diff --git a/make.sh b/make.sh index f966463..292589c 100755 --- a/make.sh +++ b/make.sh @@ -9,10 +9,13 @@ DEBUG_FLAG=0 if [ -f /usr/include/fastcommon/_os_define.h ]; then OS_BITS=$(grep -F OS_BITS /usr/include/fastcommon/_os_define.h | awk '{print $NF;}') + USE_URING=$(grep -F IOEVENT_USE_URING /usr/include/fastcommon/_os_define.h | awk '{print $NF;}') elif [ -f /usr/local/include/fastcommon/_os_define.h ]; then OS_BITS=$(grep -F OS_BITS /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}') + USE_URING=$(grep -F IOEVENT_USE_URING /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}') else OS_BITS=64 + USE_URING='' fi uname=$(uname) @@ -49,6 +52,9 @@ LIBS='' uname=$(uname) if [ "$uname" = "Linux" ]; then CFLAGS="$CFLAGS" + if [ -n "$USE_URING" ]; then + LIBS="$LIBS -luring" + fi elif [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then CFLAGS="$CFLAGS" if [ "$uname" = "Darwin" ]; then diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 118c937..c5f9225 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -174,7 +174,6 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, const uint32_t hash_code, const FCCommunicationType comm_type, const char *server_ip, const uint16_t port, int *err_no) { - int len; struct fast_task_info *task; SFAddressFamilyHandler *fh; SFNetworkHandler *handler; @@ -195,12 +194,7 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel return NULL; } - len = strlen(server_ip); - if (len >= sizeof(task->server_ip)) { - len = sizeof(task->server_ip) - 1; - } - memcpy(task->server_ip, server_ip, len); - *(task->server_ip + len) = '\0'; + fc_safe_strcpy(task->server_ip, server_ip); task->port = port; task->arg = channel; task->thread_data = g_sf_context.thread_data + @@ -209,7 +203,8 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel channel->last_connect_time = g_current_time; if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { channel->in_ioevent = 0; //rollback - sf_release_task(task); + __sync_sub_and_fetch(&task->reffer_count, 1); + free_queue_push(task); return NULL; } return task; @@ -221,6 +216,12 @@ int idempotency_client_channel_check_reconnect( int result; char formatted_ip[FORMATTED_IP_SIZE]; +#if IOEVENT_USE_URING + if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) { + return 0; + } +#endif + if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { return 0; } @@ -237,6 +238,9 @@ int idempotency_client_channel_check_reconnect( formatted_ip, channel->task->port); } + if (channel->task->event.fd >= 0) { + channel->task->handler->close_connection(channel->task); + } __sync_bool_compare_and_swap(&channel->task->canceled, 1, 0); if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { channel->last_connect_time = g_current_time; @@ -348,8 +352,8 @@ int idempotency_client_channel_push(struct idempotency_client_channel *channel, receipt->req_id = req_id; fc_queue_push_ex(&channel->queue, receipt, ¬ify); if (notify) { - if (__sync_add_and_fetch(&channel->in_ioevent, 0)) { - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->in_ioevent)) { + if (FC_ATOMIC_GET(channel->established)) { sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE); } } else { diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index 8fa82be..44de026 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -76,13 +76,13 @@ static inline void idempotency_client_channel_set_id_key( static inline int idempotency_client_channel_check_wait_ex( struct idempotency_client_channel *channel, const int timeout) { - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { return 0; } idempotency_client_channel_check_reconnect(channel); lcp_timedwait_sec(&channel->lcp, timeout); - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { return 0; } else { /* diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 0a44d01..061d3ef 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -49,6 +49,10 @@ static IdempotencyReceiptGlobalVars receipt_global_vars; static int receipt_init_task(struct fast_task_info *task, void *arg) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = true; +#endif + if (RDMA_INIT_CONNECTION != NULL) { return RDMA_INIT_CONNECTION(task, arg); } else { @@ -92,7 +96,6 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) if (task->event.fd >= 0) { sf_task_detach_thread(task); - task->handler->close_connection(task); } sf_nio_reset_task_length(task); @@ -282,7 +285,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) } channel = (IdempotencyClientChannel *)task->arg; - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { format_ip_address(task->server_ip, formatted_ip); logWarning("file: "__FILE__", line: %d, " "response from server %s:%u, unexpected cmd: " @@ -520,6 +523,7 @@ static int do_init(FCAddressPtrArray *address_array) { const int task_arg_size = 0; const bool double_buffers = false; + const bool need_shrink_task_buffer = false; const bool explicit_post_recv = false; int result; int bytes; @@ -552,8 +556,8 @@ static int do_init(FCAddressPtrArray *address_array) NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, - task_arg_size, double_buffers, explicit_post_recv, - receipt_init_task, pd, NULL); + task_arg_size, double_buffers, need_shrink_task_buffer, + explicit_post_recv, receipt_init_task, pd, NULL); } int receipt_handler_init(FCAddressPtrArray *address_array) @@ -565,7 +569,6 @@ int receipt_handler_init(FCAddressPtrArray *address_array) } sf_enable_thread_notify(true); - sf_set_remove_from_ready_list(false); fc_sleep_ms(100); return 0; diff --git a/src/sf_global.c b/src/sf_global.c index 26c7046..f6a3aa1 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -51,7 +51,7 @@ SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}, {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, {DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, - SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, true, + SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, {false, 0, 0}, {sf_task_finish_clean_up} }; @@ -477,7 +477,8 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler) } static int init_network_handler(SFContext *sf_context, - SFNetworkHandler *handler, SFAddressFamilyHandler *fh) + SFNetworkHandler *handler, SFAddressFamilyHandler *fh, + const bool use_send_zc) { handler->fh = fh; handler->inner.handler = handler; @@ -498,10 +499,18 @@ static int init_network_handler(SFContext *sf_context, handler->send_data = sf_socket_send_data; handler->recv_data = sf_socket_recv_data; handler->post_recv = NULL; +#if IOEVENT_USE_URING + handler->use_io_uring = true; + handler->use_send_zc = use_send_zc; +#else + handler->use_io_uring = false; + handler->use_send_zc = false; +#endif return 0; } else { handler->inner.id = NULL; handler->outer.id = NULL; + handler->use_io_uring = false; return load_rdma_apis(sf_context, handler); } } @@ -669,6 +678,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context, int inner_port; int outer_port; int port; + bool global_use_send_zc; + bool use_send_zc; int i; int result; @@ -705,6 +716,15 @@ int sf_load_context_from_config_ex(SFContext *sf_context, outer_port = config->default_outer_port; } + global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc", + config->ini_ctx.context, true); + if (config->ini_ctx.section_name == NULL) { + use_send_zc = global_use_send_zc; + } else { + use_send_zc = iniGetBoolValue(config->ini_ctx.section_name, + "use_send_zc", config->ini_ctx.context, global_use_send_zc); + } + for (i=0; ihandlers + i; fh->ctx = sf_context; @@ -728,7 +748,9 @@ int sf_load_context_from_config_ex(SFContext *sf_context, if (!handler->enabled) { continue; } - if ((result=init_network_handler(sf_context, handler, fh)) != 0) { + if ((result=init_network_handler(sf_context, handler, + fh, use_send_zc)) != 0) + { return result; } } @@ -885,6 +907,31 @@ static const char *get_address_family_caption( } } +#if IOEVENT_USE_URING +static void get_io_uring_configs(const SFContext *sf_context, + bool *use_io_uring, bool *use_send_zc) +{ + int i; + const SFAddressFamilyHandler *fh; + const SFNetworkHandler *handler; + const SFNetworkHandler *end; + + *use_io_uring = false; + *use_send_zc = false; + for (i=0; ihandlers + i; + end = fh->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=fh->handlers; handlerenabled && handler->use_io_uring) { + *use_io_uring = true; + *use_send_zc = handler->use_send_zc; + return; + } + } + } +} +#endif + void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { @@ -894,6 +941,10 @@ void sf_context_config_to_string(const SFContext *sf_context, char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2]; int i; int len; +#if IOEVENT_USE_URING + bool use_io_uring; + bool use_send_zc; +#endif *inner_bind_addr = '\0'; *outer_bind_addr = '\0'; @@ -934,6 +985,12 @@ void sf_context_config_to_string(const SFContext *sf_context, ", address_family=%s, accept_threads=%d, work_threads=%d", get_address_family_caption(sf_context->address_family), sf_context->accept_threads, sf_context->work_threads); + +#if IOEVENT_USE_URING + get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc); + len += snprintf(output + len, size - len, ", use_io_uring=%d" + ", use_send_zc=%d", use_io_uring, use_send_zc); +#endif } void sf_log_config_to_string_ex(SFLogConfig *log_cfg, const char *caption, @@ -983,6 +1040,10 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, int max_pkg_size; int min_buff_size; int max_buff_size; +#if IOEVENT_USE_URING + bool use_io_uring; + bool use_send_zc; +#endif char pkg_buff[256]; max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size - @@ -1004,18 +1065,28 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, len = snprintf(output, size, "base_path=%s, max_connections=%d, connect_timeout=%d, " - "network_timeout=%d, thread_stack_size=%d KB, " - "%s, tcp_quick_ack=%d, log_level=%s, " - "run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR, + "network_timeout=%d, thread_stack_size=%d KB, %s, ", + SF_G_BASE_PATH_STR, g_sf_global_vars.net_buffer_cfg.max_connections, g_sf_global_vars.net_buffer_cfg.connect_timeout, g_sf_global_vars.net_buffer_cfg.network_timeout, - g_sf_global_vars.thread_stack_size / 1024, - pkg_buff, g_sf_global_vars.tcp_quick_ack, + g_sf_global_vars.thread_stack_size / 1024, pkg_buff); + +#if IOEVENT_USE_URING + get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc); + len += snprintf(output + len, size - len, + "use_io_uring=%d, use_send_zc=%d, ", + use_io_uring, use_send_zc); +#endif + + len += snprintf(output + len, size - len, + "tcp_quick_ack=%d, " + "log_level=%s, " + "run_by_group=%s, run_by_user=%s, ", + g_sf_global_vars.tcp_quick_ack, log_get_level_caption(), g_sf_global_vars.run_by.group, - g_sf_global_vars.run_by.user - ); + g_sf_global_vars.run_by.user); sf_log_config_to_string(&g_sf_global_vars.error_log, "error-log", output + len, size - len); diff --git a/src/sf_nio.c b/src/sf_nio.c index 1d4d60f..e92f190 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -41,6 +41,9 @@ #include "sf_service.h" #include "sf_nio.h" +static int sf_client_sock_write(int sock, const int event, void *arg); +static int sf_client_sock_read(int sock, const int event, void *arg); + void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, @@ -59,19 +62,45 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_context->callbacks.release_buffer = release_buffer_callback; } +#if IOEVENT_USE_URING +#define CLEAR_OP_TYPE_AND_RELEASE_TASK(task) \ + FC_URING_OP_TYPE(task) = IORING_OP_NOP; \ + sf_release_task(task) + +static int sf_uring_cancel_done(int sock, const int event, void *arg) +{ + struct fast_task_info *task; + + task = (struct fast_task_info *)arg; + if (event != IOEVENT_TIMEOUT) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } + return 0; +} +#endif + void sf_task_detach_thread(struct fast_task_info *task) { +#if IOEVENT_USE_URING + bool need_cancel; + if (task->handler->use_io_uring) { + need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP); + } else { + need_cancel = true; + } + if (need_cancel) { + task->event.callback = (IOEventCallback)sf_uring_cancel_done; + uring_prep_cancel(task); + } +#else ioevent_detach(&task->thread_data->ev_puller, task->event.fd); +#endif if (task->event.timer.expires > 0) { fast_timer_remove(&task->thread_data->timer, &task->event.timer); task->event.timer.expires = 0; } - - if (SF_CTX->remove_from_ready_list) { - ioevent_remove(&task->thread_data->ev_puller, task); - } } void sf_task_switch_thread(struct fast_task_info *task, @@ -92,18 +121,24 @@ static inline void release_iovec_buffer(struct fast_task_info *task) } } +void sf_socket_close_connection(struct fast_task_info *task) +{ +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (uring_prep_close_fd(task) != 0) { + close(task->event.fd); + } + } else { +#endif + close(task->event.fd); +#if IOEVENT_USE_URING + } +#endif + task->event.fd = -1; +} + void sf_task_finish_clean_up(struct fast_task_info *task) { - /* - assert(task->event.fd >= 0); - if (task->event.fd < 0) { - logWarning("file: "__FILE__", line: %d, " - "task: %p already cleaned", - __LINE__, task); - return; - } - */ - if (task->finish_callback != NULL) { task->finish_callback(task); task->finish_callback = NULL; @@ -111,9 +146,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task) release_iovec_buffer(task); sf_task_detach_thread(task); - task->handler->close_connection(task); - __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); +#if IOEVENT_USE_URING + if (!task->handler->use_io_uring) { +#endif + task->handler->close_connection(task); + __sync_fetch_and_sub(&g_sf_global_vars. + connection_stat.current_count, 1); + +#if IOEVENT_USE_URING + } +#endif + sf_release_task(task); } @@ -139,46 +183,116 @@ static inline int set_write_event(struct fast_task_info *task) return 0; } +#if IOEVENT_USE_URING +static inline int prepare_first_recv(struct fast_task_info *task) +{ + if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { + return uring_prep_recv_data(task, task->recv.ptr->data, + SF_CTX->header_size); + } else { + return uring_prep_first_recv(task); + } +} + +static inline int prepare_next_recv(struct fast_task_info *task) +{ + int recv_bytes; + + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; + return uring_prep_recv_data(task, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + return uring_prep_recv_data(task, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + return uring_prep_recv_data(task, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } + } +} +#endif + static inline int set_read_event(struct fast_task_info *task) { int result; - if (task->event.callback == (IOEventCallback)sf_client_sock_read) { - return 0; - } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) { + if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) { + logWarning("file: "__FILE__", line: %d, " + "trigger recv again!", __LINE__); + return 0; + } else { + logWarning("file: "__FILE__", line: %d, " + "another operation in progress, op_type: %d!", + __LINE__, FC_URING_OP_TYPE(task)); + return EBUSY; + } + } - task->event.callback = (IOEventCallback)sf_client_sock_read; - if (ioevent_modify(&task->thread_data->ev_puller, - task->event.fd, IOEVENT_READ, task) != 0) - { - result = errno != 0 ? errno : ENOENT; - ioevent_add_to_deleted_list(task); + if (task->event.callback != (IOEventCallback)sf_client_sock_read) { + task->event.callback = (IOEventCallback)sf_client_sock_read; + } + if ((result=prepare_first_recv(task)) != 0) { + ioevent_add_to_deleted_list(task); + return result; + } + } else { +#endif - logError("file: "__FILE__", line: %d, " - "ioevent_modify fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); - return result; + if (task->event.callback == (IOEventCallback)sf_client_sock_read) { + return 0; + } + task->event.callback = (IOEventCallback)sf_client_sock_read; + if (ioevent_modify(&task->thread_data->ev_puller, + task->event.fd, IOEVENT_READ, task) != 0) + { + result = errno != 0 ? errno : ENOENT; + ioevent_add_to_deleted_list(task); + + logError("file: "__FILE__", line: %d, " + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + +#if IOEVENT_USE_URING } +#endif return 0; } int sf_set_read_event(struct fast_task_info *task) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + return 0; + } +#endif + + /* reset recv offset and length */ task->recv.ptr->offset = 0; task->recv.ptr->length = 0; + task->nio_stages.current = SF_NIO_STAGE_RECV; return set_read_event(task); } -static inline int sf_ioevent_add(struct fast_task_info *task, - IOEventCallback callback, const int timeout) +static inline int sf_ioevent_add(struct fast_task_info *task) { int result; result = ioevent_set(task, task->thread_data, task->event.fd, - IOEVENT_READ, callback, timeout); + IOEVENT_READ, (IOEventCallback)sf_client_sock_read, + SF_CTX->net_buffer_cfg.network_timeout, + task->handler->use_io_uring); return result > 0 ? -1 * result : result; } @@ -196,8 +310,7 @@ static inline void inc_connection_current_count() static inline int sf_nio_init(struct fast_task_info *task) { inc_connection_current_count(); - return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout); + return sf_ioevent_add(task); } int sf_socket_async_connect_check(struct fast_task_info *task) @@ -212,7 +325,7 @@ int sf_socket_async_connect_check(struct fast_task_info *task) return result; } -static int sf_client_connect_done(int sock, short event, void *arg) +static int sf_client_connect_done(int sock, const int event, void *arg) { int result; struct fast_task_info *task; @@ -220,16 +333,31 @@ static int sf_client_connect_done(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if (task->canceled) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return ENOTCONN; } - if (event & IOEVENT_TIMEOUT) { + if (event == IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { - result = task->handler->async_connect_check(task); - if (result == EINPROGRESS) { - return 0; +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + result = (task->event.res < 0 ? -1 * task->event.res : + task->event.res); + } else { +#endif + result = task->handler->async_connect_check(task); + if (result == EINPROGRESS) { + return 0; + } +#if IOEVENT_USE_URING } +#endif } if (SF_CTX->callbacks.connect_done != NULL) { @@ -260,14 +388,26 @@ static int sf_client_connect_done(int sock, short event, void *arg) int sf_socket_async_connect_server(struct fast_task_info *task) { int result; - if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, - O_NONBLOCK, NULL, &result)) < 0) - { - return result > 0 ? -1 * result : result; - } - return asyncconnectserverbyip(task->event.fd, - task->server_ip, task->port); +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if ((result=uring_prep_connect(task)) != 0) { + return result; + } + return EINPROGRESS; + } else { +#endif + if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, + O_NONBLOCK, NULL, &result)) < 0) + { + return result > 0 ? -1 * result : result; + } + + return asyncconnectserverbyip(task->event.fd, + task->server_ip, task->port); +#if IOEVENT_USE_URING + } +#endif } static int sf_async_connect_server(struct fast_task_info *task) @@ -279,7 +419,7 @@ static int sf_async_connect_server(struct fast_task_info *task) result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_connect_done, SF_CTX->net_buffer_cfg. - connect_timeout); + connect_timeout, task->handler->use_io_uring); return result > 0 ? -1 * result : result; } else { if (SF_CTX->callbacks.connect_done != NULL) { @@ -287,10 +427,7 @@ static int sf_async_connect_server(struct fast_task_info *task) } if (result == 0) { - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, SF_CTX-> - net_buffer_cfg.network_timeout)) != 0) - { + if ((result=sf_ioevent_add(task)) != 0) { return result; } @@ -331,10 +468,12 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) case SF_NIO_STAGE_RECV: task->nio_stages.current = SF_NIO_STAGE_RECV; if ((result=set_read_event(task)) == 0) { - if (sf_client_sock_read(task->event.fd, - IOEVENT_READ, task) < 0) - { - result = errno != 0 ? errno : EIO; + if (!task->handler->use_io_uring) { + if (sf_client_sock_read(task->event.fd, + IOEVENT_READ, task) < 0) + { + result = errno != 0 ? errno : EIO; + } } } break; @@ -345,10 +484,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, - SF_CTX->net_buffer_cfg.network_timeout)) == 0) - { + if ((result=sf_ioevent_add(task)) == 0) { result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); } break; @@ -435,7 +571,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " - "write eventfd %d fail, errno: %d, error info: %s", + "write to fd %d fail, errno: %d, error info: %s", __LINE__, FC_NOTIFY_WRITE_FD(task->thread_data), result, STRERROR(result)); return result; @@ -463,26 +599,32 @@ static inline void deal_notified_task(struct fast_task_info *task, } } -void sf_recv_notify_read(int sock, short event, void *arg) +void sf_recv_notify_read(int fd, const int event, void *arg) { int64_t n; int stage; - struct nio_thread_data *thread_data; + struct ioevent_notify_entry *notify_entry; struct fast_task_info *task; struct fast_task_info *current; - thread_data = ((struct ioevent_notify_entry *)arg)->thread_data; - if (read(sock, &n, sizeof(n)) < 0) { + notify_entry = (struct ioevent_notify_entry *)arg; + if (read(fd, &n, sizeof(n)) < 0) { +#if IOEVENT_USE_URING + if (errno == EAGAIN) { + return; + } +#endif + logWarning("file: "__FILE__", line: %d, " - "read from eventfd %d fail, errno: %d, error info: %s", - __LINE__, sock, errno, STRERROR(errno)); + "read from fd %d fail, errno: %d, error info: %s", + __LINE__, fd, errno, STRERROR(errno)); } - PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); - current = thread_data->waiting_queue.head; - thread_data->waiting_queue.head = NULL; - thread_data->waiting_queue.tail = NULL; - PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); + PTHREAD_MUTEX_LOCK(¬ify_entry->thread_data->waiting_queue.lock); + current = notify_entry->thread_data->waiting_queue.head; + notify_entry->thread_data->waiting_queue.head = NULL; + notify_entry->thread_data->waiting_queue.tail = NULL; + PTHREAD_MUTEX_UNLOCK(¬ify_entry->thread_data->waiting_queue.lock); while (current != NULL) { task = current; @@ -508,16 +650,31 @@ int sf_send_add_event(struct fast_task_info *task) if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; - if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { - return errno != 0 ? errno : EIO; +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (task->event.callback != (IOEventCallback)sf_client_sock_write) { + task->event.callback = (IOEventCallback)sf_client_sock_write; + } + if (task->handler->use_send_zc) { + return uring_prep_first_send_zc(task); + } else { + return uring_prep_first_send(task); + } + } else { +#endif + if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { + return errno != 0 ? errno : EIO; + } +#if IOEVENT_USE_URING } +#endif } return 0; } static inline int check_task(struct fast_task_info *task, - const short event, const int expect_stage) + const int event, const int expect_stage) { if (task->canceled) { return ENOTCONN; @@ -537,6 +694,18 @@ static inline int check_task(struct fast_task_info *task, return 0; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, event: %d, expect stage: %d, " + "but current stage: %d, close connection", + __LINE__, task->client_ip, event, + expect_stage, task->nio_stages.current); + ioevent_add_to_deleted_list(task); + return -1; + } +#endif + if (task->handler->comm_type == fc_comm_type_sock) { if (tcp_socket_connected(task->event.fd)) { return EAGAIN; @@ -553,28 +722,69 @@ static inline int check_task(struct fast_task_info *task, } } +#if IOEVENT_USE_URING +static inline int prepare_next_send(struct fast_task_info *task) +{ + if (task->handler->use_send_zc) { + return uring_prep_next_send_zc(task); + } else { + return uring_prep_next_send(task); + } +} +#endif + ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action, bool *send_done) { int bytes; + int result; - if (task->iovec_array.iovs != NULL) { - bytes = writev(task->event.fd, task->iovec_array.iovs, - FC_MIN(task->iovec_array.count, IOV_MAX)); +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + bytes = task->event.res; } else { - bytes = write(task->event.fd, task->send.ptr->data + - task->send.ptr->offset, task->send.ptr->length - - task->send.ptr->offset); +#endif + if (task->iovec_array.iovs != NULL) { + bytes = writev(task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX)); + } else { + bytes = write(task->event.fd, task->send.ptr->data + + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); + } +#if IOEVENT_USE_URING } +#endif + if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - if (set_write_event(task) != 0) { - return -1; +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + result = -bytes; + } else { +#endif + result = errno; +#if IOEVENT_USE_URING + } +#endif + if (result == EAGAIN || result == EWOULDBLOCK) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (prepare_next_send(task) != 0) { + return -1; + } + } else { +#endif + if (set_write_event(task) != 0) { + return -1; + } +#if IOEVENT_USE_URING } +#endif + *action = sf_comm_action_break; return 0; - } else if (errno == EINTR) { //should retry + } else if (result == EINTR && !task->handler->use_io_uring) { + /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); @@ -585,30 +795,40 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, "client ip: %s, send fail, task offset: %d, length: %d, " "errno: %d, error info: %s", __LINE__, task->client_ip, task->send.ptr->offset, task->send.ptr->length, - errno, strerror(errno)); + result, strerror(result)); return -1; } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, task length: %d, offset: %d, " + "client ip: %s, task length: %d, offset: %d, " "send failed, connection disconnected", __LINE__, - task->client_ip, task->event.fd, task->send.ptr->length, + task->client_ip, task->send.ptr->length, task->send.ptr->offset); return -1; } task->send.ptr->offset += bytes; if (task->send.ptr->offset >= task->send.ptr->length) { - if (task->send.ptr != task->recv.ptr) { //double buffers - task->send.ptr->offset = 0; - task->send.ptr->length = 0; - } - *action = sf_comm_action_finish; - *send_done = true; - } else { - *action = sf_comm_action_continue; - *send_done = false; +#if IOEVENT_USE_URING + if (FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify) + { + *action = sf_comm_action_break; + *send_done = false; + } else { +#endif + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } + *action = sf_comm_action_finish; + *send_done = true; +#if IOEVENT_USE_URING + } +#endif + + } else { /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { struct iovec *iov; @@ -637,6 +857,25 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->iovec_array.iovs = iov; task->iovec_array.count = end - iov; } + +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify)) + { + if (prepare_next_send(task) != 0) { + return -1; + } + } + *action = sf_comm_action_break; + } else { +#endif + *action = sf_comm_action_continue; +#if IOEVENT_USE_URING + } +#endif + + *send_done = false; } return bytes; @@ -646,30 +885,56 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, const bool call_post_recv, SFCommAction *action) { int bytes; + int result; int recv_bytes; bool new_alloc; - if (task->recv.ptr->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; - bytes = read(task->event.fd, task->recv.ptr->data + - task->recv.ptr->offset, recv_bytes); +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + bytes = task->event.res; } else { - recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; - if (task->recv_body == NULL) { +#endif + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; bytes = read(task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, recv_bytes); } else { - bytes = read(task->event.fd, task->recv_body + - (task->recv.ptr->offset - SF_CTX-> - header_size), recv_bytes); + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + bytes = read(task->event.fd, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } } +#if IOEVENT_USE_URING } +#endif if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + result = -bytes; + } else { +#endif + result = errno; +#if IOEVENT_USE_URING + } +#endif + if (result == EAGAIN || result == EWOULDBLOCK) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + } +#endif *action = sf_comm_action_break; return 0; - } else if (errno == EINTR) { //should retry + } else if (result == EINTR && !task->handler->use_io_uring) { + /* should try again */ logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); @@ -680,7 +945,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, "client ip: %s, recv fail, " "errno: %d, error info: %s", __LINE__, task->client_ip, - errno, strerror(errno)); + result, strerror(result)); return -1; } } else if (bytes == 0) { @@ -712,7 +977,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, task->recv.ptr->offset += bytes; if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->offset < SF_CTX->header_size) { - *action = sf_comm_action_continue; +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { +#endif + *action = sf_comm_action_continue; +#if IOEVENT_USE_URING + } +#endif return bytes; } @@ -765,7 +1041,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done *action = sf_comm_action_finish; } else { - *action = sf_comm_action_continue; +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + if (prepare_next_recv(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + } else { +#endif + *action = sf_comm_action_continue; +#if IOEVENT_USE_URING + } +#endif } return bytes; @@ -841,8 +1128,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task) ioevent_set_timeout(&task->thread_data->ev_puller, task->thread_data->timeout_ms); } - result = sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout); + result = sf_ioevent_add(task); format_ip_address(task->client_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, client: %s:%u, " @@ -904,7 +1190,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) return 0; } -int sf_client_sock_read(int sock, short event, void *arg) +static int sf_client_sock_read(int sock, const int event, void *arg) { int result; int bytes; @@ -914,10 +1200,15 @@ int sf_client_sock_read(int sock, short event, void *arg) task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif return result >= 0 ? 0 : -1; } - if (event & IOEVENT_TIMEOUT) { + if (event == IOEVENT_TIMEOUT) { if (task->recv.ptr->offset == 0 && task->req_count > 0) { if (SF_CTX->callbacks.task_timeout != NULL) { if (SF_CTX->callbacks.task_timeout(task) != 0) { @@ -929,7 +1220,8 @@ int sf_client_sock_read(int sock, short event, void *arg) task->event.timer.expires = g_current_time + SF_CTX->net_buffer_cfg.network_timeout; fast_timer_add(&task->thread_data->timer, - &task->event.timer); + &task->event.timer); + return 0; } else { if (task->recv.ptr->length > 0) { logWarning("file: "__FILE__", line: %d, " @@ -946,10 +1238,14 @@ int sf_client_sock_read(int sock, short event, void *arg) ioevent_add_to_deleted_list(task); return -1; } - - return 0; } +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } +#endif + total_read = 0; action = sf_comm_action_continue; while (1) { @@ -996,23 +1292,63 @@ int sf_client_sock_read(int sock, short event, void *arg) return total_read; } -int sf_client_sock_write(int sock, short event, void *arg) +static int sock_write_done(struct fast_task_info *task, + const int length, const bool send_done) +{ + int next_stage; + + release_iovec_buffer(task); + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + + if (SF_CTX->callbacks.send_done == NULL || !send_done) { + task->nio_stages.current = SF_NIO_STAGE_RECV; + } else { + if (SF_CTX->callbacks.send_done(task, + length, &next_stage) != 0) + { + return -1; + } + + if (task->nio_stages.current != next_stage) { + task->nio_stages.current = next_stage; + } + } + + if (task->nio_stages.current == SF_NIO_STAGE_RECV) { + if (set_read_event(task) != 0) { + return -1; + } + } + + return 0; +} + +static int sf_client_sock_write(int sock, const int event, void *arg) { int result; int bytes; int total_write; int length; - int next_stage; SFCommAction action; bool send_done; struct fast_task_info *task; task = (struct fast_task_info *)arg; if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { + if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) && + task->thread_data->ev_puller.send_zc_done_notify)) + { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } + } +#endif return result >= 0 ? 0 : -1; } - if (event & IOEVENT_TIMEOUT) { + if (event == IOEVENT_TIMEOUT) { logError("file: "__FILE__", line: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, " "remain: %d", __LINE__, task->client_ip, task->send.ptr->length, @@ -1023,6 +1359,42 @@ int sf_client_sock_write(int sock, short event, void *arg) return -1; } +#if IOEVENT_USE_URING + if (event == IOEVENT_NOTIFY) { + if (!FC_URING_IS_SEND_ZC(task)) { + logWarning("file: "__FILE__", line: %d, " + "unexpected io_uring notify!", __LINE__); + return -1; + } + + FC_URING_OP_TYPE(task) = IORING_OP_NOP; + if (!task->canceled) { + if (task->send.ptr->offset >= task->send.ptr->length) { + length = task->send.ptr->length; + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } + + result = sock_write_done(task, length, true); + } else { + result = prepare_next_send(task); + } + } + + sf_release_task(task); + return result == 0 ? 0 : -1; + } + + if (task->handler->use_io_uring) { + if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify)) + { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } + } +#endif + total_write = 0; length = task->send.ptr->length; action = sf_comm_action_continue; @@ -1038,27 +1410,9 @@ int sf_client_sock_write(int sock, short event, void *arg) total_write += bytes; if (action == sf_comm_action_finish) { - release_iovec_buffer(task); - task->recv.ptr->offset = 0; - task->recv.ptr->length = 0; - if (set_read_event(task) != 0) { + if (sock_write_done(task, length, send_done) != 0) { return -1; } - - if (SF_CTX->callbacks.send_done == NULL || !send_done) { - task->nio_stages.current = SF_NIO_STAGE_RECV; - } else { - if (SF_CTX->callbacks.send_done(task, - length, &next_stage) != 0) - { - return -1; - } - - if (task->nio_stages.current != next_stage) { - task->nio_stages.current = next_stage; - } - } - break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_nio.h b/src/sf_nio.h index 2e93f3f..03e756d 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -68,15 +68,6 @@ static inline void sf_set_connect_done_callback_ex(SFContext *sf_context, sf_set_connect_done_callback_ex(&g_sf_context, done_callback) -static inline void sf_set_remove_from_ready_list_ex( - SFContext *sf_context, const bool enabled) -{ - sf_context->remove_from_ready_list = enabled; -} - -#define sf_set_remove_from_ready_list(enabled) \ - sf_set_remove_from_ready_list_ex(&g_sf_context, enabled); - static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex( SFContext *sf_context) { @@ -99,10 +90,9 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task) } } -void sf_recv_notify_read(int sock, short event, void *arg); +void sf_socket_close_connection(struct fast_task_info *task); +void sf_recv_notify_read(int sock, const int event, void *arg); int sf_send_add_event(struct fast_task_info *task); -int sf_client_sock_write(int sock, short event, void *arg); -int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *task); @@ -158,11 +148,6 @@ static inline int sf_nio_forward_request(struct fast_task_info *task, return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED); } -static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task) -{ - return (task->event.callback == (IOEventCallback)sf_client_sock_read); -} - static inline void sf_nio_add_to_deleted_list(struct nio_thread_data *thread_data, struct fast_task_info *task) { diff --git a/src/sf_service.c b/src/sf_service.c index 9f1f0b6..7907aef 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -34,8 +34,8 @@ #include "fastcommon/ioevent_loop.h" #include "fastcommon/fc_memory.h" #include "sf_nio.h" +#include "sf_proto.h" #include "sf_util.h" -#include "sf_global.h" #include "sf_service.h" #if defined(OS_LINUX) @@ -61,12 +61,14 @@ struct worker_thread_context { static void *worker_thread_entrance(void *arg); static int sf_init_free_queue(SFContext *sf_context, const char *name, - const bool double_buffers, const int task_padding_size, - const int task_arg_size, TaskInitCallback init_callback, - void *init_arg) + const bool double_buffers, const bool need_shrink_task_buffer, + const int task_padding_size, const int task_arg_size, + TaskInitCallback init_callback, void *init_arg) { int result; + int buffer_size; int m; + int max_m; int alloc_conn_once; if ((result=set_rand_seed()) != 0) { @@ -75,17 +77,26 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, return result; } - m = sf_context->net_buffer_cfg.min_buff_size / (64 * 1024); + if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) { + buffer_size = FC_MAX(4 * 1024 * 1024, sf_context-> + net_buffer_cfg.max_buff_size); + max_m = 64; + } else { + buffer_size = sf_context->net_buffer_cfg.min_buff_size; + max_m = 16; + } + m = buffer_size / (64 * 1024); if (m == 0) { m = 1; - } else if (m > 16) { - m = 16; + } else if (m > max_m) { + m = max_m; } alloc_conn_once = 256 / m; + return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, - sf_context->net_buffer_cfg.max_connections, alloc_conn_once, - sf_context->net_buffer_cfg.min_buff_size, sf_context-> - net_buffer_cfg.max_buff_size, task_padding_size, + need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections, + alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, + sf_context->net_buffer_cfg.max_buff_size, task_padding_size, task_arg_size, init_callback, init_arg); } @@ -101,12 +112,14 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - const bool explicit_post_recv, TaskInitCallback init_callback, - void *init_arg, sf_release_buffer_callback release_buffer_callback) + const bool need_shrink_task_buffer, const bool explicit_post_recv, + TaskInitCallback init_callback, void *init_arg, + sf_release_buffer_callback release_buffer_callback) { int result; int bytes; int extra_events; + int max_entries; int i; struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_ctx; @@ -132,8 +145,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, } if ((result=sf_init_free_queue(sf_context, name, double_buffers, - task_padding_size, task_arg_size, init_callback, - init_arg)) != 0) + need_shrink_task_buffer, task_padding_size, + task_arg_size, init_callback, init_arg)) != 0) { return result; } @@ -161,7 +174,11 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, if (SF_G_EPOLL_EDGE_TRIGGER) { #ifdef OS_LINUX +#if IOEVENT_USE_EPOLL extra_events = EPOLLET; +#else + extra_events = 0; +#endif #elif defined(OS_FREEBSD) extra_events = EV_CLEAR; #else @@ -171,6 +188,36 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, extra_events = 0; } + max_entries = (sf_context->net_buffer_cfg.max_connections + + sf_context->work_threads - 1) / sf_context->work_threads; + if (strcmp(sf_context->name, "service") == 0) { + if (max_entries < 4 * 1024) { + max_entries = max_entries * 2; + } else if (max_entries < 8 * 1024) { + max_entries = (max_entries * 3) / 2; + } else if (max_entries < 16 * 1024) { + max_entries = (max_entries * 5) / 4; + } else if (max_entries < 32 * 1024) { + max_entries = (max_entries * 6) / 5; +#if IOEVENT_USE_URING + if (max_entries > 32 * 1024) { + max_entries = 32 * 1024; + } +#else + } else if (max_entries < 64 * 1024) { + max_entries = (max_entries * 11) / 10; + } else if (max_entries < 128 * 1024) { + max_entries = (max_entries * 21) / 20; +#endif + } + } else { + if (max_entries < 1024) { + max_entries += 8; + } else { + max_entries = 1024; + } + } + g_current_time = time(NULL); sf_context->thread_count = 0; data_end = sf_context->thread_data + sf_context->work_threads; @@ -195,18 +242,37 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_data->arg = NULL; } - if (ioevent_init(&thread_data->ev_puller, 2 + sf_context-> - net_buffer_cfg.max_connections, net_timeout_ms, - extra_events) != 0) + if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name, + max_entries, net_timeout_ms, extra_events)) != 0) { - result = errno != 0 ? errno : ENOMEM; + char prompt[256]; +#if IOEVENT_USE_URING + if (result == EPERM) { + strcpy(prompt, " make sure kernel.io_uring_disabled set to 0"); + } else if (result == EINVAL) { + sprintf(prompt, " maybe max_connections: %d is too large" + " or [%s]'s work_threads: %d is too small", + sf_context->net_buffer_cfg.max_connections, + sf_context->name, sf_context->work_threads); + } else { + *prompt = '\0'; + } +#else + *prompt = '\0'; +#endif + logError("file: "__FILE__", line: %d, " - "ioevent_init fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); + "ioevent_init fail, errno: %d, error info: %s.%s" + , __LINE__, result, strerror(result), prompt); return result; } +#if IOEVENT_USE_URING + if (send_done_callback != NULL) { + ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true); + } +#endif + result = fast_timer_init(&thread_data->timer, 2 * sf_context-> net_buffer_cfg.network_timeout, g_current_time); if (result != 0) { @@ -487,7 +553,9 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener) } FC_SET_CLOEXEC(incomesock); - if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) { + if ((task=sf_alloc_init_server_task(listener->handler, + incomesock)) == NULL) + { close(incomesock); return NULL; } @@ -498,12 +566,6 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener) return task; } -void sf_socket_close_connection(struct fast_task_info *task) -{ - close(task->event.fd); - task->event.fd = -1; -} - void sf_socket_close_ex(SFContext *sf_context) { int i; diff --git a/src/sf_service.h b/src/sf_service.h index 0980328..f806806 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -25,6 +25,8 @@ #include "fastcommon/ioevent.h" #include "fastcommon/fast_task_queue.h" #include "sf_types.h" +#include "sf_proto.h" +#include "sf_global.h" typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); typedef void (*sf_sig_quit_handler)(int sig); @@ -45,8 +47,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - const bool explicit_post_recv, TaskInitCallback init_callback, - void *init_arg, sf_release_buffer_callback release_buffer_callback); + const bool need_shrink_task_buffer, const bool explicit_post_recv, + TaskInitCallback init_callback, void *init_arg, + sf_release_buffer_callback release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -56,7 +59,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \ timeout_callback, net_timeout_ms, proto_header_size, \ - 0, task_arg_size, false, false, NULL, NULL, NULL) + 0, task_arg_size, false, true, false, NULL, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -65,8 +68,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ - net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ - NULL, NULL, NULL) + net_timeout_ms, proto_header_size, 0, task_arg_size, false, true, \ + false, NULL, NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); @@ -108,8 +111,6 @@ int sf_socket_create_server(SFListener *listener, void sf_socket_close_server(SFListener *listener); struct fast_task_info *sf_socket_accept_connection(SFListener *listener); -void sf_socket_close_connection(struct fast_task_info *task); - int sf_socket_server_ex(SFContext *sf_context); #define sf_socket_server() sf_socket_server_ex(&g_sf_context) @@ -163,6 +164,11 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return NULL; } + if (task->shrinked) { + task->shrinked = false; + sf_proto_init_task_magic(task); + } + __sync_add_and_fetch(&task->reffer_count, reffer_count); __sync_bool_compare_and_swap(&task->canceled, 1, 0); task->handler = handler; @@ -170,12 +176,42 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return task; } -#define sf_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ - &task->reffer_count, inc_count) -#define sf_hold_task(task) sf_hold_task_ex(task, 1) +#define sf_hold_task_ex(task, inc_count) fc_hold_task_ex(task, inc_count) +#define sf_hold_task(task) fc_hold_task(task) #define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) +static inline struct fast_task_info *sf_alloc_init_server_task( + SFNetworkHandler *handler, const int fd) +{ + const int reffer_count = 1; + struct fast_task_info *task; + + if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = false; +#endif + } + + return task; +} + +static inline struct fast_task_info *sf_alloc_init_client_task( + SFNetworkHandler *handler) +{ + const int fd = -1; + const int reffer_count = 1; + struct fast_task_info *task; + + if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = true; +#endif + } + + return task; +} + static inline void sf_release_task(struct fast_task_info *task) { if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) { @@ -187,6 +223,15 @@ static inline void sf_release_task(struct fast_task_info *task) "used: %d, freed: %d", __LINE__, task, alloc_count, alloc_count - free_count, free_count); */ + +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + task->handler->close_connection(task); + __sync_fetch_and_sub(&g_sf_global_vars. + connection_stat.current_count, 1); + } +#endif + free_queue_push(task); } } @@ -262,6 +307,19 @@ static inline SFNetworkHandler *sf_get_rdma_network_handler3( return sf_get_rdma_network_handler(sf_context3); } +static inline bool sf_get_double_buffers_flag(FCServerGroupInfo *server_group) +{ + if (server_group->comm_type == fc_comm_type_sock) { +#if IOEVENT_USE_URING + return true; +#else + return false; +#endif + } else { //RDMA + return true; + } +} + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index 6548a35..4627cad 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -115,9 +115,12 @@ typedef struct sf_listener { struct sf_context; struct sf_address_family_handler; + typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; + bool use_io_uring; //since v1.2.9 + bool use_send_zc; //since v1.2.9 FCCommunicationType comm_type; struct sf_address_family_handler *fh; struct ibv_pd *pd; @@ -179,7 +182,6 @@ typedef struct sf_context { struct nio_thread_data *thread_data; volatile int thread_count; - //int rdma_port_offset; bool is_client; //since v1.2.5 SFAddressFamily address_family; SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT]; @@ -190,7 +192,6 @@ typedef struct sf_context { int work_threads; int header_size; - bool remove_from_ready_list; bool realloc_task_buffer; bool connect_need_log; //for client connect FCSmartPollingConfig smart_polling;