diff --git a/src/sf_nio.c b/src/sf_nio.c index 45a4a03..e92f190 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -809,13 +809,25 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, task->send.ptr->offset += bytes; if (task->send.ptr->offset >= task->send.ptr->length) { - if (task->send.ptr != task->recv.ptr) { //double buffers - task->send.ptr->offset = 0; - task->send.ptr->length = 0; - } +#if IOEVENT_USE_URING + if (FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify) + { + *action = sf_comm_action_break; + *send_done = false; + } else { +#endif + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } + + *action = sf_comm_action_finish; + *send_done = true; +#if IOEVENT_USE_URING + } +#endif - *action = sf_comm_action_finish; - *send_done = true; } else { /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { @@ -848,8 +860,12 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, #if IOEVENT_USE_URING if (task->handler->use_io_uring) { - if (prepare_next_send(task) != 0) { - return -1; + if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify)) + { + if (prepare_next_send(task) != 0) { + return -1; + } } *action = sf_comm_action_break; } else { @@ -1276,13 +1292,44 @@ static int sf_client_sock_read(int sock, const int event, void *arg) return total_read; } +static int sock_write_done(struct fast_task_info *task, + const int length, const bool send_done) +{ + int next_stage; + + release_iovec_buffer(task); + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + + if (SF_CTX->callbacks.send_done == NULL || !send_done) { + task->nio_stages.current = SF_NIO_STAGE_RECV; + } else { + if (SF_CTX->callbacks.send_done(task, + length, &next_stage) != 0) + { + return -1; + } + + if (task->nio_stages.current != next_stage) { + task->nio_stages.current = next_stage; + } + } + + if (task->nio_stages.current == SF_NIO_STAGE_RECV) { + if (set_read_event(task) != 0) { + return -1; + } + } + + return 0; +} + static int sf_client_sock_write(int sock, const int event, void *arg) { int result; int bytes; int total_write; int length; - int next_stage; SFCommAction action; bool send_done; struct fast_task_info *task; @@ -1291,7 +1338,11 @@ static int sf_client_sock_write(int sock, const int event, void *arg) if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { #if IOEVENT_USE_URING if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) { - CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) && + task->thread_data->ev_puller.send_zc_done_notify)) + { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } } #endif return result >= 0 ? 0 : -1; @@ -1309,8 +1360,38 @@ static int sf_client_sock_write(int sock, const int event, void *arg) } #if IOEVENT_USE_URING + if (event == IOEVENT_NOTIFY) { + if (!FC_URING_IS_SEND_ZC(task)) { + logWarning("file: "__FILE__", line: %d, " + "unexpected io_uring notify!", __LINE__); + return -1; + } + + FC_URING_OP_TYPE(task) = IORING_OP_NOP; + if (!task->canceled) { + if (task->send.ptr->offset >= task->send.ptr->length) { + length = task->send.ptr->length; + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } + + result = sock_write_done(task, length, true); + } else { + result = prepare_next_send(task); + } + } + + sf_release_task(task); + return result == 0 ? 0 : -1; + } + if (task->handler->use_io_uring) { - CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data-> + ev_puller.send_zc_done_notify)) + { + CLEAR_OP_TYPE_AND_RELEASE_TASK(task); + } } #endif @@ -1329,30 +1410,9 @@ static int sf_client_sock_write(int sock, const int event, void *arg) total_write += bytes; if (action == sf_comm_action_finish) { - release_iovec_buffer(task); - task->recv.ptr->offset = 0; - task->recv.ptr->length = 0; - - if (SF_CTX->callbacks.send_done == NULL || !send_done) { - task->nio_stages.current = SF_NIO_STAGE_RECV; - } else { - if (SF_CTX->callbacks.send_done(task, - length, &next_stage) != 0) - { - return -1; - } - - if (task->nio_stages.current != next_stage) { - task->nio_stages.current = next_stage; - } + if (sock_write_done(task, length, send_done) != 0) { + return -1; } - - if (task->nio_stages.current == SF_NIO_STAGE_RECV) { - if (set_read_event(task) != 0) { - return -1; - } - } - break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_service.c b/src/sf_service.c index 1e98cdb..4aa7e81 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -77,13 +77,13 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, return result; } - if (strcmp(name, "service") == 0) { - buffer_size = sf_context->net_buffer_cfg.min_buff_size; - max_m = 16; - } else { + if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) { buffer_size = FC_MAX(4 * 1024 * 1024, sf_context-> net_buffer_cfg.max_buff_size); max_m = 64; + } else { + buffer_size = sf_context->net_buffer_cfg.min_buff_size; + max_m = 16; } m = buffer_size / (64 * 1024); if (m == 0) { @@ -92,6 +92,7 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, m = max_m; } alloc_conn_once = 256 / m; + return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections, alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, @@ -266,6 +267,12 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, return result; } +#if IOEVENT_USE_URING + if (send_done_callback != NULL) { + ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true); + } +#endif + result = fast_timer_init(&thread_data->timer, 2 * sf_context-> net_buffer_cfg.network_timeout, g_current_time); if (result != 0) {