From fa329720526f706bd5977c4ac6cc8103522d577e Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 4 Nov 2025 15:40:00 +0800 Subject: [PATCH] move use_io_uring and use_send_zc to struct sf_context --- src/idempotency/client/client_channel.c | 4 +- src/sf_global.c | 77 ++++++--------------- src/sf_nio.c | 89 ++++++++++++------------- src/sf_service.c | 6 +- src/sf_service.h | 3 +- src/sf_types.h | 8 ++- 6 files changed, 74 insertions(+), 113 deletions(-) diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index c5f9225..bd79657 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -217,7 +217,9 @@ int idempotency_client_channel_check_reconnect( char formatted_ip[FORMATTED_IP_SIZE]; #if IOEVENT_USE_URING - if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) { + struct fast_task_info *task; + task = channel->task; + if (SF_CTX->use_io_uring && FC_ATOMIC_GET(task->reffer_count) > 1) { return 0; } #endif diff --git a/src/sf_global.c b/src/sf_global.c index f6a3aa1..4b4d1dc 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -47,9 +47,13 @@ SFGlobalVariables g_sf_global_vars = { {0, 0}, NULL, {NULL, 0} }; -SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto, - {{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}, - {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, +SFContext g_sf_context = {{'\0'}, NULL, 0, false, +#if IOEVENT_USE_URING + false, false, +#endif + sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock}, + {false, fc_comm_type_rdma}}}, + {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, {DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, {false, 0, 0}, {sf_task_finish_clean_up} @@ -477,8 +481,7 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler) } static int init_network_handler(SFContext *sf_context, - SFNetworkHandler *handler, SFAddressFamilyHandler *fh, - const bool use_send_zc) + SFNetworkHandler *handler, SFAddressFamilyHandler *fh) { handler->fh = fh; handler->inner.handler = handler; @@ -499,18 +502,10 @@ static int init_network_handler(SFContext *sf_context, handler->send_data = sf_socket_send_data; handler->recv_data = sf_socket_recv_data; handler->post_recv = NULL; -#if IOEVENT_USE_URING - handler->use_io_uring = true; - handler->use_send_zc = use_send_zc; -#else - handler->use_io_uring = false; - handler->use_send_zc = false; -#endif return 0; } else { handler->inner.id = NULL; handler->outer.id = NULL; - handler->use_io_uring = false; return load_rdma_apis(sf_context, handler); } } @@ -748,9 +743,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context, if (!handler->enabled) { continue; } - if ((result=init_network_handler(sf_context, handler, - fh, use_send_zc)) != 0) - { + if ((result=init_network_handler(sf_context, handler, fh)) != 0) { return result; } } @@ -769,9 +762,13 @@ int sf_load_context_from_config_ex(SFContext *sf_context, rdma_handler->inner.enabled = sock_handler->inner.enabled; rdma_handler->outer.port = sock_handler->outer.port; rdma_handler->outer.enabled = sock_handler->outer.enabled; - } +#if IOEVENT_USE_URING + sf_context->use_io_uring = (config->comm_type == fc_comm_type_sock); + sf_context->use_send_zc = use_send_zc; +#endif + sf_context->accept_threads = iniGetIntValue( config->ini_ctx.section_name, "accept_threads", config->ini_ctx.context, 1); @@ -907,31 +904,6 @@ static const char *get_address_family_caption( } } -#if IOEVENT_USE_URING -static void get_io_uring_configs(const SFContext *sf_context, - bool *use_io_uring, bool *use_send_zc) -{ - int i; - const SFAddressFamilyHandler *fh; - const SFNetworkHandler *handler; - const SFNetworkHandler *end; - - *use_io_uring = false; - *use_send_zc = false; - for (i=0; ihandlers + i; - end = fh->handlers + SF_NETWORK_HANDLER_COUNT; - for (handler=fh->handlers; handlerenabled && handler->use_io_uring) { - *use_io_uring = true; - *use_send_zc = handler->use_send_zc; - return; - } - } - } -} -#endif - void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { @@ -941,10 +913,6 @@ void sf_context_config_to_string(const SFContext *sf_context, char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2]; int i; int len; -#if IOEVENT_USE_URING - bool use_io_uring; - bool use_send_zc; -#endif *inner_bind_addr = '\0'; *outer_bind_addr = '\0'; @@ -987,9 +955,9 @@ void sf_context_config_to_string(const SFContext *sf_context, sf_context->accept_threads, sf_context->work_threads); #if IOEVENT_USE_URING - get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc); - len += snprintf(output + len, size - len, ", use_io_uring=%d" - ", use_send_zc=%d", use_io_uring, use_send_zc); + len += snprintf(output + len, size - len, ", use_io_uring=%d, " + "use_send_zc=%d", sf_context->use_io_uring, + sf_context->use_send_zc); #endif } @@ -1040,10 +1008,6 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, int max_pkg_size; int min_buff_size; int max_buff_size; -#if IOEVENT_USE_URING - bool use_io_uring; - bool use_send_zc; -#endif char pkg_buff[256]; max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size - @@ -1073,10 +1037,9 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, g_sf_global_vars.thread_stack_size / 1024, pkg_buff); #if IOEVENT_USE_URING - get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc); - len += snprintf(output + len, size - len, - "use_io_uring=%d, use_send_zc=%d, ", - use_io_uring, use_send_zc); + len += snprintf(output + len, size - len, "use_io_uring=%d, " + "use_send_zc=%d, ", g_sf_context.use_io_uring, + g_sf_context.use_send_zc); #endif len += snprintf(output + len, size - len, diff --git a/src/sf_nio.c b/src/sf_nio.c index 14d3311..abe7bb1 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -73,11 +73,7 @@ static int sf_uring_cancel_done(int sock, const int event, void *arg) task = (struct fast_task_info *)arg; if (event != IOEVENT_TIMEOUT) { - if (task->handler->use_io_uring || (FC_URING_OP_TYPE(task) != - IORING_OP_NOP && task->event.res == -ECANCELED)) - { - CLEAR_OP_TYPE_AND_RELEASE_TASK(task); - } + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); } return 0; } @@ -86,18 +82,16 @@ static int sf_uring_cancel_done(int sock, const int event, void *arg) void sf_task_detach_thread(struct fast_task_info *task) { #if IOEVENT_USE_URING - bool need_cancel; - if (task->handler->use_io_uring) { - need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP); + if (SF_CTX->use_io_uring) { + if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) { + task->event.callback = (IOEventCallback)sf_uring_cancel_done; + uring_prep_cancel(task); + } } else { - need_cancel = true; +#endif + ioevent_detach(&task->thread_data->ev_puller, task->event.fd); +#if IOEVENT_USE_URING } - if (need_cancel) { - task->event.callback = (IOEventCallback)sf_uring_cancel_done; - uring_prep_cancel(task); - } -#else - ioevent_detach(&task->thread_data->ev_puller, task->event.fd); #endif if (task->event.timer.expires > 0) { @@ -128,7 +122,7 @@ static inline void release_iovec_buffer(struct fast_task_info *task) void sf_socket_close_connection(struct fast_task_info *task) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (uring_prep_close_fd(task) != 0) { close(task->event.fd); } @@ -152,7 +146,7 @@ void sf_task_finish_clean_up(struct fast_task_info *task) sf_task_detach_thread(task); #if IOEVENT_USE_URING - if (!task->handler->use_io_uring) { + if (!SF_CTX->use_io_uring) { #endif task->handler->close_connection(task); __sync_fetch_and_sub(&g_sf_global_vars. @@ -225,7 +219,7 @@ static inline int set_read_event(struct fast_task_info *task) int result; #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) { if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { logWarning("file: "__FILE__", line: %d, " @@ -276,7 +270,7 @@ static inline int set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { return 0; } #endif @@ -295,8 +289,7 @@ static inline int sf_ioevent_add(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ, (IOEventCallback)sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout, - task->handler->use_io_uring); + SF_CTX->net_buffer_cfg.network_timeout); return result > 0 ? -1 * result : result; } @@ -338,7 +331,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg) task = (struct fast_task_info *)arg; if (task->canceled) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) { CLEAR_OP_TYPE_AND_RELEASE_TASK(task); } #endif @@ -349,7 +342,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg) result = ETIMEDOUT; } else { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { CLEAR_OP_TYPE_AND_RELEASE_TASK(task); result = (task->event.res < 0 ? -1 * task->event.res : task->event.res); @@ -394,7 +387,7 @@ int sf_socket_async_connect_server(struct fast_task_info *task) int result; #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if ((result=uring_prep_connect(task)) != 0) { return result; } @@ -423,7 +416,7 @@ static int sf_async_connect_server(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_connect_done, SF_CTX->net_buffer_cfg. - connect_timeout, task->handler->use_io_uring); + connect_timeout); return result > 0 ? -1 * result : result; } else { if (SF_CTX->callbacks.connect_done != NULL) { @@ -472,7 +465,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) case SF_NIO_STAGE_RECV: task->nio_stages.current = SF_NIO_STAGE_RECV; if ((result=set_read_event(task)) == 0) { - if (!task->handler->use_io_uring) { + if (!SF_CTX->use_io_uring) { if (sf_client_sock_read(task->event.fd, IOEVENT_READ, task) < 0) { @@ -655,11 +648,11 @@ int sf_send_add_event(struct fast_task_info *task) /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (task->event.callback != (IOEventCallback)sf_client_sock_write) { task->event.callback = (IOEventCallback)sf_client_sock_write; } - if (task->handler->use_send_zc) { + if (SF_CTX->use_send_zc) { return uring_prep_first_send_zc(task); } else { return uring_prep_first_send(task); @@ -699,7 +692,7 @@ static inline int check_task(struct fast_task_info *task, } #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, event: %d, expect stage: %d, " "but current stage: %d, close connection", @@ -729,7 +722,7 @@ static inline int check_task(struct fast_task_info *task, #if IOEVENT_USE_URING static inline int prepare_next_send(struct fast_task_info *task) { - if (task->handler->use_send_zc) { + if (SF_CTX->use_send_zc) { return uring_prep_next_send_zc(task); } else { return uring_prep_next_send(task); @@ -744,7 +737,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, int result; #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { bytes = task->event.res; } else { #endif @@ -762,7 +755,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, if (bytes < 0) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { result = -bytes; } else { #endif @@ -772,7 +765,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, #endif if (result == EAGAIN || result == EWOULDBLOCK) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (prepare_next_send(task) != 0) { return -1; } @@ -787,7 +780,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, *action = sf_comm_action_break; return 0; - } else if (result == EINTR && !task->handler->use_io_uring) { + } else if (result == EINTR && !SF_CTX->use_io_uring) { /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", @@ -814,8 +807,8 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset += bytes; if (task->send.ptr->offset >= task->send.ptr->length) { #if IOEVENT_USE_URING - if (FC_URING_IS_SEND_ZC(task) && task->thread_data-> - ev_puller.send_zc_done_notify) + if (SF_CTX->use_io_uring && FC_URING_IS_SEND_ZC(task) && + task->thread_data->ev_puller.send_zc_done_notify) { *action = sf_comm_action_break; *send_done = false; @@ -863,7 +856,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, } #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> ev_puller.send_zc_done_notify)) { @@ -894,7 +887,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, bool new_alloc; #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { bytes = task->event.res; } else { #endif @@ -919,7 +912,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, if (bytes < 0) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { result = -bytes; } else { #endif @@ -929,7 +922,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, #endif if (result == EAGAIN || result == EWOULDBLOCK) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } @@ -937,7 +930,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, #endif *action = sf_comm_action_break; return 0; - } else if (result == EINTR && !task->handler->use_io_uring) { + } else if (result == EINTR && !SF_CTX->use_io_uring) { /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", @@ -982,7 +975,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->offset < SF_CTX->header_size) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } @@ -1046,7 +1039,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, *action = sf_comm_action_finish; } else { #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } @@ -1205,7 +1198,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) { CLEAR_OP_TYPE_AND_RELEASE_TASK(task); } #endif @@ -1245,7 +1238,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg) } #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { CLEAR_OP_TYPE_AND_RELEASE_TASK(task); } #endif @@ -1320,7 +1313,7 @@ static int sock_write_done(struct fast_task_info *task, } #if IOEVENT_USE_URING - if (!task->handler->use_io_uring || task->nio_stages. + if (!SF_CTX->use_io_uring || task->nio_stages. current == SF_NIO_STAGE_RECV) { #endif @@ -1349,7 +1342,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { #if IOEVENT_USE_URING - if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) { if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) && task->thread_data->ev_puller.send_zc_done_notify)) { @@ -1398,7 +1391,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg) return result == 0 ? 0 : -1; } - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> ev_puller.send_zc_done_notify)) { diff --git a/src/sf_service.c b/src/sf_service.c index 7907aef..2abaff3 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -33,7 +33,6 @@ #include "fastcommon/sched_thread.h" #include "fastcommon/ioevent_loop.h" #include "fastcommon/fc_memory.h" -#include "sf_nio.h" #include "sf_proto.h" #include "sf_util.h" #include "sf_service.h" @@ -242,8 +241,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_data->arg = NULL; } - if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name, - max_entries, net_timeout_ms, extra_events)) != 0) + if ((result=ioevent_init(&thread_data->ev_puller, sf_context-> + name, sf_context->use_io_uring, max_entries, + net_timeout_ms, extra_events)) != 0) { char prompt[256]; #if IOEVENT_USE_URING diff --git a/src/sf_service.h b/src/sf_service.h index f806806..2c51e09 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -27,6 +27,7 @@ #include "sf_types.h" #include "sf_proto.h" #include "sf_global.h" +#include "sf_nio.h" typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); typedef void (*sf_sig_quit_handler)(int sig); @@ -225,7 +226,7 @@ static inline void sf_release_task(struct fast_task_info *task) */ #if IOEVENT_USE_URING - if (task->handler->use_io_uring) { + if (SF_CTX->use_io_uring) { task->handler->close_connection(task); __sync_fetch_and_sub(&g_sf_global_vars. connection_stat.current_count, 1); diff --git a/src/sf_types.h b/src/sf_types.h index 4627cad..35bfaf3 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -119,8 +119,6 @@ struct sf_address_family_handler; typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; - bool use_io_uring; //since v1.2.9 - bool use_send_zc; //since v1.2.9 FCCommunicationType comm_type; struct sf_address_family_handler *fh; struct ibv_pd *pd; @@ -182,7 +180,11 @@ typedef struct sf_context { struct nio_thread_data *thread_data; volatile int thread_count; - bool is_client; //since v1.2.5 + bool is_client; //since v1.2.5 +#if IOEVENT_USE_URING + bool use_io_uring; //since v1.2.9 + bool use_send_zc; //since v1.2.9 +#endif SFAddressFamily address_family; SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];