struct sf_network_handler add field use_iouring

use_iouring
YuQing 2025-09-24 15:59:27 +08:00
parent f0ee6ce73f
commit bc5af8a58b
4 changed files with 29 additions and 29 deletions

View File

@ -498,10 +498,16 @@ static int init_network_handler(SFContext *sf_context,
handler->send_data = sf_socket_send_data;
handler->recv_data = sf_socket_recv_data;
handler->post_recv = NULL;
#if IOEVENT_USE_URING
handler->use_iouring = true;
#else
handler->use_iouring = false;
#endif
return 0;
} else {
handler->inner.id = NULL;
handler->outer.id = NULL;
handler->use_iouring = false;
return load_rdma_apis(sf_context, handler);
}
}

View File

@ -41,6 +41,9 @@
#include "sf_service.h"
#include "sf_nio.h"
static int sf_client_sock_write(int sock, short event, void *arg);
static int sf_client_sock_read(int sock, short event, void *arg);
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
@ -144,6 +147,7 @@ static inline int set_read_event(struct fast_task_info *task)
}
task->event.callback = (IOEventCallback)sf_client_sock_read;
if (ioevent_modify(&task->thread_data->ev_puller,
task->event.fd, IOEVENT_READ, task) != 0)
{
@ -168,13 +172,14 @@ int sf_set_read_event(struct fast_task_info *task)
return set_read_event(task);
}
static inline int sf_ioevent_add(struct fast_task_info *task,
IOEventCallback callback, const int timeout)
static inline int sf_ioevent_add(struct fast_task_info *task)
{
int result;
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ, callback, timeout);
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
SF_CTX->net_buffer_cfg.network_timeout,
task->handler->use_iouring);
return result > 0 ? -1 * result : result;
}
@ -192,8 +197,7 @@ static inline void inc_connection_current_count()
static inline int sf_nio_init(struct fast_task_info *task)
{
inc_connection_current_count();
return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read,
SF_CTX->net_buffer_cfg.network_timeout);
return sf_ioevent_add(task);
}
int sf_socket_async_connect_check(struct fast_task_info *task)
@ -275,7 +279,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
sf_client_connect_done, SF_CTX->net_buffer_cfg.
connect_timeout);
connect_timeout, task->handler->use_iouring);
return result > 0 ? -1 * result : result;
} else {
if (SF_CTX->callbacks.connect_done != NULL) {
@ -283,10 +287,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
}
if (result == 0) {
if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, SF_CTX->
net_buffer_cfg.network_timeout)) != 0)
{
if ((result=sf_ioevent_add(task)) != 0) {
return result;
}
@ -341,10 +342,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE);
break;
case SF_NIO_STAGE_FORWARDED: //forward by other thread
if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read,
SF_CTX->net_buffer_cfg.network_timeout)) == 0)
{
if ((result=sf_ioevent_add(task)) == 0) {
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND);
}
break;
@ -837,8 +835,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task)
ioevent_set_timeout(&task->thread_data->ev_puller,
task->thread_data->timeout_ms);
}
result = sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout);
result = sf_ioevent_add(task);
format_ip_address(task->client_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, client: %s:%u, "
@ -900,7 +897,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
return 0;
}
int sf_client_sock_read(int sock, short event, void *arg)
static int sf_client_sock_read(int sock, short event, void *arg)
{
int result;
int bytes;
@ -992,7 +989,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
return total_read;
}
int sf_client_sock_write(int sock, short event, void *arg)
static int sf_client_sock_write(int sock, short event, void *arg)
{
int result;
int bytes;
@ -1037,9 +1034,6 @@ int sf_client_sock_write(int sock, short event, void *arg)
release_iovec_buffer(task);
task->recv.ptr->offset = 0;
task->recv.ptr->length = 0;
if (set_read_event(task) != 0) {
return -1;
}
if (SF_CTX->callbacks.send_done == NULL || !send_done) {
task->nio_stages.current = SF_NIO_STAGE_RECV;
@ -1055,6 +1049,12 @@ int sf_client_sock_write(int sock, short event, void *arg)
}
}
if (task->nio_stages.current == SF_NIO_STAGE_RECV) {
if (set_read_event(task) != 0) {
return -1;
}
}
break;
} else if (action == sf_comm_action_break) {
break;

View File

@ -92,8 +92,6 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task)
void sf_recv_notify_read(int sock, short event, void *arg);
int sf_send_add_event(struct fast_task_info *task);
int sf_client_sock_write(int sock, short event, void *arg);
int sf_client_sock_read(int sock, short event, void *arg);
void sf_task_finish_clean_up(struct fast_task_info *task);
@ -149,11 +147,6 @@ static inline int sf_nio_forward_request(struct fast_task_info *task,
return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED);
}
static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task)
{
return (task->event.callback == (IOEventCallback)sf_client_sock_read);
}
static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
*thread_data, struct fast_task_info *task)
{

View File

@ -115,9 +115,11 @@ typedef struct sf_listener {
struct sf_context;
struct sf_address_family_handler;
typedef struct sf_network_handler {
bool enabled;
bool explicit_post_recv;
bool use_iouring; //since v1.2.9
FCCommunicationType comm_type;
struct sf_address_family_handler *fh;
struct ibv_pd *pd;
@ -179,7 +181,6 @@ typedef struct sf_context {
struct nio_thread_data *thread_data;
volatile int thread_count;
//int rdma_port_offset;
bool is_client; //since v1.2.5
SFAddressFamily address_family;
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];