From b60912bfd6270acac053235b2f4b72c8f471cfc2 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 9 Sep 2020 14:46:58 +0800 Subject: [PATCH] task nio_stage use atomic opt. --- src/sf_define.h | 27 +++++++---- src/sf_global.h | 2 + src/sf_nio.c | 117 +++++++++++++++++++++++++++++++++--------------- src/sf_nio.h | 41 +++++++++++++---- 4 files changed, 134 insertions(+), 53 deletions(-) diff --git a/src/sf_define.h b/src/sf_define.h index d4ef901..83e5590 100644 --- a/src/sf_define.h +++ b/src/sf_define.h @@ -10,14 +10,25 @@ #define SF_DEF_MIN_BUFF_SIZE (64 * 1024) #define SF_DEF_MAX_BUFF_SIZE (64 * 1024) -#define SF_NIO_STAGE_INIT 0 //set ioevent -#define SF_NIO_STAGE_CONNECT 1 //do connect (client only) -#define SF_NIO_STAGE_HANDSHAKE 2 //notify the thread to handshake (client only) -#define SF_NIO_STAGE_RECV 3 //do recv -#define SF_NIO_STAGE_SEND 4 //do send -#define SF_NIO_STAGE_FORWARDED 5 //deal the forwarded request -#define SF_NIO_STAGE_CONTINUE 6 //notify the thread continue deal -#define SF_NIO_STAGE_CLOSE 9 //cleanup the task +#define SF_NIO_STAGE_INIT 0 //set ioevent +#define SF_NIO_STAGE_CONNECT 1 //do connect (client only) +#define SF_NIO_STAGE_HANDSHAKE 2 //notify the thread to handshake (client only) +#define SF_NIO_STAGE_RECV 4 //do recv +#define SF_NIO_STAGE_SEND 8 //do send +#define SF_NIO_STAGE_FORWARDED 16 //deal the forwarded request +#define SF_NIO_STAGE_CONTINUE 32 //notify the thread continue deal +#define SF_NIO_STAGE_CLOSE 256 //cleanup the task + +#define SF_NIO_FLAG_INPROGRESS 1024 +#define SF_NIO_STAGE_FLAGS (SF_NIO_FLAG_INPROGRESS) +#define SF_NIO_STAGE_RECV_INPROGRESS (SF_NIO_STAGE_RECV | SF_NIO_FLAG_INPROGRESS) +#define SF_NIO_STAGE_SEND_INPROGRESS (SF_NIO_STAGE_SEND | SF_NIO_FLAG_INPROGRESS) + +#define SF_NIO_TASK_STAGE_FETCH(task) __sync_add_and_fetch(&task->nio_stage, 0) +#define SF_NIO_STAGE_ONLY(stage) (stage & (~SF_NIO_STAGE_FLAGS)) + +#define SF_NIO_STAGE_IS_INPROGRESS(stage) \ + ((stage & SF_NIO_FLAG_INPROGRESS) != 0) #ifdef __cplusplus extern "C" { diff --git a/src/sf_global.h b/src/sf_global.h index 5651b50..a8c0ea1 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -50,7 +50,9 @@ extern SFContext g_sf_context; #define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag #define SF_G_CONNECT_TIMEOUT g_sf_global_vars.connect_timeout #define SF_G_NETWORK_TIMEOUT g_sf_global_vars.network_timeout +#define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size + #define SF_G_WORK_THREADS g_sf_context.work_threads #define SF_G_ALIVE_THREAD_COUNT g_sf_context.thread_count #define SF_G_THREAD_INDEX(tdata) (int)(tdata - g_sf_context.thread_data) diff --git a/src/sf_nio.c b/src/sf_nio.c index 64f54c7..f75ae18 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -8,7 +8,7 @@ #include #include #include -#include +//#include #include #include #include @@ -113,7 +113,7 @@ int sf_set_read_event(struct fast_task_info *task) { int result; - task->nio_stage = SF_NIO_STAGE_RECV; + sf_nio_set_stage(task, SF_NIO_STAGE_RECV); if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } @@ -184,7 +184,7 @@ static int sf_client_sock_connect(int sock, short event, void *arg) return -1; } - task->nio_stage = SF_NIO_STAGE_HANDSHAKE; + sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE); return SF_CTX->deal_task(task); } @@ -207,7 +207,7 @@ static int sf_connect_server(struct fast_task_info *task) return result; } - task->nio_stage = SF_NIO_STAGE_HANDSHAKE; + sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE); return SF_CTX->deal_task(task); } else if (result == EINPROGRESS) { return sf_ioevent_add(task, (IOEventCallback) @@ -224,9 +224,12 @@ static int sf_connect_server(struct fast_task_info *task) static int sf_nio_deal_task(struct fast_task_info *task) { int result; - switch (task->nio_stage) { + int stage; + + stage = SF_NIO_TASK_STAGE_FETCH(task); + switch (SF_NIO_STAGE_ONLY(stage)) { case SF_NIO_STAGE_INIT: - task->nio_stage = SF_NIO_STAGE_RECV; + sf_nio_set_stage(task, SF_NIO_STAGE_RECV); result = sf_nio_init(task); break; case SF_NIO_STAGE_CONNECT: @@ -259,7 +262,7 @@ static int sf_nio_deal_task(struct fast_task_info *task) default: logError("file: "__FILE__", line: %d, " "client ip: %s, invalid stage: %d", - __LINE__, task->client_ip, task->nio_stage); + __LINE__, task->client_ip, stage); result = -EINVAL; break; } @@ -271,13 +274,37 @@ static int sf_nio_deal_task(struct fast_task_info *task) return result; } -int sf_nio_notify(struct fast_task_info *task, const int stage) +int sf_nio_notify(struct fast_task_info *task, const int new_stage) { int64_t n; int result; + int old_stage; bool notify; - task->nio_stage = stage; + old_stage = SF_NIO_TASK_STAGE_FETCH(task); + if (!(new_stage == SF_NIO_STAGE_INIT || + new_stage == SF_NIO_STAGE_CONNECT || + new_stage == SF_NIO_STAGE_CLOSE)) + { + if (SF_NIO_STAGE_IS_INPROGRESS(old_stage)) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, nio stage in progress, " + "current stage: %d, skip set to %d", __LINE__, + task->client_ip, old_stage, new_stage); + return EBUSY; + } + } + + if (!__sync_bool_compare_and_swap(&task->nio_stage, + old_stage, new_stage)) + { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, skip set stage to %d because stage " + "changed, current stage: %d", __LINE__, task->client_ip, + new_stage, SF_NIO_TASK_STAGE_FETCH(task)); + return EEXIST; + } + task->next = NULL; pthread_mutex_lock(&task->thread_data->waiting_queue.lock); @@ -342,6 +369,7 @@ int sf_send_add_event(struct fast_task_info *task) task->offset = 0; if (task->length > 0) { /* direct send */ + sf_nio_set_stage(task, SF_NIO_STAGE_SEND); if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { return errno != 0 ? errno : EIO; } @@ -352,17 +380,19 @@ int sf_send_add_event(struct fast_task_info *task) int sf_client_sock_read(int sock, short event, void *arg) { + int stage; int bytes; int recv_bytes; int total_read; struct fast_task_info *task; task = (struct fast_task_info *)arg; - if (task->canceled || (task->nio_stage != SF_NIO_STAGE_RECV)) { + stage = SF_NIO_TASK_STAGE_FETCH(task); + if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_RECV)) { return 0; } - assert(sock >= 0); + //assert(sock >= 0); if (event & IOEVENT_TIMEOUT) { if (task->offset == 0 && task->req_count > 0) { if (SF_CTX->timeout_callback != NULL) { @@ -376,16 +406,14 @@ int sf_client_sock_read(int sock, short event, void *arg) task->network_timeout; fast_timer_add(&task->thread_data->timer, &task->event.timer); - } - else { + } else { if (task->length > 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, recv timeout, " "recv offset: %d, expect length: %d", __LINE__, task->client_ip, task->offset, task->length); - } - else { + } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, req_count: %"PRId64", recv timeout", __LINE__, task->client_ip, task->req_count); @@ -407,6 +435,18 @@ int sf_client_sock_read(int sock, short event, void *arg) return -1; } + if (stage != SF_NIO_STAGE_RECV_INPROGRESS) { + if (!__sync_bool_compare_and_swap(&task->nio_stage, + stage, SF_NIO_STAGE_RECV_INPROGRESS)) + { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, nio stage change from %d to %d, " + "skip read!", __LINE__, task->client_ip, stage, + SF_NIO_TASK_STAGE_FETCH(task)); + return 0; + } + } + total_read = 0; while (1) { fast_timer_modify(&task->thread_data->timer, @@ -414,8 +454,7 @@ int sf_client_sock_read(int sock, short event, void *arg) task->network_timeout); if (task->length == 0) { //recv header recv_bytes = SF_CTX->header_size - task->offset; - } - else { + } else { recv_bytes = task->length - task->offset; } @@ -423,14 +462,12 @@ int sf_client_sock_read(int sock, short event, void *arg) if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; - } - else if (errno == EINTR) { //should retry + } else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); continue; - } - else { + } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, recv fail, " "errno: %d, error info: %s", @@ -440,8 +477,7 @@ int sf_client_sock_read(int sock, short event, void *arg) iovent_add_to_deleted_list(task); return -1; } - } - else if (bytes == 0) { + } else if (bytes == 0) { if (task->offset > 0) { if (task->length > 0) { logWarning("file: "__FILE__", line: %d, " @@ -450,16 +486,14 @@ int sf_client_sock_read(int sock, short event, void *arg) "recv pkg length: %d", __LINE__, task->client_ip, task->length, task->offset); - } - else { + } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection " "disconnected, recv pkg length: %d", __LINE__, task->client_ip, task->offset); } - } - else { + } else { logDebug("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, recv fail, " "connection disconnected", @@ -537,7 +571,7 @@ int sf_client_sock_read(int sock, short event, void *arg) if (task->offset >= task->length) { //recv done task->req_count++; - task->nio_stage = SF_NIO_STAGE_SEND; + sf_nio_set_stage(task, SF_NIO_STAGE_SEND); if (SF_CTX->deal_task(task) < 0) { //fatal error iovent_add_to_deleted_list(task); return -1; @@ -551,13 +585,15 @@ int sf_client_sock_read(int sock, short event, void *arg) int sf_client_sock_write(int sock, short event, void *arg) { + int stage; int bytes; int total_write; struct fast_task_info *task; - assert(sock >= 0); + //assert(sock >= 0); task = (struct fast_task_info *)arg; - if (task->canceled) { + stage = SF_NIO_TASK_STAGE_FETCH(task); + if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_SEND)) { return 0; } @@ -580,6 +616,18 @@ int sf_client_sock_write(int sock, short event, void *arg) return -1; } + if (stage != SF_NIO_STAGE_SEND_INPROGRESS) { + if (!__sync_bool_compare_and_swap(&task->nio_stage, + stage, SF_NIO_STAGE_SEND_INPROGRESS)) + { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, nio stage change from %d to %d, " + "skip write!", __LINE__, task->client_ip, stage, + SF_NIO_TASK_STAGE_FETCH(task)); + return 0; + } + } + total_write = 0; while (1) { fast_timer_modify(&task->thread_data->timer, @@ -595,14 +643,12 @@ int sf_client_sock_write(int sock, short event, void *arg) return -1; } break; - } - else if (errno == EINTR) { //should retry + } else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); continue; - } - else { + } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, send fail, " "errno: %d, error info: %s", @@ -612,8 +658,7 @@ int sf_client_sock_write(int sock, short event, void *arg) iovent_add_to_deleted_list(task); return -1; } - } - else if (bytes == 0) { + } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, send failed, " "connection disconnected", diff --git a/src/sf_nio.h b/src/sf_nio.h index 8d7511c..90f98d2 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -44,29 +44,52 @@ static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex( sf_get_task_cleanup_func_ex(&g_sf_context) void sf_recv_notify_read(int sock, short event, void *arg); -int sf_send_add_event(struct fast_task_info *pTask); +int sf_send_add_event(struct fast_task_info *task); int sf_client_sock_write(int sock, short event, void *arg); int sf_client_sock_read(int sock, short event, void *arg); -void sf_task_finish_clean_up(struct fast_task_info *pTask); +void sf_task_finish_clean_up(struct fast_task_info *task); -int sf_nio_notify(struct fast_task_info *pTask, const int stage); +int sf_nio_notify(struct fast_task_info *task, const int new_stage); int sf_set_read_event(struct fast_task_info *task); -void sf_task_switch_thread(struct fast_task_info *pTask, +void sf_task_switch_thread(struct fast_task_info *task, const int new_thread_index); -static inline int sf_nio_forward_request(struct fast_task_info *pTask, +static inline int sf_nio_forward_request(struct fast_task_info *task, const int new_thread_index) { - sf_task_switch_thread(pTask, new_thread_index); - return sf_nio_notify(pTask, SF_NIO_STAGE_FORWARDED); + sf_task_switch_thread(task, new_thread_index); + return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED); } -static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask) +static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task) { - return (pTask->event.callback == (IOEventCallback)sf_client_sock_read); + return (task->event.callback == (IOEventCallback)sf_client_sock_read); +} + +static inline void sf_nio_set_stage(struct fast_task_info *task, + const int new_stage) +{ + int old_stage; + old_stage = __sync_add_and_fetch(&task->nio_stage, 0); + if (new_stage != old_stage) { + __sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage); + } +} + +static inline bool sf_nio_swap_stage(struct fast_task_info *task, + const int old_stage, const int new_stage) +{ + return __sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage); +} + +static inline bool sf_nio_task_inprogress(struct fast_task_info *task) +{ + int stage; + stage = __sync_add_and_fetch(&task->nio_stage, 0); + return SF_NIO_STAGE_IS_INPROGRESS(stage); } #ifdef __cplusplus