diff --git a/src/idempotency/client/rpc_wrapper.h b/src/idempotency/client/rpc_wrapper.h index 1bf8c63..432ab66 100644 --- a/src/idempotency/client/rpc_wrapper.h +++ b/src/idempotency/client/rpc_wrapper.h @@ -19,7 +19,7 @@ #include "../../sf_configs.h" -#define SF_CLIENT_IDEMPOTENCY_UPDATE_WRAPPER(client_ctx, \ +#define SF_CLIENT_IDEMPOTENCY_UPDATE_WRAPPER(client_ctx, conn_manager, \ GET_MASTER_CONNECTION, get_conn_arg1, update_callback, ...) \ ConnectionInfo *conn; \ IdempotencyClientChannel *old_channel; \ @@ -30,19 +30,19 @@ uint64_t req_id; \ SFNetRetryIntervalContext net_retry_ctx; \ \ - if ((conn=GET_MASTER_CONNECTION(client_ctx, \ + if ((conn=GET_MASTER_CONNECTION(conn_manager, \ get_conn_arg1, &result)) == NULL) \ { \ return SF_UNIX_ERRNO(result, EIO); \ } \ - connection_params = client_ctx->conn_manager. \ - get_connection_params(client_ctx, conn); \ + connection_params = (conn_manager)->ops. \ + get_connection_params(conn_manager, conn); \ idempotency_enabled = client_ctx->idempotency_enabled && \ connection_params != NULL; \ \ sf_init_net_retry_interval_context(&net_retry_ctx, \ - &client_ctx->net_retry_cfg.interval_mm, \ - &client_ctx->net_retry_cfg.network); \ + &client_ctx->common_cfg.net_retry_cfg.interval_mm, \ + &client_ctx->common_cfg.net_retry_cfg.network); \ \ while (1) { \ if (idempotency_enabled) { \ @@ -81,7 +81,7 @@ if ((conn_result=sf_proto_rebind_idempotency_channel( \ conn, connection_params->channel->id, \ connection_params->channel->key, \ - client_ctx->network_timeout)) == 0) \ + client_ctx->common_cfg.network_timeout)) == 0) \ { \ continue; \ } \ @@ -89,21 +89,21 @@ } \ \ SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ - net_retry_cfg.network.times, ++i, result); \ + common_cfg.net_retry_cfg.network.times, ++i, result); \ /* \ logInfo("file: "__FILE__", line: %d, func: %s, " \ "net retry result: %d, retry count: %d", \ __LINE__, __FUNCTION__, result, i); \ */ \ - SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, conn_result); \ - if ((conn=GET_MASTER_CONNECTION(client_ctx, \ + SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, conn_result); \ + if ((conn=GET_MASTER_CONNECTION(conn_manager, \ get_conn_arg1, &result)) == NULL) \ { \ return SF_UNIX_ERRNO(result, EIO); \ } \ \ - connection_params = client_ctx->conn_manager. \ - get_connection_params(client_ctx, conn); \ + connection_params = (conn_manager)->ops. \ + get_connection_params(conn_manager, conn); \ if (connection_params != NULL && connection_params->channel != \ old_channel) \ { \ @@ -124,26 +124,26 @@ break; \ } \ \ - SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ + SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, result); \ return SF_UNIX_ERRNO(result, EIO) -#define SF_CLIENT_IDEMPOTENCY_QUERY_WRAPPER(client_ctx, \ +#define SF_CLIENT_IDEMPOTENCY_QUERY_WRAPPER(client_ctx, conn_manager, \ GET_READABLE_CONNECTION, get_conn_arg1, query_callback, ...) \ ConnectionInfo *conn; \ int result; \ int i; \ SFNetRetryIntervalContext net_retry_ctx; \ \ - if ((conn=GET_READABLE_CONNECTION(client_ctx, \ + if ((conn=GET_READABLE_CONNECTION(conn_manager, \ get_conn_arg1, &result)) == NULL) \ { \ return SF_UNIX_ERRNO(result, EIO); \ } \ \ sf_init_net_retry_interval_context(&net_retry_ctx, \ - &client_ctx->net_retry_cfg.interval_mm, \ - &client_ctx->net_retry_cfg.network); \ + &client_ctx->common_cfg.net_retry_cfg.interval_mm, \ + &client_ctx->common_cfg.net_retry_cfg.network); \ i = 0; \ while (1) { \ if ((result=query_callback(client_ctx, \ @@ -152,21 +152,21 @@ break; \ } \ SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ - net_retry_cfg.network.times, ++i, result); \ + common_cfg.net_retry_cfg.network.times, ++i, result); \ /* \ logInfo("file: "__FILE__", line: %d, func: %s, " \ "net retry result: %d, retry count: %d", \ __LINE__, __FUNCTION__, result, i); \ */ \ - SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ - if ((conn=GET_READABLE_CONNECTION(client_ctx, \ + SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, result); \ + if ((conn=GET_READABLE_CONNECTION(conn_manager, \ get_conn_arg1, &result)) == NULL) \ { \ return SF_UNIX_ERRNO(result, EIO); \ } \ } \ \ - SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ + SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, result); \ return SF_UNIX_ERRNO(result, EIO) diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 578e469..1d5c502 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -354,13 +354,13 @@ static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn) if (cparam->cm.sentry != NULL) { server = cparam->cm.sentry; group = cm->groups.entries + server->group_index; - if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) { + if (cparam->cm.old_alives == NULL) { __sync_bool_compare_and_swap(&group->master, server, NULL); } else { remove_from_alives(cm, group, cparam->cm.old_alives, server); + cparam->cm.old_alives = NULL; } cparam->cm.sentry = NULL; - cparam->cm.old_alives = NULL; } conn_pool_close_connection_ex(&cm->cpool, conn, true); @@ -448,8 +448,7 @@ static int validate_connection_callback(ConnectionInfo *conn, void *args) } static int init_group_array(SFConnectionManager *cm, - SFCMConnGroupArray *garray, const int group_count, - const int min_group_id) + SFCMConnGroupArray *garray, const int group_count) { int result; int bytes; @@ -465,24 +464,20 @@ static int init_group_array(SFConnectionManager *cm, end = garray->entries + group_count; for (group=garray->entries; groupcm = cm; if ((result=init_pthread_lock(&group->lock)) != 0) { return result; } } garray->count = group_count; - garray->min_group_id = min_group_id; - garray->max_group_id = min_group_id + group_count - 1; return 0; } int sf_connection_manager_init(SFConnectionManager *cm, const SFClientCommonConfig *common_cfg, const int group_count, - const int min_group_id, const int server_group_index, - const int server_count, const int max_count_per_entry, - const int max_idle_time, fc_connection_callback_func - connect_done_callback, void *args) + const int server_group_index, const int server_count, + const int max_count_per_entry, const int max_idle_time, + fc_connection_callback_func connect_done_callback, void *args) { const int socket_domain = AF_INET; int htable_init_capacity; @@ -501,9 +496,7 @@ int sf_connection_manager_init(SFConnectionManager *cm, return result; } - if ((result=init_group_array(cm, &cm->groups, - group_count, min_group_id)) != 0) - { + if ((result=init_group_array(cm, &cm->groups, group_count)) != 0) { return result; } @@ -532,20 +525,20 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, SFCMServerEntry *entry; int group_index; - if (group_id < cm->groups.min_group_id) { + if (group_id < 1) { logError("file: "__FILE__", line: %d, " - "invalid group id: %d which < min group id: %d", - __LINE__, group_id, cm->groups.min_group_id); + "invalid group id: %d < 1", + __LINE__, group_id); return EINVAL; } - if (group_id > cm->groups.max_group_id) { + if (group_id > cm->groups.count) { logError("file: "__FILE__", line: %d, " - "invalid group id: %d which > max group id: %d", - __LINE__, group_id, cm->groups.max_group_id); + "invalid group id: %d > group count: %d", + __LINE__, group_id, cm->groups.count); return EINVAL; } - group_index = group_id - cm->groups.min_group_id; + group_index = group_id - 1; group = cm->groups.entries + group_index; group->id = group_id; group->all.servers = (SFCMServerEntry *)fc_malloc( diff --git a/src/sf_connection_manager.h b/src/sf_connection_manager.h index 3c6730f..790d86e 100644 --- a/src/sf_connection_manager.h +++ b/src/sf_connection_manager.h @@ -65,15 +65,12 @@ typedef struct sf_cm_conn_group_entry { SFCMServerArray all; volatile SFCMServerEntry *master; volatile SFCMServerPtrArray *alives; - struct sf_connection_manager *cm; pthread_mutex_t lock; } SFCMConnGroupEntry; typedef struct sf_cm_conn_group_array { SFCMConnGroupEntry *entries; int count; - int min_group_id; - int max_group_id; } SFCMConnGroupArray; typedef struct sf_cm_operations { @@ -116,10 +113,9 @@ typedef struct sf_connection_manager { int sf_connection_manager_init(SFConnectionManager *cm, const SFClientCommonConfig *common_cfg, const int group_count, - const int min_group_id, const int server_group_index, - const int server_count, const int max_count_per_entry, - const int max_idle_time, fc_connection_callback_func - connect_done_callback, void *args); + const int server_group_index, const int server_count, + const int max_count_per_entry, const int max_idle_time, + fc_connection_callback_func connect_done_callback, void *args); int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, FCServerInfo **servers, const int count); diff --git a/src/sf_proto.h b/src/sf_proto.h index c0c76cc..84dcb9c 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -377,12 +377,12 @@ int sf_proto_get_leader(ConnectionInfo *conn, const int network_timeout, SFClientServerEntry *leader); -#define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ - do { \ +#define SF_CLIENT_RELEASE_CONNECTION(cm, conn, result) \ + do { \ if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \ - client_ctx->conn_manager.close_connection(client_ctx, conn); \ - } else if (client_ctx->conn_manager.release_connection != NULL) { \ - client_ctx->conn_manager.release_connection(client_ctx, conn); \ + (cm)->ops.close_connection(cm, conn); \ + } else if ((cm)->ops.release_connection != NULL) { \ + (cm)->ops.release_connection(cm, conn); \ } \ } while (0)