From c1ae024da5b72abab95bd211ca2cfbded43a5a5f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 12 Feb 2023 10:38:46 +0800 Subject: [PATCH] enable epoll edge trigger by global variable epoll_edge_trigger --- src/sf_global.h | 3 +++ src/sf_nio.c | 59 ++++++++++++++++++++++++++++++++++++++++-------- src/sf_service.c | 17 ++++++++++++-- 3 files changed, 68 insertions(+), 11 deletions(-) diff --git a/src/sf_global.h b/src/sf_global.h index f017404..e6cb1e4 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -40,6 +40,7 @@ typedef struct sf_global_variables { volatile bool continue_flag; bool tcp_quick_ack; + bool epoll_edge_trigger; int max_connections; int max_pkg_size; int min_buff_size; @@ -98,6 +99,8 @@ extern SFContext g_sf_context; #define SF_G_ERROR_HANDLER g_sf_global_vars.error_handler #define SF_G_EMPTY_STRING g_sf_global_vars.empty +#define SF_G_EPOLL_EDGE_TRIGGER g_sf_global_vars.epoll_edge_trigger + #define SF_WORK_THREADS(sf_context) sf_context.work_threads #define SF_ALIVE_THREAD_COUNT(sf_context) sf_context.thread_count #define SF_THREAD_INDEX(sf_context, tdata) (int)(tdata - sf_context.thread_data) diff --git a/src/sf_nio.c b/src/sf_nio.c index 0883802..659d4c7 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -37,6 +37,7 @@ #include "fastcommon/sockopt.h" #include "fastcommon/fast_task_queue.h" #include "fastcommon/ioevent_loop.h" +#include "fastcommon/fc_atomic.h" #include "sf_global.h" #include "sf_service.h" #include "sf_nio.h" @@ -143,7 +144,7 @@ static inline int set_write_event(struct fast_task_info *task) return 0; } -int sf_set_read_event(struct fast_task_info *task) +static int setup_read_event(struct fast_task_info *task) { int result; @@ -169,6 +170,27 @@ int sf_set_read_event(struct fast_task_info *task) return 0; } +int sf_set_read_event(struct fast_task_info *task) +{ + int result; + + if ((result=setup_read_event(task)) != 0) { + return result; + } + + if (SF_G_EPOLL_EDGE_TRIGGER) { + if (FC_ATOMIC_GET(task->nio_stages.notify) == SF_NIO_STAGE_SEND) { + __sync_bool_compare_and_swap(&task->nio_stages.next, + SF_NIO_STAGE_NONE, SF_NIO_STAGE_RECV); + return 0; + } else { + return sf_nio_notify(task, SF_NIO_STAGE_RECV); + } + } else { + return 0; + } +} + static inline int sf_ioevent_add(struct fast_task_info *task, IOEventCallback callback, const int timeout) { @@ -276,8 +298,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = sf_connect_server(task); break; case SF_NIO_STAGE_RECV: - if ((result=sf_set_read_event(task)) == 0) - { + if ((result=setup_read_event(task)) == 0) { sf_client_sock_read(task->event.fd, IOEVENT_READ, task); } @@ -321,7 +342,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) int old_stage; bool notify; - if (__sync_add_and_fetch(&task->canceled, 0)) { + if (FC_ATOMIC_GET(task->canceled)) { if (stage == SF_NIO_STAGE_CONTINUE) { if (task->continue_callback != NULL) { return task->continue_callback(task); @@ -342,12 +363,22 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) while (!__sync_bool_compare_and_swap(&task->nio_stages.notify, SF_NIO_STAGE_NONE, stage)) { - old_stage = __sync_fetch_and_add(&task->nio_stages.notify, 0); + old_stage = FC_ATOMIC_GET(task->nio_stages.notify); if (old_stage == stage) { logDebug("file: "__FILE__", line: %d, " "current stage: %d equals to the target, skip set", __LINE__, stage); return 0; + + } else if (SF_G_EPOLL_EDGE_TRIGGER && ( + (old_stage == SF_NIO_STAGE_RECV && + stage == SF_NIO_STAGE_SEND) || + (old_stage == SF_NIO_STAGE_SEND && + stage == SF_NIO_STAGE_RECV))) + { + __sync_bool_compare_and_swap(&task->nio_stages. + next, SF_NIO_STAGE_NONE, stage); + return 0; } else if (old_stage != SF_NIO_STAGE_NONE) { logWarning("file: "__FILE__", line: %d, " "current stage: %d != %d, skip set stage to %d", @@ -390,6 +421,7 @@ void sf_recv_notify_read(int sock, short event, void *arg) { int64_t n; int stage; + int next; struct nio_thread_data *thread_data; struct fast_task_info *task; struct fast_task_info *current; @@ -410,7 +442,7 @@ void sf_recv_notify_read(int sock, short event, void *arg) task = current; current = current->next; - stage = __sync_add_and_fetch(&task->nio_stages.notify, 0); + stage = FC_ATOMIC_GET(task->nio_stages.notify); if (!task->canceled) { if (stage == SF_NIO_STAGE_CONTINUE) { /* MUST set to SF_NIO_STAGE_NONE first for re-entry */ @@ -422,6 +454,15 @@ void sf_recv_notify_read(int sock, short event, void *arg) __sync_bool_compare_and_swap(&task->nio_stages.notify, stage, SF_NIO_STAGE_NONE); } + + if (SF_G_EPOLL_EDGE_TRIGGER) { + next = FC_ATOMIC_GET(task->nio_stages.next); + if (next != SF_NIO_STAGE_NONE) { + sf_nio_notify(task, next); + __sync_bool_compare_and_swap(&task->nio_stages.next, + next, SF_NIO_STAGE_NONE); + } + } } else { if (stage != SF_NIO_STAGE_NONE) { if (stage == SF_NIO_STAGE_CONTINUE) { @@ -749,9 +790,9 @@ int sf_client_sock_write(int sock, short event, void *arg) } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, send failed, " - "connection disconnected", - __LINE__, task->client_ip, sock); + "client ip: %s, sock: %d, task length: %d, offset: %d, " + "send failed, connection disconnected", __LINE__, + task->client_ip, sock, task->length, task->offset); ioevent_add_to_deleted_list(task); return -1; diff --git a/src/sf_service.c b/src/sf_service.c index 819b817..d74ebf6 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -131,6 +131,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, { int result; int bytes; + int extra_events; struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_ctx; struct nio_thread_data *thread_data; @@ -172,6 +173,18 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, return ENOMEM; } + if (SF_G_EPOLL_EDGE_TRIGGER) { +#ifdef OS_LINUX + extra_events = EPOLLET; +#elif defined(OS_FREEBSD) + extra_events = EV_CLEAR; +#else + extra_events = 0; +#endif + } else { + extra_events = 0; + } + g_current_time = time(NULL); sf_context->thread_count = 0; data_end = sf_context->thread_data + sf_context->work_threads; @@ -187,8 +200,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_data->arg = NULL; } - if (ioevent_init(&thread_data->ev_puller, - g_sf_global_vars.max_connections + 2, net_timeout_ms, 0) != 0) + if (ioevent_init(&thread_data->ev_puller, 2 + g_sf_global_vars. + max_connections, net_timeout_ms, extra_events) != 0) { result = errno != 0 ? errno : ENOMEM; logError("file: "__FILE__", line: %d, "