diff --git a/src/sf_global.c b/src/sf_global.c index c525f11..b6fd0d0 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -477,7 +477,8 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler) } static int init_network_handler(SFContext *sf_context, - SFNetworkHandler *handler, SFAddressFamilyHandler *fh) + SFNetworkHandler *handler, SFAddressFamilyHandler *fh, + const bool use_send_zc) { handler->fh = fh; handler->inner.handler = handler; @@ -499,15 +500,17 @@ static int init_network_handler(SFContext *sf_context, handler->recv_data = sf_socket_recv_data; handler->post_recv = NULL; #if IOEVENT_USE_URING - handler->use_iouring = true; + handler->use_io_uring = true; + handler->use_send_zc = use_send_zc; #else - handler->use_iouring = false; + handler->use_io_uring = false; + handler->use_send_zc = false; #endif return 0; } else { handler->inner.id = NULL; handler->outer.id = NULL; - handler->use_iouring = false; + handler->use_io_uring = false; return load_rdma_apis(sf_context, handler); } } @@ -675,6 +678,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context, int inner_port; int outer_port; int port; + bool use_send_zc; int i; int result; @@ -711,6 +715,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context, outer_port = config->default_outer_port; } + use_send_zc = iniGetBoolValue(config->ini_ctx.section_name, + "use_send_zc", config->ini_ctx.context, false); for (i=0; ihandlers + i; fh->ctx = sf_context; @@ -734,7 +740,9 @@ int sf_load_context_from_config_ex(SFContext *sf_context, if (!handler->enabled) { continue; } - if ((result=init_network_handler(sf_context, handler, fh)) != 0) { + if ((result=init_network_handler(sf_context, handler, + fh, use_send_zc)) != 0) + { return result; } } @@ -985,11 +993,19 @@ void sf_slow_log_config_to_string(SFSlowLogConfig *slow_log_cfg, void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, char *output, const int size) { + int i; int len; int max_pkg_size; int min_buff_size; int max_buff_size; +#if IOEVENT_USE_URING + bool use_io_uring; + bool use_send_zc; +#endif char pkg_buff[256]; + SFAddressFamilyHandler *fh; + SFNetworkHandler *handler; + SFNetworkHandler *end; max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size - g_sf_global_vars.task_buffer_extra_size; @@ -1008,20 +1024,45 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, min_buff_size / 1024, max_buff_size / 1024); } +#if IOEVENT_USE_URING + use_io_uring = false; + use_send_zc = false; + for (i=0; ihandlers + SF_NETWORK_HANDLER_COUNT; + for (handler=fh->handlers; handlerenabled && handler->use_io_uring) { + use_io_uring = true; + use_send_zc = handler->use_send_zc; + break; + } + } + } +#endif + len = snprintf(output, size, "base_path=%s, max_connections=%d, connect_timeout=%d, " - "network_timeout=%d, thread_stack_size=%d KB, " - "%s, tcp_quick_ack=%d, log_level=%s, " - "run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR, + "network_timeout=%d, thread_stack_size=%d KB, %s, ", + SF_G_BASE_PATH_STR, g_sf_global_vars.net_buffer_cfg.max_connections, g_sf_global_vars.net_buffer_cfg.connect_timeout, g_sf_global_vars.net_buffer_cfg.network_timeout, - g_sf_global_vars.thread_stack_size / 1024, - pkg_buff, g_sf_global_vars.tcp_quick_ack, + g_sf_global_vars.thread_stack_size / 1024, pkg_buff); + +#if IOEVENT_USE_URING + len += snprintf(output + len, size - len, + "use_io_uring=%d, use_send_zc=%d, ", + use_io_uring, use_send_zc); +#endif + + len += snprintf(output + len, size - len, + "tcp_quick_ack=%d, " + "log_level=%s, " + "run_by_group=%s, run_by_user=%s, ", + g_sf_global_vars.tcp_quick_ack, log_get_level_caption(), g_sf_global_vars.run_by.group, - g_sf_global_vars.run_by.user - ); + g_sf_global_vars.run_by.user); sf_log_config_to_string(&g_sf_global_vars.error_log, "error-log", output + len, size - len); diff --git a/src/sf_nio.c b/src/sf_nio.c index 3da4898..a87d248 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -93,16 +93,6 @@ static inline void release_iovec_buffer(struct fast_task_info *task) void sf_task_finish_clean_up(struct fast_task_info *task) { - /* - assert(task->event.fd >= 0); - if (task->event.fd < 0) { - logWarning("file: "__FILE__", line: %d, " - "task: %p already cleaned", - __LINE__, task); - return; - } - */ - if (task->finish_callback != NULL) { task->finish_callback(task); task->finish_callback = NULL; @@ -112,6 +102,12 @@ void sf_task_finish_clean_up(struct fast_task_info *task) sf_task_detach_thread(task); task->handler->close_connection(task); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + FC_URING_OP_TYPE(task) = IORING_OP_NOP; +#endif + } + __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); sf_release_task(task); } @@ -138,27 +134,91 @@ static inline int set_write_event(struct fast_task_info *task) return 0; } +static inline int prepare_first_recv(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { + return uring_prep_recv_data(task, task->recv.ptr->data, + SF_CTX->header_size); + } else { + return uring_prep_first_recv(task); + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif +} + +static inline int prepare_next_recv(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + int recv_bytes; + + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; + return uring_prep_recv_data(task, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + return uring_prep_recv_data(task, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + return uring_prep_recv_data(task, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif +} + static inline int set_read_event(struct fast_task_info *task) { int result; - if (task->event.callback == (IOEventCallback)sf_client_sock_read) { - return 0; - } - - task->event.callback = (IOEventCallback)sf_client_sock_read; - - if (ioevent_modify(&task->thread_data->ev_puller, - task->event.fd, IOEVENT_READ, task) != 0) - { - result = errno != 0 ? errno : ENOENT; - ioevent_add_to_deleted_list(task); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { + logWarning("file: "__FILE__", line: %d, " + "trigger recv again!", __LINE__); + return 0; + } + if ((result=prepare_first_recv(task)) != 0) { + ioevent_add_to_deleted_list(task); + return result; + } + if (task->event.callback != (IOEventCallback)sf_client_sock_read) { + task->event.callback = (IOEventCallback)sf_client_sock_read; + } +#else logError("file: "__FILE__", line: %d, " - "ioevent_modify fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); - return result; + "some mistakes happen!", __LINE__); + return EBUSY; +#endif + } else { + if (task->event.callback == (IOEventCallback)sf_client_sock_read) { + return 0; + } + if (ioevent_modify(&task->thread_data->ev_puller, + task->event.fd, IOEVENT_READ, task) != 0) + { + result = errno != 0 ? errno : ENOENT; + ioevent_add_to_deleted_list(task); + + logError("file: "__FILE__", line: %d, " + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + + task->event.callback = (IOEventCallback)sf_client_sock_read; } return 0; @@ -179,7 +239,7 @@ static inline int sf_ioevent_add(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ, (IOEventCallback)sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout, - task->handler->use_iouring); + task->handler->use_io_uring); return result > 0 ? -1 * result : result; } @@ -279,7 +339,7 @@ static int sf_async_connect_server(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_connect_done, SF_CTX->net_buffer_cfg. - connect_timeout, task->handler->use_iouring); + connect_timeout, task->handler->use_io_uring); return result > 0 ? -1 * result : result; } else { if (SF_CTX->callbacks.connect_done != NULL) { @@ -328,10 +388,12 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) case SF_NIO_STAGE_RECV: task->nio_stages.current = SF_NIO_STAGE_RECV; if ((result=set_read_event(task)) == 0) { - if (sf_client_sock_read(task->event.fd, - IOEVENT_READ, task) < 0) - { - result = errno != 0 ? errno : EIO; + if (!task->handler->use_io_uring) { + if (sf_client_sock_read(task->event.fd, + IOEVENT_READ, task) < 0) + { + result = errno != 0 ? errno : EIO; + } } } break; @@ -502,8 +564,25 @@ int sf_send_add_event(struct fast_task_info *task) if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; - if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { - return errno != 0 ? errno : EIO; + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + if (task->event.callback != (IOEventCallback)sf_client_sock_write) { + task->event.callback = (IOEventCallback)sf_client_sock_write; + } + if (task->handler->use_send_zc) { + return uring_prep_first_send_zc(task); + } else { + return uring_prep_first_send(task); + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif + } else { + if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { + return errno != 0 ? errno : EIO; + } } } @@ -547,28 +626,69 @@ static inline int check_task(struct fast_task_info *task, } } +static inline int prepare_next_send(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (task->handler->use_send_zc) { + return uring_prep_next_send_zc(task); + } else { + return uring_prep_next_send(task); + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif +} + ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action, bool *send_done) { int bytes; + int result; - if (task->iovec_array.iovs != NULL) { - bytes = writev(task->event.fd, task->iovec_array.iovs, - FC_MIN(task->iovec_array.count, IOV_MAX)); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + bytes = task->event.res; +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return -1; +#endif } else { - bytes = write(task->event.fd, task->send.ptr->data + - task->send.ptr->offset, task->send.ptr->length - - task->send.ptr->offset); + if (task->iovec_array.iovs != NULL) { + bytes = writev(task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX)); + } else { + bytes = write(task->event.fd, task->send.ptr->data + + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); + } } + if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - if (set_write_event(task) != 0) { - return -1; + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + result = -bytes; +#endif + } else { + result = errno; + } + if (result == EAGAIN || result == EWOULDBLOCK) { + if (task->handler->use_io_uring) { + if (prepare_next_send(task) != 0) { + return -1; + } + } else { + if (set_write_event(task) != 0) { + return -1; + } } + *action = sf_comm_action_break; return 0; - } else if (errno == EINTR) { //should retry + } else if (result == EINTR && !task->handler->use_io_uring) { + /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); @@ -579,7 +699,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, "client ip: %s, send fail, task offset: %d, length: %d, " "errno: %d, error info: %s", __LINE__, task->client_ip, task->send.ptr->offset, task->send.ptr->length, - errno, strerror(errno)); + result, strerror(result)); return -1; } } else if (bytes == 0) { @@ -597,12 +717,15 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset = 0; task->send.ptr->length = 0; } + + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + FC_URING_OP_TYPE(task) = IORING_OP_NOP; +#endif + } *action = sf_comm_action_finish; *send_done = true; } else { - *action = sf_comm_action_continue; - *send_done = false; - /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { struct iovec *iov; @@ -631,6 +754,17 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->iovec_array.iovs = iov; task->iovec_array.count = end - iov; } + + if (task->handler->use_io_uring) { + if (prepare_next_send(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { + *action = sf_comm_action_continue; + } + + *send_done = false; } return bytes; @@ -640,30 +774,54 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, const bool call_post_recv, SFCommAction *action) { int bytes; + int result; int recv_bytes; bool new_alloc; - if (task->recv.ptr->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; - bytes = read(task->event.fd, task->recv.ptr->data + - task->recv.ptr->offset, recv_bytes); + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + bytes = task->event.res; +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return -1; +#endif } else { - recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; - if (task->recv_body == NULL) { + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; bytes = read(task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, recv_bytes); } else { - bytes = read(task->event.fd, task->recv_body + - (task->recv.ptr->offset - SF_CTX-> - header_size), recv_bytes); + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + bytes = read(task->event.fd, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } } } if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + result = -bytes; +#endif + } else { + result = errno; + } + if (result == EAGAIN || result == EWOULDBLOCK) { + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + } *action = sf_comm_action_break; return 0; - } else if (errno == EINTR) { //should retry + } else if (result == EINTR && !task->handler->use_io_uring) { + /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); @@ -674,7 +832,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, "client ip: %s, recv fail, " "errno: %d, error info: %s", __LINE__, task->client_ip, - errno, strerror(errno)); + result, strerror(result)); return -1; } } else if (bytes == 0) { @@ -706,7 +864,14 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, task->recv.ptr->offset += bytes; if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->offset < SF_CTX->header_size) { - *action = sf_comm_action_continue; + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { + *action = sf_comm_action_continue; + } return bytes; } @@ -757,9 +922,21 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, } if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done + if (task->handler->use_io_uring) { +#if IOEVENT_USE_URING + FC_URING_OP_TYPE(task) = IORING_OP_NOP; +#endif + } *action = sf_comm_action_finish; } else { - *action = sf_comm_action_continue; + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { + *action = sf_comm_action_continue; + } } return bytes; diff --git a/src/sf_types.h b/src/sf_types.h index 3e7bca8..4627cad 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -119,7 +119,8 @@ struct sf_address_family_handler; typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; - bool use_iouring; //since v1.2.9 + bool use_io_uring; //since v1.2.9 + bool use_send_zc; //since v1.2.9 FCCommunicationType comm_type; struct sf_address_family_handler *fh; struct ibv_pd *pd;