sf_connection_manager.c: make_master_connection refined

recovery_and_balance
YuQing 2022-04-15 16:58:42 +08:00
parent 7259eaf6ac
commit a57709de93
4 changed files with 74 additions and 46 deletions

View File

@ -80,7 +80,19 @@ static inline int idempotency_client_channel_check_wait_ex(
idempotency_client_channel_check_reconnect(channel);
lcp_timedwait_sec(&channel->lc_pair, timeout);
return __sync_add_and_fetch(&channel->established, 0) ? 0 : ETIMEDOUT;
if (__sync_add_and_fetch(&channel->established, 0)) {
return 0;
} else {
/*
logInfo("file: "__FILE__", line: %d, "
"channel_check fail, server %s:%u, in_ioevent: %d, "
"canceled: %d, req count: %"PRId64, __LINE__, channel->task->server_ip,
channel->task->port, __sync_add_and_fetch(&channel->
in_ioevent, 0), __sync_add_and_fetch(&channel->
task->canceled, 0), channel->task->req_count);
*/
return ETIMEDOUT;
}
}
#ifdef __cplusplus

View File

@ -68,8 +68,10 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task)
__LINE__, task->server_ip, task->port);
} else {
logError("file: "__FILE__", line: %d, "
"communication with server %s:%u timeout",
__LINE__, task->server_ip, task->port);
"communication with server %s:%u timeout, "
"channel established: %d", __LINE__,
task->server_ip, task->port,
FC_ATOMIC_GET(channel->established));
}
return ETIMEDOUT;
@ -85,8 +87,11 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
task->event.fd = -1;
}
channel = (IdempotencyClientChannel *)task->arg;
task->length = 0;
task->offset = 0;
task->req_count = 0;
channel = (IdempotencyClientChannel *)task->arg;
fc_list_del_init(&channel->dlink);
__sync_bool_compare_and_swap(&channel->established, 1, 0);
__sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0);
@ -334,12 +339,15 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
setup_channel_request(task);
result = 0;
break;
} else if (stage == SF_NIO_STAGE_CONTINUE && task->length == 0) {
if (((IdempotencyClientChannel *)task->arg)->established) {
report_req_receipt_request(task, true);
} else {
sf_set_read_event(task); //trigger read event
} else if (stage == SF_NIO_STAGE_CONTINUE) {
if (task->length == 0 && task->offset == 0) {
if (((IdempotencyClientChannel *)task->arg)->established) {
report_req_receipt_request(task, true);
} else if (task->req_count > 0) {
sf_set_read_event(task); //trigger read event
}
}
result = 0;
break;
}

View File

@ -25,10 +25,10 @@
#include "fastcommon/logger.h"
#include "sf_configs.h"
#define DEFAULT_RETRY_MAX_INTERVAL_MS 5000
#define DEFAULT_CONNECT_RETRY_TIMES 10
#define DEFAULT_RETRY_MAX_INTERVAL_MS 3000
#define DEFAULT_CONNECT_RETRY_TIMES 200
#define DEFAULT_CONNECT_RETRY_INTERVAL_MS 100
#define DEFAULT_NETWORK_RETRY_TIMES 10
#define DEFAULT_NETWORK_RETRY_TIMES 200
#define DEFAULT_NETWORK_RETRY_INTERVAL_MS 100
int sf_load_net_retry_config(SFNetRetryConfig *net_retry_cfg,

View File

@ -144,34 +144,6 @@ static inline void set_connection_params(ConnectionInfo *conn,
cparam->cm.old_alives = old_alives;
}
static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, int *err_no)
{
SFCMServerEntry *master;
ConnectionInfo *conn;
master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master);
if (master != NULL) {
if ((conn=make_connection(cm, master->addr_array,
err_no)) != NULL)
{
if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) {
set_connection_params(conn, master, NULL);
} else {
SFCMServerPtrArray *alives;
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
set_connection_params(conn, master, alives);
}
return conn;
}
__sync_bool_compare_and_swap(&group->master, master, NULL);
}
*err_no = SF_RETRIABLE_ERROR_NO_SERVER;
return NULL;
}
static inline int push_to_detect_queue(SFConnectionManager *cm,
SFCMConnGroupEntry *group, SFCMServerPtrArray *alives)
{
@ -251,6 +223,34 @@ static int remove_from_alives(SFConnectionManager *cm,
return 0;
}
static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, int *err_no)
{
SFCMServerEntry *master;
ConnectionInfo *conn;
SFCMServerPtrArray *alives;
master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master);
if (master != NULL) {
if ((conn=make_connection(cm, master->addr_array,
err_no)) != NULL)
{
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
set_connection_params(conn, master, alives);
return conn;
} else {
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
if (alives != NULL) {
remove_from_alives(cm, group, alives, master);
}
__sync_bool_compare_and_swap(&group->master, master, NULL);
}
}
*err_no = SF_RETRIABLE_ERROR_NO_SERVER;
return NULL;
}
static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, SFCMServerPtrArray *alives,
const int index, int *err_no)
@ -286,6 +286,15 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm,
return conn;
}
/*
logInfo("file: "__FILE__", line: %d, "
"retry_count: %d, interval_ms: %d, data group id: %d, "
"master: %p, alive count: %d, all count: %d", __LINE__,
retry_count, net_retry_ctx.interval_ms, group->id,
FC_ATOMIC_GET(group->master), ((SFCMServerPtrArray *)
FC_ATOMIC_GET(group->alives))->count, group->all.count);
*/
*err_no = get_group_servers(cm, group);
if (*err_no == 0) {
*err_no = SF_RETRIABLE_ERROR_NO_SERVER; //for try again
@ -296,8 +305,8 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm,
}
logError("file: "__FILE__", line: %d, "
"get_master_connection fail, retry count: %d, errno: %d",
__LINE__, retry_count, *err_no);
"get_master_connection fail, group id: %d, retry count: %d, "
"errno: %d", __LINE__, group->id, retry_count, *err_no);
return NULL;
}
@ -377,12 +386,11 @@ static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn)
if (cparam->cm.sentry != NULL) {
server = cparam->cm.sentry;
group = cm->groups.entries + server->group_index;
if (cparam->cm.old_alives == NULL) {
__sync_bool_compare_and_swap(&group->master, server, NULL);
} else {
if (cparam->cm.old_alives != NULL) {
remove_from_alives(cm, group, cparam->cm.old_alives, server);
cparam->cm.old_alives = NULL;
}
__sync_bool_compare_and_swap(&group->master, server, NULL);
cparam->cm.sentry = NULL;
}
@ -422,7 +430,7 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm,
return conn;
}
release_connection(cm, conn);
if ((conn=get_spec_connection(cm,&leader.conn,
if ((conn=get_spec_connection(cm, &leader.conn,
err_no)) == NULL)
{
break;