diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 813253c..a398b86 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -37,7 +37,51 @@ typedef struct { static ClientChannelContext channel_context; -static int init_htable(ClientChannelHashtable *htable, const int hint_capacity) +IdempotencyClientConfig g_idempotency_client_cfg = {false, 3, 300}; + +static int load_client_channel_config(IniFullContext *ini_ctx) +{ + g_idempotency_client_cfg.channel_htable_capacity = iniGetIntValue( + ini_ctx->section_name, "channel_htable_capacity", + ini_ctx->context, 1361); + if (g_idempotency_client_cfg.channel_htable_capacity < 163) { + logWarning("file: "__FILE__", line: %d, " + "config file: %s, channel_htable_capacity: %d is " + "too small, set to 163", __LINE__, ini_ctx->filename, + g_idempotency_client_cfg.channel_htable_capacity); + g_idempotency_client_cfg.channel_htable_capacity = 163; + } + + g_idempotency_client_cfg.channel_heartbeat_interval = iniGetIntValue( + ini_ctx->section_name, "channel_heartbeat_interval", + ini_ctx->context, 3); + if (g_idempotency_client_cfg.channel_heartbeat_interval <= 0) { + logWarning("file: "__FILE__", line: %d, " + "config file: %s, channel_heartbeat_interval: %d is " + "invalid, set to 3", __LINE__, ini_ctx->filename, + g_idempotency_client_cfg.channel_heartbeat_interval); + g_idempotency_client_cfg.channel_heartbeat_interval = 3; + } + + g_idempotency_client_cfg.channel_max_idle_time = iniGetIntValue( + ini_ctx->section_name, "channel_max_idle_time", + ini_ctx->context, 3); + return 0; +} + +void idempotency_client_channel_config_to_string_ex( + char *output, const int size, const bool add_comma) +{ + snprintf(output, size, "channel_htable_capacity=%d, " + "channel_heartbeat_interval=%ds, " + "channel_max_idle_time=%ds%s", + g_idempotency_client_cfg.channel_htable_capacity, + g_idempotency_client_cfg.channel_heartbeat_interval, + g_idempotency_client_cfg.channel_max_idle_time, + (add_comma ? ", " : "")); +} + +static int init_htable(ClientChannelHashtable *htable) { int result; int bytes; @@ -46,11 +90,8 @@ static int init_htable(ClientChannelHashtable *htable, const int hint_capacity) return result; } - if (hint_capacity <= 1024) { - htable->capacity = 1361; - } else { - htable->capacity = fc_ceil_prime(hint_capacity); - } + htable->capacity = fc_ceil_prime(g_idempotency_client_cfg. + channel_htable_capacity); bytes = sizeof(IdempotencyClientChannel *) * htable->capacity; htable->buckets = (IdempotencyClientChannel **)fc_malloc(bytes); if (htable->buckets == NULL) { @@ -84,9 +125,14 @@ static int idempotency_channel_alloc_init(void *element, void *args) (&((IdempotencyClientReceipt *)NULL)->next)); } -int client_channel_init_ex(const int hint_capacity) +int client_channel_init(IniFullContext *ini_ctx) { int result; + + if ((result=load_client_channel_config(ini_ctx)) != 0) { + return result; + } + if ((result=fast_mblock_init_ex1(&channel_context.channel_allocator, "channel_info", sizeof(IdempotencyClientChannel), 64, 0, idempotency_channel_alloc_init, NULL, true)) != 0) @@ -94,7 +140,7 @@ int client_channel_init_ex(const int hint_capacity) return result; } - if ((result=init_htable(&channel_context.htable, hint_capacity)) != 0) { + if ((result=init_htable(&channel_context.htable)) != 0) { return result; } @@ -130,7 +176,7 @@ struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, task->thread_data = g_sf_context.thread_data + hash_code % g_sf_context.work_threads; channel->in_ioevent = 1; - channel->last_connect_time = get_current_time(); + channel->last_connect_time = g_current_time; if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { channel->in_ioevent = 0; free_queue_push(task); @@ -143,16 +189,14 @@ int idempotency_client_channel_check_reconnect( IdempotencyClientChannel *channel) { int result; - time_t current_time; if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { return 0; } - current_time = get_current_time(); - if (channel->last_connect_time >= current_time) { + if (channel->last_connect_time >= g_current_time) { sleep(1); - channel->last_connect_time = ++current_time; + channel->last_connect_time = g_current_time; } logDebug("file: "__FILE__", line: %d, " @@ -162,7 +206,8 @@ int idempotency_client_channel_check_reconnect( channel->task->canceled = false; if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { - channel->last_connect_time = current_time; + channel->last_connect_time = g_current_time; + channel->last_report_time = g_current_time; } else { __sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); //rollback } diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index b10c8f2..e0e6949 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -3,6 +3,7 @@ #ifndef IDEMPOTENCY_CLIENT_CHANNEL_H #define IDEMPOTENCY_CLIENT_CHANNEL_H +#include "fastcommon/ini_file_reader.h" #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/fc_atomic.h" @@ -12,11 +13,17 @@ extern "C" { #endif -#define client_channel_init() client_channel_init_ex(0) +extern IdempotencyClientConfig g_idempotency_client_cfg; -int client_channel_init_ex(const int hint_capacity); +int client_channel_init(IniFullContext *ini_ctx); void client_channel_destroy(); +#define idempotency_client_channel_config_to_string(output, size) \ + idempotency_client_channel_config_to_string_ex(output, size, false) + +void idempotency_client_channel_config_to_string_ex( + char *output, const int size, const bool add_comma); + struct idempotency_client_channel *idempotency_client_channel_get( const char *server_ip, const short server_port, const int timeout, int *err_no); diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h index 58606dc..e03515a 100644 --- a/src/idempotency/client/client_types.h +++ b/src/idempotency/client/client_types.h @@ -7,6 +7,12 @@ #include "fastcommon/fc_list.h" #include "fastcommon/fc_queue.h" +typedef struct idempotency_client_config { + int channel_htable_capacity; + int channel_heartbeat_interval; + int channel_max_idle_time; +} IdempotencyClientConfig; + typedef struct idempotency_client_receipt { uint64_t req_id; struct idempotency_client_receipt *next; @@ -16,10 +22,11 @@ typedef struct idempotency_client_channel { volatile uint32_t id; //channel id, 0 for invalid volatile int key; //channel key volatile char in_ioevent; - volatile char in_heartbeat; + //volatile char in_heartbeat; volatile char established; - time_t last_connect_time; - time_t last_pkg_time; //last communication time + time_t last_connect_time; //for connect frequency control + time_t last_pkg_time; //last communication time + time_t last_report_time; //last report time for rpc receipt pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify volatile uint64_t next_req_id; struct fast_mblock_man receipt_allocator; @@ -32,6 +39,10 @@ typedef struct idempotency_client_channel { typedef struct idempotency_receipt_thread_context { struct fc_list_head head; //LRU head for hearbeat + struct { + time_t heartbeat; + time_t idle; + } last_check_times; } IdempotencyReceiptThreadContext; #ifdef __cplusplus diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 0806294..69a4300 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -168,6 +168,29 @@ static int check_report_req_receipt(struct fast_task_info *task, return sf_send_add_event(task); } +static int close_channel_request(struct fast_task_info *task) +{ + IdempotencyClientChannel *channel; + SFCommonProtoHeader *header; + + channel = (IdempotencyClientChannel *)task->arg; + idempotency_client_channel_set_id_key(channel, 0, 0); + + header = (SFCommonProtoHeader *)task->data; + SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0); + task->length = sizeof(SFCommonProtoHeader); + return sf_send_add_event(task); +} + +static int active_test_request(struct fast_task_info *task) +{ + SFCommonProtoHeader *header; + header = (SFCommonProtoHeader *)task->data; + SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0); + task->length = sizeof(SFCommonProtoHeader); + return sf_send_add_event(task); +} + static inline void update_lru_chain(struct fast_task_info *task) { IdempotencyReceiptThreadContext *thread_ctx; @@ -192,6 +215,8 @@ static int report_req_receipt_request(struct fast_task_info *task, if (count == 0) { result = sf_set_read_event(task); } else if (update_lru) { + ((IdempotencyClientChannel *)task->arg)-> + last_report_time = g_current_time; update_lru_chain(task); } @@ -336,6 +361,15 @@ static int receipt_deal_task(struct fast_task_info *task) case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: result = deal_report_req_receipt_response(task); break; + case SF_PROTO_ACTIVE_TEST_RESP: + result = 0; + break; + case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: + result = ECONNRESET; //force to close socket + logDebug("file: "__FILE__", line: %d, " + "close channel to server %s:%d !!!", + __LINE__, task->server_ip, task->port); + break; default: logError("file: "__FILE__", line: %d, " "response from server %s:%d, unexpect cmd: %d (%s)", @@ -356,16 +390,64 @@ static int receipt_deal_task(struct fast_task_info *task) return result > 0 ? -1 * result : result; } -static int receipt_thread_loop_callback(struct nio_thread_data *thread_data) +static void receipt_thread_check_heartbeat( + IdempotencyReceiptThreadContext *thread_ctx) { IdempotencyClientChannel *channel; IdempotencyClientChannel *tmp; - IdempotencyReceiptThreadContext *thread_ctx; - thread_ctx = (IdempotencyReceiptThreadContext *)thread_data->arg; fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) { - //check heartbeat - //channel->task + if (g_current_time - channel->last_pkg_time < + g_idempotency_client_cfg.channel_heartbeat_interval) + { + break; + } + + if (sf_nio_task_is_idle(channel->task)) { + channel->last_pkg_time = g_current_time; + active_test_request(channel->task); + } + } +} + +static void receipt_thread_close_idle_channel( + IdempotencyReceiptThreadContext *thread_ctx) +{ + IdempotencyClientChannel *channel; + IdempotencyClientChannel *tmp; + + fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) { + if (!sf_nio_task_is_idle(channel->task)) { + continue; + } + + if (g_current_time - channel->last_report_time > + g_idempotency_client_cfg.channel_max_idle_time) + { + logDebug("file: "__FILE__", line: %d, " + "close channel to server %s:%d because idle too long", + __LINE__, channel->task->server_ip, channel->task->port); + close_channel_request(channel->task); + } + } +} + +static int receipt_thread_loop_callback(struct nio_thread_data *thread_data) +{ + IdempotencyReceiptThreadContext *thread_ctx; + thread_ctx = (IdempotencyReceiptThreadContext *)thread_data->arg; + + if (g_current_time - thread_ctx->last_check_times.heartbeat > 0) { + thread_ctx->last_check_times.heartbeat = g_current_time; + receipt_thread_check_heartbeat(thread_ctx); + } + + if ((g_idempotency_client_cfg.channel_max_idle_time > 0) && + (g_current_time - thread_ctx->last_check_times.idle > + g_idempotency_client_cfg.channel_max_idle_time)) + { + thread_ctx->last_check_times.idle = g_current_time; + receipt_thread_close_idle_channel(thread_ctx); } return 0; @@ -382,11 +464,15 @@ static void *receipt_alloc_thread_extra_data(const int thread_index) int receipt_handler_init() { - receipt_thread_contexts = (IdempotencyReceiptThreadContext *)fc_malloc( - sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS); + int bytes; + + bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS; + receipt_thread_contexts = (IdempotencyReceiptThreadContext *) + fc_malloc(bytes); if (receipt_thread_contexts == NULL) { return ENOMEM; } + memset(receipt_thread_contexts, 0, bytes); return sf_service_init_ex2(&g_sf_context, receipt_alloc_thread_extra_data, receipt_thread_loop_callback, diff --git a/src/sf_nio.h b/src/sf_nio.h index 63fa149..5d972cc 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -56,6 +56,9 @@ int sf_nio_notify_ex(struct fast_task_info *task, const int new_stage, #define sf_nio_notify(task, new_stage) \ sf_nio_notify_ex(task, new_stage, __FILE__, __LINE__) +#define sf_nio_task_is_idle(task) \ + (task->offset == 0 && task->length == 0) + int sf_set_read_event(struct fast_task_info *task); void sf_task_switch_thread(struct fast_task_info *task, diff --git a/src/sf_proto.c b/src/sf_proto.c index b15d90a..9d5ff71 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -187,6 +187,12 @@ int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response, const char *sf_get_cmd_caption(const int cmd) { switch (cmd) { + case SF_PROTO_ACK: + return "ACK"; + case SF_PROTO_ACTIVE_TEST_REQ: + return "ACTIVE_TEST_REQ"; + case SF_PROTO_ACTIVE_TEST_RESP: + return "ACTIVE_TEST_RESP"; case SF_SERVICE_PROTO_SETUP_CHANNEL_REQ: return "SETUP_CHANNEL_REQ"; case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP: diff --git a/src/sf_proto.h b/src/sf_proto.h index 2b486de..ee739d0 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -10,13 +10,18 @@ #include "fastcommon/sockopt.h" #include "sf_types.h" +#define SF_PROTO_ACK 116 + +#define SF_PROTO_ACTIVE_TEST_REQ 117 +#define SF_PROTO_ACTIVE_TEST_RESP 118 + //for request idempotency -#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 111 -#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 112 -#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113 -#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114 -#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115 -#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116 +#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 121 +#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 122 +#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 123 +#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 124 +#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 125 +#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 126 #define SF_PROTO_MAGIC_CHAR '@' #define SF_PROTO_SET_MAGIC(m) \