send zc done notify callback for recycling buffer

use_iouring
YuQing 2025-10-20 10:34:47 +08:00
parent 817ff547da
commit 932751d392
2 changed files with 105 additions and 38 deletions

View File

@ -809,6 +809,14 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
task->send.ptr->offset += bytes; task->send.ptr->offset += bytes;
if (task->send.ptr->offset >= task->send.ptr->length) { if (task->send.ptr->offset >= task->send.ptr->length) {
#if IOEVENT_USE_URING
if (FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify)
{
*action = sf_comm_action_break;
*send_done = false;
} else {
#endif
if (task->send.ptr != task->recv.ptr) { //double buffers if (task->send.ptr != task->recv.ptr) { //double buffers
task->send.ptr->offset = 0; task->send.ptr->offset = 0;
task->send.ptr->length = 0; task->send.ptr->length = 0;
@ -816,6 +824,10 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
*action = sf_comm_action_finish; *action = sf_comm_action_finish;
*send_done = true; *send_done = true;
#if IOEVENT_USE_URING
}
#endif
} else { } else {
/* set next writev iovec array */ /* set next writev iovec array */
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
@ -848,9 +860,13 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
#if IOEVENT_USE_URING #if IOEVENT_USE_URING
if (task->handler->use_io_uring) { if (task->handler->use_io_uring) {
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify))
{
if (prepare_next_send(task) != 0) { if (prepare_next_send(task) != 0) {
return -1; return -1;
} }
}
*action = sf_comm_action_break; *action = sf_comm_action_break;
} else { } else {
#endif #endif
@ -1276,59 +1292,11 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
return total_read; return total_read;
} }
static int sf_client_sock_write(int sock, const int event, void *arg) static int sock_write_done(struct fast_task_info *task,
const int length, const bool send_done)
{ {
int result;
int bytes;
int total_write;
int length;
int next_stage; int next_stage;
SFCommAction action;
bool send_done;
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
return result >= 0 ? 0 : -1;
}
if (event == IOEVENT_TIMEOUT) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout. total length: %d, offset: %d, "
"remain: %d", __LINE__, task->client_ip, task->send.ptr->length,
task->send.ptr->offset, task->send.ptr->length -
task->send.ptr->offset);
ioevent_add_to_deleted_list(task);
return -1;
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
total_write = 0;
length = task->send.ptr->length;
action = sf_comm_action_continue;
while (1) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
SF_CTX->net_buffer_cfg.network_timeout);
if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
total_write += bytes;
if (action == sf_comm_action_finish) {
release_iovec_buffer(task); release_iovec_buffer(task);
task->recv.ptr->offset = 0; task->recv.ptr->offset = 0;
task->recv.ptr->length = 0; task->recv.ptr->length = 0;
@ -1353,6 +1321,98 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
} }
} }
return 0;
}
static int sf_client_sock_write(int sock, const int event, void *arg)
{
int result;
int bytes;
int total_write;
int length;
SFCommAction action;
bool send_done;
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) &&
task->thread_data->ev_puller.send_zc_done_notify))
{
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
}
#endif
return result >= 0 ? 0 : -1;
}
if (event == IOEVENT_TIMEOUT) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout. total length: %d, offset: %d, "
"remain: %d", __LINE__, task->client_ip, task->send.ptr->length,
task->send.ptr->offset, task->send.ptr->length -
task->send.ptr->offset);
ioevent_add_to_deleted_list(task);
return -1;
}
#if IOEVENT_USE_URING
if (event == IOEVENT_NOTIFY) {
if (!FC_URING_IS_SEND_ZC(task)) {
logWarning("file: "__FILE__", line: %d, "
"unexpected io_uring notify!", __LINE__);
return -1;
}
FC_URING_OP_TYPE(task) = IORING_OP_NOP;
if (!task->canceled) {
if (task->send.ptr->offset >= task->send.ptr->length) {
length = task->send.ptr->length;
if (task->send.ptr != task->recv.ptr) { //double buffers
task->send.ptr->offset = 0;
task->send.ptr->length = 0;
}
result = sock_write_done(task, length, true);
} else {
result = prepare_next_send(task);
}
}
sf_release_task(task);
return result == 0 ? 0 : -1;
}
if (task->handler->use_io_uring) {
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify))
{
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
}
#endif
total_write = 0;
length = task->send.ptr->length;
action = sf_comm_action_continue;
while (1) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
SF_CTX->net_buffer_cfg.network_timeout);
if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
total_write += bytes;
if (action == sf_comm_action_finish) {
if (sock_write_done(task, length, send_done) != 0) {
return -1;
}
break; break;
} else if (action == sf_comm_action_break) { } else if (action == sf_comm_action_break) {
break; break;

View File

@ -77,13 +77,13 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name,
return result; return result;
} }
if (strcmp(name, "service") == 0) { if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) {
buffer_size = sf_context->net_buffer_cfg.min_buff_size;
max_m = 16;
} else {
buffer_size = FC_MAX(4 * 1024 * 1024, sf_context-> buffer_size = FC_MAX(4 * 1024 * 1024, sf_context->
net_buffer_cfg.max_buff_size); net_buffer_cfg.max_buff_size);
max_m = 64; max_m = 64;
} else {
buffer_size = sf_context->net_buffer_cfg.min_buff_size;
max_m = 16;
} }
m = buffer_size / (64 * 1024); m = buffer_size / (64 * 1024);
if (m == 0) { if (m == 0) {
@ -92,6 +92,7 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name,
m = max_m; m = max_m;
} }
alloc_conn_once = 256 / m; alloc_conn_once = 256 / m;
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections, need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections,
alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size,
@ -266,6 +267,12 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
return result; return result;
} }
#if IOEVENT_USE_URING
if (send_done_callback != NULL) {
ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true);
}
#endif
result = fast_timer_init(&thread_data->timer, 2 * sf_context-> result = fast_timer_init(&thread_data->timer, 2 * sf_context->
net_buffer_cfg.network_timeout, g_current_time); net_buffer_cfg.network_timeout, g_current_time);
if (result != 0) { if (result != 0) {