diff --git a/src/sf_configs.h b/src/sf_configs.h index 1085e5d..76ac295 100644 --- a/src/sf_configs.h +++ b/src/sf_configs.h @@ -24,8 +24,8 @@ #include "sf_types.h" typedef struct sf_net_retry_interval_context { - SFNetRetryIntervalModeMaxPair *mm; - SFNetRetryTimesIntervalPair *ti; + const SFNetRetryIntervalModeMaxPair *mm; + const SFNetRetryTimesIntervalPair *ti; int interval_ms; } SFNetRetryIntervalContext; @@ -45,8 +45,8 @@ static inline void sf_reset_net_retry_interval(SFNetRetryIntervalContext *ctx) } static inline void sf_init_net_retry_interval_context( - SFNetRetryIntervalContext *ctx, SFNetRetryIntervalModeMaxPair *mm, - SFNetRetryTimesIntervalPair *ti) + SFNetRetryIntervalContext *ctx, const SFNetRetryIntervalModeMaxPair *mm, + const SFNetRetryTimesIntervalPair *ti) { ctx->mm = mm; ctx->ti = ti; diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 74ce361..5690f97 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -24,12 +24,16 @@ #include #include #include "sf_global.h" +#include "sf_configs.h" #include "sf_proto.h" #include "fastcommon/shared_func.h" #include "fastcommon/logger.h" #include "fastcommon/fc_atomic.h" #include "sf_connection_manager.h" +static int get_group_servers(SFConnectionManager *cm, + SFCMConnGroupEntry *group); + static ConnectionInfo *get_spec_connection(SFConnectionManager *cm, const ConnectionInfo *target, int *err_no) { @@ -77,6 +81,327 @@ static ConnectionInfo *make_connection(SFConnectionManager *cm, return NULL; } +static ConnectionInfo *get_server_connection(SFConnectionManager *cm, + FCServerInfo *server, int *err_no) +{ + FCAddressPtrArray *addr_array; + ConnectionInfo *conn; + + addr_array = &server->group_addrs[cm->server_group_index].address_array; + if ((conn=make_connection(cm, addr_array, err_no)) == NULL) { + logError("file: "__FILE__", line: %d, " + "server id: %d, get_server_connection fail", + __LINE__, server->id); + } + return conn; +} + +static ConnectionInfo *get_connection(SFConnectionManager *cm, + const int group_index, int *err_no) +{ + SFCMServerArray *server_array; + ConnectionInfo *conn; + uint32_t server_hash_code; + int server_index; + int i; + + server_array = &cm->groups.entries[group_index].all; + server_hash_code = rand(); + server_index = server_hash_code % server_array->count; + if ((conn=make_connection(cm, server_array->servers[server_index]. + addr_array, err_no)) != NULL) + { + return conn; + } + + if (server_array->count > 1) { + for (i=0; icount; i++) { + if (i == server_index) { + continue; + } + + if ((conn=make_connection(cm, server_array->servers[i]. + addr_array, err_no)) != NULL) + { + return conn; + } + } + } + + logError("file: "__FILE__", line: %d, " + "data group index: %d, get_connection fail, " + "configured server count: %d", __LINE__, + group_index, server_array->count); + return NULL; +} + +static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, + SFCMConnGroupEntry *group, int *err_no) +{ + SFCMServerEntry *master; + ConnectionInfo *conn; + + master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master); + if (master != NULL) { + if ((conn=make_connection(cm, master->addr_array, + err_no)) != NULL) + { + return conn; + } + + __sync_bool_compare_and_swap(&group->master, master, NULL); + } + + return NULL; +} + +static inline bool alive_array_cas(SFConnectionManager *cm, + SFCMConnGroupEntry *group, SFCMServerPtrArray *old_alives, + SFCMServerPtrArray *new_alives) +{ + if (__sync_bool_compare_and_swap(&group->alives, + old_alives, new_alives)) + { + fast_mblock_delay_free_object(&cm->sptr_array_allocator, old_alives, + (cm->common_cfg->connect_timeout + cm->common_cfg-> + network_timeout) * group->all.count); + return true; + } else { + fast_mblock_free_object(&cm->sptr_array_allocator, new_alives); + return false; + } +} + +static int remove_from_alives(SFConnectionManager *cm, + SFCMConnGroupEntry *group, SFCMServerPtrArray *old_alives, + SFCMServerEntry *server) +{ + SFCMServerPtrArray *new_alives; + SFCMServerEntry **pp; + SFCMServerEntry **dest; + SFCMServerEntry **end; + + new_alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (new_alives != old_alives) { + return 0; + } + + new_alives = (SFCMServerPtrArray *)fast_mblock_alloc_object( + &cm->sptr_array_allocator); + if (new_alives == NULL) { + return ENOMEM; + } + + dest = new_alives->servers; + end = old_alives->servers + old_alives->count; + for (pp=old_alives->servers; ppcount = dest - new_alives->servers; + if (alive_array_cas(cm, group, old_alives, new_alives)) { + SFCMServerEntry *master; + master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master); + if (master == server) { + __sync_bool_compare_and_swap(&group->master, master, NULL); + } + } + + return 0; +} + +static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm, + SFCMConnGroupEntry *group, SFCMServerPtrArray *alives, + const int index, int *err_no) +{ + ConnectionInfo *conn; + + if ((conn=make_connection(cm, alives->servers[index]-> + addr_array, err_no)) == NULL) + { + remove_from_alives(cm, group, alives, alives->servers[index]); + } + + return conn; +} + +static ConnectionInfo *get_master_connection(SFConnectionManager *cm, + const int group_index, int *err_no) +{ + SFCMConnGroupEntry *group; + ConnectionInfo *conn; + SFNetRetryIntervalContext net_retry_ctx; + int i; + + group = cm->groups.entries + group_index; + sf_init_net_retry_interval_context(&net_retry_ctx, + &cm->common_cfg->net_retry_cfg.interval_mm, + &cm->common_cfg->net_retry_cfg.connect); + i = 0; + while (1) { + if ((conn=make_master_connection(cm, group, err_no)) != NULL) { + return conn; + } + + *err_no = get_group_servers(cm, group); + if (*err_no == 0) { + *err_no = SF_RETRIABLE_ERROR_NO_SERVER; //for try again + } + SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, + cm->common_cfg->net_retry_cfg. + connect.times, ++i, *err_no); + } + + logError("file: "__FILE__", line: %d, " + "get_master_connection fail, errno: %d", + __LINE__, *err_no); + return NULL; +} + +static ConnectionInfo *get_readable_connection(SFConnectionManager *cm, + const int group_index, int *err_no) +{ + SFCMConnGroupEntry *group; + SFCMServerPtrArray *alives; + ConnectionInfo *conn; + SFNetRetryIntervalContext net_retry_ctx; + uint32_t index; + int i; + + group = cm->groups.entries + group_index; + if ((cm->common_cfg->read_rule == sf_data_read_rule_master_only) || + (group->all.count == 1)) + { + return get_master_connection(cm, group_index, err_no); + } + + sf_init_net_retry_interval_context(&net_retry_ctx, + &cm->common_cfg->net_retry_cfg.interval_mm, + &cm->common_cfg->net_retry_cfg.connect); + i = 0; + while (1) { + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (alives->count > 0) { + index = rand() % alives->count; + if ((conn=make_readable_connection(cm, group, alives, + index, err_no)) != NULL) + { + return conn; + } + } + + if (cm->common_cfg->read_rule == sf_data_read_rule_slave_first) { + if ((conn=make_master_connection(cm, group, err_no)) != NULL) { + return conn; + } + } + + *err_no = get_group_servers(cm, group); + if (*err_no == 0) { + *err_no = SF_RETRIABLE_ERROR_NO_SERVER; //for try again + } + SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, + cm->common_cfg->net_retry_cfg. + connect.times, ++i, *err_no); + } + + logError("file: "__FILE__", line: %d, " + "get_readable_connection fail, errno: %d", + __LINE__, *err_no); + return NULL; +} + +static void release_connection(SFConnectionManager *cm, + ConnectionInfo *conn) +{ + //TODO + if (((SFConnectionParameters *)conn->args)->group_id > 0) { + ((SFConnectionParameters *)conn->args)->group_id = 0; + } + + conn_pool_close_connection_ex(&cm->cpool, conn, false); +} + +static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn) +{ + //TODO + if (((SFConnectionParameters *)conn->args)->group_id > 0) { + int group_index; + group_index = ((SFConnectionParameters *)conn->args)-> + group_id - 1; + ((SFConnectionParameters *)conn->args)->group_id = 0; + } + + conn_pool_close_connection_ex(&cm->cpool, conn, true); +} + +static ConnectionInfo *get_leader_connection(SFConnectionManager *cm, + FCServerInfo *server, int *err_no) +{ + ConnectionInfo *conn; + SFClientServerEntry leader; + SFNetRetryIntervalContext net_retry_ctx; + int i; + int connect_fails; + + sf_init_net_retry_interval_context(&net_retry_ctx, + &cm->common_cfg->net_retry_cfg.interval_mm, + &cm->common_cfg->net_retry_cfg.connect); + i = connect_fails = 0; + while (1) { + do { + if ((conn=get_server_connection(cm, server, + err_no)) == NULL) + { + connect_fails++; + break; + } + + /* + if ((*err_no=fs_client_proto_get_leader(client_ctx, + conn, &leader)) != 0) + { + close_connection(cm, conn); + break; + } + */ + + if (FC_CONNECTION_SERVER_EQUAL1(*conn, leader.conn)) { + return conn; + } + release_connection(cm, conn); + if ((conn=get_spec_connection(cm, &leader.conn, err_no)) == NULL) { + break; + } + + return conn; + } while (0); + + if (connect_fails == 2) { + break; + } + + SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, + cm->common_cfg->net_retry_cfg. + connect.times, ++i, *err_no); + } + + logWarning("file: "__FILE__", line: %d, " + "get_leader_connection fail, server id: %d, %s:%u, errno: %d", + __LINE__, server->id, server->group_addrs[cm->server_group_index]. + address_array.addrs[0]->conn.ip_addr, server->group_addrs[cm-> + server_group_index].address_array.addrs[0]->conn.port, *err_no); + return NULL; +} + +static const struct sf_connection_parameters *get_connection_params( + SFConnectionManager *cm, ConnectionInfo *conn) +{ + return (SFConnectionParameters *)conn->args; +} + static int validate_connection_callback(ConnectionInfo *conn, void *args) { SFConnectionManager *cm; @@ -93,7 +418,8 @@ static int validate_connection_callback(ConnectionInfo *conn, void *args) return result; } -static int init_group_array(SFCMConnGroupArray *garray, const int group_count, +static int init_group_array(SFConnectionManager *cm, + SFCMConnGroupArray *garray, const int group_count, const int min_group_id) { int result; @@ -110,6 +436,7 @@ static int init_group_array(SFCMConnGroupArray *garray, const int group_count, end = garray->entries + group_count; for (group=garray->entries; groupcm = cm; if ((result=init_pthread_lock(&group->lock)) != 0) { return result; } @@ -145,8 +472,8 @@ int sf_connection_manager_init(SFConnectionManager *cm, return result; } - if ((result=init_group_array(&cm->groups, group_count, - min_group_id)) != 0) + if ((result=init_group_array(cm, &cm->groups, + group_count, min_group_id)) != 0) { return result; } @@ -154,6 +481,16 @@ int sf_connection_manager_init(SFConnectionManager *cm, cm->server_group_index = server_group_index; cm->common_cfg = common_cfg; cm->max_servers_per_group = 0; + + cm->ops.get_connection = get_connection; + cm->ops.get_server_connection = get_server_connection; + cm->ops.get_spec_connection = get_spec_connection; + cm->ops.get_master_connection = get_master_connection; + cm->ops.get_readable_connection = get_readable_connection; + cm->ops.get_leader_connection = get_leader_connection; + cm->ops.release_connection = release_connection; + cm->ops.close_connection = close_connection; + cm->ops.get_connection_params = get_connection_params; return 0; } @@ -164,6 +501,7 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, FCServerInfo **server; FCServerInfo **end; SFCMServerEntry *entry; + int group_index; if (group_id < cm->groups.min_group_id) { logError("file: "__FILE__", line: %d, " @@ -178,7 +516,8 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, return EINVAL; } - group = cm->groups.entries + (group_id - cm->groups.min_group_id); + group_index = group_id - cm->groups.min_group_id; + group = cm->groups.entries + group_index; group->id = group_id; group->all.servers = (SFCMServerEntry *)fc_malloc( sizeof(SFCMServerEntry) * count); @@ -192,9 +531,9 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, serverid = (*server)->id; + entry->group_index = group_index; entry->addr_array = &(*server)->group_addrs[ cm->server_group_index].address_array; - entry->conn = NULL; } if (count > cm->max_servers_per_group) { @@ -320,16 +659,7 @@ static int do_get_group_servers(SFConnectionManager *cm, return 0; } - if (__sync_bool_compare_and_swap(&group->alives, - old_alives, new_alives)) - { - fast_mblock_delay_free_object(&cm->sptr_array_allocator, old_alives, - (cm->common_cfg->connect_timeout + cm->common_cfg-> - network_timeout) * group->all.count); - } else { - fast_mblock_free_object(&cm->sptr_array_allocator, new_alives); - } - + alive_array_cas(cm, group, old_alives, new_alives); return 0; } diff --git a/src/sf_connection_manager.h b/src/sf_connection_manager.h index 33750a3..3c6730f 100644 --- a/src/sf_connection_manager.h +++ b/src/sf_connection_manager.h @@ -22,9 +22,31 @@ #include "fastcommon/connection_pool.h" #include "sf_types.h" +struct sf_connection_manager; + +typedef ConnectionInfo *(*sf_get_connection_func)( + struct sf_connection_manager *cm, + const int group_index, int *err_no); + +typedef ConnectionInfo *(*sf_get_server_connection_func)( + struct sf_connection_manager *cm, + FCServerInfo *server, int *err_no); + +typedef ConnectionInfo *(*sf_get_spec_connection_func)( + struct sf_connection_manager *cm, + const ConnectionInfo *target, int *err_no); + +typedef void (*sf_release_connection_func)( + struct sf_connection_manager *cm, ConnectionInfo *conn); +typedef void (*sf_close_connection_func)( + struct sf_connection_manager *cm, ConnectionInfo *conn); + +typedef const struct sf_connection_parameters * (*sf_get_connection_parameters)( + struct sf_connection_manager *cm, ConnectionInfo *conn); + typedef struct sf_cm_server_entry { int id; - ConnectionInfo *conn; + int group_index; FCAddressPtrArray *addr_array; } SFCMServerEntry; @@ -43,6 +65,7 @@ typedef struct sf_cm_conn_group_entry { SFCMServerArray all; volatile SFCMServerEntry *master; volatile SFCMServerPtrArray *alives; + struct sf_connection_manager *cm; pthread_mutex_t lock; } SFCMConnGroupEntry; @@ -53,6 +76,34 @@ typedef struct sf_cm_conn_group_array { int max_group_id; } SFCMConnGroupArray; +typedef struct sf_cm_operations { + /* get the specify connection by ip and port */ + sf_get_spec_connection_func get_spec_connection; + + /* get one connection of the configured servers by data group */ + sf_get_connection_func get_connection; + + /* get one connection of the server */ + sf_get_server_connection_func get_server_connection; + + /* get the master connection from the server */ + sf_get_connection_func get_master_connection; + + /* get one readable connection from the server */ + sf_get_connection_func get_readable_connection; + + /* get the leader connection from the server */ + sf_get_server_connection_func get_leader_connection; + + /* push back to connection pool when use connection pool */ + sf_release_connection_func release_connection; + + /* disconnect the connecton on network error */ + sf_close_connection_func close_connection; + + sf_get_connection_parameters get_connection_params; +} SFCMOperations; + typedef struct sf_connection_manager { int server_group_index; int max_servers_per_group; @@ -60,6 +111,7 @@ typedef struct sf_connection_manager { SFCMConnGroupArray groups; ConnectionPool cpool; struct fast_mblock_man sptr_array_allocator; //element: SFCMServerPtrArray + SFCMOperations ops; } SFConnectionManager; int sf_connection_manager_init(SFConnectionManager *cm, diff --git a/src/sf_proto.c b/src/sf_proto.c index 56cb8ec..8448203 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -411,7 +411,7 @@ int sf_proto_get_group_servers(ConnectionInfo *conn, } body_header = (SFProtoGetGroupServersRespBodyHeader *)in_buff; - count = buff2short(body_header->count); + count = buff2int(body_header->count); if (count <= 0) { logError("file: "__FILE__", line: %d, " "server %s:%d response server count: %d <= 0", diff --git a/src/sf_proto.h b/src/sf_proto.h index 09cba36..db0efdc 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -95,16 +95,19 @@ typedef struct sf_common_proto_header { typedef struct sf_proto_get_group_servers_req { char group_id[4]; + char padding[4]; } SFProtoGetGroupServersReq; typedef struct sf_proto_get_group_servers_resp_body_header { - char count[2]; + char count[4]; + char padding[4]; } SFProtoGetGroupServersRespBodyHeader; typedef struct sf_proto_get_group_servers_resp_body_part { + char server_id[4]; char is_master; char is_active; - char server_id[4]; + char padding[2]; } SFProtoGetGroupServersRespBodyPart; typedef struct sf_proto_idempotency_additional_header { @@ -139,9 +142,10 @@ typedef struct sf_proto_report_req_receipt_body { typedef struct sf_group_server_info { int id; + bool is_leader; bool is_master; bool is_active; - char padding[2]; + char padding[1]; } SFGroupServerInfo; typedef struct sf_group_server_array { @@ -150,6 +154,11 @@ typedef struct sf_group_server_array { int count; } SFGroupServerArray; +typedef struct sf_client_server_entry { + int server_id; + ConnectionInfo conn; +} SFClientServerEntry; + #ifdef __cplusplus extern "C" { #endif