Compare commits

...

14 Commits

Author SHA1 Message Date
vazmin 4adf6b3227 gh actions: upgrade to 1.2.11-1 2025-11-23 10:48:22 +00:00
vazmin f4a799402e gh actions: upgrade to 1.2.11-1 2025-11-23 10:00:56 +00:00
vazmin 27510e9641 gh actions: upgrade to 1.2.11-1 2025-11-23 09:06:43 +00:00
YuQing 848077797b upgrade version to 1.2.11 2025-11-16 17:01:06 +08:00
YuQing d22f9da49c bugfixed: MUST call sf_hold_task in sf_nio_notify for rare case 2025-11-16 15:29:38 +08:00
YuQing 5495455fa7 do NOT call task->finish_callback 2025-11-15 11:14:14 +08:00
YuQing 4da0ff251c upgrade version to 1.2.10 2025-11-11 09:57:18 +08:00
YuQing 2444eac6ce declare use_send_zc field anyway 2025-11-11 09:52:42 +08:00
YuQing a52cc2d5d4 check sf_context->use_io_uring more 2025-11-05 09:58:37 +08:00
YuQing c4af33a497 declare use_io_uring correctly 2025-11-04 15:55:33 +08:00
YuQing fa32972052 move use_io_uring and use_send_zc to struct sf_context 2025-11-04 15:40:00 +08:00
YuQing 688211fbcd correct compile error 2025-11-03 15:22:45 +08:00
YuQing 1b2f521b99 uring cancel callback release task correctly 2025-11-03 14:56:29 +08:00
YuQing ddc528d69d restore function sf_client_sock_in_read_stage 2025-11-02 15:02:54 +08:00
10 changed files with 138 additions and 140 deletions

18
debian/changelog vendored
View File

@ -1,3 +1,21 @@
libserverframe (1.2.11-1) unstable; urgency=medium
* upgrade to 1.2.11-1
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 10:48:22 +0000
libserverframe (1.2.11-1) unstable; urgency=medium
* upgrade to 1.2.11-1
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 10:00:56 +0000
libserverframe (1.2.11-1) unstable; urgency=medium
* upgrade to 1.2.11-1
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 09:06:43 +0000
libserverframe (1.2.8-1) unstable; urgency=medium
* upgrade to 1.2.8-1

2
debian/substvars vendored
View File

@ -1 +1 @@
libfastcommon:Version=1.0.78
libfastcommon:Version=1.0.83

View File

@ -2,7 +2,7 @@
%define CommitVersion %(echo $COMMIT_VERSION)
Name: libserverframe
Version: 1.2.9
Version: 1.2.11
Release: 1%{?dist}
Summary: network framework library
License: AGPL v3.0
@ -12,9 +12,9 @@ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
BuildRequires: libfastcommon-devel >= 1.0.81
BuildRequires: libfastcommon-devel >= 1.0.83
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
Requires: libfastcommon >= 1.0.81
Requires: libfastcommon >= 1.0.83
%description
common framework library

View File

@ -217,7 +217,9 @@ int idempotency_client_channel_check_reconnect(
char formatted_ip[FORMATTED_IP_SIZE];
#if IOEVENT_USE_URING
if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) {
struct fast_task_info *task;
task = channel->task;
if (SF_CTX->use_io_uring && FC_ATOMIC_GET(task->reffer_count) > 1) {
return 0;
}
#endif

View File

@ -47,9 +47,10 @@ SFGlobalVariables g_sf_global_vars = {
{0, 0}, NULL, {NULL, 0}
};
SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto,
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
SFContext g_sf_context = {{'\0'}, NULL, 0, false, false, false,
sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock},
{false, fc_comm_type_rdma}}},
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
{DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true,
{false, 0, 0}, {sf_task_finish_clean_up}
@ -477,8 +478,7 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler)
}
static int init_network_handler(SFContext *sf_context,
SFNetworkHandler *handler, SFAddressFamilyHandler *fh,
const bool use_send_zc)
SFNetworkHandler *handler, SFAddressFamilyHandler *fh)
{
handler->fh = fh;
handler->inner.handler = handler;
@ -499,18 +499,10 @@ static int init_network_handler(SFContext *sf_context,
handler->send_data = sf_socket_send_data;
handler->recv_data = sf_socket_recv_data;
handler->post_recv = NULL;
#if IOEVENT_USE_URING
handler->use_io_uring = true;
handler->use_send_zc = use_send_zc;
#else
handler->use_io_uring = false;
handler->use_send_zc = false;
#endif
return 0;
} else {
handler->inner.id = NULL;
handler->outer.id = NULL;
handler->use_io_uring = false;
return load_rdma_apis(sf_context, handler);
}
}
@ -678,8 +670,10 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
int inner_port;
int outer_port;
int port;
#if IOEVENT_USE_URING
bool global_use_send_zc;
bool use_send_zc;
#endif
int i;
int result;
@ -716,6 +710,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
outer_port = config->default_outer_port;
}
#if IOEVENT_USE_URING
global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc",
config->ini_ctx.context, true);
if (config->ini_ctx.section_name == NULL) {
@ -724,6 +719,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
use_send_zc = iniGetBoolValue(config->ini_ctx.section_name,
"use_send_zc", config->ini_ctx.context, global_use_send_zc);
}
#endif
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
fh = sf_context->handlers + i;
@ -748,9 +744,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
if (!handler->enabled) {
continue;
}
if ((result=init_network_handler(sf_context, handler,
fh, use_send_zc)) != 0)
{
if ((result=init_network_handler(sf_context, handler, fh)) != 0) {
return result;
}
}
@ -769,9 +763,15 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
rdma_handler->inner.enabled = sock_handler->inner.enabled;
rdma_handler->outer.port = sock_handler->outer.port;
rdma_handler->outer.enabled = sock_handler->outer.enabled;
}
#if IOEVENT_USE_URING
sf_context->use_io_uring = (config->comm_type == fc_comm_type_sock);
sf_context->use_send_zc = sf_context->use_io_uring ? use_send_zc : false;
#else
sf_context->use_io_uring = false;
#endif
sf_context->accept_threads = iniGetIntValue(
config->ini_ctx.section_name,
"accept_threads", config->ini_ctx.context, 1);
@ -907,31 +907,6 @@ static const char *get_address_family_caption(
}
}
#if IOEVENT_USE_URING
static void get_io_uring_configs(const SFContext *sf_context,
bool *use_io_uring, bool *use_send_zc)
{
int i;
const SFAddressFamilyHandler *fh;
const SFNetworkHandler *handler;
const SFNetworkHandler *end;
*use_io_uring = false;
*use_send_zc = false;
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
fh = sf_context->handlers + i;
end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=fh->handlers; handler<end; handler++) {
if (handler->enabled && handler->use_io_uring) {
*use_io_uring = true;
*use_send_zc = handler->use_send_zc;
return;
}
}
}
}
#endif
void sf_context_config_to_string(const SFContext *sf_context,
char *output, const int size)
{
@ -941,10 +916,6 @@ void sf_context_config_to_string(const SFContext *sf_context,
char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2];
int i;
int len;
#if IOEVENT_USE_URING
bool use_io_uring;
bool use_send_zc;
#endif
*inner_bind_addr = '\0';
*outer_bind_addr = '\0';
@ -987,9 +958,9 @@ void sf_context_config_to_string(const SFContext *sf_context,
sf_context->accept_threads, sf_context->work_threads);
#if IOEVENT_USE_URING
get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc);
len += snprintf(output + len, size - len, ", use_io_uring=%d"
", use_send_zc=%d", use_io_uring, use_send_zc);
len += snprintf(output + len, size - len, ", use_io_uring=%d, "
"use_send_zc=%d", sf_context->use_io_uring,
sf_context->use_send_zc);
#endif
}
@ -1040,10 +1011,6 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
int max_pkg_size;
int min_buff_size;
int max_buff_size;
#if IOEVENT_USE_URING
bool use_io_uring;
bool use_send_zc;
#endif
char pkg_buff[256];
max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
@ -1073,10 +1040,9 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
g_sf_global_vars.thread_stack_size / 1024, pkg_buff);
#if IOEVENT_USE_URING
get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc);
len += snprintf(output + len, size - len,
"use_io_uring=%d, use_send_zc=%d, ",
use_io_uring, use_send_zc);
len += snprintf(output + len, size - len, "use_io_uring=%d, "
"use_send_zc=%d, ", g_sf_context.use_io_uring,
g_sf_context.use_send_zc);
#endif
len += snprintf(output + len, size - len,

View File

@ -82,18 +82,16 @@ static int sf_uring_cancel_done(int sock, const int event, void *arg)
void sf_task_detach_thread(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
bool need_cancel;
if (task->handler->use_io_uring) {
need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP);
if (SF_CTX->use_io_uring) {
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
uring_prep_cancel(task);
}
} else {
need_cancel = true;
#endif
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
#if IOEVENT_USE_URING
}
if (need_cancel) {
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
uring_prep_cancel(task);
}
#else
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
#endif
if (task->event.timer.expires > 0) {
@ -124,7 +122,7 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
void sf_socket_close_connection(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (uring_prep_close_fd(task) != 0) {
close(task->event.fd);
}
@ -139,16 +137,11 @@ void sf_socket_close_connection(struct fast_task_info *task)
void sf_task_finish_clean_up(struct fast_task_info *task)
{
if (task->finish_callback != NULL) {
task->finish_callback(task);
task->finish_callback = NULL;
}
release_iovec_buffer(task);
sf_task_detach_thread(task);
#if IOEVENT_USE_URING
if (!task->handler->use_io_uring) {
if (!SF_CTX->use_io_uring) {
#endif
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.
@ -221,7 +214,7 @@ static inline int set_read_event(struct fast_task_info *task)
int result;
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) {
logWarning("file: "__FILE__", line: %d, "
@ -272,7 +265,7 @@ static inline int set_read_event(struct fast_task_info *task)
int sf_set_read_event(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
return 0;
}
#endif
@ -291,8 +284,7 @@ static inline int sf_ioevent_add(struct fast_task_info *task)
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
SF_CTX->net_buffer_cfg.network_timeout,
task->handler->use_io_uring);
SF_CTX->net_buffer_cfg.network_timeout);
return result > 0 ? -1 * result : result;
}
@ -334,7 +326,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg)
task = (struct fast_task_info *)arg;
if (task->canceled) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
@ -345,7 +337,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg)
result = ETIMEDOUT;
} else {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
result = (task->event.res < 0 ? -1 * task->event.res :
task->event.res);
@ -390,7 +382,7 @@ int sf_socket_async_connect_server(struct fast_task_info *task)
int result;
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if ((result=uring_prep_connect(task)) != 0) {
return result;
}
@ -419,7 +411,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
sf_client_connect_done, SF_CTX->net_buffer_cfg.
connect_timeout, task->handler->use_io_uring);
connect_timeout);
return result > 0 ? -1 * result : result;
} else {
if (SF_CTX->callbacks.connect_done != NULL) {
@ -468,7 +460,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
case SF_NIO_STAGE_RECV:
task->nio_stages.current = SF_NIO_STAGE_RECV;
if ((result=set_read_event(task)) == 0) {
if (!task->handler->use_io_uring) {
if (!SF_CTX->use_io_uring) {
if (sf_client_sock_read(task->event.fd,
IOEVENT_READ, task) < 0)
{
@ -552,6 +544,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
}
}
sf_hold_task(task); //since 1.2.11
PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock);
task->notify_next = NULL;
if (task->thread_data->waiting_queue.tail == NULL) {
@ -641,6 +634,7 @@ void sf_recv_notify_read(int fd, const int event, void *arg)
__sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE);
}
sf_release_task(task); //since 1.2.11
}
}
@ -651,11 +645,11 @@ int sf_send_add_event(struct fast_task_info *task)
/* direct send */
task->nio_stages.current = SF_NIO_STAGE_SEND;
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (task->event.callback != (IOEventCallback)sf_client_sock_write) {
task->event.callback = (IOEventCallback)sf_client_sock_write;
}
if (task->handler->use_send_zc) {
if (SF_CTX->use_send_zc) {
return uring_prep_first_send_zc(task);
} else {
return uring_prep_first_send(task);
@ -695,7 +689,7 @@ static inline int check_task(struct fast_task_info *task,
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, event: %d, expect stage: %d, "
"but current stage: %d, close connection",
@ -725,7 +719,7 @@ static inline int check_task(struct fast_task_info *task,
#if IOEVENT_USE_URING
static inline int prepare_next_send(struct fast_task_info *task)
{
if (task->handler->use_send_zc) {
if (SF_CTX->use_send_zc) {
return uring_prep_next_send_zc(task);
} else {
return uring_prep_next_send(task);
@ -740,7 +734,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
int result;
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
bytes = task->event.res;
} else {
#endif
@ -758,7 +752,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
if (bytes < 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
result = -bytes;
} else {
#endif
@ -768,7 +762,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
#endif
if (result == EAGAIN || result == EWOULDBLOCK) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (prepare_next_send(task) != 0) {
return -1;
}
@ -783,7 +777,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
*action = sf_comm_action_break;
return 0;
} else if (result == EINTR && !task->handler->use_io_uring) {
} else if (result == EINTR && !SF_CTX->use_io_uring) {
/* should try again */
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
@ -810,8 +804,8 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
task->send.ptr->offset += bytes;
if (task->send.ptr->offset >= task->send.ptr->length) {
#if IOEVENT_USE_URING
if (FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify)
if (SF_CTX->use_io_uring && FC_URING_IS_SEND_ZC(task) &&
task->thread_data->ev_puller.send_zc_done_notify)
{
*action = sf_comm_action_break;
*send_done = false;
@ -859,7 +853,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify))
{
@ -890,7 +884,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
bool new_alloc;
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
bytes = task->event.res;
} else {
#endif
@ -915,7 +909,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
if (bytes < 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
result = -bytes;
} else {
#endif
@ -925,7 +919,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
#endif
if (result == EAGAIN || result == EWOULDBLOCK) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
@ -933,7 +927,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
#endif
*action = sf_comm_action_break;
return 0;
} else if (result == EINTR && !task->handler->use_io_uring) {
} else if (result == EINTR && !SF_CTX->use_io_uring) {
/* should try again */
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
@ -978,7 +972,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
if (task->recv.ptr->length == 0) { //pkg header
if (task->recv.ptr->offset < SF_CTX->header_size) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
@ -1042,7 +1036,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
*action = sf_comm_action_finish;
} else {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
@ -1201,7 +1195,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
@ -1241,7 +1235,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
}
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
@ -1307,6 +1301,7 @@ static int sock_write_done(struct fast_task_info *task,
if (SF_CTX->callbacks.send_done(task,
length, &next_stage) != 0)
{
ioevent_add_to_deleted_list(task);
return -1;
}
@ -1316,7 +1311,7 @@ static int sock_write_done(struct fast_task_info *task,
}
#if IOEVENT_USE_URING
if (!task->handler->use_io_uring || task->nio_stages.
if (!SF_CTX->use_io_uring || task->nio_stages.
current == SF_NIO_STAGE_RECV)
{
#endif
@ -1345,7 +1340,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
#if IOEVENT_USE_URING
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) &&
task->thread_data->ev_puller.send_zc_done_notify))
{
@ -1394,7 +1389,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
return result == 0 ? 0 : -1;
}
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify))
{
@ -1429,3 +1424,8 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
return total_write;
}
bool sf_client_sock_in_read_stage(struct fast_task_info *task)
{
return (task->event.callback == (IOEventCallback)sf_client_sock_read);
}

View File

@ -158,6 +158,8 @@ static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
}
}
bool sf_client_sock_in_read_stage(struct fast_task_info *task);
#ifdef __cplusplus
}
#endif

View File

@ -33,7 +33,6 @@
#include "fastcommon/sched_thread.h"
#include "fastcommon/ioevent_loop.h"
#include "fastcommon/fc_memory.h"
#include "sf_nio.h"
#include "sf_proto.h"
#include "sf_util.h"
#include "sf_service.h"
@ -190,7 +189,15 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
max_entries = (sf_context->net_buffer_cfg.max_connections +
sf_context->work_threads - 1) / sf_context->work_threads;
if (strcmp(sf_context->name, "service") == 0) {
if (strcmp(sf_context->name, "cluster") == 0 ||
strcmp(sf_context->name, "replica") == 0)
{
if (max_entries < 1024) {
max_entries += 8;
} else {
max_entries = 1024;
}
} else {
if (max_entries < 4 * 1024) {
max_entries = max_entries * 2;
} else if (max_entries < 8 * 1024) {
@ -199,23 +206,19 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
max_entries = (max_entries * 5) / 4;
} else if (max_entries < 32 * 1024) {
max_entries = (max_entries * 6) / 5;
#if IOEVENT_USE_URING
if (max_entries > 32 * 1024) {
max_entries = 32 * 1024;
}
#else
} else if (max_entries < 64 * 1024) {
max_entries = (max_entries * 11) / 10;
} else if (max_entries < 128 * 1024) {
max_entries = (max_entries * 21) / 20;
}
#if IOEVENT_USE_URING
if (sf_context->use_io_uring) {
if (max_entries > 32 * 1024) {
max_entries = 32 * 1024;
}
}
#endif
}
} else {
if (max_entries < 1024) {
max_entries += 8;
} else {
max_entries = 1024;
}
}
g_current_time = time(NULL);
@ -242,23 +245,29 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
thread_data->arg = NULL;
}
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name,
max_entries, net_timeout_ms, extra_events)) != 0)
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->
name, sf_context->use_io_uring, max_entries,
net_timeout_ms, extra_events)) != 0)
{
char prompt[256];
#if IOEVENT_USE_URING
if (result == EPERM) {
strcpy(prompt, " make sure kernel.io_uring_disabled set to 0");
} else if (result == EINVAL) {
sprintf(prompt, " maybe max_connections: %d is too large"
" or [%s]'s work_threads: %d is too small",
sf_context->net_buffer_cfg.max_connections,
sf_context->name, sf_context->work_threads);
if (sf_context->use_io_uring) {
if (result == EPERM) {
strcpy(prompt, " make sure kernel."
"io_uring_disabled set to 0");
} else if (result == EINVAL) {
sprintf(prompt, " maybe max_connections: %d is too large"
" or [%s]'s work_threads: %d is too small",
sf_context->net_buffer_cfg.max_connections,
sf_context->name, sf_context->work_threads);
} else {
*prompt = '\0';
}
} else {
#endif
*prompt = '\0';
#if IOEVENT_USE_URING
}
#else
*prompt = '\0';
#endif
logError("file: "__FILE__", line: %d, "
@ -268,7 +277,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
}
#if IOEVENT_USE_URING
if (send_done_callback != NULL) {
if (sf_context->use_io_uring && send_done_callback != NULL) {
ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true);
}
#endif

View File

@ -27,6 +27,7 @@
#include "sf_types.h"
#include "sf_proto.h"
#include "sf_global.h"
#include "sf_nio.h"
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
typedef void (*sf_sig_quit_handler)(int sig);
@ -225,7 +226,7 @@ static inline void sf_release_task(struct fast_task_info *task)
*/
#if IOEVENT_USE_URING
if (task->handler->use_io_uring) {
if (SF_CTX->use_io_uring) {
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.
connection_stat.current_count, 1);

View File

@ -119,8 +119,6 @@ struct sf_address_family_handler;
typedef struct sf_network_handler {
bool enabled;
bool explicit_post_recv;
bool use_io_uring; //since v1.2.9
bool use_send_zc; //since v1.2.9
FCCommunicationType comm_type;
struct sf_address_family_handler *fh;
struct ibv_pd *pd;
@ -182,7 +180,9 @@ typedef struct sf_context {
struct nio_thread_data *thread_data;
volatile int thread_count;
bool is_client; //since v1.2.5
bool is_client; //since v1.2.5
bool use_io_uring; //since v1.2.9
bool use_send_zc; //since v1.2.9
SFAddressFamily address_family;
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];