Compare commits

...

32 Commits

Author SHA1 Message Date
vazmin 4adf6b3227 gh actions: upgrade to 1.2.11-1 2025-11-23 10:48:22 +00:00
vazmin f4a799402e gh actions: upgrade to 1.2.11-1 2025-11-23 10:00:56 +00:00
vazmin 27510e9641 gh actions: upgrade to 1.2.11-1 2025-11-23 09:06:43 +00:00
YuQing 848077797b upgrade version to 1.2.11 2025-11-16 17:01:06 +08:00
YuQing d22f9da49c bugfixed: MUST call sf_hold_task in sf_nio_notify for rare case 2025-11-16 15:29:38 +08:00
YuQing 5495455fa7 do NOT call task->finish_callback 2025-11-15 11:14:14 +08:00
YuQing 4da0ff251c upgrade version to 1.2.10 2025-11-11 09:57:18 +08:00
YuQing 2444eac6ce declare use_send_zc field anyway 2025-11-11 09:52:42 +08:00
YuQing a52cc2d5d4 check sf_context->use_io_uring more 2025-11-05 09:58:37 +08:00
YuQing c4af33a497 declare use_io_uring correctly 2025-11-04 15:55:33 +08:00
YuQing fa32972052 move use_io_uring and use_send_zc to struct sf_context 2025-11-04 15:40:00 +08:00
YuQing 688211fbcd correct compile error 2025-11-03 15:22:45 +08:00
YuQing 1b2f521b99 uring cancel callback release task correctly 2025-11-03 14:56:29 +08:00
YuQing ddc528d69d restore function sf_client_sock_in_read_stage 2025-11-02 15:02:54 +08:00
YuQing 32d443b497 MUST call set_read_event anyway after socket send done 2025-10-27 12:28:24 +08:00
YuQing 53dd39500f upgrade version to 1.2.9 2025-10-26 12:27:49 +08:00
YuQing 772a9a6895 Merge remote-tracking branch 'origin/use_iouring' 2025-10-26 12:26:53 +08:00
YuQing 932751d392 send zc done notify callback for recycling buffer 2025-10-20 10:34:47 +08:00
YuQing 817ff547da set alloc_conn_once and max_entries gracefully 2025-10-12 12:28:11 +08:00
YuQing 926cd40114 ioevent_init: set max entries for io_uring gracefully 2025-10-12 10:24:35 +08:00
YuQing b688973cf9 change use_send_zc's default value to true 2025-10-09 14:01:32 +08:00
YuQing b16526e8f7 bug fixed: check_task process correctly 2025-10-06 20:55:29 +08:00
YuQing 68079fc468 IOEventCallback: change event type from short to int 2025-10-05 16:53:21 +08:00
YuQing 3dcc1c570d call sf_proto_init_task_magic when task->shrinked 2025-10-03 21:06:58 +08:00
YuQing cf0950ea62 sf_set_read_event just skipped when use_io_uring is true 2025-10-03 11:33:26 +08:00
YuQing 263171c4fe async_connect use io_uring 2025-09-30 11:26:11 +08:00
YuQing a2ab8a0c01 adapt Linux io_uring OK 2025-09-27 15:41:56 +08:00
YuQing 0f75b039f6 sf_logger_set_schedule_entry change to sf_logger_set_schedule_entries 2025-09-26 19:57:03 +08:00
YuQing ecee21f289 socket send and recv adapt to io_uring 2025-09-25 15:54:38 +08:00
YuQing bc5af8a58b struct sf_network_handler add field use_iouring 2025-09-24 15:59:27 +08:00
YuQing f0ee6ce73f struct sf_context remove field: remove_from_ready_list 2025-09-21 15:08:08 +08:00
vazmin aef9d803d1 gh actions: upgrade to 1.2.8-1 2025-08-16 16:32:03 +00:00
16 changed files with 805 additions and 249 deletions

24
debian/changelog vendored
View File

@ -1,3 +1,27 @@
libserverframe (1.2.11-1) unstable; urgency=medium
* upgrade to 1.2.11-1
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 10:48:22 +0000
libserverframe (1.2.11-1) unstable; urgency=medium
* upgrade to 1.2.11-1
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 10:00:56 +0000
libserverframe (1.2.11-1) unstable; urgency=medium
* upgrade to 1.2.11-1
-- YuQing <384681@qq.com> Sun, 23 Nov 2025 09:06:43 +0000
libserverframe (1.2.8-1) unstable; urgency=medium
* upgrade to 1.2.8-1
-- YuQing <384681@qq.com> Sat, 16 Aug 2025 16:32:03 +0000
libserverframe (1.2.7-1) unstable; urgency=medium libserverframe (1.2.7-1) unstable; urgency=medium
* upgrade to 1.2.7-1 * upgrade to 1.2.7-1

2
debian/substvars vendored
View File

@ -1 +1 @@
libfastcommon:Version=1.0.77 libfastcommon:Version=1.0.83

View File

@ -2,7 +2,7 @@
%define CommitVersion %(echo $COMMIT_VERSION) %define CommitVersion %(echo $COMMIT_VERSION)
Name: libserverframe Name: libserverframe
Version: 1.2.8 Version: 1.2.11
Release: 1%{?dist} Release: 1%{?dist}
Summary: network framework library Summary: network framework library
License: AGPL v3.0 License: AGPL v3.0
@ -12,9 +12,9 @@ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
BuildRequires: libfastcommon-devel >= 1.0.78 BuildRequires: libfastcommon-devel >= 1.0.83
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
Requires: libfastcommon >= 1.0.78 Requires: libfastcommon >= 1.0.83
%description %description
common framework library common framework library

View File

@ -9,10 +9,13 @@ DEBUG_FLAG=0
if [ -f /usr/include/fastcommon/_os_define.h ]; then if [ -f /usr/include/fastcommon/_os_define.h ]; then
OS_BITS=$(grep -F OS_BITS /usr/include/fastcommon/_os_define.h | awk '{print $NF;}') OS_BITS=$(grep -F OS_BITS /usr/include/fastcommon/_os_define.h | awk '{print $NF;}')
USE_URING=$(grep -F IOEVENT_USE_URING /usr/include/fastcommon/_os_define.h | awk '{print $NF;}')
elif [ -f /usr/local/include/fastcommon/_os_define.h ]; then elif [ -f /usr/local/include/fastcommon/_os_define.h ]; then
OS_BITS=$(grep -F OS_BITS /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}') OS_BITS=$(grep -F OS_BITS /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}')
USE_URING=$(grep -F IOEVENT_USE_URING /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}')
else else
OS_BITS=64 OS_BITS=64
USE_URING=''
fi fi
uname=$(uname) uname=$(uname)
@ -49,6 +52,9 @@ LIBS=''
uname=$(uname) uname=$(uname)
if [ "$uname" = "Linux" ]; then if [ "$uname" = "Linux" ]; then
CFLAGS="$CFLAGS" CFLAGS="$CFLAGS"
if [ -n "$USE_URING" ]; then
LIBS="$LIBS -luring"
fi
elif [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then elif [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then
CFLAGS="$CFLAGS" CFLAGS="$CFLAGS"
if [ "$uname" = "Darwin" ]; then if [ "$uname" = "Darwin" ]; then

View File

@ -174,7 +174,6 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
*channel, const uint32_t hash_code, const FCCommunicationType comm_type, *channel, const uint32_t hash_code, const FCCommunicationType comm_type,
const char *server_ip, const uint16_t port, int *err_no) const char *server_ip, const uint16_t port, int *err_no)
{ {
int len;
struct fast_task_info *task; struct fast_task_info *task;
SFAddressFamilyHandler *fh; SFAddressFamilyHandler *fh;
SFNetworkHandler *handler; SFNetworkHandler *handler;
@ -195,12 +194,7 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
return NULL; return NULL;
} }
len = strlen(server_ip); fc_safe_strcpy(task->server_ip, server_ip);
if (len >= sizeof(task->server_ip)) {
len = sizeof(task->server_ip) - 1;
}
memcpy(task->server_ip, server_ip, len);
*(task->server_ip + len) = '\0';
task->port = port; task->port = port;
task->arg = channel; task->arg = channel;
task->thread_data = g_sf_context.thread_data + task->thread_data = g_sf_context.thread_data +
@ -209,7 +203,8 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
channel->last_connect_time = g_current_time; channel->last_connect_time = g_current_time;
if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) {
channel->in_ioevent = 0; //rollback channel->in_ioevent = 0; //rollback
sf_release_task(task); __sync_sub_and_fetch(&task->reffer_count, 1);
free_queue_push(task);
return NULL; return NULL;
} }
return task; return task;
@ -221,6 +216,14 @@ int idempotency_client_channel_check_reconnect(
int result; int result;
char formatted_ip[FORMATTED_IP_SIZE]; char formatted_ip[FORMATTED_IP_SIZE];
#if IOEVENT_USE_URING
struct fast_task_info *task;
task = channel->task;
if (SF_CTX->use_io_uring && FC_ATOMIC_GET(task->reffer_count) > 1) {
return 0;
}
#endif
if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) {
return 0; return 0;
} }
@ -237,6 +240,9 @@ int idempotency_client_channel_check_reconnect(
formatted_ip, channel->task->port); formatted_ip, channel->task->port);
} }
if (channel->task->event.fd >= 0) {
channel->task->handler->close_connection(channel->task);
}
__sync_bool_compare_and_swap(&channel->task->canceled, 1, 0); __sync_bool_compare_and_swap(&channel->task->canceled, 1, 0);
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {
channel->last_connect_time = g_current_time; channel->last_connect_time = g_current_time;
@ -348,8 +354,8 @@ int idempotency_client_channel_push(struct idempotency_client_channel *channel,
receipt->req_id = req_id; receipt->req_id = req_id;
fc_queue_push_ex(&channel->queue, receipt, &notify); fc_queue_push_ex(&channel->queue, receipt, &notify);
if (notify) { if (notify) {
if (__sync_add_and_fetch(&channel->in_ioevent, 0)) { if (FC_ATOMIC_GET(channel->in_ioevent)) {
if (__sync_add_and_fetch(&channel->established, 0)) { if (FC_ATOMIC_GET(channel->established)) {
sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE); sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE);
} }
} else { } else {

View File

@ -76,13 +76,13 @@ static inline void idempotency_client_channel_set_id_key(
static inline int idempotency_client_channel_check_wait_ex( static inline int idempotency_client_channel_check_wait_ex(
struct idempotency_client_channel *channel, const int timeout) struct idempotency_client_channel *channel, const int timeout)
{ {
if (__sync_add_and_fetch(&channel->established, 0)) { if (FC_ATOMIC_GET(channel->established)) {
return 0; return 0;
} }
idempotency_client_channel_check_reconnect(channel); idempotency_client_channel_check_reconnect(channel);
lcp_timedwait_sec(&channel->lcp, timeout); lcp_timedwait_sec(&channel->lcp, timeout);
if (__sync_add_and_fetch(&channel->established, 0)) { if (FC_ATOMIC_GET(channel->established)) {
return 0; return 0;
} else { } else {
/* /*

View File

@ -49,6 +49,10 @@ static IdempotencyReceiptGlobalVars receipt_global_vars;
static int receipt_init_task(struct fast_task_info *task, void *arg) static int receipt_init_task(struct fast_task_info *task, void *arg)
{ {
#if IOEVENT_USE_URING
FC_URING_IS_CLIENT(task) = true;
#endif
if (RDMA_INIT_CONNECTION != NULL) { if (RDMA_INIT_CONNECTION != NULL) {
return RDMA_INIT_CONNECTION(task, arg); return RDMA_INIT_CONNECTION(task, arg);
} else { } else {
@ -92,7 +96,6 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
if (task->event.fd >= 0) { if (task->event.fd >= 0) {
sf_task_detach_thread(task); sf_task_detach_thread(task);
task->handler->close_connection(task);
} }
sf_nio_reset_task_length(task); sf_nio_reset_task_length(task);
@ -282,7 +285,7 @@ static int deal_setup_channel_response(struct fast_task_info *task)
} }
channel = (IdempotencyClientChannel *)task->arg; channel = (IdempotencyClientChannel *)task->arg;
if (__sync_add_and_fetch(&channel->established, 0)) { if (FC_ATOMIC_GET(channel->established)) {
format_ip_address(task->server_ip, formatted_ip); format_ip_address(task->server_ip, formatted_ip);
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"response from server %s:%u, unexpected cmd: " "response from server %s:%u, unexpected cmd: "
@ -520,6 +523,7 @@ static int do_init(FCAddressPtrArray *address_array)
{ {
const int task_arg_size = 0; const int task_arg_size = 0;
const bool double_buffers = false; const bool double_buffers = false;
const bool need_shrink_task_buffer = false;
const bool explicit_post_recv = false; const bool explicit_post_recv = false;
int result; int result;
int bytes; int bytes;
@ -552,8 +556,8 @@ static int do_init(FCAddressPtrArray *address_array)
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback, receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
task_arg_size, double_buffers, explicit_post_recv, task_arg_size, double_buffers, need_shrink_task_buffer,
receipt_init_task, pd, NULL); explicit_post_recv, receipt_init_task, pd, NULL);
} }
int receipt_handler_init(FCAddressPtrArray *address_array) int receipt_handler_init(FCAddressPtrArray *address_array)
@ -565,7 +569,6 @@ int receipt_handler_init(FCAddressPtrArray *address_array)
} }
sf_enable_thread_notify(true); sf_enable_thread_notify(true);
sf_set_remove_from_ready_list(false);
fc_sleep_ms(100); fc_sleep_ms(100);
return 0; return 0;

View File

@ -96,6 +96,8 @@
#define SF_BINLOG_SOURCE_USER 'U' //by user call #define SF_BINLOG_SOURCE_USER 'U' //by user call
#define SF_BINLOG_SOURCE_REPLAY 'R' //by binlog replay #define SF_BINLOG_SOURCE_REPLAY 'R' //by binlog replay
#define SF_LOG_SCHEDULE_ENTRIES_COUNT 3
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif

View File

@ -47,11 +47,12 @@ SFGlobalVariables g_sf_global_vars = {
{0, 0}, NULL, {NULL, 0} {0, 0}, NULL, {NULL, 0}
}; };
SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto, SFContext g_sf_context = {{'\0'}, NULL, 0, false, false, false,
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}, sf_address_family_auto, {{AF_UNSPEC, {{true, fc_comm_type_sock},
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, {false, fc_comm_type_rdma}}},
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
{DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, {DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, true, SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true,
{false, 0, 0}, {sf_task_finish_clean_up} {false, 0, 0}, {sf_task_finish_clean_up}
}; };
@ -669,6 +670,10 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
int inner_port; int inner_port;
int outer_port; int outer_port;
int port; int port;
#if IOEVENT_USE_URING
bool global_use_send_zc;
bool use_send_zc;
#endif
int i; int i;
int result; int result;
@ -705,6 +710,17 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
outer_port = config->default_outer_port; outer_port = config->default_outer_port;
} }
#if IOEVENT_USE_URING
global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc",
config->ini_ctx.context, true);
if (config->ini_ctx.section_name == NULL) {
use_send_zc = global_use_send_zc;
} else {
use_send_zc = iniGetBoolValue(config->ini_ctx.section_name,
"use_send_zc", config->ini_ctx.context, global_use_send_zc);
}
#endif
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) { for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
fh = sf_context->handlers + i; fh = sf_context->handlers + i;
fh->ctx = sf_context; fh->ctx = sf_context;
@ -747,9 +763,15 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
rdma_handler->inner.enabled = sock_handler->inner.enabled; rdma_handler->inner.enabled = sock_handler->inner.enabled;
rdma_handler->outer.port = sock_handler->outer.port; rdma_handler->outer.port = sock_handler->outer.port;
rdma_handler->outer.enabled = sock_handler->outer.enabled; rdma_handler->outer.enabled = sock_handler->outer.enabled;
} }
#if IOEVENT_USE_URING
sf_context->use_io_uring = (config->comm_type == fc_comm_type_sock);
sf_context->use_send_zc = sf_context->use_io_uring ? use_send_zc : false;
#else
sf_context->use_io_uring = false;
#endif
sf_context->accept_threads = iniGetIntValue( sf_context->accept_threads = iniGetIntValue(
config->ini_ctx.section_name, config->ini_ctx.section_name,
"accept_threads", config->ini_ctx.context, 1); "accept_threads", config->ini_ctx.context, 1);
@ -934,6 +956,12 @@ void sf_context_config_to_string(const SFContext *sf_context,
", address_family=%s, accept_threads=%d, work_threads=%d", ", address_family=%s, accept_threads=%d, work_threads=%d",
get_address_family_caption(sf_context->address_family), get_address_family_caption(sf_context->address_family),
sf_context->accept_threads, sf_context->work_threads); sf_context->accept_threads, sf_context->work_threads);
#if IOEVENT_USE_URING
len += snprintf(output + len, size - len, ", use_io_uring=%d, "
"use_send_zc=%d", sf_context->use_io_uring,
sf_context->use_send_zc);
#endif
} }
void sf_log_config_to_string_ex(SFLogConfig *log_cfg, const char *caption, void sf_log_config_to_string_ex(SFLogConfig *log_cfg, const char *caption,
@ -1004,18 +1032,27 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
len = snprintf(output, size, len = snprintf(output, size,
"base_path=%s, max_connections=%d, connect_timeout=%d, " "base_path=%s, max_connections=%d, connect_timeout=%d, "
"network_timeout=%d, thread_stack_size=%d KB, " "network_timeout=%d, thread_stack_size=%d KB, %s, ",
"%s, tcp_quick_ack=%d, log_level=%s, " SF_G_BASE_PATH_STR,
"run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR,
g_sf_global_vars.net_buffer_cfg.max_connections, g_sf_global_vars.net_buffer_cfg.max_connections,
g_sf_global_vars.net_buffer_cfg.connect_timeout, g_sf_global_vars.net_buffer_cfg.connect_timeout,
g_sf_global_vars.net_buffer_cfg.network_timeout, g_sf_global_vars.net_buffer_cfg.network_timeout,
g_sf_global_vars.thread_stack_size / 1024, g_sf_global_vars.thread_stack_size / 1024, pkg_buff);
pkg_buff, g_sf_global_vars.tcp_quick_ack,
#if IOEVENT_USE_URING
len += snprintf(output + len, size - len, "use_io_uring=%d, "
"use_send_zc=%d, ", g_sf_context.use_io_uring,
g_sf_context.use_send_zc);
#endif
len += snprintf(output + len, size - len,
"tcp_quick_ack=%d, "
"log_level=%s, "
"run_by_group=%s, run_by_user=%s, ",
g_sf_global_vars.tcp_quick_ack,
log_get_level_caption(), log_get_level_caption(),
g_sf_global_vars.run_by.group, g_sf_global_vars.run_by.group,
g_sf_global_vars.run_by.user g_sf_global_vars.run_by.user);
);
sf_log_config_to_string(&g_sf_global_vars.error_log, sf_log_config_to_string(&g_sf_global_vars.error_log,
"error-log", output + len, size - len); "error-log", output + len, size - len);

View File

@ -41,6 +41,9 @@
#include "sf_service.h" #include "sf_service.h"
#include "sf_nio.h" #include "sf_nio.h"
static int sf_client_sock_write(int sock, const int event, void *arg);
static int sf_client_sock_read(int sock, const int event, void *arg);
void sf_set_parameters_ex(SFContext *sf_context, const int header_size, void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
@ -59,19 +62,43 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_context->callbacks.release_buffer = release_buffer_callback; sf_context->callbacks.release_buffer = release_buffer_callback;
} }
#if IOEVENT_USE_URING
#define CLEAR_OP_TYPE_AND_RELEASE_TASK(task) \
FC_URING_OP_TYPE(task) = IORING_OP_NOP; \
sf_release_task(task)
static int sf_uring_cancel_done(int sock, const int event, void *arg)
{
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
if (event != IOEVENT_TIMEOUT) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
return 0;
}
#endif
void sf_task_detach_thread(struct fast_task_info *task) void sf_task_detach_thread(struct fast_task_info *task)
{ {
ioevent_detach(&task->thread_data->ev_puller, task->event.fd); #if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
uring_prep_cancel(task);
}
} else {
#endif
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
#if IOEVENT_USE_URING
}
#endif
if (task->event.timer.expires > 0) { if (task->event.timer.expires > 0) {
fast_timer_remove(&task->thread_data->timer, fast_timer_remove(&task->thread_data->timer,
&task->event.timer); &task->event.timer);
task->event.timer.expires = 0; task->event.timer.expires = 0;
} }
if (SF_CTX->remove_from_ready_list) {
ioevent_remove(&task->thread_data->ev_puller, task);
}
} }
void sf_task_switch_thread(struct fast_task_info *task, void sf_task_switch_thread(struct fast_task_info *task,
@ -92,28 +119,38 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
} }
} }
void sf_socket_close_connection(struct fast_task_info *task)
{
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (uring_prep_close_fd(task) != 0) {
close(task->event.fd);
}
} else {
#endif
close(task->event.fd);
#if IOEVENT_USE_URING
}
#endif
task->event.fd = -1;
}
void sf_task_finish_clean_up(struct fast_task_info *task) void sf_task_finish_clean_up(struct fast_task_info *task)
{ {
/*
assert(task->event.fd >= 0);
if (task->event.fd < 0) {
logWarning("file: "__FILE__", line: %d, "
"task: %p already cleaned",
__LINE__, task);
return;
}
*/
if (task->finish_callback != NULL) {
task->finish_callback(task);
task->finish_callback = NULL;
}
release_iovec_buffer(task); release_iovec_buffer(task);
sf_task_detach_thread(task); sf_task_detach_thread(task);
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); #if IOEVENT_USE_URING
if (!SF_CTX->use_io_uring) {
#endif
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.
connection_stat.current_count, 1);
#if IOEVENT_USE_URING
}
#endif
sf_release_task(task); sf_release_task(task);
} }
@ -139,46 +176,115 @@ static inline int set_write_event(struct fast_task_info *task)
return 0; return 0;
} }
#if IOEVENT_USE_URING
static inline int prepare_first_recv(struct fast_task_info *task)
{
if (SF_CTX->callbacks.alloc_recv_buffer != NULL) {
return uring_prep_recv_data(task, task->recv.ptr->data,
SF_CTX->header_size);
} else {
return uring_prep_first_recv(task);
}
}
static inline int prepare_next_recv(struct fast_task_info *task)
{
int recv_bytes;
if (task->recv.ptr->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
return uring_prep_recv_data(task, task->recv.ptr->data +
task->recv.ptr->offset, recv_bytes);
} else {
recv_bytes = task->recv.ptr->length - task->recv.ptr->offset;
if (task->recv_body == NULL) {
return uring_prep_recv_data(task, task->recv.ptr->data +
task->recv.ptr->offset, recv_bytes);
} else {
return uring_prep_recv_data(task, task->recv_body +
(task->recv.ptr->offset - SF_CTX->
header_size), recv_bytes);
}
}
}
#endif
static inline int set_read_event(struct fast_task_info *task) static inline int set_read_event(struct fast_task_info *task)
{ {
int result; int result;
if (task->event.callback == (IOEventCallback)sf_client_sock_read) { #if IOEVENT_USE_URING
return 0; if (SF_CTX->use_io_uring) {
} if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) {
logWarning("file: "__FILE__", line: %d, "
"trigger recv again!", __LINE__);
return 0;
} else {
logWarning("file: "__FILE__", line: %d, "
"another operation in progress, op_type: %d!",
__LINE__, FC_URING_OP_TYPE(task));
return EBUSY;
}
}
task->event.callback = (IOEventCallback)sf_client_sock_read; if (task->event.callback != (IOEventCallback)sf_client_sock_read) {
if (ioevent_modify(&task->thread_data->ev_puller, task->event.callback = (IOEventCallback)sf_client_sock_read;
task->event.fd, IOEVENT_READ, task) != 0) }
{ if ((result=prepare_first_recv(task)) != 0) {
result = errno != 0 ? errno : ENOENT; ioevent_add_to_deleted_list(task);
ioevent_add_to_deleted_list(task); return result;
}
} else {
#endif
logError("file: "__FILE__", line: %d, " if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
"ioevent_modify fail, " return 0;
"errno: %d, error info: %s", }
__LINE__, result, strerror(result)); task->event.callback = (IOEventCallback)sf_client_sock_read;
return result; if (ioevent_modify(&task->thread_data->ev_puller,
task->event.fd, IOEVENT_READ, task) != 0)
{
result = errno != 0 ? errno : ENOENT;
ioevent_add_to_deleted_list(task);
logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
#if IOEVENT_USE_URING
} }
#endif
return 0; return 0;
} }
int sf_set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task)
{ {
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
return 0;
}
#endif
/* reset recv offset and length */
task->recv.ptr->offset = 0; task->recv.ptr->offset = 0;
task->recv.ptr->length = 0; task->recv.ptr->length = 0;
task->nio_stages.current = SF_NIO_STAGE_RECV; task->nio_stages.current = SF_NIO_STAGE_RECV;
return set_read_event(task); return set_read_event(task);
} }
static inline int sf_ioevent_add(struct fast_task_info *task, static inline int sf_ioevent_add(struct fast_task_info *task)
IOEventCallback callback, const int timeout)
{ {
int result; int result;
result = ioevent_set(task, task->thread_data, task->event.fd, result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ, callback, timeout); IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
SF_CTX->net_buffer_cfg.network_timeout);
return result > 0 ? -1 * result : result; return result > 0 ? -1 * result : result;
} }
@ -196,8 +302,7 @@ static inline void inc_connection_current_count()
static inline int sf_nio_init(struct fast_task_info *task) static inline int sf_nio_init(struct fast_task_info *task)
{ {
inc_connection_current_count(); inc_connection_current_count();
return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, return sf_ioevent_add(task);
SF_CTX->net_buffer_cfg.network_timeout);
} }
int sf_socket_async_connect_check(struct fast_task_info *task) int sf_socket_async_connect_check(struct fast_task_info *task)
@ -212,7 +317,7 @@ int sf_socket_async_connect_check(struct fast_task_info *task)
return result; return result;
} }
static int sf_client_connect_done(int sock, short event, void *arg) static int sf_client_connect_done(int sock, const int event, void *arg)
{ {
int result; int result;
struct fast_task_info *task; struct fast_task_info *task;
@ -220,16 +325,31 @@ static int sf_client_connect_done(int sock, short event, void *arg)
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if (task->canceled) { if (task->canceled) {
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
return ENOTCONN; return ENOTCONN;
} }
if (event & IOEVENT_TIMEOUT) { if (event == IOEVENT_TIMEOUT) {
result = ETIMEDOUT; result = ETIMEDOUT;
} else { } else {
result = task->handler->async_connect_check(task); #if IOEVENT_USE_URING
if (result == EINPROGRESS) { if (SF_CTX->use_io_uring) {
return 0; CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
result = (task->event.res < 0 ? -1 * task->event.res :
task->event.res);
} else {
#endif
result = task->handler->async_connect_check(task);
if (result == EINPROGRESS) {
return 0;
}
#if IOEVENT_USE_URING
} }
#endif
} }
if (SF_CTX->callbacks.connect_done != NULL) { if (SF_CTX->callbacks.connect_done != NULL) {
@ -260,14 +380,26 @@ static int sf_client_connect_done(int sock, short event, void *arg)
int sf_socket_async_connect_server(struct fast_task_info *task) int sf_socket_async_connect_server(struct fast_task_info *task)
{ {
int result; int result;
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
O_NONBLOCK, NULL, &result)) < 0)
{
return result > 0 ? -1 * result : result;
}
return asyncconnectserverbyip(task->event.fd, #if IOEVENT_USE_URING
task->server_ip, task->port); if (SF_CTX->use_io_uring) {
if ((result=uring_prep_connect(task)) != 0) {
return result;
}
return EINPROGRESS;
} else {
#endif
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
O_NONBLOCK, NULL, &result)) < 0)
{
return result > 0 ? -1 * result : result;
}
return asyncconnectserverbyip(task->event.fd,
task->server_ip, task->port);
#if IOEVENT_USE_URING
}
#endif
} }
static int sf_async_connect_server(struct fast_task_info *task) static int sf_async_connect_server(struct fast_task_info *task)
@ -287,10 +419,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
} }
if (result == 0) { if (result == 0) {
if ((result=sf_ioevent_add(task, (IOEventCallback) if ((result=sf_ioevent_add(task)) != 0) {
sf_client_sock_read, SF_CTX->
net_buffer_cfg.network_timeout)) != 0)
{
return result; return result;
} }
@ -331,10 +460,12 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
case SF_NIO_STAGE_RECV: case SF_NIO_STAGE_RECV:
task->nio_stages.current = SF_NIO_STAGE_RECV; task->nio_stages.current = SF_NIO_STAGE_RECV;
if ((result=set_read_event(task)) == 0) { if ((result=set_read_event(task)) == 0) {
if (sf_client_sock_read(task->event.fd, if (!SF_CTX->use_io_uring) {
IOEVENT_READ, task) < 0) if (sf_client_sock_read(task->event.fd,
{ IOEVENT_READ, task) < 0)
result = errno != 0 ? errno : EIO; {
result = errno != 0 ? errno : EIO;
}
} }
} }
break; break;
@ -345,10 +476,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE); result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE);
break; break;
case SF_NIO_STAGE_FORWARDED: //forward by other thread case SF_NIO_STAGE_FORWARDED: //forward by other thread
if ((result=sf_ioevent_add(task, (IOEventCallback) if ((result=sf_ioevent_add(task)) == 0) {
sf_client_sock_read,
SF_CTX->net_buffer_cfg.network_timeout)) == 0)
{
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND);
} }
break; break;
@ -416,6 +544,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
} }
} }
sf_hold_task(task); //since 1.2.11
PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock); PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock);
task->notify_next = NULL; task->notify_next = NULL;
if (task->thread_data->waiting_queue.tail == NULL) { if (task->thread_data->waiting_queue.tail == NULL) {
@ -435,7 +564,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
{ {
result = errno != 0 ? errno : EIO; result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"write eventfd %d fail, errno: %d, error info: %s", "write to fd %d fail, errno: %d, error info: %s",
__LINE__, FC_NOTIFY_WRITE_FD(task->thread_data), __LINE__, FC_NOTIFY_WRITE_FD(task->thread_data),
result, STRERROR(result)); result, STRERROR(result));
return result; return result;
@ -463,26 +592,32 @@ static inline void deal_notified_task(struct fast_task_info *task,
} }
} }
void sf_recv_notify_read(int sock, short event, void *arg) void sf_recv_notify_read(int fd, const int event, void *arg)
{ {
int64_t n; int64_t n;
int stage; int stage;
struct nio_thread_data *thread_data; struct ioevent_notify_entry *notify_entry;
struct fast_task_info *task; struct fast_task_info *task;
struct fast_task_info *current; struct fast_task_info *current;
thread_data = ((struct ioevent_notify_entry *)arg)->thread_data; notify_entry = (struct ioevent_notify_entry *)arg;
if (read(sock, &n, sizeof(n)) < 0) { if (read(fd, &n, sizeof(n)) < 0) {
#if IOEVENT_USE_URING
if (errno == EAGAIN) {
return;
}
#endif
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"read from eventfd %d fail, errno: %d, error info: %s", "read from fd %d fail, errno: %d, error info: %s",
__LINE__, sock, errno, STRERROR(errno)); __LINE__, fd, errno, STRERROR(errno));
} }
PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); PTHREAD_MUTEX_LOCK(&notify_entry->thread_data->waiting_queue.lock);
current = thread_data->waiting_queue.head; current = notify_entry->thread_data->waiting_queue.head;
thread_data->waiting_queue.head = NULL; notify_entry->thread_data->waiting_queue.head = NULL;
thread_data->waiting_queue.tail = NULL; notify_entry->thread_data->waiting_queue.tail = NULL;
PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); PTHREAD_MUTEX_UNLOCK(&notify_entry->thread_data->waiting_queue.lock);
while (current != NULL) { while (current != NULL) {
task = current; task = current;
@ -499,6 +634,7 @@ void sf_recv_notify_read(int sock, short event, void *arg)
__sync_bool_compare_and_swap(&task->nio_stages.notify, __sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE); stage, SF_NIO_STAGE_NONE);
} }
sf_release_task(task); //since 1.2.11
} }
} }
@ -508,16 +644,31 @@ int sf_send_add_event(struct fast_task_info *task)
if (task->send.ptr->length > 0) { if (task->send.ptr->length > 0) {
/* direct send */ /* direct send */
task->nio_stages.current = SF_NIO_STAGE_SEND; task->nio_stages.current = SF_NIO_STAGE_SEND;
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { #if IOEVENT_USE_URING
return errno != 0 ? errno : EIO; if (SF_CTX->use_io_uring) {
if (task->event.callback != (IOEventCallback)sf_client_sock_write) {
task->event.callback = (IOEventCallback)sf_client_sock_write;
}
if (SF_CTX->use_send_zc) {
return uring_prep_first_send_zc(task);
} else {
return uring_prep_first_send(task);
}
} else {
#endif
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
return errno != 0 ? errno : EIO;
}
#if IOEVENT_USE_URING
} }
#endif
} }
return 0; return 0;
} }
static inline int check_task(struct fast_task_info *task, static inline int check_task(struct fast_task_info *task,
const short event, const int expect_stage) const int event, const int expect_stage)
{ {
if (task->canceled) { if (task->canceled) {
return ENOTCONN; return ENOTCONN;
@ -537,6 +688,18 @@ static inline int check_task(struct fast_task_info *task,
return 0; return 0;
} }
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, event: %d, expect stage: %d, "
"but current stage: %d, close connection",
__LINE__, task->client_ip, event,
expect_stage, task->nio_stages.current);
ioevent_add_to_deleted_list(task);
return -1;
}
#endif
if (task->handler->comm_type == fc_comm_type_sock) { if (task->handler->comm_type == fc_comm_type_sock) {
if (tcp_socket_connected(task->event.fd)) { if (tcp_socket_connected(task->event.fd)) {
return EAGAIN; return EAGAIN;
@ -553,28 +716,69 @@ static inline int check_task(struct fast_task_info *task,
} }
} }
#if IOEVENT_USE_URING
static inline int prepare_next_send(struct fast_task_info *task)
{
if (SF_CTX->use_send_zc) {
return uring_prep_next_send_zc(task);
} else {
return uring_prep_next_send(task);
}
}
#endif
ssize_t sf_socket_send_data(struct fast_task_info *task, ssize_t sf_socket_send_data(struct fast_task_info *task,
SFCommAction *action, bool *send_done) SFCommAction *action, bool *send_done)
{ {
int bytes; int bytes;
int result;
if (task->iovec_array.iovs != NULL) { #if IOEVENT_USE_URING
bytes = writev(task->event.fd, task->iovec_array.iovs, if (SF_CTX->use_io_uring) {
FC_MIN(task->iovec_array.count, IOV_MAX)); bytes = task->event.res;
} else { } else {
bytes = write(task->event.fd, task->send.ptr->data + #endif
task->send.ptr->offset, task->send.ptr->length - if (task->iovec_array.iovs != NULL) {
task->send.ptr->offset); bytes = writev(task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX));
} else {
bytes = write(task->event.fd, task->send.ptr->data +
task->send.ptr->offset, task->send.ptr->length -
task->send.ptr->offset);
}
#if IOEVENT_USE_URING
} }
#endif
if (bytes < 0) { if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) #if IOEVENT_USE_URING
{ if (SF_CTX->use_io_uring) {
if (set_write_event(task) != 0) { result = -bytes;
return -1; } else {
#endif
result = errno;
#if IOEVENT_USE_URING
}
#endif
if (result == EAGAIN || result == EWOULDBLOCK) {
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (prepare_next_send(task) != 0) {
return -1;
}
} else {
#endif
if (set_write_event(task) != 0) {
return -1;
}
#if IOEVENT_USE_URING
} }
#endif
*action = sf_comm_action_break; *action = sf_comm_action_break;
return 0; return 0;
} else if (errno == EINTR) { //should retry } else if (result == EINTR && !SF_CTX->use_io_uring) {
/* should try again */
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal", "client ip: %s, ignore interupt signal",
__LINE__, task->client_ip); __LINE__, task->client_ip);
@ -585,30 +789,40 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
"client ip: %s, send fail, task offset: %d, length: %d, " "client ip: %s, send fail, task offset: %d, length: %d, "
"errno: %d, error info: %s", __LINE__, task->client_ip, "errno: %d, error info: %s", __LINE__, task->client_ip,
task->send.ptr->offset, task->send.ptr->length, task->send.ptr->offset, task->send.ptr->length,
errno, strerror(errno)); result, strerror(result));
return -1; return -1;
} }
} else if (bytes == 0) { } else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, task length: %d, offset: %d, " "client ip: %s, task length: %d, offset: %d, "
"send failed, connection disconnected", __LINE__, "send failed, connection disconnected", __LINE__,
task->client_ip, task->event.fd, task->send.ptr->length, task->client_ip, task->send.ptr->length,
task->send.ptr->offset); task->send.ptr->offset);
return -1; return -1;
} }
task->send.ptr->offset += bytes; task->send.ptr->offset += bytes;
if (task->send.ptr->offset >= task->send.ptr->length) { if (task->send.ptr->offset >= task->send.ptr->length) {
if (task->send.ptr != task->recv.ptr) { //double buffers #if IOEVENT_USE_URING
task->send.ptr->offset = 0; if (SF_CTX->use_io_uring && FC_URING_IS_SEND_ZC(task) &&
task->send.ptr->length = 0; task->thread_data->ev_puller.send_zc_done_notify)
} {
*action = sf_comm_action_finish; *action = sf_comm_action_break;
*send_done = true; *send_done = false;
} else { } else {
*action = sf_comm_action_continue; #endif
*send_done = false; if (task->send.ptr != task->recv.ptr) { //double buffers
task->send.ptr->offset = 0;
task->send.ptr->length = 0;
}
*action = sf_comm_action_finish;
*send_done = true;
#if IOEVENT_USE_URING
}
#endif
} else {
/* set next writev iovec array */ /* set next writev iovec array */
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
struct iovec *iov; struct iovec *iov;
@ -637,6 +851,25 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
task->iovec_array.iovs = iov; task->iovec_array.iovs = iov;
task->iovec_array.count = end - iov; task->iovec_array.count = end - iov;
} }
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify))
{
if (prepare_next_send(task) != 0) {
return -1;
}
}
*action = sf_comm_action_break;
} else {
#endif
*action = sf_comm_action_continue;
#if IOEVENT_USE_URING
}
#endif
*send_done = false;
} }
return bytes; return bytes;
@ -646,30 +879,56 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
const bool call_post_recv, SFCommAction *action) const bool call_post_recv, SFCommAction *action)
{ {
int bytes; int bytes;
int result;
int recv_bytes; int recv_bytes;
bool new_alloc; bool new_alloc;
if (task->recv.ptr->length == 0) { //recv header #if IOEVENT_USE_URING
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; if (SF_CTX->use_io_uring) {
bytes = read(task->event.fd, task->recv.ptr->data + bytes = task->event.res;
task->recv.ptr->offset, recv_bytes);
} else { } else {
recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; #endif
if (task->recv_body == NULL) { if (task->recv.ptr->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
bytes = read(task->event.fd, task->recv.ptr->data + bytes = read(task->event.fd, task->recv.ptr->data +
task->recv.ptr->offset, recv_bytes); task->recv.ptr->offset, recv_bytes);
} else { } else {
bytes = read(task->event.fd, task->recv_body + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset;
(task->recv.ptr->offset - SF_CTX-> if (task->recv_body == NULL) {
header_size), recv_bytes); bytes = read(task->event.fd, task->recv.ptr->data +
task->recv.ptr->offset, recv_bytes);
} else {
bytes = read(task->event.fd, task->recv_body +
(task->recv.ptr->offset - SF_CTX->
header_size), recv_bytes);
}
} }
#if IOEVENT_USE_URING
} }
#endif
if (bytes < 0) { if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { #if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
result = -bytes;
} else {
#endif
result = errno;
#if IOEVENT_USE_URING
}
#endif
if (result == EAGAIN || result == EWOULDBLOCK) {
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
}
#endif
*action = sf_comm_action_break; *action = sf_comm_action_break;
return 0; return 0;
} else if (errno == EINTR) { //should retry } else if (result == EINTR && !SF_CTX->use_io_uring) {
/* should try again */
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal", "client ip: %s, ignore interupt signal",
__LINE__, task->client_ip); __LINE__, task->client_ip);
@ -680,7 +939,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
"client ip: %s, recv fail, " "client ip: %s, recv fail, "
"errno: %d, error info: %s", "errno: %d, error info: %s",
__LINE__, task->client_ip, __LINE__, task->client_ip,
errno, strerror(errno)); result, strerror(result));
return -1; return -1;
} }
} else if (bytes == 0) { } else if (bytes == 0) {
@ -712,7 +971,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
task->recv.ptr->offset += bytes; task->recv.ptr->offset += bytes;
if (task->recv.ptr->length == 0) { //pkg header if (task->recv.ptr->length == 0) { //pkg header
if (task->recv.ptr->offset < SF_CTX->header_size) { if (task->recv.ptr->offset < SF_CTX->header_size) {
*action = sf_comm_action_continue; #if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
*action = sf_comm_action_break;
} else {
#endif
*action = sf_comm_action_continue;
#if IOEVENT_USE_URING
}
#endif
return bytes; return bytes;
} }
@ -765,7 +1035,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done
*action = sf_comm_action_finish; *action = sf_comm_action_finish;
} else { } else {
*action = sf_comm_action_continue; #if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
if (prepare_next_recv(task) != 0) {
return -1;
}
*action = sf_comm_action_break;
} else {
#endif
*action = sf_comm_action_continue;
#if IOEVENT_USE_URING
}
#endif
} }
return bytes; return bytes;
@ -841,8 +1122,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task)
ioevent_set_timeout(&task->thread_data->ev_puller, ioevent_set_timeout(&task->thread_data->ev_puller,
task->thread_data->timeout_ms); task->thread_data->timeout_ms);
} }
result = sf_ioevent_add(task, (IOEventCallback) result = sf_ioevent_add(task);
sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout);
format_ip_address(task->client_ip, formatted_ip); format_ip_address(task->client_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, client: %s:%u, " logInfo("file: "__FILE__", line: %d, client: %s:%u, "
@ -904,7 +1184,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
return 0; return 0;
} }
int sf_client_sock_read(int sock, short event, void *arg) static int sf_client_sock_read(int sock, const int event, void *arg)
{ {
int result; int result;
int bytes; int bytes;
@ -914,10 +1194,15 @@ int sf_client_sock_read(int sock, short event, void *arg)
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
return result >= 0 ? 0 : -1; return result >= 0 ? 0 : -1;
} }
if (event & IOEVENT_TIMEOUT) { if (event == IOEVENT_TIMEOUT) {
if (task->recv.ptr->offset == 0 && task->req_count > 0) { if (task->recv.ptr->offset == 0 && task->req_count > 0) {
if (SF_CTX->callbacks.task_timeout != NULL) { if (SF_CTX->callbacks.task_timeout != NULL) {
if (SF_CTX->callbacks.task_timeout(task) != 0) { if (SF_CTX->callbacks.task_timeout(task) != 0) {
@ -929,7 +1214,8 @@ int sf_client_sock_read(int sock, short event, void *arg)
task->event.timer.expires = g_current_time + task->event.timer.expires = g_current_time +
SF_CTX->net_buffer_cfg.network_timeout; SF_CTX->net_buffer_cfg.network_timeout;
fast_timer_add(&task->thread_data->timer, fast_timer_add(&task->thread_data->timer,
&task->event.timer); &task->event.timer);
return 0;
} else { } else {
if (task->recv.ptr->length > 0) { if (task->recv.ptr->length > 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
@ -946,10 +1232,14 @@ int sf_client_sock_read(int sock, short event, void *arg)
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
return 0;
} }
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
#endif
total_read = 0; total_read = 0;
action = sf_comm_action_continue; action = sf_comm_action_continue;
while (1) { while (1) {
@ -996,23 +1286,72 @@ int sf_client_sock_read(int sock, short event, void *arg)
return total_read; return total_read;
} }
int sf_client_sock_write(int sock, short event, void *arg) static int sock_write_done(struct fast_task_info *task,
const int length, const bool send_done)
{
int next_stage;
release_iovec_buffer(task);
task->recv.ptr->offset = 0;
task->recv.ptr->length = 0;
if (SF_CTX->callbacks.send_done == NULL || !send_done) {
task->nio_stages.current = SF_NIO_STAGE_RECV;
} else {
if (SF_CTX->callbacks.send_done(task,
length, &next_stage) != 0)
{
ioevent_add_to_deleted_list(task);
return -1;
}
if (task->nio_stages.current != next_stage) {
task->nio_stages.current = next_stage;
}
}
#if IOEVENT_USE_URING
if (!SF_CTX->use_io_uring || task->nio_stages.
current == SF_NIO_STAGE_RECV)
{
#endif
if (set_read_event(task) != 0) {
return -1;
}
#if IOEVENT_USE_URING
}
#endif
return 0;
}
static int sf_client_sock_write(int sock, const int event, void *arg)
{ {
int result; int result;
int bytes; int bytes;
int total_write; int total_write;
int length; int length;
int next_stage;
SFCommAction action; SFCommAction action;
bool send_done; bool send_done;
struct fast_task_info *task; struct fast_task_info *task;
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring && event != IOEVENT_TIMEOUT) {
if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) &&
task->thread_data->ev_puller.send_zc_done_notify))
{
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
}
#endif
return result >= 0 ? 0 : -1; return result >= 0 ? 0 : -1;
} }
if (event & IOEVENT_TIMEOUT) { if (event == IOEVENT_TIMEOUT) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout. total length: %d, offset: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, "
"remain: %d", __LINE__, task->client_ip, task->send.ptr->length, "remain: %d", __LINE__, task->client_ip, task->send.ptr->length,
@ -1023,6 +1362,42 @@ int sf_client_sock_write(int sock, short event, void *arg)
return -1; return -1;
} }
#if IOEVENT_USE_URING
if (event == IOEVENT_NOTIFY) {
if (!FC_URING_IS_SEND_ZC(task)) {
logWarning("file: "__FILE__", line: %d, "
"unexpected io_uring notify!", __LINE__);
return -1;
}
FC_URING_OP_TYPE(task) = IORING_OP_NOP;
if (!task->canceled) {
if (task->send.ptr->offset >= task->send.ptr->length) {
length = task->send.ptr->length;
if (task->send.ptr != task->recv.ptr) { //double buffers
task->send.ptr->offset = 0;
task->send.ptr->length = 0;
}
result = sock_write_done(task, length, true);
} else {
result = prepare_next_send(task);
}
}
sf_release_task(task);
return result == 0 ? 0 : -1;
}
if (SF_CTX->use_io_uring) {
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
ev_puller.send_zc_done_notify))
{
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
}
}
#endif
total_write = 0; total_write = 0;
length = task->send.ptr->length; length = task->send.ptr->length;
action = sf_comm_action_continue; action = sf_comm_action_continue;
@ -1038,27 +1413,9 @@ int sf_client_sock_write(int sock, short event, void *arg)
total_write += bytes; total_write += bytes;
if (action == sf_comm_action_finish) { if (action == sf_comm_action_finish) {
release_iovec_buffer(task); if (sock_write_done(task, length, send_done) != 0) {
task->recv.ptr->offset = 0;
task->recv.ptr->length = 0;
if (set_read_event(task) != 0) {
return -1; return -1;
} }
if (SF_CTX->callbacks.send_done == NULL || !send_done) {
task->nio_stages.current = SF_NIO_STAGE_RECV;
} else {
if (SF_CTX->callbacks.send_done(task,
length, &next_stage) != 0)
{
return -1;
}
if (task->nio_stages.current != next_stage) {
task->nio_stages.current = next_stage;
}
}
break; break;
} else if (action == sf_comm_action_break) { } else if (action == sf_comm_action_break) {
break; break;
@ -1067,3 +1424,8 @@ int sf_client_sock_write(int sock, short event, void *arg)
return total_write; return total_write;
} }
bool sf_client_sock_in_read_stage(struct fast_task_info *task)
{
return (task->event.callback == (IOEventCallback)sf_client_sock_read);
}

View File

@ -68,15 +68,6 @@ static inline void sf_set_connect_done_callback_ex(SFContext *sf_context,
sf_set_connect_done_callback_ex(&g_sf_context, done_callback) sf_set_connect_done_callback_ex(&g_sf_context, done_callback)
static inline void sf_set_remove_from_ready_list_ex(
SFContext *sf_context, const bool enabled)
{
sf_context->remove_from_ready_list = enabled;
}
#define sf_set_remove_from_ready_list(enabled) \
sf_set_remove_from_ready_list_ex(&g_sf_context, enabled);
static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex( static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex(
SFContext *sf_context) SFContext *sf_context)
{ {
@ -99,10 +90,9 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task)
} }
} }
void sf_recv_notify_read(int sock, short event, void *arg); void sf_socket_close_connection(struct fast_task_info *task);
void sf_recv_notify_read(int sock, const int event, void *arg);
int sf_send_add_event(struct fast_task_info *task); int sf_send_add_event(struct fast_task_info *task);
int sf_client_sock_write(int sock, short event, void *arg);
int sf_client_sock_read(int sock, short event, void *arg);
void sf_task_finish_clean_up(struct fast_task_info *task); void sf_task_finish_clean_up(struct fast_task_info *task);
@ -158,11 +148,6 @@ static inline int sf_nio_forward_request(struct fast_task_info *task,
return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED); return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED);
} }
static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task)
{
return (task->event.callback == (IOEventCallback)sf_client_sock_read);
}
static inline void sf_nio_add_to_deleted_list(struct nio_thread_data static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
*thread_data, struct fast_task_info *task) *thread_data, struct fast_task_info *task)
{ {
@ -173,6 +158,8 @@ static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
} }
} }
bool sf_client_sock_in_read_stage(struct fast_task_info *task);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -33,9 +33,8 @@
#include "fastcommon/sched_thread.h" #include "fastcommon/sched_thread.h"
#include "fastcommon/ioevent_loop.h" #include "fastcommon/ioevent_loop.h"
#include "fastcommon/fc_memory.h" #include "fastcommon/fc_memory.h"
#include "sf_nio.h" #include "sf_proto.h"
#include "sf_util.h" #include "sf_util.h"
#include "sf_global.h"
#include "sf_service.h" #include "sf_service.h"
#if defined(OS_LINUX) #if defined(OS_LINUX)
@ -61,12 +60,14 @@ struct worker_thread_context {
static void *worker_thread_entrance(void *arg); static void *worker_thread_entrance(void *arg);
static int sf_init_free_queue(SFContext *sf_context, const char *name, static int sf_init_free_queue(SFContext *sf_context, const char *name,
const bool double_buffers, const int task_padding_size, const bool double_buffers, const bool need_shrink_task_buffer,
const int task_arg_size, TaskInitCallback init_callback, const int task_padding_size, const int task_arg_size,
void *init_arg) TaskInitCallback init_callback, void *init_arg)
{ {
int result; int result;
int buffer_size;
int m; int m;
int max_m;
int alloc_conn_once; int alloc_conn_once;
if ((result=set_rand_seed()) != 0) { if ((result=set_rand_seed()) != 0) {
@ -75,17 +76,26 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name,
return result; return result;
} }
m = sf_context->net_buffer_cfg.min_buff_size / (64 * 1024); if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) {
buffer_size = FC_MAX(4 * 1024 * 1024, sf_context->
net_buffer_cfg.max_buff_size);
max_m = 64;
} else {
buffer_size = sf_context->net_buffer_cfg.min_buff_size;
max_m = 16;
}
m = buffer_size / (64 * 1024);
if (m == 0) { if (m == 0) {
m = 1; m = 1;
} else if (m > 16) { } else if (m > max_m) {
m = 16; m = max_m;
} }
alloc_conn_once = 256 / m; alloc_conn_once = 256 / m;
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
sf_context->net_buffer_cfg.max_connections, alloc_conn_once, need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections,
sf_context->net_buffer_cfg.min_buff_size, sf_context-> alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size,
net_buffer_cfg.max_buff_size, task_padding_size, sf_context->net_buffer_cfg.max_buff_size, task_padding_size,
task_arg_size, init_callback, init_arg); task_arg_size, init_callback, init_arg);
} }
@ -101,12 +111,14 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, const bool double_buffers, const int task_arg_size, const bool double_buffers,
const bool explicit_post_recv, TaskInitCallback init_callback, const bool need_shrink_task_buffer, const bool explicit_post_recv,
void *init_arg, sf_release_buffer_callback release_buffer_callback) TaskInitCallback init_callback, void *init_arg,
sf_release_buffer_callback release_buffer_callback)
{ {
int result; int result;
int bytes; int bytes;
int extra_events; int extra_events;
int max_entries;
int i; int i;
struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_contexts;
struct worker_thread_context *thread_ctx; struct worker_thread_context *thread_ctx;
@ -132,8 +144,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
} }
if ((result=sf_init_free_queue(sf_context, name, double_buffers, if ((result=sf_init_free_queue(sf_context, name, double_buffers,
task_padding_size, task_arg_size, init_callback, need_shrink_task_buffer, task_padding_size,
init_arg)) != 0) task_arg_size, init_callback, init_arg)) != 0)
{ {
return result; return result;
} }
@ -161,7 +173,11 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
if (SF_G_EPOLL_EDGE_TRIGGER) { if (SF_G_EPOLL_EDGE_TRIGGER) {
#ifdef OS_LINUX #ifdef OS_LINUX
#if IOEVENT_USE_EPOLL
extra_events = EPOLLET; extra_events = EPOLLET;
#else
extra_events = 0;
#endif
#elif defined(OS_FREEBSD) #elif defined(OS_FREEBSD)
extra_events = EV_CLEAR; extra_events = EV_CLEAR;
#else #else
@ -171,6 +187,40 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
extra_events = 0; extra_events = 0;
} }
max_entries = (sf_context->net_buffer_cfg.max_connections +
sf_context->work_threads - 1) / sf_context->work_threads;
if (strcmp(sf_context->name, "cluster") == 0 ||
strcmp(sf_context->name, "replica") == 0)
{
if (max_entries < 1024) {
max_entries += 8;
} else {
max_entries = 1024;
}
} else {
if (max_entries < 4 * 1024) {
max_entries = max_entries * 2;
} else if (max_entries < 8 * 1024) {
max_entries = (max_entries * 3) / 2;
} else if (max_entries < 16 * 1024) {
max_entries = (max_entries * 5) / 4;
} else if (max_entries < 32 * 1024) {
max_entries = (max_entries * 6) / 5;
} else if (max_entries < 64 * 1024) {
max_entries = (max_entries * 11) / 10;
} else if (max_entries < 128 * 1024) {
max_entries = (max_entries * 21) / 20;
}
#if IOEVENT_USE_URING
if (sf_context->use_io_uring) {
if (max_entries > 32 * 1024) {
max_entries = 32 * 1024;
}
}
#endif
}
g_current_time = time(NULL); g_current_time = time(NULL);
sf_context->thread_count = 0; sf_context->thread_count = 0;
data_end = sf_context->thread_data + sf_context->work_threads; data_end = sf_context->thread_data + sf_context->work_threads;
@ -195,18 +245,43 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
thread_data->arg = NULL; thread_data->arg = NULL;
} }
if (ioevent_init(&thread_data->ev_puller, 2 + sf_context-> if ((result=ioevent_init(&thread_data->ev_puller, sf_context->
net_buffer_cfg.max_connections, net_timeout_ms, name, sf_context->use_io_uring, max_entries,
extra_events) != 0) net_timeout_ms, extra_events)) != 0)
{ {
result = errno != 0 ? errno : ENOMEM; char prompt[256];
#if IOEVENT_USE_URING
if (sf_context->use_io_uring) {
if (result == EPERM) {
strcpy(prompt, " make sure kernel."
"io_uring_disabled set to 0");
} else if (result == EINVAL) {
sprintf(prompt, " maybe max_connections: %d is too large"
" or [%s]'s work_threads: %d is too small",
sf_context->net_buffer_cfg.max_connections,
sf_context->name, sf_context->work_threads);
} else {
*prompt = '\0';
}
} else {
#endif
*prompt = '\0';
#if IOEVENT_USE_URING
}
#endif
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"ioevent_init fail, " "ioevent_init fail, errno: %d, error info: %s.%s"
"errno: %d, error info: %s", , __LINE__, result, strerror(result), prompt);
__LINE__, result, strerror(result));
return result; return result;
} }
#if IOEVENT_USE_URING
if (sf_context->use_io_uring && send_done_callback != NULL) {
ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true);
}
#endif
result = fast_timer_init(&thread_data->timer, 2 * sf_context-> result = fast_timer_init(&thread_data->timer, 2 * sf_context->
net_buffer_cfg.network_timeout, g_current_time); net_buffer_cfg.network_timeout, g_current_time);
if (result != 0) { if (result != 0) {
@ -487,7 +562,9 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener)
} }
FC_SET_CLOEXEC(incomesock); FC_SET_CLOEXEC(incomesock);
if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) { if ((task=sf_alloc_init_server_task(listener->handler,
incomesock)) == NULL)
{
close(incomesock); close(incomesock);
return NULL; return NULL;
} }
@ -498,12 +575,6 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener)
return task; return task;
} }
void sf_socket_close_connection(struct fast_task_info *task)
{
close(task->event.fd);
task->event.fd = -1;
}
void sf_socket_close_ex(SFContext *sf_context) void sf_socket_close_ex(SFContext *sf_context)
{ {
int i; int i;
@ -782,15 +853,13 @@ int sf_setup_signal_handler()
return 0; return 0;
} }
#define LOG_SCHEDULE_ENTRIES_COUNT 3
int sf_startup_schedule(pthread_t *schedule_tid) int sf_startup_schedule(pthread_t *schedule_tid)
{ {
ScheduleArray scheduleArray; ScheduleArray scheduleArray;
ScheduleEntry scheduleEntries[LOG_SCHEDULE_ENTRIES_COUNT]; ScheduleEntry scheduleEntries[SF_LOG_SCHEDULE_ENTRIES_COUNT];
scheduleArray.entries = scheduleEntries; scheduleArray.entries = scheduleEntries;
sf_setup_schedule(&g_log_context, &g_sf_global_vars.error_log, sf_logger_setup_schedule(&g_log_context, &g_sf_global_vars.error_log,
&scheduleArray); &scheduleArray);
return sched_start(&scheduleArray, schedule_tid, return sched_start(&scheduleArray, schedule_tid,
g_sf_global_vars.thread_stack_size, (bool * volatile) g_sf_global_vars.thread_stack_size, (bool * volatile)
@ -801,7 +870,7 @@ int sf_add_slow_log_schedule(SFSlowLogContext *slowlog_ctx)
{ {
int result; int result;
ScheduleArray scheduleArray; ScheduleArray scheduleArray;
ScheduleEntry scheduleEntries[LOG_SCHEDULE_ENTRIES_COUNT]; ScheduleEntry scheduleEntries[SF_LOG_SCHEDULE_ENTRIES_COUNT];
if (!slowlog_ctx->cfg.enabled) { if (!slowlog_ctx->cfg.enabled) {
return 0; return 0;
@ -814,8 +883,8 @@ int sf_add_slow_log_schedule(SFSlowLogContext *slowlog_ctx)
} }
scheduleArray.entries = scheduleEntries; scheduleArray.entries = scheduleEntries;
sf_setup_schedule(&slowlog_ctx->ctx, &slowlog_ctx->cfg.log_cfg, sf_logger_setup_schedule(&slowlog_ctx->ctx, &slowlog_ctx->
&scheduleArray); cfg.log_cfg, &scheduleArray);
return sched_add_entries(&scheduleArray); return sched_add_entries(&scheduleArray);
} }

View File

@ -25,6 +25,9 @@
#include "fastcommon/ioevent.h" #include "fastcommon/ioevent.h"
#include "fastcommon/fast_task_queue.h" #include "fastcommon/fast_task_queue.h"
#include "sf_types.h" #include "sf_types.h"
#include "sf_proto.h"
#include "sf_global.h"
#include "sf_nio.h"
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
typedef void (*sf_sig_quit_handler)(int sig); typedef void (*sf_sig_quit_handler)(int sig);
@ -45,8 +48,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, const bool double_buffers, const int task_arg_size, const bool double_buffers,
const bool explicit_post_recv, TaskInitCallback init_callback, const bool need_shrink_task_buffer, const bool explicit_post_recv,
void *init_arg, sf_release_buffer_callback release_buffer_callback); TaskInitCallback init_callback, void *init_arg,
sf_release_buffer_callback release_buffer_callback);
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -56,7 +60,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
NULL, send_done_callback, deal_func, task_cleanup_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \
timeout_callback, net_timeout_ms, proto_header_size, \ timeout_callback, net_timeout_ms, proto_header_size, \
0, task_arg_size, false, false, NULL, NULL, NULL) 0, task_arg_size, false, true, false, NULL, NULL, NULL)
#define sf_service_init(name, alloc_thread_extra_data_callback, \ #define sf_service_init(name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -65,8 +69,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ net_timeout_ms, proto_header_size, 0, task_arg_size, false, true, \
NULL, NULL, NULL) false, NULL, NULL, NULL)
int sf_service_destroy_ex(SFContext *sf_context); int sf_service_destroy_ex(SFContext *sf_context);
@ -108,8 +112,6 @@ int sf_socket_create_server(SFListener *listener,
void sf_socket_close_server(SFListener *listener); void sf_socket_close_server(SFListener *listener);
struct fast_task_info *sf_socket_accept_connection(SFListener *listener); struct fast_task_info *sf_socket_accept_connection(SFListener *listener);
void sf_socket_close_connection(struct fast_task_info *task);
int sf_socket_server_ex(SFContext *sf_context); int sf_socket_server_ex(SFContext *sf_context);
#define sf_socket_server() sf_socket_server_ex(&g_sf_context) #define sf_socket_server() sf_socket_server_ex(&g_sf_context)
@ -163,6 +165,11 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
return NULL; return NULL;
} }
if (task->shrinked) {
task->shrinked = false;
sf_proto_init_task_magic(task);
}
__sync_add_and_fetch(&task->reffer_count, reffer_count); __sync_add_and_fetch(&task->reffer_count, reffer_count);
__sync_bool_compare_and_swap(&task->canceled, 1, 0); __sync_bool_compare_and_swap(&task->canceled, 1, 0);
task->handler = handler; task->handler = handler;
@ -170,12 +177,42 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
return task; return task;
} }
#define sf_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ #define sf_hold_task_ex(task, inc_count) fc_hold_task_ex(task, inc_count)
&task->reffer_count, inc_count) #define sf_hold_task(task) fc_hold_task(task)
#define sf_hold_task(task) sf_hold_task_ex(task, 1)
#define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) #define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1)
static inline struct fast_task_info *sf_alloc_init_server_task(
SFNetworkHandler *handler, const int fd)
{
const int reffer_count = 1;
struct fast_task_info *task;
if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) {
#if IOEVENT_USE_URING
FC_URING_IS_CLIENT(task) = false;
#endif
}
return task;
}
static inline struct fast_task_info *sf_alloc_init_client_task(
SFNetworkHandler *handler)
{
const int fd = -1;
const int reffer_count = 1;
struct fast_task_info *task;
if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) {
#if IOEVENT_USE_URING
FC_URING_IS_CLIENT(task) = true;
#endif
}
return task;
}
static inline void sf_release_task(struct fast_task_info *task) static inline void sf_release_task(struct fast_task_info *task)
{ {
if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) { if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) {
@ -187,6 +224,15 @@ static inline void sf_release_task(struct fast_task_info *task)
"used: %d, freed: %d", __LINE__, task, "used: %d, freed: %d", __LINE__, task,
alloc_count, alloc_count - free_count, free_count); alloc_count, alloc_count - free_count, free_count);
*/ */
#if IOEVENT_USE_URING
if (SF_CTX->use_io_uring) {
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.
connection_stat.current_count, 1);
}
#endif
free_queue_push(task); free_queue_push(task);
} }
} }
@ -262,6 +308,19 @@ static inline SFNetworkHandler *sf_get_rdma_network_handler3(
return sf_get_rdma_network_handler(sf_context3); return sf_get_rdma_network_handler(sf_context3);
} }
static inline bool sf_get_double_buffers_flag(FCServerGroupInfo *server_group)
{
if (server_group->comm_type == fc_comm_type_sock) {
#if IOEVENT_USE_URING
return true;
#else
return false;
#endif
} else { //RDMA
return true;
}
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -115,6 +115,7 @@ typedef struct sf_listener {
struct sf_context; struct sf_context;
struct sf_address_family_handler; struct sf_address_family_handler;
typedef struct sf_network_handler { typedef struct sf_network_handler {
bool enabled; bool enabled;
bool explicit_post_recv; bool explicit_post_recv;
@ -179,8 +180,9 @@ typedef struct sf_context {
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
volatile int thread_count; volatile int thread_count;
//int rdma_port_offset; bool is_client; //since v1.2.5
bool is_client; //since v1.2.5 bool use_io_uring; //since v1.2.9
bool use_send_zc; //since v1.2.9
SFAddressFamily address_family; SFAddressFamily address_family;
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT]; SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
@ -190,7 +192,6 @@ typedef struct sf_context {
int work_threads; int work_threads;
int header_size; int header_size;
bool remove_from_ready_list;
bool realloc_task_buffer; bool realloc_task_buffer;
bool connect_need_log; //for client connect bool connect_need_log; //for client connect
FCSmartPollingConfig smart_polling; FCSmartPollingConfig smart_polling;

View File

@ -281,7 +281,7 @@ int sf_logger_init(LogContext *pContext, const char *filename_prefix)
return 0; return 0;
} }
ScheduleEntry *sf_logger_set_schedule_entry(struct log_context *pContext, ScheduleEntry *sf_logger_set_schedule_entries(struct log_context *pContext,
SFLogConfig *log_cfg, ScheduleEntry *pScheduleEntry) SFLogConfig *log_cfg, ScheduleEntry *pScheduleEntry)
{ {
INIT_SCHEDULE_ENTRY(*pScheduleEntry, sched_generate_next_id(), INIT_SCHEDULE_ENTRY(*pScheduleEntry, sched_generate_next_id(),

View File

@ -96,14 +96,14 @@ void sf_parse_cmd_option_bool(int argc, char *argv[],
int sf_logger_init(LogContext *pContext, const char *filename_prefix); int sf_logger_init(LogContext *pContext, const char *filename_prefix);
ScheduleEntry *sf_logger_set_schedule_entry(struct log_context *pContext, ScheduleEntry *sf_logger_set_schedule_entries(struct log_context *pContext,
SFLogConfig *log_cfg, ScheduleEntry *pScheduleEntry); SFLogConfig *log_cfg, ScheduleEntry *pScheduleEntry);
static inline void sf_setup_schedule(struct log_context *pContext, static inline void sf_logger_setup_schedule(struct log_context *pContext,
SFLogConfig *log_cfg, ScheduleArray *scheduleArray) SFLogConfig *log_cfg, ScheduleArray *scheduleArray)
{ {
ScheduleEntry *scheduleEntry; ScheduleEntry *scheduleEntry;
scheduleEntry = sf_logger_set_schedule_entry(pContext, scheduleEntry = sf_logger_set_schedule_entries(pContext,
log_cfg, scheduleArray->entries); log_cfg, scheduleArray->entries);
scheduleArray->count = scheduleEntry - scheduleArray->entries; scheduleArray->count = scheduleEntry - scheduleArray->entries;
} }