diff --git a/src/sf_global.c b/src/sf_global.c index 3c91200..c525f11 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -498,10 +498,16 @@ static int init_network_handler(SFContext *sf_context, handler->send_data = sf_socket_send_data; handler->recv_data = sf_socket_recv_data; handler->post_recv = NULL; +#if IOEVENT_USE_URING + handler->use_iouring = true; +#else + handler->use_iouring = false; +#endif return 0; } else { handler->inner.id = NULL; handler->outer.id = NULL; + handler->use_iouring = false; return load_rdma_apis(sf_context, handler); } } diff --git a/src/sf_nio.c b/src/sf_nio.c index 4eb374d..3da4898 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -41,6 +41,9 @@ #include "sf_service.h" #include "sf_nio.h" +static int sf_client_sock_write(int sock, short event, void *arg); +static int sf_client_sock_read(int sock, short event, void *arg); + void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, @@ -144,6 +147,7 @@ static inline int set_read_event(struct fast_task_info *task) } task->event.callback = (IOEventCallback)sf_client_sock_read; + if (ioevent_modify(&task->thread_data->ev_puller, task->event.fd, IOEVENT_READ, task) != 0) { @@ -168,13 +172,14 @@ int sf_set_read_event(struct fast_task_info *task) return set_read_event(task); } -static inline int sf_ioevent_add(struct fast_task_info *task, - IOEventCallback callback, const int timeout) +static inline int sf_ioevent_add(struct fast_task_info *task) { int result; result = ioevent_set(task, task->thread_data, task->event.fd, - IOEVENT_READ, callback, timeout); + IOEVENT_READ, (IOEventCallback)sf_client_sock_read, + SF_CTX->net_buffer_cfg.network_timeout, + task->handler->use_iouring); return result > 0 ? -1 * result : result; } @@ -192,8 +197,7 @@ static inline void inc_connection_current_count() static inline int sf_nio_init(struct fast_task_info *task) { inc_connection_current_count(); - return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout); + return sf_ioevent_add(task); } int sf_socket_async_connect_check(struct fast_task_info *task) @@ -275,7 +279,7 @@ static int sf_async_connect_server(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_connect_done, SF_CTX->net_buffer_cfg. - connect_timeout); + connect_timeout, task->handler->use_iouring); return result > 0 ? -1 * result : result; } else { if (SF_CTX->callbacks.connect_done != NULL) { @@ -283,10 +287,7 @@ static int sf_async_connect_server(struct fast_task_info *task) } if (result == 0) { - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, SF_CTX-> - net_buffer_cfg.network_timeout)) != 0) - { + if ((result=sf_ioevent_add(task)) != 0) { return result; } @@ -341,10 +342,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout)) == 0) - { + if ((result=sf_ioevent_add(task)) == 0) { result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); } break; @@ -837,8 +835,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task) ioevent_set_timeout(&task->thread_data->ev_puller, task->thread_data->timeout_ms); } - result = sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout); + result = sf_ioevent_add(task); format_ip_address(task->client_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, client: %s:%u, " @@ -900,7 +897,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) return 0; } -int sf_client_sock_read(int sock, short event, void *arg) +static int sf_client_sock_read(int sock, short event, void *arg) { int result; int bytes; @@ -992,7 +989,7 @@ int sf_client_sock_read(int sock, short event, void *arg) return total_read; } -int sf_client_sock_write(int sock, short event, void *arg) +static int sf_client_sock_write(int sock, short event, void *arg) { int result; int bytes; @@ -1037,9 +1034,6 @@ int sf_client_sock_write(int sock, short event, void *arg) release_iovec_buffer(task); task->recv.ptr->offset = 0; task->recv.ptr->length = 0; - if (set_read_event(task) != 0) { - return -1; - } if (SF_CTX->callbacks.send_done == NULL || !send_done) { task->nio_stages.current = SF_NIO_STAGE_RECV; @@ -1055,6 +1049,12 @@ int sf_client_sock_write(int sock, short event, void *arg) } } + if (task->nio_stages.current == SF_NIO_STAGE_RECV) { + if (set_read_event(task) != 0) { + return -1; + } + } + break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_nio.h b/src/sf_nio.h index 64e5403..8411781 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -92,8 +92,6 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task) void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); -int sf_client_sock_write(int sock, short event, void *arg); -int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *task); @@ -149,11 +147,6 @@ static inline int sf_nio_forward_request(struct fast_task_info *task, return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED); } -static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task) -{ - return (task->event.callback == (IOEventCallback)sf_client_sock_read); -} - static inline void sf_nio_add_to_deleted_list(struct nio_thread_data *thread_data, struct fast_task_info *task) { diff --git a/src/sf_types.h b/src/sf_types.h index 2a946e2..3e7bca8 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -115,9 +115,11 @@ typedef struct sf_listener { struct sf_context; struct sf_address_family_handler; + typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; + bool use_iouring; //since v1.2.9 FCCommunicationType comm_type; struct sf_address_family_handler *fh; struct ibv_pd *pd; @@ -179,7 +181,6 @@ typedef struct sf_context { struct nio_thread_data *thread_data; volatile int thread_count; - //int rdma_port_offset; bool is_client; //since v1.2.5 SFAddressFamily address_family; SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];