adapt Linux io_uring OK

use_iouring
YuQing 2025-09-27 15:41:56 +08:00
parent ecee21f289
commit a2ab8a0c01
5 changed files with 143 additions and 83 deletions

View File

@ -993,19 +993,19 @@ void sf_slow_log_config_to_string(SFSlowLogConfig *slow_log_cfg,
void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
char *output, const int size)
{
int i;
int len;
int max_pkg_size;
int min_buff_size;
int max_buff_size;
#if IOEVENT_USE_URING
int i;
bool use_io_uring;
bool use_send_zc;
#endif
char pkg_buff[256];
SFAddressFamilyHandler *fh;
SFNetworkHandler *handler;
SFNetworkHandler *end;
#endif
char pkg_buff[256];
max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
g_sf_global_vars.task_buffer_extra_size;

View File

@ -62,9 +62,37 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_context->callbacks.release_buffer = release_buffer_callback;
}
#if IOEVENT_USE_URING
#define CLEAR_OP_TYPE_AND_RELEASE_TASK(task) \
FC_URING_OP_TYPE(task) = IORING_OP_NOP; \
sf_release_task(task)
static int sf_uring_cancel_done(int sock, short event, void *arg)
{
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
return 0;
}
#endif
void sf_task_detach_thread(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
bool need_cancel;
if (task->handler->use_io_uring) {
need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP);
} else {
need_cancel = true;
}
if (need_cancel) {
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
uring_prep_cancel(task);
}
#else
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
#endif
if (task->event.timer.expires > 0) {
fast_timer_remove(&task->thread_data->timer,
@ -91,6 +119,22 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
}
}
void sf_socket_close_connection(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (uring_prep_close_fd(task) != 0) {
close(task->event.fd);
}
} else {
#endif
close(task->event.fd);
#if IOEVENT_USE_URING
}
#endif
task->event.fd = -1;
}
void sf_task_finish_clean_up(struct fast_task_info *task)
{
if (task->finish_callback != NULL) {
@ -100,15 +144,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
release_iovec_buffer(task);
sf_task_detach_thread(task);
task->handler->close_connection(task);
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
FC_URING_OP_TYPE(task) = IORING_OP_NOP;
if (!task->handler->use_io_uring) {
#endif
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.
connection_stat.current_count, 1);
#if IOEVENT_USE_URING
}
#endif
}
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
sf_release_task(task);
}
@ -134,25 +181,19 @@ static inline int set_write_event(struct fast_task_info *task)
return 0;
}
#if IOEVENT_USE_URING
static inline int prepare_first_recv(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
if (SF_CTX->callbacks.alloc_recv_buffer != NULL) {
return uring_prep_recv_data(task, task->recv.ptr->data,
SF_CTX->header_size);
} else {
return uring_prep_first_recv(task);
}
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return EBUSY;
#endif
}
static inline int prepare_next_recv(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
int recv_bytes;
if (task->recv.ptr->length == 0) { //recv header
@ -170,41 +211,35 @@ static inline int prepare_next_recv(struct fast_task_info *task)
header_size), recv_bytes);
}
}
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return EBUSY;
#endif
}
#endif
static inline int set_read_event(struct fast_task_info *task)
{
int result;
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) {
logWarning("file: "__FILE__", line: %d, "
"trigger recv again!", __LINE__);
return 0;
}
if (task->event.callback != (IOEventCallback)sf_client_sock_read) {
task->event.callback = (IOEventCallback)sf_client_sock_read;
}
if ((result=prepare_first_recv(task)) != 0) {
ioevent_add_to_deleted_list(task);
return result;
}
if (task->event.callback != (IOEventCallback)sf_client_sock_read) {
task->event.callback = (IOEventCallback)sf_client_sock_read;
}
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return EBUSY;
#endif
} else {
#endif
if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
return 0;
}
task->event.callback = (IOEventCallback)sf_client_sock_read;
if (ioevent_modify(&task->thread_data->ev_puller,
task->event.fd, IOEVENT_READ, task) != 0)
{
@ -218,8 +253,9 @@ static inline int set_read_event(struct fast_task_info *task)
return result;
}
task->event.callback = (IOEventCallback)sf_client_sock_read;
#if IOEVENT_USE_URING
}
#endif
return 0;
}
@ -564,8 +600,8 @@ int sf_send_add_event(struct fast_task_info *task)
if (task->send.ptr->length > 0) {
/* direct send */
task->nio_stages.current = SF_NIO_STAGE_SEND;
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (task->event.callback != (IOEventCallback)sf_client_sock_write) {
task->event.callback = (IOEventCallback)sf_client_sock_write;
}
@ -574,16 +610,14 @@ int sf_send_add_event(struct fast_task_info *task)
} else {
return uring_prep_first_send(task);
}
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return EBUSY;
#endif
} else {
#endif
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
return errno != 0 ? errno : EIO;
}
#if IOEVENT_USE_URING
}
#endif
}
return 0;
@ -626,20 +660,16 @@ static inline int check_task(struct fast_task_info *task,
}
}
#if IOEVENT_USE_URING
static inline int prepare_next_send(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
if (task->handler->use_send_zc) {
return uring_prep_next_send_zc(task);
} else {
return uring_prep_next_send(task);
}
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return EBUSY;
#endif
}
#endif
ssize_t sf_socket_send_data(struct fast_task_info *task,
SFCommAction *action, bool *send_done)
@ -647,15 +677,11 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
int bytes;
int result;
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
bytes = task->event.res;
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return -1;
#endif
} else {
#endif
if (task->iovec_array.iovs != NULL) {
bytes = writev(task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX));
@ -664,26 +690,34 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
task->send.ptr->offset, task->send.ptr->length -
task->send.ptr->offset);
}
#if IOEVENT_USE_URING
}
#endif
if (bytes < 0) {
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
result = -bytes;
#endif
} else {
#endif
result = errno;
#if IOEVENT_USE_URING
}
#endif
if (result == EAGAIN || result == EWOULDBLOCK) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (prepare_next_send(task) != 0) {
return -1;
}
} else {
#endif
if (set_write_event(task) != 0) {
return -1;
}
#if IOEVENT_USE_URING
}
#endif
*action = sf_comm_action_break;
return 0;
@ -718,11 +752,6 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
task->send.ptr->length = 0;
}
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
FC_URING_OP_TYPE(task) = IORING_OP_NOP;
#endif
}
*action = sf_comm_action_finish;
*send_done = true;
} else {
@ -755,14 +784,18 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
task->iovec_array.count = end - iov;
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (prepare_next_send(task) != 0) {
return -1;
}
*action = sf_comm_action_break;
} else {
#endif
*action = sf_comm_action_continue;
#if IOEVENT_USE_URING
}
#endif
*send_done = false;
}
@ -778,15 +811,11 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
int recv_bytes;
bool new_alloc;
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
bytes = task->event.res;
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return -1;
#endif
} else {
#endif
if (task->recv.ptr->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
bytes = read(task->event.fd, task->recv.ptr->data +
@ -802,22 +831,28 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
header_size), recv_bytes);
}
}
#if IOEVENT_USE_URING
}
#endif
if (bytes < 0) {
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
result = -bytes;
#endif
} else {
#endif
result = errno;
#if IOEVENT_USE_URING
}
#endif
if (result == EAGAIN || result == EWOULDBLOCK) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
}
#endif
*action = sf_comm_action_break;
return 0;
} else if (result == EINTR && !task->handler->use_io_uring) {
@ -864,14 +899,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
task->recv.ptr->offset += bytes;
if (task->recv.ptr->length == 0) { //pkg header
if (task->recv.ptr->offset < SF_CTX->header_size) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
*action = sf_comm_action_break;
} else {
#endif
*action = sf_comm_action_continue;
#if IOEVENT_USE_URING
}
#endif
return bytes;
}
@ -922,21 +961,20 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
}
if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done
if (task->handler->use_io_uring) {
#if IOEVENT_USE_URING
FC_URING_OP_TYPE(task) = IORING_OP_NOP;
#endif
}
*action = sf_comm_action_finish;
} else {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
*action = sf_comm_action_break;
} else {
#endif
*action = sf_comm_action_continue;
#if IOEVENT_USE_URING
}
#endif
}
return bytes;
@ -1084,6 +1122,11 @@ static int sf_client_sock_read(int sock, short event, void *arg)
task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
return result >= 0 ? 0 : -1;
}
@ -1100,6 +1143,7 @@ static int sf_client_sock_read(int sock, short event, void *arg)
SF_CTX->net_buffer_cfg.network_timeout;
fast_timer_add(&task->thread_data->timer,
&task->event.timer);
return 0;
} else {
if (task->recv.ptr->length > 0) {
logWarning("file: "__FILE__", line: %d, "
@ -1116,10 +1160,14 @@ static int sf_client_sock_read(int sock, short event, void *arg)
ioevent_add_to_deleted_list(task);
return -1;
}
return 0;
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
total_read = 0;
action = sf_comm_action_continue;
while (1) {
@ -1179,6 +1227,11 @@ static int sf_client_sock_write(int sock, short event, void *arg)
task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && (event != IOEVENT_TIMEOUT)) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
return result >= 0 ? 0 : -1;
}
@ -1193,6 +1246,12 @@ static int sf_client_sock_write(int sock, short event, void *arg)
return -1;
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
total_write = 0;
length = task->send.ptr->length;
action = sf_comm_action_continue;

View File

@ -90,6 +90,7 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task)
}
}
void sf_socket_close_connection(struct fast_task_info *task);
void sf_recv_notify_read(int sock, short event, void *arg);
int sf_send_add_event(struct fast_task_info *task);

View File

@ -35,7 +35,6 @@
#include "fastcommon/fc_memory.h"
#include "sf_nio.h"
#include "sf_util.h"
#include "sf_global.h"
#include "sf_service.h"
#if defined(OS_LINUX)
@ -502,12 +501,6 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener)
return task;
}
void sf_socket_close_connection(struct fast_task_info *task)
{
close(task->event.fd);
task->event.fd = -1;
}
void sf_socket_close_ex(SFContext *sf_context)
{
int i;

View File

@ -25,6 +25,7 @@
#include "fastcommon/ioevent.h"
#include "fastcommon/fast_task_queue.h"
#include "sf_types.h"
#include "sf_global.h"
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
typedef void (*sf_sig_quit_handler)(int sig);
@ -108,8 +109,6 @@ int sf_socket_create_server(SFListener *listener,
void sf_socket_close_server(SFListener *listener);
struct fast_task_info *sf_socket_accept_connection(SFListener *listener);
void sf_socket_close_connection(struct fast_task_info *task);
int sf_socket_server_ex(SFContext *sf_context);
#define sf_socket_server() sf_socket_server_ex(&g_sf_context)
@ -170,9 +169,9 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
return task;
}
#define sf_hold_task_ex(task, inc_count) __sync_add_and_fetch( \
&task->reffer_count, inc_count)
#define sf_hold_task(task) sf_hold_task_ex(task, 1)
#define sf_hold_task_ex(task, inc_count) \
fc_hold_task_ex(task, inc_count)
#define sf_hold_task(task) fc_hold_task(task)
#define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1)
@ -187,6 +186,14 @@ static inline void sf_release_task(struct fast_task_info *task)
"used: %d, freed: %d", __LINE__, task,
alloc_count, alloc_count - free_count, free_count);
*/
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.
connection_stat.current_count, 1);
}
#endif
free_queue_push(task);
}
}