From a2ab8a0c01431a547c2bfc45165e3f3b81231f1b Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 27 Sep 2025 15:41:56 +0800 Subject: [PATCH] adapt Linux io_uring OK --- src/sf_global.c | 6 +- src/sf_nio.c | 195 ++++++++++++++++++++++++++++++----------------- src/sf_nio.h | 1 + src/sf_service.c | 7 -- src/sf_service.h | 17 +++-- 5 files changed, 143 insertions(+), 83 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index b6fd0d0..1473203 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -993,19 +993,19 @@ void sf_slow_log_config_to_string(SFSlowLogConfig *slow_log_cfg, void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, char *output, const int size) { - int i; int len; int max_pkg_size; int min_buff_size; int max_buff_size; #if IOEVENT_USE_URING + int i; bool use_io_uring; bool use_send_zc; -#endif - char pkg_buff[256]; SFAddressFamilyHandler *fh; SFNetworkHandler *handler; SFNetworkHandler *end; +#endif + char pkg_buff[256]; max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size - g_sf_global_vars.task_buffer_extra_size; diff --git a/src/sf_nio.c b/src/sf_nio.c index a87d248..3c3bba1 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -62,9 +62,37 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_context->callbacks.release_buffer = release_buffer_callback; } +#if IOEVENT_USE_URING +#define CLEAR_OP_TYPE_AND_RELEASE_TASK(task) \ + FC_URING_OP_TYPE(task) = IORING_OP_NOP; \ + sf_release_task(task) + +static int sf_uring_cancel_done(int sock, short event, void *arg) +{ + struct fast_task_info *task; + + task = (struct fast_task_info *)arg; + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + return 0; +} +#endif + void sf_task_detach_thread(struct fast_task_info *task) { +#if IOEVENT_USE_URING + bool need_cancel; + if (task->handler->use_io_uring) { + need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP); + } else { + need_cancel = true; + } + if (need_cancel) { + task->event.callback = (IOEventCallback)sf_uring_cancel_done; + uring_prep_cancel(task); + } +#else ioevent_detach(&task->thread_data->ev_puller, task->event.fd); +#endif if (task->event.timer.expires > 0) { fast_timer_remove(&task->thread_data->timer, @@ -91,6 +119,22 @@ static inline void release_iovec_buffer(struct fast_task_info *task) } } +void sf_socket_close_connection(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (uring_prep_close_fd(task) != 0) { + close(task->event.fd); + } + } else { +#endif + close(task->event.fd); +#if IOEVENT_USE_URING + } +#endif + task->event.fd = -1; +} + void sf_task_finish_clean_up(struct fast_task_info *task) { if (task->finish_callback != NULL) { @@ -100,15 +144,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task) release_iovec_buffer(task); sf_task_detach_thread(task); - task->handler->close_connection(task); - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING - FC_URING_OP_TYPE(task) = IORING_OP_NOP; + if (!task->handler->use_io_uring) { +#endif + task->handler->close_connection(task); + __sync_fetch_and_sub(&g_sf_global_vars. + connection_stat.current_count, 1); + +#if IOEVENT_USE_URING + } #endif - } - __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); sf_release_task(task); } @@ -134,25 +181,19 @@ static inline int set_write_event(struct fast_task_info *task) return 0; } +#if IOEVENT_USE_URING static inline int prepare_first_recv(struct fast_task_info *task) { -#if IOEVENT_USE_URING if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { return uring_prep_recv_data(task, task->recv.ptr->data, SF_CTX->header_size); } else { return uring_prep_first_recv(task); } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } static inline int prepare_next_recv(struct fast_task_info *task) { -#if IOEVENT_USE_URING int recv_bytes; if (task->recv.ptr->length == 0) { //recv header @@ -170,41 +211,35 @@ static inline int prepare_next_recv(struct fast_task_info *task) header_size), recv_bytes); } } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } +#endif static inline int set_read_event(struct fast_task_info *task) { int result; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { logWarning("file: "__FILE__", line: %d, " "trigger recv again!", __LINE__); return 0; } + if (task->event.callback != (IOEventCallback)sf_client_sock_read) { + task->event.callback = (IOEventCallback)sf_client_sock_read; + } if ((result=prepare_first_recv(task)) != 0) { ioevent_add_to_deleted_list(task); return result; } - if (task->event.callback != (IOEventCallback)sf_client_sock_read) { - task->event.callback = (IOEventCallback)sf_client_sock_read; - } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } else { +#endif + if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } + task->event.callback = (IOEventCallback)sf_client_sock_read; if (ioevent_modify(&task->thread_data->ev_puller, task->event.fd, IOEVENT_READ, task) != 0) { @@ -218,8 +253,9 @@ static inline int set_read_event(struct fast_task_info *task) return result; } - task->event.callback = (IOEventCallback)sf_client_sock_read; +#if IOEVENT_USE_URING } +#endif return 0; } @@ -564,8 +600,8 @@ int sf_send_add_event(struct fast_task_info *task) if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { if (task->event.callback != (IOEventCallback)sf_client_sock_write) { task->event.callback = (IOEventCallback)sf_client_sock_write; } @@ -574,16 +610,14 @@ int sf_send_add_event(struct fast_task_info *task) } else { return uring_prep_first_send(task); } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } else { +#endif if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { return errno != 0 ? errno : EIO; } +#if IOEVENT_USE_URING } +#endif } return 0; @@ -626,20 +660,16 @@ static inline int check_task(struct fast_task_info *task, } } +#if IOEVENT_USE_URING static inline int prepare_next_send(struct fast_task_info *task) { -#if IOEVENT_USE_URING if (task->handler->use_send_zc) { return uring_prep_next_send_zc(task); } else { return uring_prep_next_send(task); } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } +#endif ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action, bool *send_done) @@ -647,15 +677,11 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, int bytes; int result; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { bytes = task->event.res; -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return -1; -#endif } else { +#endif if (task->iovec_array.iovs != NULL) { bytes = writev(task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX)); @@ -664,26 +690,34 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset); } +#if IOEVENT_USE_URING } +#endif if (bytes < 0) { - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { result = -bytes; -#endif } else { +#endif result = errno; +#if IOEVENT_USE_URING } +#endif if (result == EAGAIN || result == EWOULDBLOCK) { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_send(task) != 0) { return -1; } } else { +#endif if (set_write_event(task) != 0) { return -1; } +#if IOEVENT_USE_URING } +#endif *action = sf_comm_action_break; return 0; @@ -718,11 +752,6 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->length = 0; } - if (task->handler->use_io_uring) { -#if IOEVENT_USE_URING - FC_URING_OP_TYPE(task) = IORING_OP_NOP; -#endif - } *action = sf_comm_action_finish; *send_done = true; } else { @@ -755,14 +784,18 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->iovec_array.count = end - iov; } +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_send(task) != 0) { return -1; } *action = sf_comm_action_break; } else { +#endif *action = sf_comm_action_continue; +#if IOEVENT_USE_URING } +#endif *send_done = false; } @@ -778,15 +811,11 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, int recv_bytes; bool new_alloc; - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { bytes = task->event.res; -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return -1; -#endif } else { +#endif if (task->recv.ptr->length == 0) { //recv header recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; bytes = read(task->event.fd, task->recv.ptr->data + @@ -802,22 +831,28 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, header_size), recv_bytes); } } +#if IOEVENT_USE_URING } +#endif if (bytes < 0) { - if (task->handler->use_io_uring) { #if IOEVENT_USE_URING + if (task->handler->use_io_uring) { result = -bytes; -#endif } else { +#endif result = errno; +#if IOEVENT_USE_URING } +#endif if (result == EAGAIN || result == EWOULDBLOCK) { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } } +#endif *action = sf_comm_action_break; return 0; } else if (result == EINTR && !task->handler->use_io_uring) { @@ -864,14 +899,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, task->recv.ptr->offset += bytes; if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->offset < SF_CTX->header_size) { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } *action = sf_comm_action_break; } else { +#endif *action = sf_comm_action_continue; +#if IOEVENT_USE_URING } +#endif return bytes; } @@ -922,21 +961,20 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, } if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done - if (task->handler->use_io_uring) { -#if IOEVENT_USE_URING - FC_URING_OP_TYPE(task) = IORING_OP_NOP; -#endif - } *action = sf_comm_action_finish; } else { +#if IOEVENT_USE_URING if (task->handler->use_io_uring) { if (prepare_next_recv(task) != 0) { return -1; } *action = sf_comm_action_break; } else { +#endif *action = sf_comm_action_continue; +#if IOEVENT_USE_URING } +#endif } return bytes; @@ -1084,6 +1122,11 @@ static int sf_client_sock_read(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return result >= 0 ? 0 : -1; } @@ -1099,7 +1142,8 @@ static int sf_client_sock_read(int sock, short event, void *arg) task->event.timer.expires = g_current_time + SF_CTX->net_buffer_cfg.network_timeout; fast_timer_add(&task->thread_data->timer, - &task->event.timer); + &task->event.timer); + return 0; } else { if (task->recv.ptr->length > 0) { logWarning("file: "__FILE__", line: %d, " @@ -1116,10 +1160,14 @@ static int sf_client_sock_read(int sock, short event, void *arg) ioevent_add_to_deleted_list(task); return -1; } - - return 0; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif + total_read = 0; action = sf_comm_action_continue; while (1) { @@ -1179,6 +1227,11 @@ static int sf_client_sock_write(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return result >= 0 ? 0 : -1; } @@ -1193,6 +1246,12 @@ static int sf_client_sock_write(int sock, short event, void *arg) return -1; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif + total_write = 0; length = task->send.ptr->length; action = sf_comm_action_continue; diff --git a/src/sf_nio.h b/src/sf_nio.h index 8411781..599904e 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -90,6 +90,7 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task) } } +void sf_socket_close_connection(struct fast_task_info *task); void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); diff --git a/src/sf_service.c b/src/sf_service.c index aa6df86..efd3389 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -35,7 +35,6 @@ #include "fastcommon/fc_memory.h" #include "sf_nio.h" #include "sf_util.h" -#include "sf_global.h" #include "sf_service.h" #if defined(OS_LINUX) @@ -502,12 +501,6 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener) return task; } -void sf_socket_close_connection(struct fast_task_info *task) -{ - close(task->event.fd); - task->event.fd = -1; -} - void sf_socket_close_ex(SFContext *sf_context) { int i; diff --git a/src/sf_service.h b/src/sf_service.h index 0980328..813e8f2 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -25,6 +25,7 @@ #include "fastcommon/ioevent.h" #include "fastcommon/fast_task_queue.h" #include "sf_types.h" +#include "sf_global.h" typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); typedef void (*sf_sig_quit_handler)(int sig); @@ -108,8 +109,6 @@ int sf_socket_create_server(SFListener *listener, void sf_socket_close_server(SFListener *listener); struct fast_task_info *sf_socket_accept_connection(SFListener *listener); -void sf_socket_close_connection(struct fast_task_info *task); - int sf_socket_server_ex(SFContext *sf_context); #define sf_socket_server() sf_socket_server_ex(&g_sf_context) @@ -170,9 +169,9 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return task; } -#define sf_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ - &task->reffer_count, inc_count) -#define sf_hold_task(task) sf_hold_task_ex(task, 1) +#define sf_hold_task_ex(task, inc_count) \ + fc_hold_task_ex(task, inc_count) +#define sf_hold_task(task) fc_hold_task(task) #define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) @@ -187,6 +186,14 @@ static inline void sf_release_task(struct fast_task_info *task) "used: %d, freed: %d", __LINE__, task, alloc_count, alloc_count - free_count, free_count); */ + +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + task->handler->close_connection(task); + __sync_fetch_and_sub(&g_sf_global_vars. + connection_stat.current_count, 1); + } +#endif free_queue_push(task); } }