Compare commits
No commits in common. "c4af33a497eeff30ea57ead224d0553df9ce4e97" and "688211fbcd29ae973d394efb731b6ffc9f21ce51" have entirely different histories.
c4af33a497
...
688211fbcd
|
|
@ -217,9 +217,7 @@ int idempotency_client_channel_check_reconnect(
|
|||
char formatted_ip[FORMATTED_IP_SIZE];
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
struct fast_task_info *task;
|
||||
task = channel->task;
|
||||
if (SF_CTX->use_io_uring && FC_ATOMIC_GET(task->reffer_count) > 1) {
|
||||
if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -47,12 +47,8 @@ SFGlobalVariables g_sf_global_vars = {
|
|||
{0, 0}, NULL, {NULL, 0}
|
||||
};
|
||||
|
||||
SFContext g_sf_context = {{'\0'}, NULL, 0, false, false,
|
||||
#if IOEVENT_USE_URING
|
||||
false,
|
||||
#endif
|
||||
sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock},
|
||||
{false, fc_comm_type_rdma}}},
|
||||
SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto,
|
||||
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
|
||||
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
|
||||
{DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
|
||||
SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true,
|
||||
|
|
@ -481,7 +477,8 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler)
|
|||
}
|
||||
|
||||
static int init_network_handler(SFContext *sf_context,
|
||||
SFNetworkHandler *handler, SFAddressFamilyHandler *fh)
|
||||
SFNetworkHandler *handler, SFAddressFamilyHandler *fh,
|
||||
const bool use_send_zc)
|
||||
{
|
||||
handler->fh = fh;
|
||||
handler->inner.handler = handler;
|
||||
|
|
@ -502,10 +499,18 @@ static int init_network_handler(SFContext *sf_context,
|
|||
handler->send_data = sf_socket_send_data;
|
||||
handler->recv_data = sf_socket_recv_data;
|
||||
handler->post_recv = NULL;
|
||||
#if IOEVENT_USE_URING
|
||||
handler->use_io_uring = true;
|
||||
handler->use_send_zc = use_send_zc;
|
||||
#else
|
||||
handler->use_io_uring = false;
|
||||
handler->use_send_zc = false;
|
||||
#endif
|
||||
return 0;
|
||||
} else {
|
||||
handler->inner.id = NULL;
|
||||
handler->outer.id = NULL;
|
||||
handler->use_io_uring = false;
|
||||
return load_rdma_apis(sf_context, handler);
|
||||
}
|
||||
}
|
||||
|
|
@ -673,10 +678,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
int inner_port;
|
||||
int outer_port;
|
||||
int port;
|
||||
#if IOEVENT_USE_URING
|
||||
bool global_use_send_zc;
|
||||
bool use_send_zc;
|
||||
#endif
|
||||
int i;
|
||||
int result;
|
||||
|
||||
|
|
@ -713,7 +716,6 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
outer_port = config->default_outer_port;
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc",
|
||||
config->ini_ctx.context, true);
|
||||
if (config->ini_ctx.section_name == NULL) {
|
||||
|
|
@ -722,7 +724,6 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
use_send_zc = iniGetBoolValue(config->ini_ctx.section_name,
|
||||
"use_send_zc", config->ini_ctx.context, global_use_send_zc);
|
||||
}
|
||||
#endif
|
||||
|
||||
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
||||
fh = sf_context->handlers + i;
|
||||
|
|
@ -747,7 +748,9 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
if (!handler->enabled) {
|
||||
continue;
|
||||
}
|
||||
if ((result=init_network_handler(sf_context, handler, fh)) != 0) {
|
||||
if ((result=init_network_handler(sf_context, handler,
|
||||
fh, use_send_zc)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -766,14 +769,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
rdma_handler->inner.enabled = sock_handler->inner.enabled;
|
||||
rdma_handler->outer.port = sock_handler->outer.port;
|
||||
rdma_handler->outer.enabled = sock_handler->outer.enabled;
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
sf_context->use_io_uring = (config->comm_type == fc_comm_type_sock);
|
||||
sf_context->use_send_zc = use_send_zc;
|
||||
#else
|
||||
sf_context->use_io_uring = false;
|
||||
#endif
|
||||
}
|
||||
|
||||
sf_context->accept_threads = iniGetIntValue(
|
||||
config->ini_ctx.section_name,
|
||||
|
|
@ -910,6 +907,31 @@ static const char *get_address_family_caption(
|
|||
}
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
static void get_io_uring_configs(const SFContext *sf_context,
|
||||
bool *use_io_uring, bool *use_send_zc)
|
||||
{
|
||||
int i;
|
||||
const SFAddressFamilyHandler *fh;
|
||||
const SFNetworkHandler *handler;
|
||||
const SFNetworkHandler *end;
|
||||
|
||||
*use_io_uring = false;
|
||||
*use_send_zc = false;
|
||||
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
||||
fh = sf_context->handlers + i;
|
||||
end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
|
||||
for (handler=fh->handlers; handler<end; handler++) {
|
||||
if (handler->enabled && handler->use_io_uring) {
|
||||
*use_io_uring = true;
|
||||
*use_send_zc = handler->use_send_zc;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
void sf_context_config_to_string(const SFContext *sf_context,
|
||||
char *output, const int size)
|
||||
{
|
||||
|
|
@ -919,6 +941,10 @@ void sf_context_config_to_string(const SFContext *sf_context,
|
|||
char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2];
|
||||
int i;
|
||||
int len;
|
||||
#if IOEVENT_USE_URING
|
||||
bool use_io_uring;
|
||||
bool use_send_zc;
|
||||
#endif
|
||||
|
||||
*inner_bind_addr = '\0';
|
||||
*outer_bind_addr = '\0';
|
||||
|
|
@ -961,9 +987,9 @@ void sf_context_config_to_string(const SFContext *sf_context,
|
|||
sf_context->accept_threads, sf_context->work_threads);
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
len += snprintf(output + len, size - len, ", use_io_uring=%d, "
|
||||
"use_send_zc=%d", sf_context->use_io_uring,
|
||||
sf_context->use_send_zc);
|
||||
get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc);
|
||||
len += snprintf(output + len, size - len, ", use_io_uring=%d"
|
||||
", use_send_zc=%d", use_io_uring, use_send_zc);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
@ -1014,6 +1040,10 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
|
|||
int max_pkg_size;
|
||||
int min_buff_size;
|
||||
int max_buff_size;
|
||||
#if IOEVENT_USE_URING
|
||||
bool use_io_uring;
|
||||
bool use_send_zc;
|
||||
#endif
|
||||
char pkg_buff[256];
|
||||
|
||||
max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
|
||||
|
|
@ -1043,9 +1073,10 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
|
|||
g_sf_global_vars.thread_stack_size / 1024, pkg_buff);
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
len += snprintf(output + len, size - len, "use_io_uring=%d, "
|
||||
"use_send_zc=%d, ", g_sf_context.use_io_uring,
|
||||
g_sf_context.use_send_zc);
|
||||
get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc);
|
||||
len += snprintf(output + len, size - len,
|
||||
"use_io_uring=%d, use_send_zc=%d, ",
|
||||
use_io_uring, use_send_zc);
|
||||
#endif
|
||||
|
||||
len += snprintf(output + len, size - len,
|
||||
|
|
|
|||
83
src/sf_nio.c
83
src/sf_nio.c
|
|
@ -73,8 +73,12 @@ static int sf_uring_cancel_done(int sock, const int event, void *arg)
|
|||
|
||||
task = (struct fast_task_info *)arg;
|
||||
if (event != IOEVENT_TIMEOUT) {
|
||||
if (task->handler->use_io_uring || (FC_URING_OP_TYPE(task) !=
|
||||
IORING_OP_NOP && task->event.res == -ECANCELED))
|
||||
{
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
@ -82,16 +86,18 @@ static int sf_uring_cancel_done(int sock, const int event, void *arg)
|
|||
void sf_task_detach_thread(struct fast_task_info *task)
|
||||
{
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
|
||||
bool need_cancel;
|
||||
if (task->handler->use_io_uring) {
|
||||
need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP);
|
||||
} else {
|
||||
need_cancel = true;
|
||||
}
|
||||
if (need_cancel) {
|
||||
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
|
||||
uring_prep_cancel(task);
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
#else
|
||||
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
||||
#if IOEVENT_USE_URING
|
||||
}
|
||||
#endif
|
||||
|
||||
if (task->event.timer.expires > 0) {
|
||||
|
|
@ -122,7 +128,7 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
|
|||
void sf_socket_close_connection(struct fast_task_info *task)
|
||||
{
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (uring_prep_close_fd(task) != 0) {
|
||||
close(task->event.fd);
|
||||
}
|
||||
|
|
@ -146,7 +152,7 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
|
|||
sf_task_detach_thread(task);
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (!SF_CTX->use_io_uring) {
|
||||
if (!task->handler->use_io_uring) {
|
||||
#endif
|
||||
task->handler->close_connection(task);
|
||||
__sync_fetch_and_sub(&g_sf_global_vars.
|
||||
|
|
@ -219,7 +225,7 @@ static inline int set_read_event(struct fast_task_info *task)
|
|||
int result;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
|
||||
if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
|
|
@ -270,7 +276,7 @@ static inline int set_read_event(struct fast_task_info *task)
|
|||
int sf_set_read_event(struct fast_task_info *task)
|
||||
{
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
@ -289,7 +295,8 @@ static inline int sf_ioevent_add(struct fast_task_info *task)
|
|||
|
||||
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
|
||||
SF_CTX->net_buffer_cfg.network_timeout);
|
||||
SF_CTX->net_buffer_cfg.network_timeout,
|
||||
task->handler->use_io_uring);
|
||||
return result > 0 ? -1 * result : result;
|
||||
}
|
||||
|
||||
|
|
@ -331,7 +338,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg)
|
|||
task = (struct fast_task_info *)arg;
|
||||
if (task->canceled) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
#endif
|
||||
|
|
@ -342,7 +349,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg)
|
|||
result = ETIMEDOUT;
|
||||
} else {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
result = (task->event.res < 0 ? -1 * task->event.res :
|
||||
task->event.res);
|
||||
|
|
@ -387,7 +394,7 @@ int sf_socket_async_connect_server(struct fast_task_info *task)
|
|||
int result;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if ((result=uring_prep_connect(task)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
|
@ -416,7 +423,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
|
|||
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
|
||||
sf_client_connect_done, SF_CTX->net_buffer_cfg.
|
||||
connect_timeout);
|
||||
connect_timeout, task->handler->use_io_uring);
|
||||
return result > 0 ? -1 * result : result;
|
||||
} else {
|
||||
if (SF_CTX->callbacks.connect_done != NULL) {
|
||||
|
|
@ -465,7 +472,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
|
|||
case SF_NIO_STAGE_RECV:
|
||||
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
||||
if ((result=set_read_event(task)) == 0) {
|
||||
if (!SF_CTX->use_io_uring) {
|
||||
if (!task->handler->use_io_uring) {
|
||||
if (sf_client_sock_read(task->event.fd,
|
||||
IOEVENT_READ, task) < 0)
|
||||
{
|
||||
|
|
@ -648,11 +655,11 @@ int sf_send_add_event(struct fast_task_info *task)
|
|||
/* direct send */
|
||||
task->nio_stages.current = SF_NIO_STAGE_SEND;
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (task->event.callback != (IOEventCallback)sf_client_sock_write) {
|
||||
task->event.callback = (IOEventCallback)sf_client_sock_write;
|
||||
}
|
||||
if (SF_CTX->use_send_zc) {
|
||||
if (task->handler->use_send_zc) {
|
||||
return uring_prep_first_send_zc(task);
|
||||
} else {
|
||||
return uring_prep_first_send(task);
|
||||
|
|
@ -692,7 +699,7 @@ static inline int check_task(struct fast_task_info *task,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, event: %d, expect stage: %d, "
|
||||
"but current stage: %d, close connection",
|
||||
|
|
@ -722,7 +729,7 @@ static inline int check_task(struct fast_task_info *task,
|
|||
#if IOEVENT_USE_URING
|
||||
static inline int prepare_next_send(struct fast_task_info *task)
|
||||
{
|
||||
if (SF_CTX->use_send_zc) {
|
||||
if (task->handler->use_send_zc) {
|
||||
return uring_prep_next_send_zc(task);
|
||||
} else {
|
||||
return uring_prep_next_send(task);
|
||||
|
|
@ -737,7 +744,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
int result;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
bytes = task->event.res;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -755,7 +762,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
|
||||
if (bytes < 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
result = -bytes;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -765,7 +772,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
#endif
|
||||
if (result == EAGAIN || result == EWOULDBLOCK) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (prepare_next_send(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -780,7 +787,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
|
||||
*action = sf_comm_action_break;
|
||||
return 0;
|
||||
} else if (result == EINTR && !SF_CTX->use_io_uring) {
|
||||
} else if (result == EINTR && !task->handler->use_io_uring) {
|
||||
/* should try again */
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, ignore interupt signal",
|
||||
|
|
@ -807,8 +814,8 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
task->send.ptr->offset += bytes;
|
||||
if (task->send.ptr->offset >= task->send.ptr->length) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring && FC_URING_IS_SEND_ZC(task) &&
|
||||
task->thread_data->ev_puller.send_zc_done_notify)
|
||||
if (FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||
ev_puller.send_zc_done_notify)
|
||||
{
|
||||
*action = sf_comm_action_break;
|
||||
*send_done = false;
|
||||
|
|
@ -856,7 +863,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||
ev_puller.send_zc_done_notify))
|
||||
{
|
||||
|
|
@ -887,7 +894,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
bool new_alloc;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
bytes = task->event.res;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -912,7 +919,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
|
||||
if (bytes < 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
result = -bytes;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -922,7 +929,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
#endif
|
||||
if (result == EAGAIN || result == EWOULDBLOCK) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (prepare_next_recv(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -930,7 +937,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
#endif
|
||||
*action = sf_comm_action_break;
|
||||
return 0;
|
||||
} else if (result == EINTR && !SF_CTX->use_io_uring) {
|
||||
} else if (result == EINTR && !task->handler->use_io_uring) {
|
||||
/* should try again */
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, ignore interupt signal",
|
||||
|
|
@ -975,7 +982,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
if (task->recv.ptr->length == 0) { //pkg header
|
||||
if (task->recv.ptr->offset < SF_CTX->header_size) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (prepare_next_recv(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -1039,7 +1046,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
*action = sf_comm_action_finish;
|
||||
} else {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (prepare_next_recv(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -1198,7 +1205,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
|
|||
task = (struct fast_task_info *)arg;
|
||||
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
#endif
|
||||
|
|
@ -1238,7 +1245,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
#endif
|
||||
|
|
@ -1313,7 +1320,7 @@ static int sock_write_done(struct fast_task_info *task,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (!SF_CTX->use_io_uring || task->nio_stages.
|
||||
if (!task->handler->use_io_uring || task->nio_stages.
|
||||
current == SF_NIO_STAGE_RECV)
|
||||
{
|
||||
#endif
|
||||
|
|
@ -1342,7 +1349,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
|
|||
task = (struct fast_task_info *)arg;
|
||||
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) &&
|
||||
task->thread_data->ev_puller.send_zc_done_notify))
|
||||
{
|
||||
|
|
@ -1391,7 +1398,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
|
|||
return result == 0 ? 0 : -1;
|
||||
}
|
||||
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||
ev_puller.send_zc_done_notify))
|
||||
{
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@
|
|||
#include "fastcommon/sched_thread.h"
|
||||
#include "fastcommon/ioevent_loop.h"
|
||||
#include "fastcommon/fc_memory.h"
|
||||
#include "sf_nio.h"
|
||||
#include "sf_proto.h"
|
||||
#include "sf_util.h"
|
||||
#include "sf_service.h"
|
||||
|
|
@ -241,9 +242,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
thread_data->arg = NULL;
|
||||
}
|
||||
|
||||
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->
|
||||
name, sf_context->use_io_uring, max_entries,
|
||||
net_timeout_ms, extra_events)) != 0)
|
||||
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name,
|
||||
max_entries, net_timeout_ms, extra_events)) != 0)
|
||||
{
|
||||
char prompt[256];
|
||||
#if IOEVENT_USE_URING
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@
|
|||
#include "sf_types.h"
|
||||
#include "sf_proto.h"
|
||||
#include "sf_global.h"
|
||||
#include "sf_nio.h"
|
||||
|
||||
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
|
||||
typedef void (*sf_sig_quit_handler)(int sig);
|
||||
|
|
@ -226,7 +225,7 @@ static inline void sf_release_task(struct fast_task_info *task)
|
|||
*/
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->handler->use_io_uring) {
|
||||
task->handler->close_connection(task);
|
||||
__sync_fetch_and_sub(&g_sf_global_vars.
|
||||
connection_stat.current_count, 1);
|
||||
|
|
|
|||
|
|
@ -119,6 +119,8 @@ struct sf_address_family_handler;
|
|||
typedef struct sf_network_handler {
|
||||
bool enabled;
|
||||
bool explicit_post_recv;
|
||||
bool use_io_uring; //since v1.2.9
|
||||
bool use_send_zc; //since v1.2.9
|
||||
FCCommunicationType comm_type;
|
||||
struct sf_address_family_handler *fh;
|
||||
struct ibv_pd *pd;
|
||||
|
|
@ -181,10 +183,6 @@ typedef struct sf_context {
|
|||
volatile int thread_count;
|
||||
|
||||
bool is_client; //since v1.2.5
|
||||
bool use_io_uring; //since v1.2.9
|
||||
#if IOEVENT_USE_URING
|
||||
bool use_send_zc; //since v1.2.9
|
||||
#endif
|
||||
SFAddressFamily address_family;
|
||||
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue