From f0ee6ce73f0cf6f9e249a30304e0ef1b5c0812d4 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 21 Sep 2025 15:08:08 +0800 Subject: [PATCH 01/13] struct sf_context remove field: remove_from_ready_list --- src/idempotency/client/receipt_handler.c | 1 - src/sf_global.c | 2 +- src/sf_nio.c | 4 ---- src/sf_nio.h | 9 --------- src/sf_service.c | 4 ++++ src/sf_types.h | 1 - 6 files changed, 5 insertions(+), 16 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 0a44d01..e0c1d06 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -565,7 +565,6 @@ int receipt_handler_init(FCAddressPtrArray *address_array) } sf_enable_thread_notify(true); - sf_set_remove_from_ready_list(false); fc_sleep_ms(100); return 0; diff --git a/src/sf_global.c b/src/sf_global.c index 26c7046..3c91200 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -51,7 +51,7 @@ SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}, {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, {DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, - SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, true, + SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, {false, 0, 0}, {sf_task_finish_clean_up} }; diff --git a/src/sf_nio.c b/src/sf_nio.c index 1d4d60f..4eb374d 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -68,10 +68,6 @@ void sf_task_detach_thread(struct fast_task_info *task) &task->event.timer); task->event.timer.expires = 0; } - - if (SF_CTX->remove_from_ready_list) { - ioevent_remove(&task->thread_data->ev_puller, task); - } } void sf_task_switch_thread(struct fast_task_info *task, diff --git a/src/sf_nio.h b/src/sf_nio.h index 2e93f3f..64e5403 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -68,15 +68,6 @@ static inline void sf_set_connect_done_callback_ex(SFContext *sf_context, sf_set_connect_done_callback_ex(&g_sf_context, done_callback) -static inline void sf_set_remove_from_ready_list_ex( - SFContext *sf_context, const bool enabled) -{ - sf_context->remove_from_ready_list = enabled; -} - -#define sf_set_remove_from_ready_list(enabled) \ - sf_set_remove_from_ready_list_ex(&g_sf_context, enabled); - static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex( SFContext *sf_context) { diff --git a/src/sf_service.c b/src/sf_service.c index 685ed01..aa6df86 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -161,7 +161,11 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, if (SF_G_EPOLL_EDGE_TRIGGER) { #ifdef OS_LINUX +#if IOEVENT_USE_EPOLL extra_events = EPOLLET; +#else + extra_events = 0; +#endif #elif defined(OS_FREEBSD) extra_events = EV_CLEAR; #else diff --git a/src/sf_types.h b/src/sf_types.h index 6548a35..2a946e2 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -190,7 +190,6 @@ typedef struct sf_context { int work_threads; int header_size; - bool remove_from_ready_list; bool realloc_task_buffer; bool connect_need_log; //for client connect FCSmartPollingConfig smart_polling; From bc5af8a58b60727fb663c5cdecae4558a119fddf Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 24 Sep 2025 15:59:27 +0800 Subject: [PATCH 02/13] struct sf_network_handler add field use_iouring --- src/sf_global.c | 6 ++++++ src/sf_nio.c | 42 +++++++++++++++++++++--------------------- src/sf_nio.h | 7 ------- src/sf_types.h | 3 ++- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index 3c91200..c525f11 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -498,10 +498,16 @@ static int init_network_handler(SFContext *sf_context, handler->send_data = sf_socket_send_data; handler->recv_data = sf_socket_recv_data; handler->post_recv = NULL; +#if IOEVENT_USE_URING + handler->use_iouring = true; +#else + handler->use_iouring = false; +#endif return 0; } else { handler->inner.id = NULL; handler->outer.id = NULL; + handler->use_iouring = false; return load_rdma_apis(sf_context, handler); } } diff --git a/src/sf_nio.c b/src/sf_nio.c index 4eb374d..3da4898 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -41,6 +41,9 @@ #include "sf_service.h" #include "sf_nio.h" +static int sf_client_sock_write(int sock, short event, void *arg); +static int sf_client_sock_read(int sock, short event, void *arg); + void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, @@ -144,6 +147,7 @@ static inline int set_read_event(struct fast_task_info *task) } task->event.callback = (IOEventCallback)sf_client_sock_read; + if (ioevent_modify(&task->thread_data->ev_puller, task->event.fd, IOEVENT_READ, task) != 0) { @@ -168,13 +172,14 @@ int sf_set_read_event(struct fast_task_info *task) return set_read_event(task); } -static inline int sf_ioevent_add(struct fast_task_info *task, - IOEventCallback callback, const int timeout) +static inline int sf_ioevent_add(struct fast_task_info *task) { int result; result = ioevent_set(task, task->thread_data, task->event.fd, - IOEVENT_READ, callback, timeout); + IOEVENT_READ, (IOEventCallback)sf_client_sock_read, + SF_CTX->net_buffer_cfg.network_timeout, + task->handler->use_iouring); return result > 0 ? -1 * result : result; } @@ -192,8 +197,7 @@ static inline void inc_connection_current_count() static inline int sf_nio_init(struct fast_task_info *task) { inc_connection_current_count(); - return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout); + return sf_ioevent_add(task); } int sf_socket_async_connect_check(struct fast_task_info *task) @@ -275,7 +279,7 @@ static int sf_async_connect_server(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_connect_done, SF_CTX->net_buffer_cfg. - connect_timeout); + connect_timeout, task->handler->use_iouring); return result > 0 ? -1 * result : result; } else { if (SF_CTX->callbacks.connect_done != NULL) { @@ -283,10 +287,7 @@ static int sf_async_connect_server(struct fast_task_info *task) } if (result == 0) { - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, SF_CTX-> - net_buffer_cfg.network_timeout)) != 0) - { + if ((result=sf_ioevent_add(task)) != 0) { return result; } @@ -341,10 +342,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout)) == 0) - { + if ((result=sf_ioevent_add(task)) == 0) { result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); } break; @@ -837,8 +835,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task) ioevent_set_timeout(&task->thread_data->ev_puller, task->thread_data->timeout_ms); } - result = sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout); + result = sf_ioevent_add(task); format_ip_address(task->client_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, client: %s:%u, " @@ -900,7 +897,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) return 0; } -int sf_client_sock_read(int sock, short event, void *arg) +static int sf_client_sock_read(int sock, short event, void *arg) { int result; int bytes; @@ -992,7 +989,7 @@ int sf_client_sock_read(int sock, short event, void *arg) return total_read; } -int sf_client_sock_write(int sock, short event, void *arg) +static int sf_client_sock_write(int sock, short event, void *arg) { int result; int bytes; @@ -1037,9 +1034,6 @@ int sf_client_sock_write(int sock, short event, void *arg) release_iovec_buffer(task); task->recv.ptr->offset = 0; task->recv.ptr->length = 0; - if (set_read_event(task) != 0) { - return -1; - } if (SF_CTX->callbacks.send_done == NULL || !send_done) { task->nio_stages.current = SF_NIO_STAGE_RECV; @@ -1055,6 +1049,12 @@ int sf_client_sock_write(int sock, short event, void *arg) } } + if (task->nio_stages.current == SF_NIO_STAGE_RECV) { + if (set_read_event(task) != 0) { + return -1; + } + } + break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_nio.h b/src/sf_nio.h index 64e5403..8411781 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -92,8 +92,6 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task) void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); -int sf_client_sock_write(int sock, short event, void *arg); -int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *task); @@ -149,11 +147,6 @@ static inline int sf_nio_forward_request(struct fast_task_info *task, return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED); } -static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task) -{ - return (task->event.callback == (IOEventCallback)sf_client_sock_read); -} - static inline void sf_nio_add_to_deleted_list(struct nio_thread_data *thread_data, struct fast_task_info *task) { diff --git a/src/sf_types.h b/src/sf_types.h index 2a946e2..3e7bca8 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -115,9 +115,11 @@ typedef struct sf_listener { struct sf_context; struct sf_address_family_handler; + typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; + bool use_iouring; //since v1.2.9 FCCommunicationType comm_type; struct sf_address_family_handler *fh; struct ibv_pd *pd; @@ -179,7 +181,6 @@ typedef struct sf_context { struct nio_thread_data *thread_data; volatile int thread_count; - //int rdma_port_offset; bool is_client; //since v1.2.5 SFAddressFamily address_family; SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT]; From ecee21f289dd5ab170ef9e14a93587357c146ac3 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 25 Sep 2025 15:54:38 +0800 Subject: [PATCH 03/13] socket send and recv adapt to io_uring --- src/sf_global.c | 65 +++++++++-- src/sf_nio.c | 301 ++++++++++++++++++++++++++++++++++++++---------- src/sf_types.h | 3 +- 3 files changed, 294 insertions(+), 75 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index c525f11..b6fd0d0 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -477,7 +477,8 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler) } static int init_network_handler(SFContext *sf_context, - SFNetworkHandler *handler, SFAddressFamilyHandler *fh) + SFNetworkHandler *handler, SFAddressFamilyHandler *fh, + const bool use_send_zc) { handler->fh = fh; handler->inner.handler = handler; @@ -499,15 +500,17 @@ static int init_network_handler(SFContext *sf_context, handler->recv_data = sf_socket_recv_data; handler->post_recv = NULL; #if IOEVENT_USE_URING - handler->use_iouring = true; + handler->use_io_uring = true; + handler->use_send_zc = use_send_zc; #else - handler->use_iouring = false; + handler->use_io_uring = false; + handler->use_send_zc = false; #endif return 0; } else { handler->inner.id = NULL; handler->outer.id = NULL; - handler->use_iouring = false; + handler->use_io_uring = false; return load_rdma_apis(sf_context, handler); } } @@ -675,6 +678,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context, int inner_port; int outer_port; int port; + bool use_send_zc; int i; int result; @@ -711,6 +715,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context, outer_port = config->default_outer_port; } + use_send_zc = iniGetBoolValue(config->ini_ctx.section_name, + "use_send_zc", config->ini_ctx.context, false); for (i=0; ihandlers + i; fh->ctx = sf_context; @@ -734,7 +740,9 @@ int sf_load_context_from_config_ex(SFContext *sf_context, if (!handler->enabled) { continue; } - if ((result=init_network_handler(sf_context, handler, fh)) != 0) { + if ((result=init_network_handler(sf_context, handler, + fh, use_send_zc)) != 0) + { return result; } } @@ -985,11 +993,19 @@ void sf_slow_log_config_to_string(SFSlowLogConfig *slow_log_cfg, void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, char *output, const int size) { + int i; int len; int max_pkg_size; int min_buff_size; int max_buff_size; +#if IOEVENT_USE_URING + bool use_io_uring; + bool use_send_zc; +#endif char pkg_buff[256]; + SFAddressFamilyHandler *fh; + SFNetworkHandler *handler; + SFNetworkHandler *end; max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size - g_sf_global_vars.task_buffer_extra_size; @@ -1008,20 +1024,45 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, min_buff_size / 1024, max_buff_size / 1024); } +#if IOEVENT_USE_URING + use_io_uring = false; + use_send_zc = false; + for (i=0; ihandlers + SF_NETWORK_HANDLER_COUNT; + for (handler=fh->handlers; handlerenabled && handler->use_io_uring) { + use_io_uring = true; + use_send_zc = handler->use_send_zc; + break; + } + } + } +#endif + len = snprintf(output, size, "base_path=%s, max_connections=%d, connect_timeout=%d, " - "network_timeout=%d, thread_stack_size=%d KB, " - "%s, tcp_quick_ack=%d, log_level=%s, " - "run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR, + "network_timeout=%d, thread_stack_size=%d KB, %s, ", + SF_G_BASE_PATH_STR, g_sf_global_vars.net_buffer_cfg.max_connections, g_sf_global_vars.net_buffer_cfg.connect_timeout, g_sf_global_vars.net_buffer_cfg.network_timeout, - g_sf_global_vars.thread_stack_size / 1024, - pkg_buff, g_sf_global_vars.tcp_quick_ack, + g_sf_global_vars.thread_stack_size / 1024, pkg_buff); + +#if IOEVENT_USE_URING + len += snprintf(output + len, size - len, + "use_io_uring=%d, use_send_zc=%d, ", + use_io_uring, use_send_zc); +#endif + + len += snprintf(output + len, size - len, + "tcp_quick_ack=%d, " + "log_level=%s, " + "run_by_group=%s, run_by_user=%s, ", + g_sf_global_vars.tcp_quick_ack, log_get_level_caption(), g_sf_global_vars.run_by.group, - g_sf_global_vars.run_by.user - ); + g_sf_global_vars.run_by.user); sf_log_config_to_string(&g_sf_global_vars.error_log, "error-log", output + len, size - len); diff --git a/src/sf_nio.c b/src/sf_nio.c index 3da4898..a87d248 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -93,16 +93,6 @@ static inline void release_iovec_buffer(struct fast_task_info *task) void sf_task_finish_clean_up(struct fast_task_info *task) { - /* - assert(task->event.fd >= 0); - if (task->event.fd < 0) { - logWarning("file: "__FILE__", line: %d, " - "task: %p already cleaned", - __LINE__, task); - return; - } - */ - if (task->finish_callback != NULL) { task->finish_callback(task); task->finish_callback = NULL; @@ -112,6 +102,12 @@ void sf_task_finish_clean_up(struct fast_task_info *task) sf_task_detach_thread(task); task->handler->close_connection(task); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + FC_URING_OP_TYPE(task) = IORING_OP_NOP; +#endif + } + __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); sf_release_task(task); } @@ -138,27 +134,91 @@ static inline int set_write_event(struct fast_task_info *task) return 0; } +static inline int prepare_first_recv(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { + return uring_prep_recv_data(task, task->recv.ptr->data, + SF_CTX->header_size); + } else { + return uring_prep_first_recv(task); + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif +} + +static inline int prepare_next_recv(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + int recv_bytes; + + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; + return uring_prep_recv_data(task, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + return uring_prep_recv_data(task, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + return uring_prep_recv_data(task, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif +} + static inline int set_read_event(struct fast_task_info *task) { int result; - if (task->event.callback == (IOEventCallback)sf_client_sock_read) { - return 0; - } - - task->event.callback = (IOEventCallback)sf_client_sock_read; - - if (ioevent_modify(&task->thread_data->ev_puller, - task->event.fd, IOEVENT_READ, task) != 0) - { - result = errno != 0 ? errno : ENOENT; - ioevent_add_to_deleted_list(task); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { + logWarning("file: "__FILE__", line: %d, " + "trigger recv again!", __LINE__); + return 0; + } + if ((result=prepare_first_recv(task)) != 0) { + ioevent_add_to_deleted_list(task); + return result; + } + if (task->event.callback != (IOEventCallback)sf_client_sock_read) { + task->event.callback = (IOEventCallback)sf_client_sock_read; + } +#else logError("file: "__FILE__", line: %d, " - "ioevent_modify fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); - return result; + "some mistakes happen!", __LINE__); + return EBUSY; +#endif + } else { + if (task->event.callback == (IOEventCallback)sf_client_sock_read) { + return 0; + } + if (ioevent_modify(&task->thread_data->ev_puller, + task->event.fd, IOEVENT_READ, task) != 0) + { + result = errno != 0 ? errno : ENOENT; + ioevent_add_to_deleted_list(task); + + logError("file: "__FILE__", line: %d, " + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + + task->event.callback = (IOEventCallback)sf_client_sock_read; } return 0; @@ -179,7 +239,7 @@ static inline int sf_ioevent_add(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ, (IOEventCallback)sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout, - task->handler->use_iouring); + task->handler->use_io_uring); return result > 0 ? -1 * result : result; } @@ -279,7 +339,7 @@ static int sf_async_connect_server(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_connect_done, SF_CTX->net_buffer_cfg. - connect_timeout, task->handler->use_iouring); + connect_timeout, task->handler->use_io_uring); return result > 0 ? -1 * result : result; } else { if (SF_CTX->callbacks.connect_done != NULL) { @@ -328,10 +388,12 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) case SF_NIO_STAGE_RECV: task->nio_stages.current = SF_NIO_STAGE_RECV; if ((result=set_read_event(task)) == 0) { - if (sf_client_sock_read(task->event.fd, - IOEVENT_READ, task) < 0) - { - result = errno != 0 ? errno : EIO; + if (!task->handler->use_io_uring) { + if (sf_client_sock_read(task->event.fd, + IOEVENT_READ, task) < 0) + { + result = errno != 0 ? errno : EIO; + } } } break; @@ -502,8 +564,25 @@ int sf_send_add_event(struct fast_task_info *task) if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; - if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { - return errno != 0 ? errno : EIO; + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + if (task->event.callback != (IOEventCallback)sf_client_sock_write) { + task->event.callback = (IOEventCallback)sf_client_sock_write; + } + if (task->handler->use_send_zc) { + return uring_prep_first_send_zc(task); + } else { + return uring_prep_first_send(task); + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif + } else { + if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { + return errno != 0 ? errno : EIO; + } } } @@ -547,28 +626,69 @@ static inline int check_task(struct fast_task_info *task, } } +static inline int prepare_next_send(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (task->handler->use_send_zc) { + return uring_prep_next_send_zc(task); + } else { + return uring_prep_next_send(task); + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif +} + ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action, bool *send_done) { int bytes; + int result; - if (task->iovec_array.iovs != NULL) { - bytes = writev(task->event.fd, task->iovec_array.iovs, - FC_MIN(task->iovec_array.count, IOV_MAX)); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + bytes = task->event.res; +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return -1; +#endif } else { - bytes = write(task->event.fd, task->send.ptr->data + - task->send.ptr->offset, task->send.ptr->length - - task->send.ptr->offset); + if (task->iovec_array.iovs != NULL) { + bytes = writev(task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX)); + } else { + bytes = write(task->event.fd, task->send.ptr->data + + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); + } } + if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - if (set_write_event(task) != 0) { - return -1; + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + result = -bytes; +#endif + } else { + result = errno; + } + if (result == EAGAIN || result == EWOULDBLOCK) { + if (task->handler->use_io_uring) { + if (prepare_next_send(task) != 0) { + return -1; + } + } else { + if (set_write_event(task) != 0) { + return -1; + } } + *action = sf_comm_action_break; return 0; - } else if (errno == EINTR) { //should retry + } else if (result == EINTR && !task->handler->use_io_uring) { + /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); @@ -579,7 +699,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, "client ip: %s, send fail, task offset: %d, length: %d, " "errno: %d, error info: %s", __LINE__, task->client_ip, task->send.ptr->offset, task->send.ptr->length, - errno, strerror(errno)); + result, strerror(result)); return -1; } } else if (bytes == 0) { @@ -597,12 +717,15 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset = 0; task->send.ptr->length = 0; } + + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + FC_URING_OP_TYPE(task) = IORING_OP_NOP; +#endif + } *action = sf_comm_action_finish; *send_done = true; } else { - *action = sf_comm_action_continue; - *send_done = false; - /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { struct iovec *iov; @@ -631,6 +754,17 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->iovec_array.iovs = iov; task->iovec_array.count = end - iov; } + + if (task->handler->use_io_uring) { + if (prepare_next_send(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { + *action = sf_comm_action_continue; + } + + *send_done = false; } return bytes; @@ -640,30 +774,54 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, const bool call_post_recv, SFCommAction *action) { int bytes; + int result; int recv_bytes; bool new_alloc; - if (task->recv.ptr->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; - bytes = read(task->event.fd, task->recv.ptr->data + - task->recv.ptr->offset, recv_bytes); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + bytes = task->event.res; +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return -1; +#endif } else { - recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; - if (task->recv_body == NULL) { + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; bytes = read(task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, recv_bytes); } else { - bytes = read(task->event.fd, task->recv_body + - (task->recv.ptr->offset - SF_CTX-> - header_size), recv_bytes); + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + bytes = read(task->event.fd, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } } } if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + result = -bytes; +#endif + } else { + result = errno; + } + if (result == EAGAIN || result == EWOULDBLOCK) { + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + } *action = sf_comm_action_break; return 0; - } else if (errno == EINTR) { //should retry + } else if (result == EINTR && !task->handler->use_io_uring) { + /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); @@ -674,7 +832,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, "client ip: %s, recv fail, " "errno: %d, error info: %s", __LINE__, task->client_ip, - errno, strerror(errno)); + result, strerror(result)); return -1; } } else if (bytes == 0) { @@ -706,7 +864,14 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, task->recv.ptr->offset += bytes; if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->offset < SF_CTX->header_size) { - *action = sf_comm_action_continue; + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { + *action = sf_comm_action_continue; + } return bytes; } @@ -757,9 +922,21 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, } if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + FC_URING_OP_TYPE(task) = IORING_OP_NOP; +#endif + } *action = sf_comm_action_finish; } else { - *action = sf_comm_action_continue; + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { + *action = sf_comm_action_continue; + } } return bytes; diff --git a/src/sf_types.h b/src/sf_types.h index 3e7bca8..4627cad 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -119,7 +119,8 @@ struct sf_address_family_handler; typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; - bool use_iouring; //since v1.2.9 + bool use_io_uring; //since v1.2.9 + bool use_send_zc; //since v1.2.9 FCCommunicationType comm_type; struct sf_address_family_handler *fh; struct ibv_pd *pd; From a2ab8a0c01431a547c2bfc45165e3f3b81231f1b Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 27 Sep 2025 15:41:56 +0800 Subject: [PATCH 04/13] adapt Linux io_uring OK --- src/sf_global.c | 6 +- src/sf_nio.c | 195 ++++++++++++++++++++++++++++++----------------- src/sf_nio.h | 1 + src/sf_service.c | 7 -- src/sf_service.h | 17 +++-- 5 files changed, 143 insertions(+), 83 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index b6fd0d0..1473203 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -993,19 +993,19 @@ void sf_slow_log_config_to_string(SFSlowLogConfig *slow_log_cfg, void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, char *output, const int size) { - int i; int len; int max_pkg_size; int min_buff_size; int max_buff_size; #if IOEVENT_USE_URING + int i; bool use_io_uring; bool use_send_zc; -#endif - char pkg_buff[256]; SFAddressFamilyHandler *fh; SFNetworkHandler *handler; SFNetworkHandler *end; +#endif + char pkg_buff[256]; max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size - g_sf_global_vars.task_buffer_extra_size; diff --git a/src/sf_nio.c b/src/sf_nio.c index a87d248..3c3bba1 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -62,9 +62,37 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_context->callbacks.release_buffer = release_buffer_callback; } +#if IOEVENT_USE_URING +#define CLEAR_OP_TYPE_AND_RELEASE_TASK(task) \ + FC_URING_OP_TYPE(task) = IORING_OP_NOP; \ + sf_release_task(task) + +static int sf_uring_cancel_done(int sock, short event, void *arg) +{ + struct fast_task_info *task; + + task = (struct fast_task_info *)arg; + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + return 0; +} +#endif + void sf_task_detach_thread(struct fast_task_info *task) { +#if IOEVENT_USE_URING + bool need_cancel; + if (task->handler->use_io_uring) { + need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP); + } else { + need_cancel = true; + } + if (need_cancel) { + task->event.callback = (IOEventCallback)sf_uring_cancel_done; + uring_prep_cancel(task); + } +#else ioevent_detach(&task->thread_data->ev_puller, task->event.fd); +#endif if (task->event.timer.expires > 0) { fast_timer_remove(&task->thread_data->timer, @@ -91,6 +119,22 @@ static inline void release_iovec_buffer(struct fast_task_info *task) } } +void sf_socket_close_connection(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (uring_prep_close_fd(task) != 0) { + close(task->event.fd); + } + } else { +#endif + close(task->event.fd); +#if IOEVENT_USE_URING + } +#endif + task->event.fd = -1; +} + void sf_task_finish_clean_up(struct fast_task_info *task) { if (task->finish_callback != NULL) { @@ -100,15 +144,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task) release_iovec_buffer(task); sf_task_detach_thread(task); - task->handler->close_connection(task); - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING - FC_URING_OP_TYPE(task) = IORING_OP_NOP; + if (!task->handler->use_io_uring) { +#endif + task->handler->close_connection(task); + __sync_fetch_and_sub(&g_sf_global_vars. + connection_stat.current_count, 1); + +#if IOEVENT_USE_URING + } #endif - } - __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); sf_release_task(task); } @@ -134,25 +181,19 @@ static inline int set_write_event(struct fast_task_info *task) return 0; } +#if IOEVENT_USE_URING static inline int prepare_first_recv(struct fast_task_info *task) { -#if IOEVENT_USE_URING if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { return uring_prep_recv_data(task, task->recv.ptr->data, SF_CTX->header_size); } else { return uring_prep_first_recv(task); } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } static inline int prepare_next_recv(struct fast_task_info *task) { -#if IOEVENT_USE_URING int recv_bytes; if (task->recv.ptr->length == 0) { //recv header @@ -170,41 +211,35 @@ static inline int prepare_next_recv(struct fast_task_info *task) header_size), recv_bytes); } } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } +#endif static inline int set_read_event(struct fast_task_info *task) { int result; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { logWarning("file: "__FILE__", line: %d, " "trigger recv again!", __LINE__); return 0; } + if (task->event.callback != (IOEventCallback)sf_client_sock_read) { + task->event.callback = (IOEventCallback)sf_client_sock_read; + } if ((result=prepare_first_recv(task)) != 0) { ioevent_add_to_deleted_list(task); return result; } - if (task->event.callback != (IOEventCallback)sf_client_sock_read) { - task->event.callback = (IOEventCallback)sf_client_sock_read; - } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } else { +#endif + if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } + task->event.callback = (IOEventCallback)sf_client_sock_read; if (ioevent_modify(&task->thread_data->ev_puller, task->event.fd, IOEVENT_READ, task) != 0) { @@ -218,8 +253,9 @@ static inline int set_read_event(struct fast_task_info *task) return result; } - task->event.callback = (IOEventCallback)sf_client_sock_read; +#if IOEVENT_USE_URING } +#endif return 0; } @@ -564,8 +600,8 @@ int sf_send_add_event(struct fast_task_info *task) if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { if (task->event.callback != (IOEventCallback)sf_client_sock_write) { task->event.callback = (IOEventCallback)sf_client_sock_write; } @@ -574,16 +610,14 @@ int sf_send_add_event(struct fast_task_info *task) } else { return uring_prep_first_send(task); } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } else { +#endif if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { return errno != 0 ? errno : EIO; } +#if IOEVENT_USE_URING } +#endif } return 0; @@ -626,20 +660,16 @@ static inline int check_task(struct fast_task_info *task, } } +#if IOEVENT_USE_URING static inline int prepare_next_send(struct fast_task_info *task) { -#if IOEVENT_USE_URING if (task->handler->use_send_zc) { return uring_prep_next_send_zc(task); } else { return uring_prep_next_send(task); } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } +#endif ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action, bool *send_done) @@ -647,15 +677,11 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, int bytes; int result; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { bytes = task->event.res; -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return -1; -#endif } else { +#endif if (task->iovec_array.iovs != NULL) { bytes = writev(task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX)); @@ -664,26 +690,34 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset); } +#if IOEVENT_USE_URING } +#endif if (bytes < 0) { - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { result = -bytes; -#endif } else { +#endif result = errno; +#if IOEVENT_USE_URING } +#endif if (result == EAGAIN || result == EWOULDBLOCK) { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_send(task) != 0) { return -1; } } else { +#endif if (set_write_event(task) != 0) { return -1; } +#if IOEVENT_USE_URING } +#endif *action = sf_comm_action_break; return 0; @@ -718,11 +752,6 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->length = 0; } - if (task->handler->use_io_uring) { -#if IOEVENT_USE_URING - FC_URING_OP_TYPE(task) = IORING_OP_NOP; -#endif - } *action = sf_comm_action_finish; *send_done = true; } else { @@ -755,14 +784,18 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->iovec_array.count = end - iov; } +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_send(task) != 0) { return -1; } *action = sf_comm_action_break; } else { +#endif *action = sf_comm_action_continue; +#if IOEVENT_USE_URING } +#endif *send_done = false; } @@ -778,15 +811,11 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, int recv_bytes; bool new_alloc; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { bytes = task->event.res; -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return -1; -#endif } else { +#endif if (task->recv.ptr->length == 0) { //recv header recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; bytes = read(task->event.fd, task->recv.ptr->data + @@ -802,22 +831,28 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, header_size), recv_bytes); } } +#if IOEVENT_USE_URING } +#endif if (bytes < 0) { - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { result = -bytes; -#endif } else { +#endif result = errno; +#if IOEVENT_USE_URING } +#endif if (result == EAGAIN || result == EWOULDBLOCK) { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } } +#endif *action = sf_comm_action_break; return 0; } else if (result == EINTR && !task->handler->use_io_uring) { @@ -864,14 +899,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, task->recv.ptr->offset += bytes; if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->offset < SF_CTX->header_size) { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } *action = sf_comm_action_break; } else { +#endif *action = sf_comm_action_continue; +#if IOEVENT_USE_URING } +#endif return bytes; } @@ -922,21 +961,20 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, } if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done - if (task->handler->use_io_uring) { -#if IOEVENT_USE_URING - FC_URING_OP_TYPE(task) = IORING_OP_NOP; -#endif - } *action = sf_comm_action_finish; } else { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } *action = sf_comm_action_break; } else { +#endif *action = sf_comm_action_continue; +#if IOEVENT_USE_URING } +#endif } return bytes; @@ -1084,6 +1122,11 @@ static int sf_client_sock_read(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return result >= 0 ? 0 : -1; } @@ -1099,7 +1142,8 @@ static int sf_client_sock_read(int sock, short event, void *arg) task->event.timer.expires = g_current_time + SF_CTX->net_buffer_cfg.network_timeout; fast_timer_add(&task->thread_data->timer, - &task->event.timer); + &task->event.timer); + return 0; } else { if (task->recv.ptr->length > 0) { logWarning("file: "__FILE__", line: %d, " @@ -1116,10 +1160,14 @@ static int sf_client_sock_read(int sock, short event, void *arg) ioevent_add_to_deleted_list(task); return -1; } - - return 0; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif + total_read = 0; action = sf_comm_action_continue; while (1) { @@ -1179,6 +1227,11 @@ static int sf_client_sock_write(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return result >= 0 ? 0 : -1; } @@ -1193,6 +1246,12 @@ static int sf_client_sock_write(int sock, short event, void *arg) return -1; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif + total_write = 0; length = task->send.ptr->length; action = sf_comm_action_continue; diff --git a/src/sf_nio.h b/src/sf_nio.h index 8411781..599904e 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -90,6 +90,7 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task) } } +void sf_socket_close_connection(struct fast_task_info *task); void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); diff --git a/src/sf_service.c b/src/sf_service.c index aa6df86..efd3389 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -35,7 +35,6 @@ #include "fastcommon/fc_memory.h" #include "sf_nio.h" #include "sf_util.h" -#include "sf_global.h" #include "sf_service.h" #if defined(OS_LINUX) @@ -502,12 +501,6 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener) return task; } -void sf_socket_close_connection(struct fast_task_info *task) -{ - close(task->event.fd); - task->event.fd = -1; -} - void sf_socket_close_ex(SFContext *sf_context) { int i; diff --git a/src/sf_service.h b/src/sf_service.h index 0980328..813e8f2 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -25,6 +25,7 @@ #include "fastcommon/ioevent.h" #include "fastcommon/fast_task_queue.h" #include "sf_types.h" +#include "sf_global.h" typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); typedef void (*sf_sig_quit_handler)(int sig); @@ -108,8 +109,6 @@ int sf_socket_create_server(SFListener *listener, void sf_socket_close_server(SFListener *listener); struct fast_task_info *sf_socket_accept_connection(SFListener *listener); -void sf_socket_close_connection(struct fast_task_info *task); - int sf_socket_server_ex(SFContext *sf_context); #define sf_socket_server() sf_socket_server_ex(&g_sf_context) @@ -170,9 +169,9 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return task; } -#define sf_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ - &task->reffer_count, inc_count) -#define sf_hold_task(task) sf_hold_task_ex(task, 1) +#define sf_hold_task_ex(task, inc_count) \ + fc_hold_task_ex(task, inc_count) +#define sf_hold_task(task) fc_hold_task(task) #define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) @@ -187,6 +186,14 @@ static inline void sf_release_task(struct fast_task_info *task) "used: %d, freed: %d", __LINE__, task, alloc_count, alloc_count - free_count, free_count); */ + +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + task->handler->close_connection(task); + __sync_fetch_and_sub(&g_sf_global_vars. + connection_stat.current_count, 1); + } +#endif free_queue_push(task); } } From 263171c4fee4fb003e050b692fbb35253ecb3714 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 30 Sep 2025 11:26:11 +0800 Subject: [PATCH 05/13] async_connect use io_uring --- make.sh | 6 +++++ src/sf_nio.c | 62 ++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/make.sh b/make.sh index f966463..292589c 100755 --- a/make.sh +++ b/make.sh @@ -9,10 +9,13 @@ DEBUG_FLAG=0 if [ -f /usr/include/fastcommon/_os_define.h ]; then OS_BITS=$(grep -F OS_BITS /usr/include/fastcommon/_os_define.h | awk '{print $NF;}') + USE_URING=$(grep -F IOEVENT_USE_URING /usr/include/fastcommon/_os_define.h | awk '{print $NF;}') elif [ -f /usr/local/include/fastcommon/_os_define.h ]; then OS_BITS=$(grep -F OS_BITS /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}') + USE_URING=$(grep -F IOEVENT_USE_URING /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}') else OS_BITS=64 + USE_URING='' fi uname=$(uname) @@ -49,6 +52,9 @@ LIBS='' uname=$(uname) if [ "$uname" = "Linux" ]; then CFLAGS="$CFLAGS" + if [ -n "$USE_URING" ]; then + LIBS="$LIBS -luring" + fi elif [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then CFLAGS="$CFLAGS" if [ "$uname" = "Darwin" ]; then diff --git a/src/sf_nio.c b/src/sf_nio.c index 3c3bba1..e733ac3 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -220,10 +220,17 @@ static inline int set_read_event(struct fast_task_info *task) #if IOEVENT_USE_URING if (task->handler->use_io_uring) { - if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { - logWarning("file: "__FILE__", line: %d, " - "trigger recv again!", __LINE__); - return 0; + if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) { + if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { + logWarning("file: "__FILE__", line: %d, " + "trigger recv again!", __LINE__); + return 0; + } else { + logWarning("file: "__FILE__", line: %d, " + "another operation in progress, op_type: %d!", + __LINE__, FC_URING_OP_TYPE(task)); + return EBUSY; + } } if (task->event.callback != (IOEventCallback)sf_client_sock_read) { @@ -316,16 +323,31 @@ static int sf_client_connect_done(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if (task->canceled) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return ENOTCONN; } if (event & IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { - result = task->handler->async_connect_check(task); - if (result == EINPROGRESS) { - return 0; +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + result = (task->event.res < 0 ? -1 * task->event.res : + task->event.res); + } else { +#endif + result = task->handler->async_connect_check(task); + if (result == EINPROGRESS) { + return 0; + } +#if IOEVENT_USE_URING } +#endif } if (SF_CTX->callbacks.connect_done != NULL) { @@ -356,14 +378,26 @@ static int sf_client_connect_done(int sock, short event, void *arg) int sf_socket_async_connect_server(struct fast_task_info *task) { int result; - if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, - O_NONBLOCK, NULL, &result)) < 0) - { - return result > 0 ? -1 * result : result; - } - return asyncconnectserverbyip(task->event.fd, - task->server_ip, task->port); +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if ((result=uring_prep_connect(task)) != 0) { + return result; + } + return EINPROGRESS; + } else { +#endif + if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, + O_NONBLOCK, NULL, &result)) < 0) + { + return result > 0 ? -1 * result : result; + } + + return asyncconnectserverbyip(task->event.fd, + task->server_ip, task->port); +#if IOEVENT_USE_URING + } +#endif } static int sf_async_connect_server(struct fast_task_info *task) From cf0950ea625e924102c451a0ef6253c9f519fa86 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 3 Oct 2025 11:33:26 +0800 Subject: [PATCH 06/13] sf_set_read_event just skipped when use_io_uring is true --- src/idempotency/client/client_channel.c | 24 +++++++----- src/idempotency/client/client_channel.h | 4 +- src/idempotency/client/receipt_handler.c | 7 +++- src/sf_nio.c | 40 +++++++++++++------- src/sf_service.c | 4 +- src/sf_service.h | 48 +++++++++++++++++++++++- 6 files changed, 96 insertions(+), 31 deletions(-) diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 118c937..c5f9225 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -174,7 +174,6 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, const uint32_t hash_code, const FCCommunicationType comm_type, const char *server_ip, const uint16_t port, int *err_no) { - int len; struct fast_task_info *task; SFAddressFamilyHandler *fh; SFNetworkHandler *handler; @@ -195,12 +194,7 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel return NULL; } - len = strlen(server_ip); - if (len >= sizeof(task->server_ip)) { - len = sizeof(task->server_ip) - 1; - } - memcpy(task->server_ip, server_ip, len); - *(task->server_ip + len) = '\0'; + fc_safe_strcpy(task->server_ip, server_ip); task->port = port; task->arg = channel; task->thread_data = g_sf_context.thread_data + @@ -209,7 +203,8 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel channel->last_connect_time = g_current_time; if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { channel->in_ioevent = 0; //rollback - sf_release_task(task); + __sync_sub_and_fetch(&task->reffer_count, 1); + free_queue_push(task); return NULL; } return task; @@ -221,6 +216,12 @@ int idempotency_client_channel_check_reconnect( int result; char formatted_ip[FORMATTED_IP_SIZE]; +#if IOEVENT_USE_URING + if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) { + return 0; + } +#endif + if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { return 0; } @@ -237,6 +238,9 @@ int idempotency_client_channel_check_reconnect( formatted_ip, channel->task->port); } + if (channel->task->event.fd >= 0) { + channel->task->handler->close_connection(channel->task); + } __sync_bool_compare_and_swap(&channel->task->canceled, 1, 0); if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { channel->last_connect_time = g_current_time; @@ -348,8 +352,8 @@ int idempotency_client_channel_push(struct idempotency_client_channel *channel, receipt->req_id = req_id; fc_queue_push_ex(&channel->queue, receipt, ¬ify); if (notify) { - if (__sync_add_and_fetch(&channel->in_ioevent, 0)) { - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->in_ioevent)) { + if (FC_ATOMIC_GET(channel->established)) { sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE); } } else { diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index 8fa82be..44de026 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -76,13 +76,13 @@ static inline void idempotency_client_channel_set_id_key( static inline int idempotency_client_channel_check_wait_ex( struct idempotency_client_channel *channel, const int timeout) { - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { return 0; } idempotency_client_channel_check_reconnect(channel); lcp_timedwait_sec(&channel->lcp, timeout); - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { return 0; } else { /* diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index e0c1d06..b6db0bf 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -49,6 +49,10 @@ static IdempotencyReceiptGlobalVars receipt_global_vars; static int receipt_init_task(struct fast_task_info *task, void *arg) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = true; +#endif + if (RDMA_INIT_CONNECTION != NULL) { return RDMA_INIT_CONNECTION(task, arg); } else { @@ -92,7 +96,6 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) if (task->event.fd >= 0) { sf_task_detach_thread(task); - task->handler->close_connection(task); } sf_nio_reset_task_length(task); @@ -282,7 +285,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) } channel = (IdempotencyClientChannel *)task->arg; - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { format_ip_address(task->server_ip, formatted_ip); logWarning("file: "__FILE__", line: %d, " "response from server %s:%u, unexpected cmd: " diff --git a/src/sf_nio.c b/src/sf_nio.c index e733ac3..44cbc58 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -269,6 +269,12 @@ static inline int set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + return 0; + } +#endif + task->recv.ptr->offset = 0; task->recv.ptr->length = 0; task->nio_stages.current = SF_NIO_STAGE_RECV; @@ -561,7 +567,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " - "write eventfd %d fail, errno: %d, error info: %s", + "write to fd %d fail, errno: %d, error info: %s", __LINE__, FC_NOTIFY_WRITE_FD(task->thread_data), result, STRERROR(result)); return result; @@ -589,26 +595,32 @@ static inline void deal_notified_task(struct fast_task_info *task, } } -void sf_recv_notify_read(int sock, short event, void *arg) +void sf_recv_notify_read(int fd, short event, void *arg) { int64_t n; int stage; - struct nio_thread_data *thread_data; + struct ioevent_notify_entry *notify_entry; struct fast_task_info *task; struct fast_task_info *current; - thread_data = ((struct ioevent_notify_entry *)arg)->thread_data; - if (read(sock, &n, sizeof(n)) < 0) { + notify_entry = (struct ioevent_notify_entry *)arg; + if (read(fd, &n, sizeof(n)) < 0) { +#if IOEVENT_USE_URING + if (errno == EAGAIN) { + return; + } +#endif + logWarning("file: "__FILE__", line: %d, " - "read from eventfd %d fail, errno: %d, error info: %s", - __LINE__, sock, errno, STRERROR(errno)); + "read from fd %d fail, errno: %d, error info: %s", + __LINE__, fd, errno, STRERROR(errno)); } - PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); - current = thread_data->waiting_queue.head; - thread_data->waiting_queue.head = NULL; - thread_data->waiting_queue.tail = NULL; - PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); + PTHREAD_MUTEX_LOCK(¬ify_entry->thread_data->waiting_queue.lock); + current = notify_entry->thread_data->waiting_queue.head; + notify_entry->thread_data->waiting_queue.head = NULL; + notify_entry->thread_data->waiting_queue.tail = NULL; + PTHREAD_MUTEX_UNLOCK(¬ify_entry->thread_data->waiting_queue.lock); while (current != NULL) { task = current; @@ -772,9 +784,9 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, task length: %d, offset: %d, " + "client ip: %s, task length: %d, offset: %d, " "send failed, connection disconnected", __LINE__, - task->client_ip, task->event.fd, task->send.ptr->length, + task->client_ip, task->send.ptr->length, task->send.ptr->offset); return -1; } diff --git a/src/sf_service.c b/src/sf_service.c index efd3389..a9adfee 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -490,7 +490,9 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener) } FC_SET_CLOEXEC(incomesock); - if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) { + if ((task=sf_alloc_init_server_task(listener->handler, + incomesock)) == NULL) + { close(incomesock); return NULL; } diff --git a/src/sf_service.h b/src/sf_service.h index 813e8f2..eba2c7f 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -169,12 +169,42 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return task; } -#define sf_hold_task_ex(task, inc_count) \ - fc_hold_task_ex(task, inc_count) +#define sf_hold_task_ex(task, inc_count) fc_hold_task_ex(task, inc_count) #define sf_hold_task(task) fc_hold_task(task) #define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) +static inline struct fast_task_info *sf_alloc_init_server_task( + SFNetworkHandler *handler, const int fd) +{ + const int reffer_count = 1; + struct fast_task_info *task; + + if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = false; +#endif + } + + return task; +} + +static inline struct fast_task_info *sf_alloc_init_client_task( + SFNetworkHandler *handler) +{ + const int fd = -1; + const int reffer_count = 1; + struct fast_task_info *task; + + if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = true; +#endif + } + + return task; +} + static inline void sf_release_task(struct fast_task_info *task) { if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) { @@ -194,6 +224,7 @@ static inline void sf_release_task(struct fast_task_info *task) connection_stat.current_count, 1); } #endif + free_queue_push(task); } } @@ -269,6 +300,19 @@ static inline SFNetworkHandler *sf_get_rdma_network_handler3( return sf_get_rdma_network_handler(sf_context3); } +static inline bool sf_get_double_buffers_flag(FCServerGroupInfo *server_group) +{ + if (server_group->comm_type == fc_comm_type_sock) { +#if IOEVENT_USE_URING + return true; +#else + return false; +#endif + } else { //RDMA + return true; + } +} + #ifdef __cplusplus } #endif From 3dcc1c570d318356f8992ed36eded5331f221fc0 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 3 Oct 2025 21:06:58 +0800 Subject: [PATCH 07/13] call sf_proto_init_task_magic when task->shrinked --- src/idempotency/client/receipt_handler.c | 5 +++-- src/sf_nio.c | 6 ++++-- src/sf_service.c | 22 ++++++++++++---------- src/sf_service.h | 17 ++++++++++++----- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index b6db0bf..061d3ef 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -523,6 +523,7 @@ static int do_init(FCAddressPtrArray *address_array) { const int task_arg_size = 0; const bool double_buffers = false; + const bool need_shrink_task_buffer = false; const bool explicit_post_recv = false; int result; int bytes; @@ -555,8 +556,8 @@ static int do_init(FCAddressPtrArray *address_array) NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, - task_arg_size, double_buffers, explicit_post_recv, - receipt_init_task, pd, NULL); + task_arg_size, double_buffers, need_shrink_task_buffer, + explicit_post_recv, receipt_init_task, pd, NULL); } int receipt_handler_init(FCAddressPtrArray *address_array) diff --git a/src/sf_nio.c b/src/sf_nio.c index 44cbc58..291583c 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -269,14 +269,16 @@ static inline int set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task) { + /* reset recv offset and length */ + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + #if IOEVENT_USE_URING if (task->handler->use_io_uring) { return 0; } #endif - task->recv.ptr->offset = 0; - task->recv.ptr->length = 0; task->nio_stages.current = SF_NIO_STAGE_RECV; return set_read_event(task); } diff --git a/src/sf_service.c b/src/sf_service.c index a9adfee..26393b7 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -34,6 +34,7 @@ #include "fastcommon/ioevent_loop.h" #include "fastcommon/fc_memory.h" #include "sf_nio.h" +#include "sf_proto.h" #include "sf_util.h" #include "sf_service.h" @@ -60,9 +61,9 @@ struct worker_thread_context { static void *worker_thread_entrance(void *arg); static int sf_init_free_queue(SFContext *sf_context, const char *name, - const bool double_buffers, const int task_padding_size, - const int task_arg_size, TaskInitCallback init_callback, - void *init_arg) + const bool double_buffers, const bool need_shrink_task_buffer, + const int task_padding_size, const int task_arg_size, + TaskInitCallback init_callback, void *init_arg) { int result; int m; @@ -82,9 +83,9 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, } alloc_conn_once = 256 / m; return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, - sf_context->net_buffer_cfg.max_connections, alloc_conn_once, - sf_context->net_buffer_cfg.min_buff_size, sf_context-> - net_buffer_cfg.max_buff_size, task_padding_size, + need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections, + alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, + sf_context->net_buffer_cfg.max_buff_size, task_padding_size, task_arg_size, init_callback, init_arg); } @@ -100,8 +101,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - const bool explicit_post_recv, TaskInitCallback init_callback, - void *init_arg, sf_release_buffer_callback release_buffer_callback) + const bool need_shrink_task_buffer, const bool explicit_post_recv, + TaskInitCallback init_callback, void *init_arg, + sf_release_buffer_callback release_buffer_callback) { int result; int bytes; @@ -131,8 +133,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, } if ((result=sf_init_free_queue(sf_context, name, double_buffers, - task_padding_size, task_arg_size, init_callback, - init_arg)) != 0) + need_shrink_task_buffer, task_padding_size, + task_arg_size, init_callback, init_arg)) != 0) { return result; } diff --git a/src/sf_service.h b/src/sf_service.h index eba2c7f..f806806 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -25,6 +25,7 @@ #include "fastcommon/ioevent.h" #include "fastcommon/fast_task_queue.h" #include "sf_types.h" +#include "sf_proto.h" #include "sf_global.h" typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); @@ -46,8 +47,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - const bool explicit_post_recv, TaskInitCallback init_callback, - void *init_arg, sf_release_buffer_callback release_buffer_callback); + const bool need_shrink_task_buffer, const bool explicit_post_recv, + TaskInitCallback init_callback, void *init_arg, + sf_release_buffer_callback release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -57,7 +59,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \ timeout_callback, net_timeout_ms, proto_header_size, \ - 0, task_arg_size, false, false, NULL, NULL, NULL) + 0, task_arg_size, false, true, false, NULL, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -66,8 +68,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ - net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ - NULL, NULL, NULL) + net_timeout_ms, proto_header_size, 0, task_arg_size, false, true, \ + false, NULL, NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); @@ -162,6 +164,11 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return NULL; } + if (task->shrinked) { + task->shrinked = false; + sf_proto_init_task_magic(task); + } + __sync_add_and_fetch(&task->reffer_count, reffer_count); __sync_bool_compare_and_swap(&task->canceled, 1, 0); task->handler = handler; From 68079fc4688270d2dc02ed8503102f3ef2c00e47 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 5 Oct 2025 16:53:21 +0800 Subject: [PATCH 08/13] IOEventCallback: change event type from short to int --- src/sf_nio.c | 38 ++++++++++++++++++++------------------ src/sf_nio.h | 2 +- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/sf_nio.c b/src/sf_nio.c index 291583c..a4d9bd0 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -41,8 +41,8 @@ #include "sf_service.h" #include "sf_nio.h" -static int sf_client_sock_write(int sock, short event, void *arg); -static int sf_client_sock_read(int sock, short event, void *arg); +static int sf_client_sock_write(int sock, const int event, void *arg); +static int sf_client_sock_read(int sock, const int event, void *arg); void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, @@ -67,12 +67,14 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, FC_URING_OP_TYPE(task) = IORING_OP_NOP; \ sf_release_task(task) -static int sf_uring_cancel_done(int sock, short event, void *arg) +static int sf_uring_cancel_done(int sock, const int event, void *arg) { struct fast_task_info *task; task = (struct fast_task_info *)arg; - CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + if (event != IOEVENT_TIMEOUT) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } return 0; } #endif @@ -269,16 +271,16 @@ static inline int set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task) { - /* reset recv offset and length */ - task->recv.ptr->offset = 0; - task->recv.ptr->length = 0; - #if IOEVENT_USE_URING if (task->handler->use_io_uring) { return 0; } #endif + /* reset recv offset and length */ + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + task->nio_stages.current = SF_NIO_STAGE_RECV; return set_read_event(task); } @@ -323,7 +325,7 @@ int sf_socket_async_connect_check(struct fast_task_info *task) return result; } -static int sf_client_connect_done(int sock, short event, void *arg) +static int sf_client_connect_done(int sock, const int event, void *arg) { int result; struct fast_task_info *task; @@ -339,7 +341,7 @@ static int sf_client_connect_done(int sock, short event, void *arg) return ENOTCONN; } - if (event & IOEVENT_TIMEOUT) { + if (event == IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { #if IOEVENT_USE_URING @@ -597,7 +599,7 @@ static inline void deal_notified_task(struct fast_task_info *task, } } -void sf_recv_notify_read(int fd, short event, void *arg) +void sf_recv_notify_read(int fd, const int event, void *arg) { int64_t n; int stage; @@ -672,7 +674,7 @@ int sf_send_add_event(struct fast_task_info *task) } static inline int check_task(struct fast_task_info *task, - const short event, const int expect_stage) + const int event, const int expect_stage) { if (task->canceled) { return ENOTCONN; @@ -1160,7 +1162,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) return 0; } -static int sf_client_sock_read(int sock, short event, void *arg) +static int sf_client_sock_read(int sock, const int event, void *arg) { int result; int bytes; @@ -1171,14 +1173,14 @@ static int sf_client_sock_read(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) { + if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { CLEAR_OP_TYPE_AND_RELEASE_TASK(task); } #endif return result >= 0 ? 0 : -1; } - if (event & IOEVENT_TIMEOUT) { + if (event == IOEVENT_TIMEOUT) { if (task->recv.ptr->offset == 0 && task->req_count > 0) { if (SF_CTX->callbacks.task_timeout != NULL) { if (SF_CTX->callbacks.task_timeout(task) != 0) { @@ -1262,7 +1264,7 @@ static int sf_client_sock_read(int sock, short event, void *arg) return total_read; } -static int sf_client_sock_write(int sock, short event, void *arg) +static int sf_client_sock_write(int sock, const int event, void *arg) { int result; int bytes; @@ -1276,14 +1278,14 @@ static int sf_client_sock_write(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) { + if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { CLEAR_OP_TYPE_AND_RELEASE_TASK(task); } #endif return result >= 0 ? 0 : -1; } - if (event & IOEVENT_TIMEOUT) { + if (event == IOEVENT_TIMEOUT) { logError("file: "__FILE__", line: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, " "remain: %d", __LINE__, task->client_ip, task->send.ptr->length, diff --git a/src/sf_nio.h b/src/sf_nio.h index 599904e..03e756d 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -91,7 +91,7 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task) } void sf_socket_close_connection(struct fast_task_info *task); -void sf_recv_notify_read(int sock, short event, void *arg); +void sf_recv_notify_read(int sock, const int event, void *arg); int sf_send_add_event(struct fast_task_info *task); void sf_task_finish_clean_up(struct fast_task_info *task); From b16526e8f7adaed52eaa4595ba7c523ea37ec707 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 6 Oct 2025 20:55:29 +0800 Subject: [PATCH 09/13] bug fixed: check_task process correctly --- src/sf_global.c | 66 ++++++++++++++++++++++++++++++++---------------- src/sf_nio.c | 12 +++++++++ src/sf_service.c | 6 ++--- 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index 1473203..2097e0e 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -678,6 +678,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context, int inner_port; int outer_port; int port; + bool global_use_send_zc; bool use_send_zc; int i; int result; @@ -715,8 +716,15 @@ int sf_load_context_from_config_ex(SFContext *sf_context, outer_port = config->default_outer_port; } - use_send_zc = iniGetBoolValue(config->ini_ctx.section_name, - "use_send_zc", config->ini_ctx.context, false); + global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc", + config->ini_ctx.context, false); + if (config->ini_ctx.section_name == NULL) { + use_send_zc = global_use_send_zc; + } else { + use_send_zc = iniGetBoolValue(config->ini_ctx.section_name, + "use_send_zc", config->ini_ctx.context, global_use_send_zc); + } + for (i=0; ihandlers + i; fh->ctx = sf_context; @@ -899,6 +907,29 @@ static const char *get_address_family_caption( } } +static void get_io_uring_configs(const SFContext *sf_context, + bool *use_io_uring, bool *use_send_zc) +{ + int i; + const SFAddressFamilyHandler *fh; + const SFNetworkHandler *handler; + const SFNetworkHandler *end; + + *use_io_uring = false; + *use_send_zc = false; + for (i=0; ihandlers + i; + end = fh->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=fh->handlers; handlerenabled && handler->use_io_uring) { + *use_io_uring = true; + *use_send_zc = handler->use_send_zc; + return; + } + } + } +} + void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { @@ -908,6 +939,10 @@ void sf_context_config_to_string(const SFContext *sf_context, char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2]; int i; int len; +#if IOEVENT_USE_URING + bool use_io_uring; + bool use_send_zc; +#endif *inner_bind_addr = '\0'; *outer_bind_addr = '\0'; @@ -948,6 +983,12 @@ void sf_context_config_to_string(const SFContext *sf_context, ", address_family=%s, accept_threads=%d, work_threads=%d", get_address_family_caption(sf_context->address_family), sf_context->accept_threads, sf_context->work_threads); + +#if IOEVENT_USE_URING + get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc); + len += snprintf(output + len, size - len, ", use_io_uring=%d" + ", use_send_zc=%d", use_io_uring, use_send_zc); +#endif } void sf_log_config_to_string_ex(SFLogConfig *log_cfg, const char *caption, @@ -998,12 +1039,8 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, int min_buff_size; int max_buff_size; #if IOEVENT_USE_URING - int i; bool use_io_uring; bool use_send_zc; - SFAddressFamilyHandler *fh; - SFNetworkHandler *handler; - SFNetworkHandler *end; #endif char pkg_buff[256]; @@ -1024,22 +1061,6 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, min_buff_size / 1024, max_buff_size / 1024); } -#if IOEVENT_USE_URING - use_io_uring = false; - use_send_zc = false; - for (i=0; ihandlers + SF_NETWORK_HANDLER_COUNT; - for (handler=fh->handlers; handlerenabled && handler->use_io_uring) { - use_io_uring = true; - use_send_zc = handler->use_send_zc; - break; - } - } - } -#endif - len = snprintf(output, size, "base_path=%s, max_connections=%d, connect_timeout=%d, " "network_timeout=%d, thread_stack_size=%d KB, %s, ", @@ -1050,6 +1071,7 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, g_sf_global_vars.thread_stack_size / 1024, pkg_buff); #if IOEVENT_USE_URING + get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc); len += snprintf(output + len, size - len, "use_io_uring=%d, use_send_zc=%d, ", use_io_uring, use_send_zc); diff --git a/src/sf_nio.c b/src/sf_nio.c index a4d9bd0..45a4a03 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -694,6 +694,18 @@ static inline int check_task(struct fast_task_info *task, return 0; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, event: %d, expect stage: %d, " + "but current stage: %d, close connection", + __LINE__, task->client_ip, event, + expect_stage, task->nio_stages.current); + ioevent_add_to_deleted_list(task); + return -1; + } +#endif + if (task->handler->comm_type == fc_comm_type_sock) { if (tcp_socket_connected(task->event.fd)) { return EAGAIN; diff --git a/src/sf_service.c b/src/sf_service.c index 26393b7..f8ce26d 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -200,9 +200,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_data->arg = NULL; } - if (ioevent_init(&thread_data->ev_puller, 2 + sf_context-> - net_buffer_cfg.max_connections, net_timeout_ms, - extra_events) != 0) + if (ioevent_init(&thread_data->ev_puller, sf_context->name, + 2 + sf_context->net_buffer_cfg.max_connections, + net_timeout_ms, extra_events) != 0) { result = errno != 0 ? errno : ENOMEM; logError("file: "__FILE__", line: %d, " From b688973cf98f46308aabe5dad8fc560010fb9c62 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 9 Oct 2025 14:01:32 +0800 Subject: [PATCH 10/13] change use_send_zc's default value to true --- src/sf_global.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sf_global.c b/src/sf_global.c index 2097e0e..9f5c174 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -717,7 +717,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context, } global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc", - config->ini_ctx.context, false); + config->ini_ctx.context, true); if (config->ini_ctx.section_name == NULL) { use_send_zc = global_use_send_zc; } else { From 926cd40114e37a3f08d47795aaf2aac092b4591d Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 12 Oct 2025 10:24:35 +0800 Subject: [PATCH 11/13] ioevent_init: set max entries for io_uring gracefully --- src/sf_global.c | 2 ++ src/sf_service.c | 35 ++++++++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index 9f5c174..f6a3aa1 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -907,6 +907,7 @@ static const char *get_address_family_caption( } } +#if IOEVENT_USE_URING static void get_io_uring_configs(const SFContext *sf_context, bool *use_io_uring, bool *use_send_zc) { @@ -929,6 +930,7 @@ static void get_io_uring_configs(const SFContext *sf_context, } } } +#endif void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) diff --git a/src/sf_service.c b/src/sf_service.c index f8ce26d..23de1b8 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -108,6 +108,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, int result; int bytes; int extra_events; + int max_entries; int i; struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_ctx; @@ -200,15 +201,35 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_data->arg = NULL; } - if (ioevent_init(&thread_data->ev_puller, sf_context->name, - 2 + sf_context->net_buffer_cfg.max_connections, - net_timeout_ms, extra_events) != 0) +#if IOEVENT_USE_URING + if (sf_context->net_buffer_cfg.max_connections < 16 * 1024) { + max_entries = 2 * sf_context->net_buffer_cfg.max_connections; + } else { + max_entries = sf_context->net_buffer_cfg.max_connections; + } +#else + max_entries = 2 + sf_context->net_buffer_cfg.max_connections; +#endif + if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name, + max_entries, net_timeout_ms, extra_events)) != 0) { - result = errno != 0 ? errno : ENOMEM; + char prompt[256]; +#if IOEVENT_USE_URING + if (result == EPERM) { + strcpy(prompt, " make sure kernel.io_uring_disabled set to 0"); + } else if (result == EINVAL) { + sprintf(prompt, " maybe max_connections: %d is too large", + sf_context->net_buffer_cfg.max_connections); + } else { + *prompt = '\0'; + } +#else + *prompt = '\0'; +#endif + logError("file: "__FILE__", line: %d, " - "ioevent_init fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); + "ioevent_init fail, errno: %d, error info: %s.%s" + , __LINE__, result, strerror(result), prompt); return result; } From 817ff547da90ecf77c6724ab5614c1c2c0cfb2c1 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 12 Oct 2025 12:28:11 +0800 Subject: [PATCH 12/13] set alloc_conn_once and max_entries gracefully --- src/sf_service.c | 61 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/src/sf_service.c b/src/sf_service.c index 23de1b8..1e98cdb 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -66,7 +66,9 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, TaskInitCallback init_callback, void *init_arg) { int result; + int buffer_size; int m; + int max_m; int alloc_conn_once; if ((result=set_rand_seed()) != 0) { @@ -75,11 +77,19 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, return result; } - m = sf_context->net_buffer_cfg.min_buff_size / (64 * 1024); + if (strcmp(name, "service") == 0) { + buffer_size = sf_context->net_buffer_cfg.min_buff_size; + max_m = 16; + } else { + buffer_size = FC_MAX(4 * 1024 * 1024, sf_context-> + net_buffer_cfg.max_buff_size); + max_m = 64; + } + m = buffer_size / (64 * 1024); if (m == 0) { m = 1; - } else if (m > 16) { - m = 16; + } else if (m > max_m) { + m = max_m; } alloc_conn_once = 256 / m; return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, @@ -177,6 +187,36 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, extra_events = 0; } + max_entries = (sf_context->net_buffer_cfg.max_connections + + sf_context->work_threads - 1) / sf_context->work_threads; + if (strcmp(sf_context->name, "service") == 0) { + if (max_entries < 4 * 1024) { + max_entries = max_entries * 2; + } else if (max_entries < 8 * 1024) { + max_entries = (max_entries * 3) / 2; + } else if (max_entries < 16 * 1024) { + max_entries = (max_entries * 5) / 4; + } else if (max_entries < 32 * 1024) { + max_entries = (max_entries * 6) / 5; +#if IOEVENT_USE_URING + if (max_entries > 32 * 1024) { + max_entries = 32 * 1024; + } +#else + } else if (max_entries < 64 * 1024) { + max_entries = (max_entries * 11) / 10; + } else if (max_entries < 128 * 1024) { + max_entries = (max_entries * 21) / 20; +#endif + } + } else { + if (max_entries < 1024) { + max_entries += 8; + } else { + max_entries = 1024; + } + } + g_current_time = time(NULL); sf_context->thread_count = 0; data_end = sf_context->thread_data + sf_context->work_threads; @@ -201,15 +241,6 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_data->arg = NULL; } -#if IOEVENT_USE_URING - if (sf_context->net_buffer_cfg.max_connections < 16 * 1024) { - max_entries = 2 * sf_context->net_buffer_cfg.max_connections; - } else { - max_entries = sf_context->net_buffer_cfg.max_connections; - } -#else - max_entries = 2 + sf_context->net_buffer_cfg.max_connections; -#endif if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name, max_entries, net_timeout_ms, extra_events)) != 0) { @@ -218,8 +249,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, if (result == EPERM) { strcpy(prompt, " make sure kernel.io_uring_disabled set to 0"); } else if (result == EINVAL) { - sprintf(prompt, " maybe max_connections: %d is too large", - sf_context->net_buffer_cfg.max_connections); + sprintf(prompt, " maybe max_connections: %d is too large" + " or [%s]'s work_threads: %d is too small", + sf_context->net_buffer_cfg.max_connections, + sf_context->name, sf_context->work_threads); } else { *prompt = '\0'; } From 932751d3926f43f54622b19af79b38bc47baa4b2 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 20 Oct 2025 10:34:47 +0800 Subject: [PATCH 13/13] send zc done notify callback for recycling buffer --- src/sf_nio.c | 128 ++++++++++++++++++++++++++++++++++------------- src/sf_service.c | 15 ++++-- 2 files changed, 105 insertions(+), 38 deletions(-) diff --git a/src/sf_nio.c b/src/sf_nio.c index 45a4a03..e92f190 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -809,13 +809,25 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset += bytes; if (task->send.ptr->offset >= task->send.ptr->length) { - if (task->send.ptr != task->recv.ptr) { //double buffers - task->send.ptr->offset = 0; - task->send.ptr->length = 0; - } +#if IOEVENT_USE_URING + if (FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify) + { + *action = sf_comm_action_break; + *send_done = false; + } else { +#endif + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } + + *action = sf_comm_action_finish; + *send_done = true; +#if IOEVENT_USE_URING + } +#endif - *action = sf_comm_action_finish; - *send_done = true; } else { /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { @@ -848,8 +860,12 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, #if IOEVENT_USE_URING if (task->handler->use_io_uring) { - if (prepare_next_send(task) != 0) { - return -1; + if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify)) + { + if (prepare_next_send(task) != 0) { + return -1; + } } *action = sf_comm_action_break; } else { @@ -1276,13 +1292,44 @@ static int sf_client_sock_read(int sock, const int event, void *arg) return total_read; } +static int sock_write_done(struct fast_task_info *task, + const int length, const bool send_done) +{ + int next_stage; + + release_iovec_buffer(task); + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + + if (SF_CTX->callbacks.send_done == NULL || !send_done) { + task->nio_stages.current = SF_NIO_STAGE_RECV; + } else { + if (SF_CTX->callbacks.send_done(task, + length, &next_stage) != 0) + { + return -1; + } + + if (task->nio_stages.current != next_stage) { + task->nio_stages.current = next_stage; + } + } + + if (task->nio_stages.current == SF_NIO_STAGE_RECV) { + if (set_read_event(task) != 0) { + return -1; + } + } + + return 0; +} + static int sf_client_sock_write(int sock, const int event, void *arg) { int result; int bytes; int total_write; int length; - int next_stage; SFCommAction action; bool send_done; struct fast_task_info *task; @@ -1291,7 +1338,11 @@ static int sf_client_sock_write(int sock, const int event, void *arg) if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { #if IOEVENT_USE_URING if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { - CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) && + task->thread_data->ev_puller.send_zc_done_notify)) + { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } } #endif return result >= 0 ? 0 : -1; @@ -1309,8 +1360,38 @@ static int sf_client_sock_write(int sock, const int event, void *arg) } #if IOEVENT_USE_URING + if (event == IOEVENT_NOTIFY) { + if (!FC_URING_IS_SEND_ZC(task)) { + logWarning("file: "__FILE__", line: %d, " + "unexpected io_uring notify!", __LINE__); + return -1; + } + + FC_URING_OP_TYPE(task) = IORING_OP_NOP; + if (!task->canceled) { + if (task->send.ptr->offset >= task->send.ptr->length) { + length = task->send.ptr->length; + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } + + result = sock_write_done(task, length, true); + } else { + result = prepare_next_send(task); + } + } + + sf_release_task(task); + return result == 0 ? 0 : -1; + } + if (task->handler->use_io_uring) { - CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify)) + { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } } #endif @@ -1329,30 +1410,9 @@ static int sf_client_sock_write(int sock, const int event, void *arg) total_write += bytes; if (action == sf_comm_action_finish) { - release_iovec_buffer(task); - task->recv.ptr->offset = 0; - task->recv.ptr->length = 0; - - if (SF_CTX->callbacks.send_done == NULL || !send_done) { - task->nio_stages.current = SF_NIO_STAGE_RECV; - } else { - if (SF_CTX->callbacks.send_done(task, - length, &next_stage) != 0) - { - return -1; - } - - if (task->nio_stages.current != next_stage) { - task->nio_stages.current = next_stage; - } + if (sock_write_done(task, length, send_done) != 0) { + return -1; } - - if (task->nio_stages.current == SF_NIO_STAGE_RECV) { - if (set_read_event(task) != 0) { - return -1; - } - } - break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_service.c b/src/sf_service.c index 1e98cdb..4aa7e81 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -77,13 +77,13 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, return result; } - if (strcmp(name, "service") == 0) { - buffer_size = sf_context->net_buffer_cfg.min_buff_size; - max_m = 16; - } else { + if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) { buffer_size = FC_MAX(4 * 1024 * 1024, sf_context-> net_buffer_cfg.max_buff_size); max_m = 64; + } else { + buffer_size = sf_context->net_buffer_cfg.min_buff_size; + max_m = 16; } m = buffer_size / (64 * 1024); if (m == 0) { @@ -92,6 +92,7 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, m = max_m; } alloc_conn_once = 256 / m; + return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections, alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, @@ -266,6 +267,12 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, return result; } +#if IOEVENT_USE_URING + if (send_done_callback != NULL) { + ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true); + } +#endif + result = fast_timer_init(&thread_data->timer, 2 * sf_context-> net_buffer_cfg.network_timeout, g_current_time); if (result != 0) {