From a57709de937dd4bbda82432bd5574660499c2bf9 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 15 Apr 2022 16:58:42 +0800 Subject: [PATCH] sf_connection_manager.c: make_master_connection refined --- src/idempotency/client/client_channel.h | 14 ++++- src/idempotency/client/receipt_handler.c | 24 +++++--- src/sf_configs.c | 6 +- src/sf_connection_manager.c | 76 +++++++++++++----------- 4 files changed, 74 insertions(+), 46 deletions(-) diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index 59bc3ed..bc58cc4 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -80,7 +80,19 @@ static inline int idempotency_client_channel_check_wait_ex( idempotency_client_channel_check_reconnect(channel); lcp_timedwait_sec(&channel->lc_pair, timeout); - return __sync_add_and_fetch(&channel->established, 0) ? 0 : ETIMEDOUT; + if (__sync_add_and_fetch(&channel->established, 0)) { + return 0; + } else { + /* + logInfo("file: "__FILE__", line: %d, " + "channel_check fail, server %s:%u, in_ioevent: %d, " + "canceled: %d, req count: %"PRId64, __LINE__, channel->task->server_ip, + channel->task->port, __sync_add_and_fetch(&channel-> + in_ioevent, 0), __sync_add_and_fetch(&channel-> + task->canceled, 0), channel->task->req_count); + */ + return ETIMEDOUT; + } } #ifdef __cplusplus diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 61c8d8c..0ce09d5 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -68,8 +68,10 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task) __LINE__, task->server_ip, task->port); } else { logError("file: "__FILE__", line: %d, " - "communication with server %s:%u timeout", - __LINE__, task->server_ip, task->port); + "communication with server %s:%u timeout, " + "channel established: %d", __LINE__, + task->server_ip, task->port, + FC_ATOMIC_GET(channel->established)); } return ETIMEDOUT; @@ -85,8 +87,11 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) task->event.fd = -1; } - channel = (IdempotencyClientChannel *)task->arg; + task->length = 0; + task->offset = 0; + task->req_count = 0; + channel = (IdempotencyClientChannel *)task->arg; fc_list_del_init(&channel->dlink); __sync_bool_compare_and_swap(&channel->established, 1, 0); __sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); @@ -334,12 +339,15 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) setup_channel_request(task); result = 0; break; - } else if (stage == SF_NIO_STAGE_CONTINUE && task->length == 0) { - if (((IdempotencyClientChannel *)task->arg)->established) { - report_req_receipt_request(task, true); - } else { - sf_set_read_event(task); //trigger read event + } else if (stage == SF_NIO_STAGE_CONTINUE) { + if (task->length == 0 && task->offset == 0) { + if (((IdempotencyClientChannel *)task->arg)->established) { + report_req_receipt_request(task, true); + } else if (task->req_count > 0) { + sf_set_read_event(task); //trigger read event + } } + result = 0; break; } diff --git a/src/sf_configs.c b/src/sf_configs.c index dce8439..f088515 100644 --- a/src/sf_configs.c +++ b/src/sf_configs.c @@ -25,10 +25,10 @@ #include "fastcommon/logger.h" #include "sf_configs.h" -#define DEFAULT_RETRY_MAX_INTERVAL_MS 5000 -#define DEFAULT_CONNECT_RETRY_TIMES 10 +#define DEFAULT_RETRY_MAX_INTERVAL_MS 3000 +#define DEFAULT_CONNECT_RETRY_TIMES 200 #define DEFAULT_CONNECT_RETRY_INTERVAL_MS 100 -#define DEFAULT_NETWORK_RETRY_TIMES 10 +#define DEFAULT_NETWORK_RETRY_TIMES 200 #define DEFAULT_NETWORK_RETRY_INTERVAL_MS 100 int sf_load_net_retry_config(SFNetRetryConfig *net_retry_cfg, diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 71e4b68..d85577e 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -144,34 +144,6 @@ static inline void set_connection_params(ConnectionInfo *conn, cparam->cm.old_alives = old_alives; } -static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, - SFCMConnGroupEntry *group, int *err_no) -{ - SFCMServerEntry *master; - ConnectionInfo *conn; - - master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master); - if (master != NULL) { - if ((conn=make_connection(cm, master->addr_array, - err_no)) != NULL) - { - if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) { - set_connection_params(conn, master, NULL); - } else { - SFCMServerPtrArray *alives; - alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); - set_connection_params(conn, master, alives); - } - return conn; - } - - __sync_bool_compare_and_swap(&group->master, master, NULL); - } - - *err_no = SF_RETRIABLE_ERROR_NO_SERVER; - return NULL; -} - static inline int push_to_detect_queue(SFConnectionManager *cm, SFCMConnGroupEntry *group, SFCMServerPtrArray *alives) { @@ -251,6 +223,34 @@ static int remove_from_alives(SFConnectionManager *cm, return 0; } +static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, + SFCMConnGroupEntry *group, int *err_no) +{ + SFCMServerEntry *master; + ConnectionInfo *conn; + SFCMServerPtrArray *alives; + + master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master); + if (master != NULL) { + if ((conn=make_connection(cm, master->addr_array, + err_no)) != NULL) + { + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + set_connection_params(conn, master, alives); + return conn; + } else { + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (alives != NULL) { + remove_from_alives(cm, group, alives, master); + } + __sync_bool_compare_and_swap(&group->master, master, NULL); + } + } + + *err_no = SF_RETRIABLE_ERROR_NO_SERVER; + return NULL; +} + static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm, SFCMConnGroupEntry *group, SFCMServerPtrArray *alives, const int index, int *err_no) @@ -286,6 +286,15 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm, return conn; } + /* + logInfo("file: "__FILE__", line: %d, " + "retry_count: %d, interval_ms: %d, data group id: %d, " + "master: %p, alive count: %d, all count: %d", __LINE__, + retry_count, net_retry_ctx.interval_ms, group->id, + FC_ATOMIC_GET(group->master), ((SFCMServerPtrArray *) + FC_ATOMIC_GET(group->alives))->count, group->all.count); + */ + *err_no = get_group_servers(cm, group); if (*err_no == 0) { *err_no = SF_RETRIABLE_ERROR_NO_SERVER; //for try again @@ -296,8 +305,8 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm, } logError("file: "__FILE__", line: %d, " - "get_master_connection fail, retry count: %d, errno: %d", - __LINE__, retry_count, *err_no); + "get_master_connection fail, group id: %d, retry count: %d, " + "errno: %d", __LINE__, group->id, retry_count, *err_no); return NULL; } @@ -377,12 +386,11 @@ static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn) if (cparam->cm.sentry != NULL) { server = cparam->cm.sentry; group = cm->groups.entries + server->group_index; - if (cparam->cm.old_alives == NULL) { - __sync_bool_compare_and_swap(&group->master, server, NULL); - } else { + if (cparam->cm.old_alives != NULL) { remove_from_alives(cm, group, cparam->cm.old_alives, server); cparam->cm.old_alives = NULL; } + __sync_bool_compare_and_swap(&group->master, server, NULL); cparam->cm.sentry = NULL; } @@ -422,7 +430,7 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm, return conn; } release_connection(cm, conn); - if ((conn=get_spec_connection(cm,&leader.conn, + if ((conn=get_spec_connection(cm, &leader.conn, err_no)) == NULL) { break;