Compare commits
14 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
4adf6b3227 | |
|
|
f4a799402e | |
|
|
27510e9641 | |
|
|
848077797b | |
|
|
d22f9da49c | |
|
|
5495455fa7 | |
|
|
4da0ff251c | |
|
|
2444eac6ce | |
|
|
a52cc2d5d4 | |
|
|
c4af33a497 | |
|
|
fa32972052 | |
|
|
688211fbcd | |
|
|
1b2f521b99 | |
|
|
ddc528d69d |
|
|
@ -1,3 +1,21 @@
|
|||
libserverframe (1.2.11-1) unstable; urgency=medium
|
||||
|
||||
* upgrade to 1.2.11-1
|
||||
|
||||
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 10:48:22 +0000
|
||||
|
||||
libserverframe (1.2.11-1) unstable; urgency=medium
|
||||
|
||||
* upgrade to 1.2.11-1
|
||||
|
||||
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 10:00:56 +0000
|
||||
|
||||
libserverframe (1.2.11-1) unstable; urgency=medium
|
||||
|
||||
* upgrade to 1.2.11-1
|
||||
|
||||
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 09:06:43 +0000
|
||||
|
||||
libserverframe (1.2.8-1) unstable; urgency=medium
|
||||
|
||||
* upgrade to 1.2.8-1
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
libfastcommon:Version=1.0.78
|
||||
libfastcommon:Version=1.0.83
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
%define CommitVersion %(echo $COMMIT_VERSION)
|
||||
|
||||
Name: libserverframe
|
||||
Version: 1.2.9
|
||||
Version: 1.2.11
|
||||
Release: 1%{?dist}
|
||||
Summary: network framework library
|
||||
License: AGPL v3.0
|
||||
|
|
@ -12,9 +12,9 @@ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz
|
|||
|
||||
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
|
||||
|
||||
BuildRequires: libfastcommon-devel >= 1.0.81
|
||||
BuildRequires: libfastcommon-devel >= 1.0.83
|
||||
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
|
||||
Requires: libfastcommon >= 1.0.81
|
||||
Requires: libfastcommon >= 1.0.83
|
||||
|
||||
%description
|
||||
common framework library
|
||||
|
|
|
|||
|
|
@ -217,7 +217,9 @@ int idempotency_client_channel_check_reconnect(
|
|||
char formatted_ip[FORMATTED_IP_SIZE];
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) {
|
||||
struct fast_task_info *task;
|
||||
task = channel->task;
|
||||
if (SF_CTX->use_io_uring && FC_ATOMIC_GET(task->reffer_count) > 1) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -47,9 +47,10 @@ SFGlobalVariables g_sf_global_vars = {
|
|||
{0, 0}, NULL, {NULL, 0}
|
||||
};
|
||||
|
||||
SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto,
|
||||
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
|
||||
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
|
||||
SFContext g_sf_context = {{'\0'}, NULL, 0, false, false, false,
|
||||
sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock},
|
||||
{false, fc_comm_type_rdma}}},
|
||||
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
|
||||
{DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
|
||||
SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true,
|
||||
{false, 0, 0}, {sf_task_finish_clean_up}
|
||||
|
|
@ -477,8 +478,7 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler)
|
|||
}
|
||||
|
||||
static int init_network_handler(SFContext *sf_context,
|
||||
SFNetworkHandler *handler, SFAddressFamilyHandler *fh,
|
||||
const bool use_send_zc)
|
||||
SFNetworkHandler *handler, SFAddressFamilyHandler *fh)
|
||||
{
|
||||
handler->fh = fh;
|
||||
handler->inner.handler = handler;
|
||||
|
|
@ -499,18 +499,10 @@ static int init_network_handler(SFContext *sf_context,
|
|||
handler->send_data = sf_socket_send_data;
|
||||
handler->recv_data = sf_socket_recv_data;
|
||||
handler->post_recv = NULL;
|
||||
#if IOEVENT_USE_URING
|
||||
handler->use_io_uring = true;
|
||||
handler->use_send_zc = use_send_zc;
|
||||
#else
|
||||
handler->use_io_uring = false;
|
||||
handler->use_send_zc = false;
|
||||
#endif
|
||||
return 0;
|
||||
} else {
|
||||
handler->inner.id = NULL;
|
||||
handler->outer.id = NULL;
|
||||
handler->use_io_uring = false;
|
||||
return load_rdma_apis(sf_context, handler);
|
||||
}
|
||||
}
|
||||
|
|
@ -678,8 +670,10 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
int inner_port;
|
||||
int outer_port;
|
||||
int port;
|
||||
#if IOEVENT_USE_URING
|
||||
bool global_use_send_zc;
|
||||
bool use_send_zc;
|
||||
#endif
|
||||
int i;
|
||||
int result;
|
||||
|
||||
|
|
@ -716,6 +710,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
outer_port = config->default_outer_port;
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc",
|
||||
config->ini_ctx.context, true);
|
||||
if (config->ini_ctx.section_name == NULL) {
|
||||
|
|
@ -724,6 +719,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
use_send_zc = iniGetBoolValue(config->ini_ctx.section_name,
|
||||
"use_send_zc", config->ini_ctx.context, global_use_send_zc);
|
||||
}
|
||||
#endif
|
||||
|
||||
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
||||
fh = sf_context->handlers + i;
|
||||
|
|
@ -748,9 +744,7 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
if (!handler->enabled) {
|
||||
continue;
|
||||
}
|
||||
if ((result=init_network_handler(sf_context, handler,
|
||||
fh, use_send_zc)) != 0)
|
||||
{
|
||||
if ((result=init_network_handler(sf_context, handler, fh)) != 0) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -769,9 +763,15 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
|||
rdma_handler->inner.enabled = sock_handler->inner.enabled;
|
||||
rdma_handler->outer.port = sock_handler->outer.port;
|
||||
rdma_handler->outer.enabled = sock_handler->outer.enabled;
|
||||
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
sf_context->use_io_uring = (config->comm_type == fc_comm_type_sock);
|
||||
sf_context->use_send_zc = sf_context->use_io_uring ? use_send_zc : false;
|
||||
#else
|
||||
sf_context->use_io_uring = false;
|
||||
#endif
|
||||
|
||||
sf_context->accept_threads = iniGetIntValue(
|
||||
config->ini_ctx.section_name,
|
||||
"accept_threads", config->ini_ctx.context, 1);
|
||||
|
|
@ -907,31 +907,6 @@ static const char *get_address_family_caption(
|
|||
}
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
static void get_io_uring_configs(const SFContext *sf_context,
|
||||
bool *use_io_uring, bool *use_send_zc)
|
||||
{
|
||||
int i;
|
||||
const SFAddressFamilyHandler *fh;
|
||||
const SFNetworkHandler *handler;
|
||||
const SFNetworkHandler *end;
|
||||
|
||||
*use_io_uring = false;
|
||||
*use_send_zc = false;
|
||||
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
||||
fh = sf_context->handlers + i;
|
||||
end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
|
||||
for (handler=fh->handlers; handler<end; handler++) {
|
||||
if (handler->enabled && handler->use_io_uring) {
|
||||
*use_io_uring = true;
|
||||
*use_send_zc = handler->use_send_zc;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
void sf_context_config_to_string(const SFContext *sf_context,
|
||||
char *output, const int size)
|
||||
{
|
||||
|
|
@ -941,10 +916,6 @@ void sf_context_config_to_string(const SFContext *sf_context,
|
|||
char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2];
|
||||
int i;
|
||||
int len;
|
||||
#if IOEVENT_USE_URING
|
||||
bool use_io_uring;
|
||||
bool use_send_zc;
|
||||
#endif
|
||||
|
||||
*inner_bind_addr = '\0';
|
||||
*outer_bind_addr = '\0';
|
||||
|
|
@ -987,9 +958,9 @@ void sf_context_config_to_string(const SFContext *sf_context,
|
|||
sf_context->accept_threads, sf_context->work_threads);
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc);
|
||||
len += snprintf(output + len, size - len, ", use_io_uring=%d"
|
||||
", use_send_zc=%d", use_io_uring, use_send_zc);
|
||||
len += snprintf(output + len, size - len, ", use_io_uring=%d, "
|
||||
"use_send_zc=%d", sf_context->use_io_uring,
|
||||
sf_context->use_send_zc);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
@ -1040,10 +1011,6 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
|
|||
int max_pkg_size;
|
||||
int min_buff_size;
|
||||
int max_buff_size;
|
||||
#if IOEVENT_USE_URING
|
||||
bool use_io_uring;
|
||||
bool use_send_zc;
|
||||
#endif
|
||||
char pkg_buff[256];
|
||||
|
||||
max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
|
||||
|
|
@ -1073,10 +1040,9 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
|
|||
g_sf_global_vars.thread_stack_size / 1024, pkg_buff);
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc);
|
||||
len += snprintf(output + len, size - len,
|
||||
"use_io_uring=%d, use_send_zc=%d, ",
|
||||
use_io_uring, use_send_zc);
|
||||
len += snprintf(output + len, size - len, "use_io_uring=%d, "
|
||||
"use_send_zc=%d, ", g_sf_context.use_io_uring,
|
||||
g_sf_context.use_send_zc);
|
||||
#endif
|
||||
|
||||
len += snprintf(output + len, size - len,
|
||||
|
|
|
|||
96
src/sf_nio.c
96
src/sf_nio.c
|
|
@ -82,18 +82,16 @@ static int sf_uring_cancel_done(int sock, const int event, void *arg)
|
|||
void sf_task_detach_thread(struct fast_task_info *task)
|
||||
{
|
||||
#if IOEVENT_USE_URING
|
||||
bool need_cancel;
|
||||
if (task->handler->use_io_uring) {
|
||||
need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP);
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
|
||||
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
|
||||
uring_prep_cancel(task);
|
||||
}
|
||||
} else {
|
||||
need_cancel = true;
|
||||
#endif
|
||||
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
||||
#if IOEVENT_USE_URING
|
||||
}
|
||||
if (need_cancel) {
|
||||
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
|
||||
uring_prep_cancel(task);
|
||||
}
|
||||
#else
|
||||
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
||||
#endif
|
||||
|
||||
if (task->event.timer.expires > 0) {
|
||||
|
|
@ -124,7 +122,7 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
|
|||
void sf_socket_close_connection(struct fast_task_info *task)
|
||||
{
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (uring_prep_close_fd(task) != 0) {
|
||||
close(task->event.fd);
|
||||
}
|
||||
|
|
@ -139,16 +137,11 @@ void sf_socket_close_connection(struct fast_task_info *task)
|
|||
|
||||
void sf_task_finish_clean_up(struct fast_task_info *task)
|
||||
{
|
||||
if (task->finish_callback != NULL) {
|
||||
task->finish_callback(task);
|
||||
task->finish_callback = NULL;
|
||||
}
|
||||
|
||||
release_iovec_buffer(task);
|
||||
sf_task_detach_thread(task);
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (!task->handler->use_io_uring) {
|
||||
if (!SF_CTX->use_io_uring) {
|
||||
#endif
|
||||
task->handler->close_connection(task);
|
||||
__sync_fetch_and_sub(&g_sf_global_vars.
|
||||
|
|
@ -221,7 +214,7 @@ static inline int set_read_event(struct fast_task_info *task)
|
|||
int result;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
|
||||
if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
|
|
@ -272,7 +265,7 @@ static inline int set_read_event(struct fast_task_info *task)
|
|||
int sf_set_read_event(struct fast_task_info *task)
|
||||
{
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
@ -291,8 +284,7 @@ static inline int sf_ioevent_add(struct fast_task_info *task)
|
|||
|
||||
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
|
||||
SF_CTX->net_buffer_cfg.network_timeout,
|
||||
task->handler->use_io_uring);
|
||||
SF_CTX->net_buffer_cfg.network_timeout);
|
||||
return result > 0 ? -1 * result : result;
|
||||
}
|
||||
|
||||
|
|
@ -334,7 +326,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg)
|
|||
task = (struct fast_task_info *)arg;
|
||||
if (task->canceled) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
#endif
|
||||
|
|
@ -345,7 +337,7 @@ static int sf_client_connect_done(int sock, const int event, void *arg)
|
|||
result = ETIMEDOUT;
|
||||
} else {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
result = (task->event.res < 0 ? -1 * task->event.res :
|
||||
task->event.res);
|
||||
|
|
@ -390,7 +382,7 @@ int sf_socket_async_connect_server(struct fast_task_info *task)
|
|||
int result;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if ((result=uring_prep_connect(task)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
|
@ -419,7 +411,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
|
|||
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
|
||||
sf_client_connect_done, SF_CTX->net_buffer_cfg.
|
||||
connect_timeout, task->handler->use_io_uring);
|
||||
connect_timeout);
|
||||
return result > 0 ? -1 * result : result;
|
||||
} else {
|
||||
if (SF_CTX->callbacks.connect_done != NULL) {
|
||||
|
|
@ -468,7 +460,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
|
|||
case SF_NIO_STAGE_RECV:
|
||||
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
||||
if ((result=set_read_event(task)) == 0) {
|
||||
if (!task->handler->use_io_uring) {
|
||||
if (!SF_CTX->use_io_uring) {
|
||||
if (sf_client_sock_read(task->event.fd,
|
||||
IOEVENT_READ, task) < 0)
|
||||
{
|
||||
|
|
@ -552,6 +544,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
|
|||
}
|
||||
}
|
||||
|
||||
sf_hold_task(task); //since 1.2.11
|
||||
PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock);
|
||||
task->notify_next = NULL;
|
||||
if (task->thread_data->waiting_queue.tail == NULL) {
|
||||
|
|
@ -641,6 +634,7 @@ void sf_recv_notify_read(int fd, const int event, void *arg)
|
|||
__sync_bool_compare_and_swap(&task->nio_stages.notify,
|
||||
stage, SF_NIO_STAGE_NONE);
|
||||
}
|
||||
sf_release_task(task); //since 1.2.11
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -651,11 +645,11 @@ int sf_send_add_event(struct fast_task_info *task)
|
|||
/* direct send */
|
||||
task->nio_stages.current = SF_NIO_STAGE_SEND;
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (task->event.callback != (IOEventCallback)sf_client_sock_write) {
|
||||
task->event.callback = (IOEventCallback)sf_client_sock_write;
|
||||
}
|
||||
if (task->handler->use_send_zc) {
|
||||
if (SF_CTX->use_send_zc) {
|
||||
return uring_prep_first_send_zc(task);
|
||||
} else {
|
||||
return uring_prep_first_send(task);
|
||||
|
|
@ -695,7 +689,7 @@ static inline int check_task(struct fast_task_info *task,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, event: %d, expect stage: %d, "
|
||||
"but current stage: %d, close connection",
|
||||
|
|
@ -725,7 +719,7 @@ static inline int check_task(struct fast_task_info *task,
|
|||
#if IOEVENT_USE_URING
|
||||
static inline int prepare_next_send(struct fast_task_info *task)
|
||||
{
|
||||
if (task->handler->use_send_zc) {
|
||||
if (SF_CTX->use_send_zc) {
|
||||
return uring_prep_next_send_zc(task);
|
||||
} else {
|
||||
return uring_prep_next_send(task);
|
||||
|
|
@ -740,7 +734,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
int result;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
bytes = task->event.res;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -758,7 +752,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
|
||||
if (bytes < 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
result = -bytes;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -768,7 +762,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
#endif
|
||||
if (result == EAGAIN || result == EWOULDBLOCK) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (prepare_next_send(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -783,7 +777,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
|
||||
*action = sf_comm_action_break;
|
||||
return 0;
|
||||
} else if (result == EINTR && !task->handler->use_io_uring) {
|
||||
} else if (result == EINTR && !SF_CTX->use_io_uring) {
|
||||
/* should try again */
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, ignore interupt signal",
|
||||
|
|
@ -810,8 +804,8 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
task->send.ptr->offset += bytes;
|
||||
if (task->send.ptr->offset >= task->send.ptr->length) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||
ev_puller.send_zc_done_notify)
|
||||
if (SF_CTX->use_io_uring && FC_URING_IS_SEND_ZC(task) &&
|
||||
task->thread_data->ev_puller.send_zc_done_notify)
|
||||
{
|
||||
*action = sf_comm_action_break;
|
||||
*send_done = false;
|
||||
|
|
@ -859,7 +853,7 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||
ev_puller.send_zc_done_notify))
|
||||
{
|
||||
|
|
@ -890,7 +884,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
bool new_alloc;
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
bytes = task->event.res;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -915,7 +909,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
|
||||
if (bytes < 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
result = -bytes;
|
||||
} else {
|
||||
#endif
|
||||
|
|
@ -925,7 +919,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
#endif
|
||||
if (result == EAGAIN || result == EWOULDBLOCK) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (prepare_next_recv(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -933,7 +927,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
#endif
|
||||
*action = sf_comm_action_break;
|
||||
return 0;
|
||||
} else if (result == EINTR && !task->handler->use_io_uring) {
|
||||
} else if (result == EINTR && !SF_CTX->use_io_uring) {
|
||||
/* should try again */
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, ignore interupt signal",
|
||||
|
|
@ -978,7 +972,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
if (task->recv.ptr->length == 0) { //pkg header
|
||||
if (task->recv.ptr->offset < SF_CTX->header_size) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (prepare_next_recv(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -1042,7 +1036,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
|||
*action = sf_comm_action_finish;
|
||||
} else {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (prepare_next_recv(task) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -1201,7 +1195,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
|
|||
task = (struct fast_task_info *)arg;
|
||||
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
#endif
|
||||
|
|
@ -1241,7 +1235,7 @@ static int sf_client_sock_read(int sock, const int event, void *arg)
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||
}
|
||||
#endif
|
||||
|
|
@ -1307,6 +1301,7 @@ static int sock_write_done(struct fast_task_info *task,
|
|||
if (SF_CTX->callbacks.send_done(task,
|
||||
length, &next_stage) != 0)
|
||||
{
|
||||
ioevent_add_to_deleted_list(task);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
@ -1316,7 +1311,7 @@ static int sock_write_done(struct fast_task_info *task,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (!task->handler->use_io_uring || task->nio_stages.
|
||||
if (!SF_CTX->use_io_uring || task->nio_stages.
|
||||
current == SF_NIO_STAGE_RECV)
|
||||
{
|
||||
#endif
|
||||
|
|
@ -1345,7 +1340,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
|
|||
task = (struct fast_task_info *)arg;
|
||||
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||
if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) &&
|
||||
task->thread_data->ev_puller.send_zc_done_notify))
|
||||
{
|
||||
|
|
@ -1394,7 +1389,7 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
|
|||
return result == 0 ? 0 : -1;
|
||||
}
|
||||
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||
ev_puller.send_zc_done_notify))
|
||||
{
|
||||
|
|
@ -1429,3 +1424,8 @@ static int sf_client_sock_write(int sock, const int event, void *arg)
|
|||
|
||||
return total_write;
|
||||
}
|
||||
|
||||
bool sf_client_sock_in_read_stage(struct fast_task_info *task)
|
||||
{
|
||||
return (task->event.callback == (IOEventCallback)sf_client_sock_read);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -158,6 +158,8 @@ static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
|
|||
}
|
||||
}
|
||||
|
||||
bool sf_client_sock_in_read_stage(struct fast_task_info *task);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@
|
|||
#include "fastcommon/sched_thread.h"
|
||||
#include "fastcommon/ioevent_loop.h"
|
||||
#include "fastcommon/fc_memory.h"
|
||||
#include "sf_nio.h"
|
||||
#include "sf_proto.h"
|
||||
#include "sf_util.h"
|
||||
#include "sf_service.h"
|
||||
|
|
@ -190,7 +189,15 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
|
||||
max_entries = (sf_context->net_buffer_cfg.max_connections +
|
||||
sf_context->work_threads - 1) / sf_context->work_threads;
|
||||
if (strcmp(sf_context->name, "service") == 0) {
|
||||
if (strcmp(sf_context->name, "cluster") == 0 ||
|
||||
strcmp(sf_context->name, "replica") == 0)
|
||||
{
|
||||
if (max_entries < 1024) {
|
||||
max_entries += 8;
|
||||
} else {
|
||||
max_entries = 1024;
|
||||
}
|
||||
} else {
|
||||
if (max_entries < 4 * 1024) {
|
||||
max_entries = max_entries * 2;
|
||||
} else if (max_entries < 8 * 1024) {
|
||||
|
|
@ -199,23 +206,19 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
max_entries = (max_entries * 5) / 4;
|
||||
} else if (max_entries < 32 * 1024) {
|
||||
max_entries = (max_entries * 6) / 5;
|
||||
#if IOEVENT_USE_URING
|
||||
if (max_entries > 32 * 1024) {
|
||||
max_entries = 32 * 1024;
|
||||
}
|
||||
#else
|
||||
} else if (max_entries < 64 * 1024) {
|
||||
max_entries = (max_entries * 11) / 10;
|
||||
} else if (max_entries < 128 * 1024) {
|
||||
max_entries = (max_entries * 21) / 20;
|
||||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (sf_context->use_io_uring) {
|
||||
if (max_entries > 32 * 1024) {
|
||||
max_entries = 32 * 1024;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
} else {
|
||||
if (max_entries < 1024) {
|
||||
max_entries += 8;
|
||||
} else {
|
||||
max_entries = 1024;
|
||||
}
|
||||
}
|
||||
|
||||
g_current_time = time(NULL);
|
||||
|
|
@ -242,23 +245,29 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
thread_data->arg = NULL;
|
||||
}
|
||||
|
||||
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name,
|
||||
max_entries, net_timeout_ms, extra_events)) != 0)
|
||||
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->
|
||||
name, sf_context->use_io_uring, max_entries,
|
||||
net_timeout_ms, extra_events)) != 0)
|
||||
{
|
||||
char prompt[256];
|
||||
#if IOEVENT_USE_URING
|
||||
if (result == EPERM) {
|
||||
strcpy(prompt, " make sure kernel.io_uring_disabled set to 0");
|
||||
} else if (result == EINVAL) {
|
||||
sprintf(prompt, " maybe max_connections: %d is too large"
|
||||
" or [%s]'s work_threads: %d is too small",
|
||||
sf_context->net_buffer_cfg.max_connections,
|
||||
sf_context->name, sf_context->work_threads);
|
||||
if (sf_context->use_io_uring) {
|
||||
if (result == EPERM) {
|
||||
strcpy(prompt, " make sure kernel."
|
||||
"io_uring_disabled set to 0");
|
||||
} else if (result == EINVAL) {
|
||||
sprintf(prompt, " maybe max_connections: %d is too large"
|
||||
" or [%s]'s work_threads: %d is too small",
|
||||
sf_context->net_buffer_cfg.max_connections,
|
||||
sf_context->name, sf_context->work_threads);
|
||||
} else {
|
||||
*prompt = '\0';
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
*prompt = '\0';
|
||||
#if IOEVENT_USE_URING
|
||||
}
|
||||
#else
|
||||
*prompt = '\0';
|
||||
#endif
|
||||
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
|
|
@ -268,7 +277,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
}
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (send_done_callback != NULL) {
|
||||
if (sf_context->use_io_uring && send_done_callback != NULL) {
|
||||
ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true);
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@
|
|||
#include "sf_types.h"
|
||||
#include "sf_proto.h"
|
||||
#include "sf_global.h"
|
||||
#include "sf_nio.h"
|
||||
|
||||
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
|
||||
typedef void (*sf_sig_quit_handler)(int sig);
|
||||
|
|
@ -225,7 +226,7 @@ static inline void sf_release_task(struct fast_task_info *task)
|
|||
*/
|
||||
|
||||
#if IOEVENT_USE_URING
|
||||
if (task->handler->use_io_uring) {
|
||||
if (SF_CTX->use_io_uring) {
|
||||
task->handler->close_connection(task);
|
||||
__sync_fetch_and_sub(&g_sf_global_vars.
|
||||
connection_stat.current_count, 1);
|
||||
|
|
|
|||
|
|
@ -119,8 +119,6 @@ struct sf_address_family_handler;
|
|||
typedef struct sf_network_handler {
|
||||
bool enabled;
|
||||
bool explicit_post_recv;
|
||||
bool use_io_uring; //since v1.2.9
|
||||
bool use_send_zc; //since v1.2.9
|
||||
FCCommunicationType comm_type;
|
||||
struct sf_address_family_handler *fh;
|
||||
struct ibv_pd *pd;
|
||||
|
|
@ -182,7 +180,9 @@ typedef struct sf_context {
|
|||
struct nio_thread_data *thread_data;
|
||||
volatile int thread_count;
|
||||
|
||||
bool is_client; //since v1.2.5
|
||||
bool is_client; //since v1.2.5
|
||||
bool use_io_uring; //since v1.2.9
|
||||
bool use_send_zc; //since v1.2.9
|
||||
SFAddressFamily address_family;
|
||||
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue