faststore use this connection manager

connection_manager
YuQing 2021-02-20 12:49:11 +08:00
parent 4aeec5385a
commit 47ed8fb46c
4 changed files with 43 additions and 54 deletions

View File

@ -19,7 +19,7 @@
#include "../../sf_configs.h" #include "../../sf_configs.h"
#define SF_CLIENT_IDEMPOTENCY_UPDATE_WRAPPER(client_ctx, \ #define SF_CLIENT_IDEMPOTENCY_UPDATE_WRAPPER(client_ctx, conn_manager, \
GET_MASTER_CONNECTION, get_conn_arg1, update_callback, ...) \ GET_MASTER_CONNECTION, get_conn_arg1, update_callback, ...) \
ConnectionInfo *conn; \ ConnectionInfo *conn; \
IdempotencyClientChannel *old_channel; \ IdempotencyClientChannel *old_channel; \
@ -30,19 +30,19 @@
uint64_t req_id; \ uint64_t req_id; \
SFNetRetryIntervalContext net_retry_ctx; \ SFNetRetryIntervalContext net_retry_ctx; \
\ \
if ((conn=GET_MASTER_CONNECTION(client_ctx, \ if ((conn=GET_MASTER_CONNECTION(conn_manager, \
get_conn_arg1, &result)) == NULL) \ get_conn_arg1, &result)) == NULL) \
{ \ { \
return SF_UNIX_ERRNO(result, EIO); \ return SF_UNIX_ERRNO(result, EIO); \
} \ } \
connection_params = client_ctx->conn_manager. \ connection_params = (conn_manager)->ops. \
get_connection_params(client_ctx, conn); \ get_connection_params(conn_manager, conn); \
idempotency_enabled = client_ctx->idempotency_enabled && \ idempotency_enabled = client_ctx->idempotency_enabled && \
connection_params != NULL; \ connection_params != NULL; \
\ \
sf_init_net_retry_interval_context(&net_retry_ctx, \ sf_init_net_retry_interval_context(&net_retry_ctx, \
&client_ctx->net_retry_cfg.interval_mm, \ &client_ctx->common_cfg.net_retry_cfg.interval_mm, \
&client_ctx->net_retry_cfg.network); \ &client_ctx->common_cfg.net_retry_cfg.network); \
\ \
while (1) { \ while (1) { \
if (idempotency_enabled) { \ if (idempotency_enabled) { \
@ -81,7 +81,7 @@
if ((conn_result=sf_proto_rebind_idempotency_channel( \ if ((conn_result=sf_proto_rebind_idempotency_channel( \
conn, connection_params->channel->id, \ conn, connection_params->channel->id, \
connection_params->channel->key, \ connection_params->channel->key, \
client_ctx->network_timeout)) == 0) \ client_ctx->common_cfg.network_timeout)) == 0) \
{ \ { \
continue; \ continue; \
} \ } \
@ -89,21 +89,21 @@
} \ } \
\ \
SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \
net_retry_cfg.network.times, ++i, result); \ common_cfg.net_retry_cfg.network.times, ++i, result); \
/* \ /* \
logInfo("file: "__FILE__", line: %d, func: %s, " \ logInfo("file: "__FILE__", line: %d, func: %s, " \
"net retry result: %d, retry count: %d", \ "net retry result: %d, retry count: %d", \
__LINE__, __FUNCTION__, result, i); \ __LINE__, __FUNCTION__, result, i); \
*/ \ */ \
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, conn_result); \ SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, conn_result); \
if ((conn=GET_MASTER_CONNECTION(client_ctx, \ if ((conn=GET_MASTER_CONNECTION(conn_manager, \
get_conn_arg1, &result)) == NULL) \ get_conn_arg1, &result)) == NULL) \
{ \ { \
return SF_UNIX_ERRNO(result, EIO); \ return SF_UNIX_ERRNO(result, EIO); \
} \ } \
\ \
connection_params = client_ctx->conn_manager. \ connection_params = (conn_manager)->ops. \
get_connection_params(client_ctx, conn); \ get_connection_params(conn_manager, conn); \
if (connection_params != NULL && connection_params->channel != \ if (connection_params != NULL && connection_params->channel != \
old_channel) \ old_channel) \
{ \ { \
@ -124,26 +124,26 @@
break; \ break; \
} \ } \
\ \
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, result); \
return SF_UNIX_ERRNO(result, EIO) return SF_UNIX_ERRNO(result, EIO)
#define SF_CLIENT_IDEMPOTENCY_QUERY_WRAPPER(client_ctx, \ #define SF_CLIENT_IDEMPOTENCY_QUERY_WRAPPER(client_ctx, conn_manager, \
GET_READABLE_CONNECTION, get_conn_arg1, query_callback, ...) \ GET_READABLE_CONNECTION, get_conn_arg1, query_callback, ...) \
ConnectionInfo *conn; \ ConnectionInfo *conn; \
int result; \ int result; \
int i; \ int i; \
SFNetRetryIntervalContext net_retry_ctx; \ SFNetRetryIntervalContext net_retry_ctx; \
\ \
if ((conn=GET_READABLE_CONNECTION(client_ctx, \ if ((conn=GET_READABLE_CONNECTION(conn_manager, \
get_conn_arg1, &result)) == NULL) \ get_conn_arg1, &result)) == NULL) \
{ \ { \
return SF_UNIX_ERRNO(result, EIO); \ return SF_UNIX_ERRNO(result, EIO); \
} \ } \
\ \
sf_init_net_retry_interval_context(&net_retry_ctx, \ sf_init_net_retry_interval_context(&net_retry_ctx, \
&client_ctx->net_retry_cfg.interval_mm, \ &client_ctx->common_cfg.net_retry_cfg.interval_mm, \
&client_ctx->net_retry_cfg.network); \ &client_ctx->common_cfg.net_retry_cfg.network); \
i = 0; \ i = 0; \
while (1) { \ while (1) { \
if ((result=query_callback(client_ctx, \ if ((result=query_callback(client_ctx, \
@ -152,21 +152,21 @@
break; \ break; \
} \ } \
SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \
net_retry_cfg.network.times, ++i, result); \ common_cfg.net_retry_cfg.network.times, ++i, result); \
/* \ /* \
logInfo("file: "__FILE__", line: %d, func: %s, " \ logInfo("file: "__FILE__", line: %d, func: %s, " \
"net retry result: %d, retry count: %d", \ "net retry result: %d, retry count: %d", \
__LINE__, __FUNCTION__, result, i); \ __LINE__, __FUNCTION__, result, i); \
*/ \ */ \
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, result); \
if ((conn=GET_READABLE_CONNECTION(client_ctx, \ if ((conn=GET_READABLE_CONNECTION(conn_manager, \
get_conn_arg1, &result)) == NULL) \ get_conn_arg1, &result)) == NULL) \
{ \ { \
return SF_UNIX_ERRNO(result, EIO); \ return SF_UNIX_ERRNO(result, EIO); \
} \ } \
} \ } \
\ \
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ SF_CLIENT_RELEASE_CONNECTION(conn_manager, conn, result); \
return SF_UNIX_ERRNO(result, EIO) return SF_UNIX_ERRNO(result, EIO)

View File

@ -354,13 +354,13 @@ static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn)
if (cparam->cm.sentry != NULL) { if (cparam->cm.sentry != NULL) {
server = cparam->cm.sentry; server = cparam->cm.sentry;
group = cm->groups.entries + server->group_index; group = cm->groups.entries + server->group_index;
if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) { if (cparam->cm.old_alives == NULL) {
__sync_bool_compare_and_swap(&group->master, server, NULL); __sync_bool_compare_and_swap(&group->master, server, NULL);
} else { } else {
remove_from_alives(cm, group, cparam->cm.old_alives, server); remove_from_alives(cm, group, cparam->cm.old_alives, server);
cparam->cm.old_alives = NULL;
} }
cparam->cm.sentry = NULL; cparam->cm.sentry = NULL;
cparam->cm.old_alives = NULL;
} }
conn_pool_close_connection_ex(&cm->cpool, conn, true); conn_pool_close_connection_ex(&cm->cpool, conn, true);
@ -448,8 +448,7 @@ static int validate_connection_callback(ConnectionInfo *conn, void *args)
} }
static int init_group_array(SFConnectionManager *cm, static int init_group_array(SFConnectionManager *cm,
SFCMConnGroupArray *garray, const int group_count, SFCMConnGroupArray *garray, const int group_count)
const int min_group_id)
{ {
int result; int result;
int bytes; int bytes;
@ -465,24 +464,20 @@ static int init_group_array(SFConnectionManager *cm,
end = garray->entries + group_count; end = garray->entries + group_count;
for (group=garray->entries; group<end; group++) { for (group=garray->entries; group<end; group++) {
group->cm = cm;
if ((result=init_pthread_lock(&group->lock)) != 0) { if ((result=init_pthread_lock(&group->lock)) != 0) {
return result; return result;
} }
} }
garray->count = group_count; garray->count = group_count;
garray->min_group_id = min_group_id;
garray->max_group_id = min_group_id + group_count - 1;
return 0; return 0;
} }
int sf_connection_manager_init(SFConnectionManager *cm, int sf_connection_manager_init(SFConnectionManager *cm,
const SFClientCommonConfig *common_cfg, const int group_count, const SFClientCommonConfig *common_cfg, const int group_count,
const int min_group_id, const int server_group_index, const int server_group_index, const int server_count,
const int server_count, const int max_count_per_entry, const int max_count_per_entry, const int max_idle_time,
const int max_idle_time, fc_connection_callback_func fc_connection_callback_func connect_done_callback, void *args)
connect_done_callback, void *args)
{ {
const int socket_domain = AF_INET; const int socket_domain = AF_INET;
int htable_init_capacity; int htable_init_capacity;
@ -501,9 +496,7 @@ int sf_connection_manager_init(SFConnectionManager *cm,
return result; return result;
} }
if ((result=init_group_array(cm, &cm->groups, if ((result=init_group_array(cm, &cm->groups, group_count)) != 0) {
group_count, min_group_id)) != 0)
{
return result; return result;
} }
@ -532,20 +525,20 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id,
SFCMServerEntry *entry; SFCMServerEntry *entry;
int group_index; int group_index;
if (group_id < cm->groups.min_group_id) { if (group_id < 1) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"invalid group id: %d which < min group id: %d", "invalid group id: %d < 1",
__LINE__, group_id, cm->groups.min_group_id); __LINE__, group_id);
return EINVAL; return EINVAL;
} }
if (group_id > cm->groups.max_group_id) { if (group_id > cm->groups.count) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"invalid group id: %d which > max group id: %d", "invalid group id: %d > group count: %d",
__LINE__, group_id, cm->groups.max_group_id); __LINE__, group_id, cm->groups.count);
return EINVAL; return EINVAL;
} }
group_index = group_id - cm->groups.min_group_id; group_index = group_id - 1;
group = cm->groups.entries + group_index; group = cm->groups.entries + group_index;
group->id = group_id; group->id = group_id;
group->all.servers = (SFCMServerEntry *)fc_malloc( group->all.servers = (SFCMServerEntry *)fc_malloc(

View File

@ -65,15 +65,12 @@ typedef struct sf_cm_conn_group_entry {
SFCMServerArray all; SFCMServerArray all;
volatile SFCMServerEntry *master; volatile SFCMServerEntry *master;
volatile SFCMServerPtrArray *alives; volatile SFCMServerPtrArray *alives;
struct sf_connection_manager *cm;
pthread_mutex_t lock; pthread_mutex_t lock;
} SFCMConnGroupEntry; } SFCMConnGroupEntry;
typedef struct sf_cm_conn_group_array { typedef struct sf_cm_conn_group_array {
SFCMConnGroupEntry *entries; SFCMConnGroupEntry *entries;
int count; int count;
int min_group_id;
int max_group_id;
} SFCMConnGroupArray; } SFCMConnGroupArray;
typedef struct sf_cm_operations { typedef struct sf_cm_operations {
@ -116,10 +113,9 @@ typedef struct sf_connection_manager {
int sf_connection_manager_init(SFConnectionManager *cm, int sf_connection_manager_init(SFConnectionManager *cm,
const SFClientCommonConfig *common_cfg, const int group_count, const SFClientCommonConfig *common_cfg, const int group_count,
const int min_group_id, const int server_group_index, const int server_group_index, const int server_count,
const int server_count, const int max_count_per_entry, const int max_count_per_entry, const int max_idle_time,
const int max_idle_time, fc_connection_callback_func fc_connection_callback_func connect_done_callback, void *args);
connect_done_callback, void *args);
int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, int sf_connection_manager_add(SFConnectionManager *cm, const int group_id,
FCServerInfo **servers, const int count); FCServerInfo **servers, const int count);

View File

@ -377,12 +377,12 @@ int sf_proto_get_leader(ConnectionInfo *conn,
const int network_timeout, const int network_timeout,
SFClientServerEntry *leader); SFClientServerEntry *leader);
#define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ #define SF_CLIENT_RELEASE_CONNECTION(cm, conn, result) \
do { \ do { \
if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \ if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \
client_ctx->conn_manager.close_connection(client_ctx, conn); \ (cm)->ops.close_connection(cm, conn); \
} else if (client_ctx->conn_manager.release_connection != NULL) { \ } else if ((cm)->ops.release_connection != NULL) { \
client_ctx->conn_manager.release_connection(client_ctx, conn); \ (cm)->ops.release_connection(cm, conn); \
} \ } \
} while (0) } while (0)