Compare commits
15 Commits
0f75b039f6
...
53dd39500f
| Author | SHA1 | Date |
|---|---|---|
|
|
53dd39500f | |
|
|
772a9a6895 | |
|
|
932751d392 | |
|
|
817ff547da | |
|
|
926cd40114 | |
|
|
b688973cf9 | |
|
|
b16526e8f7 | |
|
|
68079fc468 | |
|
|
3dcc1c570d | |
|
|
cf0950ea62 | |
|
|
263171c4fe | |
|
|
a2ab8a0c01 | |
|
|
ecee21f289 | |
|
|
bc5af8a58b | |
|
|
f0ee6ce73f |
|
|
@ -2,7 +2,7 @@
|
||||||
%define CommitVersion %(echo $COMMIT_VERSION)
|
%define CommitVersion %(echo $COMMIT_VERSION)
|
||||||
|
|
||||||
Name: libserverframe
|
Name: libserverframe
|
||||||
Version: 1.2.8
|
Version: 1.2.9
|
||||||
Release: 1%{?dist}
|
Release: 1%{?dist}
|
||||||
Summary: network framework library
|
Summary: network framework library
|
||||||
License: AGPL v3.0
|
License: AGPL v3.0
|
||||||
|
|
@ -12,9 +12,9 @@ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz
|
||||||
|
|
||||||
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
|
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
|
||||||
|
|
||||||
BuildRequires: libfastcommon-devel >= 1.0.78
|
BuildRequires: libfastcommon-devel >= 1.0.81
|
||||||
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
|
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
|
||||||
Requires: libfastcommon >= 1.0.78
|
Requires: libfastcommon >= 1.0.81
|
||||||
|
|
||||||
%description
|
%description
|
||||||
common framework library
|
common framework library
|
||||||
|
|
|
||||||
6
make.sh
6
make.sh
|
|
@ -9,10 +9,13 @@ DEBUG_FLAG=0
|
||||||
|
|
||||||
if [ -f /usr/include/fastcommon/_os_define.h ]; then
|
if [ -f /usr/include/fastcommon/_os_define.h ]; then
|
||||||
OS_BITS=$(grep -F OS_BITS /usr/include/fastcommon/_os_define.h | awk '{print $NF;}')
|
OS_BITS=$(grep -F OS_BITS /usr/include/fastcommon/_os_define.h | awk '{print $NF;}')
|
||||||
|
USE_URING=$(grep -F IOEVENT_USE_URING /usr/include/fastcommon/_os_define.h | awk '{print $NF;}')
|
||||||
elif [ -f /usr/local/include/fastcommon/_os_define.h ]; then
|
elif [ -f /usr/local/include/fastcommon/_os_define.h ]; then
|
||||||
OS_BITS=$(grep -F OS_BITS /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}')
|
OS_BITS=$(grep -F OS_BITS /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}')
|
||||||
|
USE_URING=$(grep -F IOEVENT_USE_URING /usr/local/include/fastcommon/_os_define.h | awk '{print $NF;}')
|
||||||
else
|
else
|
||||||
OS_BITS=64
|
OS_BITS=64
|
||||||
|
USE_URING=''
|
||||||
fi
|
fi
|
||||||
|
|
||||||
uname=$(uname)
|
uname=$(uname)
|
||||||
|
|
@ -49,6 +52,9 @@ LIBS=''
|
||||||
uname=$(uname)
|
uname=$(uname)
|
||||||
if [ "$uname" = "Linux" ]; then
|
if [ "$uname" = "Linux" ]; then
|
||||||
CFLAGS="$CFLAGS"
|
CFLAGS="$CFLAGS"
|
||||||
|
if [ -n "$USE_URING" ]; then
|
||||||
|
LIBS="$LIBS -luring"
|
||||||
|
fi
|
||||||
elif [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then
|
elif [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then
|
||||||
CFLAGS="$CFLAGS"
|
CFLAGS="$CFLAGS"
|
||||||
if [ "$uname" = "Darwin" ]; then
|
if [ "$uname" = "Darwin" ]; then
|
||||||
|
|
|
||||||
|
|
@ -174,7 +174,6 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
|
||||||
*channel, const uint32_t hash_code, const FCCommunicationType comm_type,
|
*channel, const uint32_t hash_code, const FCCommunicationType comm_type,
|
||||||
const char *server_ip, const uint16_t port, int *err_no)
|
const char *server_ip, const uint16_t port, int *err_no)
|
||||||
{
|
{
|
||||||
int len;
|
|
||||||
struct fast_task_info *task;
|
struct fast_task_info *task;
|
||||||
SFAddressFamilyHandler *fh;
|
SFAddressFamilyHandler *fh;
|
||||||
SFNetworkHandler *handler;
|
SFNetworkHandler *handler;
|
||||||
|
|
@ -195,12 +194,7 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = strlen(server_ip);
|
fc_safe_strcpy(task->server_ip, server_ip);
|
||||||
if (len >= sizeof(task->server_ip)) {
|
|
||||||
len = sizeof(task->server_ip) - 1;
|
|
||||||
}
|
|
||||||
memcpy(task->server_ip, server_ip, len);
|
|
||||||
*(task->server_ip + len) = '\0';
|
|
||||||
task->port = port;
|
task->port = port;
|
||||||
task->arg = channel;
|
task->arg = channel;
|
||||||
task->thread_data = g_sf_context.thread_data +
|
task->thread_data = g_sf_context.thread_data +
|
||||||
|
|
@ -209,7 +203,8 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
|
||||||
channel->last_connect_time = g_current_time;
|
channel->last_connect_time = g_current_time;
|
||||||
if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) {
|
if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) {
|
||||||
channel->in_ioevent = 0; //rollback
|
channel->in_ioevent = 0; //rollback
|
||||||
sf_release_task(task);
|
__sync_sub_and_fetch(&task->reffer_count, 1);
|
||||||
|
free_queue_push(task);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return task;
|
return task;
|
||||||
|
|
@ -221,6 +216,12 @@ int idempotency_client_channel_check_reconnect(
|
||||||
int result;
|
int result;
|
||||||
char formatted_ip[FORMATTED_IP_SIZE];
|
char formatted_ip[FORMATTED_IP_SIZE];
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) {
|
if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -237,6 +238,9 @@ int idempotency_client_channel_check_reconnect(
|
||||||
formatted_ip, channel->task->port);
|
formatted_ip, channel->task->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (channel->task->event.fd >= 0) {
|
||||||
|
channel->task->handler->close_connection(channel->task);
|
||||||
|
}
|
||||||
__sync_bool_compare_and_swap(&channel->task->canceled, 1, 0);
|
__sync_bool_compare_and_swap(&channel->task->canceled, 1, 0);
|
||||||
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {
|
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {
|
||||||
channel->last_connect_time = g_current_time;
|
channel->last_connect_time = g_current_time;
|
||||||
|
|
@ -348,8 +352,8 @@ int idempotency_client_channel_push(struct idempotency_client_channel *channel,
|
||||||
receipt->req_id = req_id;
|
receipt->req_id = req_id;
|
||||||
fc_queue_push_ex(&channel->queue, receipt, ¬ify);
|
fc_queue_push_ex(&channel->queue, receipt, ¬ify);
|
||||||
if (notify) {
|
if (notify) {
|
||||||
if (__sync_add_and_fetch(&channel->in_ioevent, 0)) {
|
if (FC_ATOMIC_GET(channel->in_ioevent)) {
|
||||||
if (__sync_add_and_fetch(&channel->established, 0)) {
|
if (FC_ATOMIC_GET(channel->established)) {
|
||||||
sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE);
|
sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -76,13 +76,13 @@ static inline void idempotency_client_channel_set_id_key(
|
||||||
static inline int idempotency_client_channel_check_wait_ex(
|
static inline int idempotency_client_channel_check_wait_ex(
|
||||||
struct idempotency_client_channel *channel, const int timeout)
|
struct idempotency_client_channel *channel, const int timeout)
|
||||||
{
|
{
|
||||||
if (__sync_add_and_fetch(&channel->established, 0)) {
|
if (FC_ATOMIC_GET(channel->established)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
idempotency_client_channel_check_reconnect(channel);
|
idempotency_client_channel_check_reconnect(channel);
|
||||||
lcp_timedwait_sec(&channel->lcp, timeout);
|
lcp_timedwait_sec(&channel->lcp, timeout);
|
||||||
if (__sync_add_and_fetch(&channel->established, 0)) {
|
if (FC_ATOMIC_GET(channel->established)) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,10 @@ static IdempotencyReceiptGlobalVars receipt_global_vars;
|
||||||
|
|
||||||
static int receipt_init_task(struct fast_task_info *task, void *arg)
|
static int receipt_init_task(struct fast_task_info *task, void *arg)
|
||||||
{
|
{
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
FC_URING_IS_CLIENT(task) = true;
|
||||||
|
#endif
|
||||||
|
|
||||||
if (RDMA_INIT_CONNECTION != NULL) {
|
if (RDMA_INIT_CONNECTION != NULL) {
|
||||||
return RDMA_INIT_CONNECTION(task, arg);
|
return RDMA_INIT_CONNECTION(task, arg);
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -92,7 +96,6 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
|
||||||
|
|
||||||
if (task->event.fd >= 0) {
|
if (task->event.fd >= 0) {
|
||||||
sf_task_detach_thread(task);
|
sf_task_detach_thread(task);
|
||||||
task->handler->close_connection(task);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sf_nio_reset_task_length(task);
|
sf_nio_reset_task_length(task);
|
||||||
|
|
@ -282,7 +285,7 @@ static int deal_setup_channel_response(struct fast_task_info *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
channel = (IdempotencyClientChannel *)task->arg;
|
channel = (IdempotencyClientChannel *)task->arg;
|
||||||
if (__sync_add_and_fetch(&channel->established, 0)) {
|
if (FC_ATOMIC_GET(channel->established)) {
|
||||||
format_ip_address(task->server_ip, formatted_ip);
|
format_ip_address(task->server_ip, formatted_ip);
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"response from server %s:%u, unexpected cmd: "
|
"response from server %s:%u, unexpected cmd: "
|
||||||
|
|
@ -520,6 +523,7 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
{
|
{
|
||||||
const int task_arg_size = 0;
|
const int task_arg_size = 0;
|
||||||
const bool double_buffers = false;
|
const bool double_buffers = false;
|
||||||
|
const bool need_shrink_task_buffer = false;
|
||||||
const bool explicit_post_recv = false;
|
const bool explicit_post_recv = false;
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
|
|
@ -552,8 +556,8 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
|
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
|
||||||
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
|
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
|
||||||
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
|
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
|
||||||
task_arg_size, double_buffers, explicit_post_recv,
|
task_arg_size, double_buffers, need_shrink_task_buffer,
|
||||||
receipt_init_task, pd, NULL);
|
explicit_post_recv, receipt_init_task, pd, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int receipt_handler_init(FCAddressPtrArray *address_array)
|
int receipt_handler_init(FCAddressPtrArray *address_array)
|
||||||
|
|
@ -565,7 +569,6 @@ int receipt_handler_init(FCAddressPtrArray *address_array)
|
||||||
}
|
}
|
||||||
|
|
||||||
sf_enable_thread_notify(true);
|
sf_enable_thread_notify(true);
|
||||||
sf_set_remove_from_ready_list(false);
|
|
||||||
fc_sleep_ms(100);
|
fc_sleep_ms(100);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ SFContext g_sf_context = {{'\0'}, NULL, 0, false, sf_address_family_auto,
|
||||||
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
|
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
|
||||||
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
|
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
|
||||||
{DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
|
{DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
|
||||||
SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, true,
|
SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true,
|
||||||
{false, 0, 0}, {sf_task_finish_clean_up}
|
{false, 0, 0}, {sf_task_finish_clean_up}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -477,7 +477,8 @@ static int load_rdma_apis(SFContext *sf_context, SFNetworkHandler *handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int init_network_handler(SFContext *sf_context,
|
static int init_network_handler(SFContext *sf_context,
|
||||||
SFNetworkHandler *handler, SFAddressFamilyHandler *fh)
|
SFNetworkHandler *handler, SFAddressFamilyHandler *fh,
|
||||||
|
const bool use_send_zc)
|
||||||
{
|
{
|
||||||
handler->fh = fh;
|
handler->fh = fh;
|
||||||
handler->inner.handler = handler;
|
handler->inner.handler = handler;
|
||||||
|
|
@ -498,10 +499,18 @@ static int init_network_handler(SFContext *sf_context,
|
||||||
handler->send_data = sf_socket_send_data;
|
handler->send_data = sf_socket_send_data;
|
||||||
handler->recv_data = sf_socket_recv_data;
|
handler->recv_data = sf_socket_recv_data;
|
||||||
handler->post_recv = NULL;
|
handler->post_recv = NULL;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
handler->use_io_uring = true;
|
||||||
|
handler->use_send_zc = use_send_zc;
|
||||||
|
#else
|
||||||
|
handler->use_io_uring = false;
|
||||||
|
handler->use_send_zc = false;
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
handler->inner.id = NULL;
|
handler->inner.id = NULL;
|
||||||
handler->outer.id = NULL;
|
handler->outer.id = NULL;
|
||||||
|
handler->use_io_uring = false;
|
||||||
return load_rdma_apis(sf_context, handler);
|
return load_rdma_apis(sf_context, handler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -669,6 +678,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
||||||
int inner_port;
|
int inner_port;
|
||||||
int outer_port;
|
int outer_port;
|
||||||
int port;
|
int port;
|
||||||
|
bool global_use_send_zc;
|
||||||
|
bool use_send_zc;
|
||||||
int i;
|
int i;
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
|
|
@ -705,6 +716,15 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
||||||
outer_port = config->default_outer_port;
|
outer_port = config->default_outer_port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
global_use_send_zc = iniGetBoolValue(NULL, "use_send_zc",
|
||||||
|
config->ini_ctx.context, true);
|
||||||
|
if (config->ini_ctx.section_name == NULL) {
|
||||||
|
use_send_zc = global_use_send_zc;
|
||||||
|
} else {
|
||||||
|
use_send_zc = iniGetBoolValue(config->ini_ctx.section_name,
|
||||||
|
"use_send_zc", config->ini_ctx.context, global_use_send_zc);
|
||||||
|
}
|
||||||
|
|
||||||
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
||||||
fh = sf_context->handlers + i;
|
fh = sf_context->handlers + i;
|
||||||
fh->ctx = sf_context;
|
fh->ctx = sf_context;
|
||||||
|
|
@ -728,7 +748,9 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
|
||||||
if (!handler->enabled) {
|
if (!handler->enabled) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if ((result=init_network_handler(sf_context, handler, fh)) != 0) {
|
if ((result=init_network_handler(sf_context, handler,
|
||||||
|
fh, use_send_zc)) != 0)
|
||||||
|
{
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -885,6 +907,31 @@ static const char *get_address_family_caption(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
static void get_io_uring_configs(const SFContext *sf_context,
|
||||||
|
bool *use_io_uring, bool *use_send_zc)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
const SFAddressFamilyHandler *fh;
|
||||||
|
const SFNetworkHandler *handler;
|
||||||
|
const SFNetworkHandler *end;
|
||||||
|
|
||||||
|
*use_io_uring = false;
|
||||||
|
*use_send_zc = false;
|
||||||
|
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
|
||||||
|
fh = sf_context->handlers + i;
|
||||||
|
end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
|
||||||
|
for (handler=fh->handlers; handler<end; handler++) {
|
||||||
|
if (handler->enabled && handler->use_io_uring) {
|
||||||
|
*use_io_uring = true;
|
||||||
|
*use_send_zc = handler->use_send_zc;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void sf_context_config_to_string(const SFContext *sf_context,
|
void sf_context_config_to_string(const SFContext *sf_context,
|
||||||
char *output, const int size)
|
char *output, const int size)
|
||||||
{
|
{
|
||||||
|
|
@ -894,6 +941,10 @@ void sf_context_config_to_string(const SFContext *sf_context,
|
||||||
char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2];
|
char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2];
|
||||||
int i;
|
int i;
|
||||||
int len;
|
int len;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
bool use_io_uring;
|
||||||
|
bool use_send_zc;
|
||||||
|
#endif
|
||||||
|
|
||||||
*inner_bind_addr = '\0';
|
*inner_bind_addr = '\0';
|
||||||
*outer_bind_addr = '\0';
|
*outer_bind_addr = '\0';
|
||||||
|
|
@ -934,6 +985,12 @@ void sf_context_config_to_string(const SFContext *sf_context,
|
||||||
", address_family=%s, accept_threads=%d, work_threads=%d",
|
", address_family=%s, accept_threads=%d, work_threads=%d",
|
||||||
get_address_family_caption(sf_context->address_family),
|
get_address_family_caption(sf_context->address_family),
|
||||||
sf_context->accept_threads, sf_context->work_threads);
|
sf_context->accept_threads, sf_context->work_threads);
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
get_io_uring_configs(sf_context, &use_io_uring, &use_send_zc);
|
||||||
|
len += snprintf(output + len, size - len, ", use_io_uring=%d"
|
||||||
|
", use_send_zc=%d", use_io_uring, use_send_zc);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_log_config_to_string_ex(SFLogConfig *log_cfg, const char *caption,
|
void sf_log_config_to_string_ex(SFLogConfig *log_cfg, const char *caption,
|
||||||
|
|
@ -983,6 +1040,10 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
|
||||||
int max_pkg_size;
|
int max_pkg_size;
|
||||||
int min_buff_size;
|
int min_buff_size;
|
||||||
int max_buff_size;
|
int max_buff_size;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
bool use_io_uring;
|
||||||
|
bool use_send_zc;
|
||||||
|
#endif
|
||||||
char pkg_buff[256];
|
char pkg_buff[256];
|
||||||
|
|
||||||
max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
|
max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
|
||||||
|
|
@ -1004,18 +1065,28 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
|
||||||
|
|
||||||
len = snprintf(output, size,
|
len = snprintf(output, size,
|
||||||
"base_path=%s, max_connections=%d, connect_timeout=%d, "
|
"base_path=%s, max_connections=%d, connect_timeout=%d, "
|
||||||
"network_timeout=%d, thread_stack_size=%d KB, "
|
"network_timeout=%d, thread_stack_size=%d KB, %s, ",
|
||||||
"%s, tcp_quick_ack=%d, log_level=%s, "
|
SF_G_BASE_PATH_STR,
|
||||||
"run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR,
|
|
||||||
g_sf_global_vars.net_buffer_cfg.max_connections,
|
g_sf_global_vars.net_buffer_cfg.max_connections,
|
||||||
g_sf_global_vars.net_buffer_cfg.connect_timeout,
|
g_sf_global_vars.net_buffer_cfg.connect_timeout,
|
||||||
g_sf_global_vars.net_buffer_cfg.network_timeout,
|
g_sf_global_vars.net_buffer_cfg.network_timeout,
|
||||||
g_sf_global_vars.thread_stack_size / 1024,
|
g_sf_global_vars.thread_stack_size / 1024, pkg_buff);
|
||||||
pkg_buff, g_sf_global_vars.tcp_quick_ack,
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
get_io_uring_configs(&g_sf_context, &use_io_uring, &use_send_zc);
|
||||||
|
len += snprintf(output + len, size - len,
|
||||||
|
"use_io_uring=%d, use_send_zc=%d, ",
|
||||||
|
use_io_uring, use_send_zc);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
len += snprintf(output + len, size - len,
|
||||||
|
"tcp_quick_ack=%d, "
|
||||||
|
"log_level=%s, "
|
||||||
|
"run_by_group=%s, run_by_user=%s, ",
|
||||||
|
g_sf_global_vars.tcp_quick_ack,
|
||||||
log_get_level_caption(),
|
log_get_level_caption(),
|
||||||
g_sf_global_vars.run_by.group,
|
g_sf_global_vars.run_by.group,
|
||||||
g_sf_global_vars.run_by.user
|
g_sf_global_vars.run_by.user);
|
||||||
);
|
|
||||||
|
|
||||||
sf_log_config_to_string(&g_sf_global_vars.error_log,
|
sf_log_config_to_string(&g_sf_global_vars.error_log,
|
||||||
"error-log", output + len, size - len);
|
"error-log", output + len, size - len);
|
||||||
|
|
|
||||||
636
src/sf_nio.c
636
src/sf_nio.c
|
|
@ -41,6 +41,9 @@
|
||||||
#include "sf_service.h"
|
#include "sf_service.h"
|
||||||
#include "sf_nio.h"
|
#include "sf_nio.h"
|
||||||
|
|
||||||
|
static int sf_client_sock_write(int sock, const int event, void *arg);
|
||||||
|
static int sf_client_sock_read(int sock, const int event, void *arg);
|
||||||
|
|
||||||
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
||||||
sf_set_body_length_callback set_body_length_func,
|
sf_set_body_length_callback set_body_length_func,
|
||||||
sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
|
sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
|
||||||
|
|
@ -59,19 +62,45 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
||||||
sf_context->callbacks.release_buffer = release_buffer_callback;
|
sf_context->callbacks.release_buffer = release_buffer_callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
#define CLEAR_OP_TYPE_AND_RELEASE_TASK(task) \
|
||||||
|
FC_URING_OP_TYPE(task) = IORING_OP_NOP; \
|
||||||
|
sf_release_task(task)
|
||||||
|
|
||||||
|
static int sf_uring_cancel_done(int sock, const int event, void *arg)
|
||||||
|
{
|
||||||
|
struct fast_task_info *task;
|
||||||
|
|
||||||
|
task = (struct fast_task_info *)arg;
|
||||||
|
if (event != IOEVENT_TIMEOUT) {
|
||||||
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void sf_task_detach_thread(struct fast_task_info *task)
|
void sf_task_detach_thread(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
bool need_cancel;
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
need_cancel = (FC_URING_OP_TYPE(task) != IORING_OP_NOP);
|
||||||
|
} else {
|
||||||
|
need_cancel = true;
|
||||||
|
}
|
||||||
|
if (need_cancel) {
|
||||||
|
task->event.callback = (IOEventCallback)sf_uring_cancel_done;
|
||||||
|
uring_prep_cancel(task);
|
||||||
|
}
|
||||||
|
#else
|
||||||
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
||||||
|
#endif
|
||||||
|
|
||||||
if (task->event.timer.expires > 0) {
|
if (task->event.timer.expires > 0) {
|
||||||
fast_timer_remove(&task->thread_data->timer,
|
fast_timer_remove(&task->thread_data->timer,
|
||||||
&task->event.timer);
|
&task->event.timer);
|
||||||
task->event.timer.expires = 0;
|
task->event.timer.expires = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SF_CTX->remove_from_ready_list) {
|
|
||||||
ioevent_remove(&task->thread_data->ev_puller, task);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_task_switch_thread(struct fast_task_info *task,
|
void sf_task_switch_thread(struct fast_task_info *task,
|
||||||
|
|
@ -92,18 +121,24 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void sf_socket_close_connection(struct fast_task_info *task)
|
||||||
|
{
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (uring_prep_close_fd(task) != 0) {
|
||||||
|
close(task->event.fd);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
close(task->event.fd);
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
task->event.fd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
void sf_task_finish_clean_up(struct fast_task_info *task)
|
void sf_task_finish_clean_up(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
assert(task->event.fd >= 0);
|
|
||||||
if (task->event.fd < 0) {
|
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
|
||||||
"task: %p already cleaned",
|
|
||||||
__LINE__, task);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (task->finish_callback != NULL) {
|
if (task->finish_callback != NULL) {
|
||||||
task->finish_callback(task);
|
task->finish_callback(task);
|
||||||
task->finish_callback = NULL;
|
task->finish_callback = NULL;
|
||||||
|
|
@ -111,9 +146,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
|
||||||
|
|
||||||
release_iovec_buffer(task);
|
release_iovec_buffer(task);
|
||||||
sf_task_detach_thread(task);
|
sf_task_detach_thread(task);
|
||||||
task->handler->close_connection(task);
|
|
||||||
|
|
||||||
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
|
#if IOEVENT_USE_URING
|
||||||
|
if (!task->handler->use_io_uring) {
|
||||||
|
#endif
|
||||||
|
task->handler->close_connection(task);
|
||||||
|
__sync_fetch_and_sub(&g_sf_global_vars.
|
||||||
|
connection_stat.current_count, 1);
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
sf_release_task(task);
|
sf_release_task(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -139,46 +183,116 @@ static inline int set_write_event(struct fast_task_info *task)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
static inline int prepare_first_recv(struct fast_task_info *task)
|
||||||
|
{
|
||||||
|
if (SF_CTX->callbacks.alloc_recv_buffer != NULL) {
|
||||||
|
return uring_prep_recv_data(task, task->recv.ptr->data,
|
||||||
|
SF_CTX->header_size);
|
||||||
|
} else {
|
||||||
|
return uring_prep_first_recv(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int prepare_next_recv(struct fast_task_info *task)
|
||||||
|
{
|
||||||
|
int recv_bytes;
|
||||||
|
|
||||||
|
if (task->recv.ptr->length == 0) { //recv header
|
||||||
|
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
|
||||||
|
return uring_prep_recv_data(task, task->recv.ptr->data +
|
||||||
|
task->recv.ptr->offset, recv_bytes);
|
||||||
|
} else {
|
||||||
|
recv_bytes = task->recv.ptr->length - task->recv.ptr->offset;
|
||||||
|
if (task->recv_body == NULL) {
|
||||||
|
return uring_prep_recv_data(task, task->recv.ptr->data +
|
||||||
|
task->recv.ptr->offset, recv_bytes);
|
||||||
|
} else {
|
||||||
|
return uring_prep_recv_data(task, task->recv_body +
|
||||||
|
(task->recv.ptr->offset - SF_CTX->
|
||||||
|
header_size), recv_bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static inline int set_read_event(struct fast_task_info *task)
|
static inline int set_read_event(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
|
#if IOEVENT_USE_URING
|
||||||
return 0;
|
if (task->handler->use_io_uring) {
|
||||||
}
|
if (FC_URING_OP_TYPE(task) != IORING_OP_NOP) {
|
||||||
|
if (FC_URING_OP_TYPE(task) == IORING_OP_RECV) {
|
||||||
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
|
"trigger recv again!", __LINE__);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
|
"another operation in progress, op_type: %d!",
|
||||||
|
__LINE__, FC_URING_OP_TYPE(task));
|
||||||
|
return EBUSY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
task->event.callback = (IOEventCallback)sf_client_sock_read;
|
if (task->event.callback != (IOEventCallback)sf_client_sock_read) {
|
||||||
if (ioevent_modify(&task->thread_data->ev_puller,
|
task->event.callback = (IOEventCallback)sf_client_sock_read;
|
||||||
task->event.fd, IOEVENT_READ, task) != 0)
|
}
|
||||||
{
|
if ((result=prepare_first_recv(task)) != 0) {
|
||||||
result = errno != 0 ? errno : ENOENT;
|
ioevent_add_to_deleted_list(task);
|
||||||
ioevent_add_to_deleted_list(task);
|
return result;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
|
||||||
"ioevent_modify fail, "
|
return 0;
|
||||||
"errno: %d, error info: %s",
|
}
|
||||||
__LINE__, result, strerror(result));
|
task->event.callback = (IOEventCallback)sf_client_sock_read;
|
||||||
return result;
|
if (ioevent_modify(&task->thread_data->ev_puller,
|
||||||
|
task->event.fd, IOEVENT_READ, task) != 0)
|
||||||
|
{
|
||||||
|
result = errno != 0 ? errno : ENOENT;
|
||||||
|
ioevent_add_to_deleted_list(task);
|
||||||
|
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"ioevent_modify fail, "
|
||||||
|
"errno: %d, error info: %s",
|
||||||
|
__LINE__, result, strerror(result));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_set_read_event(struct fast_task_info *task)
|
int sf_set_read_event(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* reset recv offset and length */
|
||||||
task->recv.ptr->offset = 0;
|
task->recv.ptr->offset = 0;
|
||||||
task->recv.ptr->length = 0;
|
task->recv.ptr->length = 0;
|
||||||
|
|
||||||
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
||||||
return set_read_event(task);
|
return set_read_event(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int sf_ioevent_add(struct fast_task_info *task,
|
static inline int sf_ioevent_add(struct fast_task_info *task)
|
||||||
IOEventCallback callback, const int timeout)
|
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
result = ioevent_set(task, task->thread_data, task->event.fd,
|
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||||
IOEVENT_READ, callback, timeout);
|
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
|
||||||
|
SF_CTX->net_buffer_cfg.network_timeout,
|
||||||
|
task->handler->use_io_uring);
|
||||||
return result > 0 ? -1 * result : result;
|
return result > 0 ? -1 * result : result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -196,8 +310,7 @@ static inline void inc_connection_current_count()
|
||||||
static inline int sf_nio_init(struct fast_task_info *task)
|
static inline int sf_nio_init(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
inc_connection_current_count();
|
inc_connection_current_count();
|
||||||
return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read,
|
return sf_ioevent_add(task);
|
||||||
SF_CTX->net_buffer_cfg.network_timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_socket_async_connect_check(struct fast_task_info *task)
|
int sf_socket_async_connect_check(struct fast_task_info *task)
|
||||||
|
|
@ -212,7 +325,7 @@ int sf_socket_async_connect_check(struct fast_task_info *task)
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int sf_client_connect_done(int sock, short event, void *arg)
|
static int sf_client_connect_done(int sock, const int event, void *arg)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
struct fast_task_info *task;
|
struct fast_task_info *task;
|
||||||
|
|
@ -220,16 +333,31 @@ static int sf_client_connect_done(int sock, short event, void *arg)
|
||||||
|
|
||||||
task = (struct fast_task_info *)arg;
|
task = (struct fast_task_info *)arg;
|
||||||
if (task->canceled) {
|
if (task->canceled) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||||
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return ENOTCONN;
|
return ENOTCONN;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event & IOEVENT_TIMEOUT) {
|
if (event == IOEVENT_TIMEOUT) {
|
||||||
result = ETIMEDOUT;
|
result = ETIMEDOUT;
|
||||||
} else {
|
} else {
|
||||||
result = task->handler->async_connect_check(task);
|
#if IOEVENT_USE_URING
|
||||||
if (result == EINPROGRESS) {
|
if (task->handler->use_io_uring) {
|
||||||
return 0;
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
result = (task->event.res < 0 ? -1 * task->event.res :
|
||||||
|
task->event.res);
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
result = task->handler->async_connect_check(task);
|
||||||
|
if (result == EINPROGRESS) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SF_CTX->callbacks.connect_done != NULL) {
|
if (SF_CTX->callbacks.connect_done != NULL) {
|
||||||
|
|
@ -260,14 +388,26 @@ static int sf_client_connect_done(int sock, short event, void *arg)
|
||||||
int sf_socket_async_connect_server(struct fast_task_info *task)
|
int sf_socket_async_connect_server(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
|
|
||||||
O_NONBLOCK, NULL, &result)) < 0)
|
|
||||||
{
|
|
||||||
return result > 0 ? -1 * result : result;
|
|
||||||
}
|
|
||||||
|
|
||||||
return asyncconnectserverbyip(task->event.fd,
|
#if IOEVENT_USE_URING
|
||||||
task->server_ip, task->port);
|
if (task->handler->use_io_uring) {
|
||||||
|
if ((result=uring_prep_connect(task)) != 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
return EINPROGRESS;
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
|
||||||
|
O_NONBLOCK, NULL, &result)) < 0)
|
||||||
|
{
|
||||||
|
return result > 0 ? -1 * result : result;
|
||||||
|
}
|
||||||
|
|
||||||
|
return asyncconnectserverbyip(task->event.fd,
|
||||||
|
task->server_ip, task->port);
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int sf_async_connect_server(struct fast_task_info *task)
|
static int sf_async_connect_server(struct fast_task_info *task)
|
||||||
|
|
@ -279,7 +419,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
|
||||||
result = ioevent_set(task, task->thread_data, task->event.fd,
|
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||||
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
|
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
|
||||||
sf_client_connect_done, SF_CTX->net_buffer_cfg.
|
sf_client_connect_done, SF_CTX->net_buffer_cfg.
|
||||||
connect_timeout);
|
connect_timeout, task->handler->use_io_uring);
|
||||||
return result > 0 ? -1 * result : result;
|
return result > 0 ? -1 * result : result;
|
||||||
} else {
|
} else {
|
||||||
if (SF_CTX->callbacks.connect_done != NULL) {
|
if (SF_CTX->callbacks.connect_done != NULL) {
|
||||||
|
|
@ -287,10 +427,7 @@ static int sf_async_connect_server(struct fast_task_info *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result == 0) {
|
if (result == 0) {
|
||||||
if ((result=sf_ioevent_add(task, (IOEventCallback)
|
if ((result=sf_ioevent_add(task)) != 0) {
|
||||||
sf_client_sock_read, SF_CTX->
|
|
||||||
net_buffer_cfg.network_timeout)) != 0)
|
|
||||||
{
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -331,10 +468,12 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
|
||||||
case SF_NIO_STAGE_RECV:
|
case SF_NIO_STAGE_RECV:
|
||||||
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
||||||
if ((result=set_read_event(task)) == 0) {
|
if ((result=set_read_event(task)) == 0) {
|
||||||
if (sf_client_sock_read(task->event.fd,
|
if (!task->handler->use_io_uring) {
|
||||||
IOEVENT_READ, task) < 0)
|
if (sf_client_sock_read(task->event.fd,
|
||||||
{
|
IOEVENT_READ, task) < 0)
|
||||||
result = errno != 0 ? errno : EIO;
|
{
|
||||||
|
result = errno != 0 ? errno : EIO;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
@ -345,10 +484,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
|
||||||
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE);
|
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE);
|
||||||
break;
|
break;
|
||||||
case SF_NIO_STAGE_FORWARDED: //forward by other thread
|
case SF_NIO_STAGE_FORWARDED: //forward by other thread
|
||||||
if ((result=sf_ioevent_add(task, (IOEventCallback)
|
if ((result=sf_ioevent_add(task)) == 0) {
|
||||||
sf_client_sock_read,
|
|
||||||
SF_CTX->net_buffer_cfg.network_timeout)) == 0)
|
|
||||||
{
|
|
||||||
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND);
|
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
@ -435,7 +571,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
|
||||||
{
|
{
|
||||||
result = errno != 0 ? errno : EIO;
|
result = errno != 0 ? errno : EIO;
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"write eventfd %d fail, errno: %d, error info: %s",
|
"write to fd %d fail, errno: %d, error info: %s",
|
||||||
__LINE__, FC_NOTIFY_WRITE_FD(task->thread_data),
|
__LINE__, FC_NOTIFY_WRITE_FD(task->thread_data),
|
||||||
result, STRERROR(result));
|
result, STRERROR(result));
|
||||||
return result;
|
return result;
|
||||||
|
|
@ -463,26 +599,32 @@ static inline void deal_notified_task(struct fast_task_info *task,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_recv_notify_read(int sock, short event, void *arg)
|
void sf_recv_notify_read(int fd, const int event, void *arg)
|
||||||
{
|
{
|
||||||
int64_t n;
|
int64_t n;
|
||||||
int stage;
|
int stage;
|
||||||
struct nio_thread_data *thread_data;
|
struct ioevent_notify_entry *notify_entry;
|
||||||
struct fast_task_info *task;
|
struct fast_task_info *task;
|
||||||
struct fast_task_info *current;
|
struct fast_task_info *current;
|
||||||
|
|
||||||
thread_data = ((struct ioevent_notify_entry *)arg)->thread_data;
|
notify_entry = (struct ioevent_notify_entry *)arg;
|
||||||
if (read(sock, &n, sizeof(n)) < 0) {
|
if (read(fd, &n, sizeof(n)) < 0) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (errno == EAGAIN) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"read from eventfd %d fail, errno: %d, error info: %s",
|
"read from fd %d fail, errno: %d, error info: %s",
|
||||||
__LINE__, sock, errno, STRERROR(errno));
|
__LINE__, fd, errno, STRERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock);
|
PTHREAD_MUTEX_LOCK(¬ify_entry->thread_data->waiting_queue.lock);
|
||||||
current = thread_data->waiting_queue.head;
|
current = notify_entry->thread_data->waiting_queue.head;
|
||||||
thread_data->waiting_queue.head = NULL;
|
notify_entry->thread_data->waiting_queue.head = NULL;
|
||||||
thread_data->waiting_queue.tail = NULL;
|
notify_entry->thread_data->waiting_queue.tail = NULL;
|
||||||
PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock);
|
PTHREAD_MUTEX_UNLOCK(¬ify_entry->thread_data->waiting_queue.lock);
|
||||||
|
|
||||||
while (current != NULL) {
|
while (current != NULL) {
|
||||||
task = current;
|
task = current;
|
||||||
|
|
@ -508,16 +650,31 @@ int sf_send_add_event(struct fast_task_info *task)
|
||||||
if (task->send.ptr->length > 0) {
|
if (task->send.ptr->length > 0) {
|
||||||
/* direct send */
|
/* direct send */
|
||||||
task->nio_stages.current = SF_NIO_STAGE_SEND;
|
task->nio_stages.current = SF_NIO_STAGE_SEND;
|
||||||
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
|
#if IOEVENT_USE_URING
|
||||||
return errno != 0 ? errno : EIO;
|
if (task->handler->use_io_uring) {
|
||||||
|
if (task->event.callback != (IOEventCallback)sf_client_sock_write) {
|
||||||
|
task->event.callback = (IOEventCallback)sf_client_sock_write;
|
||||||
|
}
|
||||||
|
if (task->handler->use_send_zc) {
|
||||||
|
return uring_prep_first_send_zc(task);
|
||||||
|
} else {
|
||||||
|
return uring_prep_first_send(task);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
|
||||||
|
return errno != 0 ? errno : EIO;
|
||||||
|
}
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int check_task(struct fast_task_info *task,
|
static inline int check_task(struct fast_task_info *task,
|
||||||
const short event, const int expect_stage)
|
const int event, const int expect_stage)
|
||||||
{
|
{
|
||||||
if (task->canceled) {
|
if (task->canceled) {
|
||||||
return ENOTCONN;
|
return ENOTCONN;
|
||||||
|
|
@ -537,6 +694,18 @@ static inline int check_task(struct fast_task_info *task,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
|
"client ip: %s, event: %d, expect stage: %d, "
|
||||||
|
"but current stage: %d, close connection",
|
||||||
|
__LINE__, task->client_ip, event,
|
||||||
|
expect_stage, task->nio_stages.current);
|
||||||
|
ioevent_add_to_deleted_list(task);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (task->handler->comm_type == fc_comm_type_sock) {
|
if (task->handler->comm_type == fc_comm_type_sock) {
|
||||||
if (tcp_socket_connected(task->event.fd)) {
|
if (tcp_socket_connected(task->event.fd)) {
|
||||||
return EAGAIN;
|
return EAGAIN;
|
||||||
|
|
@ -553,28 +722,69 @@ static inline int check_task(struct fast_task_info *task,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
static inline int prepare_next_send(struct fast_task_info *task)
|
||||||
|
{
|
||||||
|
if (task->handler->use_send_zc) {
|
||||||
|
return uring_prep_next_send_zc(task);
|
||||||
|
} else {
|
||||||
|
return uring_prep_next_send(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
ssize_t sf_socket_send_data(struct fast_task_info *task,
|
ssize_t sf_socket_send_data(struct fast_task_info *task,
|
||||||
SFCommAction *action, bool *send_done)
|
SFCommAction *action, bool *send_done)
|
||||||
{
|
{
|
||||||
int bytes;
|
int bytes;
|
||||||
|
int result;
|
||||||
|
|
||||||
if (task->iovec_array.iovs != NULL) {
|
#if IOEVENT_USE_URING
|
||||||
bytes = writev(task->event.fd, task->iovec_array.iovs,
|
if (task->handler->use_io_uring) {
|
||||||
FC_MIN(task->iovec_array.count, IOV_MAX));
|
bytes = task->event.res;
|
||||||
} else {
|
} else {
|
||||||
bytes = write(task->event.fd, task->send.ptr->data +
|
#endif
|
||||||
task->send.ptr->offset, task->send.ptr->length -
|
if (task->iovec_array.iovs != NULL) {
|
||||||
task->send.ptr->offset);
|
bytes = writev(task->event.fd, task->iovec_array.iovs,
|
||||||
|
FC_MIN(task->iovec_array.count, IOV_MAX));
|
||||||
|
} else {
|
||||||
|
bytes = write(task->event.fd, task->send.ptr->data +
|
||||||
|
task->send.ptr->offset, task->send.ptr->length -
|
||||||
|
task->send.ptr->offset);
|
||||||
|
}
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
#if IOEVENT_USE_URING
|
||||||
{
|
if (task->handler->use_io_uring) {
|
||||||
if (set_write_event(task) != 0) {
|
result = -bytes;
|
||||||
return -1;
|
} else {
|
||||||
|
#endif
|
||||||
|
result = errno;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (result == EAGAIN || result == EWOULDBLOCK) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (prepare_next_send(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
if (set_write_event(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
*action = sf_comm_action_break;
|
*action = sf_comm_action_break;
|
||||||
return 0;
|
return 0;
|
||||||
} else if (errno == EINTR) { //should retry
|
} else if (result == EINTR && !task->handler->use_io_uring) {
|
||||||
|
/* should try again */
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, ignore interupt signal",
|
"client ip: %s, ignore interupt signal",
|
||||||
__LINE__, task->client_ip);
|
__LINE__, task->client_ip);
|
||||||
|
|
@ -585,30 +795,40 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
||||||
"client ip: %s, send fail, task offset: %d, length: %d, "
|
"client ip: %s, send fail, task offset: %d, length: %d, "
|
||||||
"errno: %d, error info: %s", __LINE__, task->client_ip,
|
"errno: %d, error info: %s", __LINE__, task->client_ip,
|
||||||
task->send.ptr->offset, task->send.ptr->length,
|
task->send.ptr->offset, task->send.ptr->length,
|
||||||
errno, strerror(errno));
|
result, strerror(result));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else if (bytes == 0) {
|
} else if (bytes == 0) {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, sock: %d, task length: %d, offset: %d, "
|
"client ip: %s, task length: %d, offset: %d, "
|
||||||
"send failed, connection disconnected", __LINE__,
|
"send failed, connection disconnected", __LINE__,
|
||||||
task->client_ip, task->event.fd, task->send.ptr->length,
|
task->client_ip, task->send.ptr->length,
|
||||||
task->send.ptr->offset);
|
task->send.ptr->offset);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
task->send.ptr->offset += bytes;
|
task->send.ptr->offset += bytes;
|
||||||
if (task->send.ptr->offset >= task->send.ptr->length) {
|
if (task->send.ptr->offset >= task->send.ptr->length) {
|
||||||
if (task->send.ptr != task->recv.ptr) { //double buffers
|
#if IOEVENT_USE_URING
|
||||||
task->send.ptr->offset = 0;
|
if (FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||||
task->send.ptr->length = 0;
|
ev_puller.send_zc_done_notify)
|
||||||
}
|
{
|
||||||
*action = sf_comm_action_finish;
|
*action = sf_comm_action_break;
|
||||||
*send_done = true;
|
*send_done = false;
|
||||||
} else {
|
} else {
|
||||||
*action = sf_comm_action_continue;
|
#endif
|
||||||
*send_done = false;
|
if (task->send.ptr != task->recv.ptr) { //double buffers
|
||||||
|
task->send.ptr->offset = 0;
|
||||||
|
task->send.ptr->length = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
*action = sf_comm_action_finish;
|
||||||
|
*send_done = true;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
} else {
|
||||||
/* set next writev iovec array */
|
/* set next writev iovec array */
|
||||||
if (task->iovec_array.iovs != NULL) {
|
if (task->iovec_array.iovs != NULL) {
|
||||||
struct iovec *iov;
|
struct iovec *iov;
|
||||||
|
|
@ -637,6 +857,25 @@ ssize_t sf_socket_send_data(struct fast_task_info *task,
|
||||||
task->iovec_array.iovs = iov;
|
task->iovec_array.iovs = iov;
|
||||||
task->iovec_array.count = end - iov;
|
task->iovec_array.count = end - iov;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||||
|
ev_puller.send_zc_done_notify))
|
||||||
|
{
|
||||||
|
if (prepare_next_send(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*action = sf_comm_action_break;
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
*action = sf_comm_action_continue;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
*send_done = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytes;
|
return bytes;
|
||||||
|
|
@ -646,30 +885,56 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
||||||
const bool call_post_recv, SFCommAction *action)
|
const bool call_post_recv, SFCommAction *action)
|
||||||
{
|
{
|
||||||
int bytes;
|
int bytes;
|
||||||
|
int result;
|
||||||
int recv_bytes;
|
int recv_bytes;
|
||||||
bool new_alloc;
|
bool new_alloc;
|
||||||
|
|
||||||
if (task->recv.ptr->length == 0) { //recv header
|
#if IOEVENT_USE_URING
|
||||||
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
|
if (task->handler->use_io_uring) {
|
||||||
bytes = read(task->event.fd, task->recv.ptr->data +
|
bytes = task->event.res;
|
||||||
task->recv.ptr->offset, recv_bytes);
|
|
||||||
} else {
|
} else {
|
||||||
recv_bytes = task->recv.ptr->length - task->recv.ptr->offset;
|
#endif
|
||||||
if (task->recv_body == NULL) {
|
if (task->recv.ptr->length == 0) { //recv header
|
||||||
|
recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
|
||||||
bytes = read(task->event.fd, task->recv.ptr->data +
|
bytes = read(task->event.fd, task->recv.ptr->data +
|
||||||
task->recv.ptr->offset, recv_bytes);
|
task->recv.ptr->offset, recv_bytes);
|
||||||
} else {
|
} else {
|
||||||
bytes = read(task->event.fd, task->recv_body +
|
recv_bytes = task->recv.ptr->length - task->recv.ptr->offset;
|
||||||
(task->recv.ptr->offset - SF_CTX->
|
if (task->recv_body == NULL) {
|
||||||
header_size), recv_bytes);
|
bytes = read(task->event.fd, task->recv.ptr->data +
|
||||||
|
task->recv.ptr->offset, recv_bytes);
|
||||||
|
} else {
|
||||||
|
bytes = read(task->event.fd, task->recv_body +
|
||||||
|
(task->recv.ptr->offset - SF_CTX->
|
||||||
|
header_size), recv_bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
result = -bytes;
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
result = errno;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (result == EAGAIN || result == EWOULDBLOCK) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (prepare_next_recv(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
*action = sf_comm_action_break;
|
*action = sf_comm_action_break;
|
||||||
return 0;
|
return 0;
|
||||||
} else if (errno == EINTR) { //should retry
|
} else if (result == EINTR && !task->handler->use_io_uring) {
|
||||||
|
/* should try again */
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, ignore interupt signal",
|
"client ip: %s, ignore interupt signal",
|
||||||
__LINE__, task->client_ip);
|
__LINE__, task->client_ip);
|
||||||
|
|
@ -680,7 +945,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
||||||
"client ip: %s, recv fail, "
|
"client ip: %s, recv fail, "
|
||||||
"errno: %d, error info: %s",
|
"errno: %d, error info: %s",
|
||||||
__LINE__, task->client_ip,
|
__LINE__, task->client_ip,
|
||||||
errno, strerror(errno));
|
result, strerror(result));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else if (bytes == 0) {
|
} else if (bytes == 0) {
|
||||||
|
|
@ -712,7 +977,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
||||||
task->recv.ptr->offset += bytes;
|
task->recv.ptr->offset += bytes;
|
||||||
if (task->recv.ptr->length == 0) { //pkg header
|
if (task->recv.ptr->length == 0) { //pkg header
|
||||||
if (task->recv.ptr->offset < SF_CTX->header_size) {
|
if (task->recv.ptr->offset < SF_CTX->header_size) {
|
||||||
*action = sf_comm_action_continue;
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (prepare_next_recv(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*action = sf_comm_action_break;
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
*action = sf_comm_action_continue;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -765,7 +1041,18 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
|
||||||
if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done
|
if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done
|
||||||
*action = sf_comm_action_finish;
|
*action = sf_comm_action_finish;
|
||||||
} else {
|
} else {
|
||||||
*action = sf_comm_action_continue;
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (prepare_next_recv(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*action = sf_comm_action_break;
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
*action = sf_comm_action_continue;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytes;
|
return bytes;
|
||||||
|
|
@ -841,8 +1128,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task)
|
||||||
ioevent_set_timeout(&task->thread_data->ev_puller,
|
ioevent_set_timeout(&task->thread_data->ev_puller,
|
||||||
task->thread_data->timeout_ms);
|
task->thread_data->timeout_ms);
|
||||||
}
|
}
|
||||||
result = sf_ioevent_add(task, (IOEventCallback)
|
result = sf_ioevent_add(task);
|
||||||
sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout);
|
|
||||||
|
|
||||||
format_ip_address(task->client_ip, formatted_ip);
|
format_ip_address(task->client_ip, formatted_ip);
|
||||||
logInfo("file: "__FILE__", line: %d, client: %s:%u, "
|
logInfo("file: "__FILE__", line: %d, client: %s:%u, "
|
||||||
|
|
@ -904,7 +1190,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_client_sock_read(int sock, short event, void *arg)
|
static int sf_client_sock_read(int sock, const int event, void *arg)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
|
|
@ -914,10 +1200,15 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
|
|
||||||
task = (struct fast_task_info *)arg;
|
task = (struct fast_task_info *)arg;
|
||||||
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
|
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||||
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return result >= 0 ? 0 : -1;
|
return result >= 0 ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event & IOEVENT_TIMEOUT) {
|
if (event == IOEVENT_TIMEOUT) {
|
||||||
if (task->recv.ptr->offset == 0 && task->req_count > 0) {
|
if (task->recv.ptr->offset == 0 && task->req_count > 0) {
|
||||||
if (SF_CTX->callbacks.task_timeout != NULL) {
|
if (SF_CTX->callbacks.task_timeout != NULL) {
|
||||||
if (SF_CTX->callbacks.task_timeout(task) != 0) {
|
if (SF_CTX->callbacks.task_timeout(task) != 0) {
|
||||||
|
|
@ -929,7 +1220,8 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
task->event.timer.expires = g_current_time +
|
task->event.timer.expires = g_current_time +
|
||||||
SF_CTX->net_buffer_cfg.network_timeout;
|
SF_CTX->net_buffer_cfg.network_timeout;
|
||||||
fast_timer_add(&task->thread_data->timer,
|
fast_timer_add(&task->thread_data->timer,
|
||||||
&task->event.timer);
|
&task->event.timer);
|
||||||
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
if (task->recv.ptr->length > 0) {
|
if (task->recv.ptr->length > 0) {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
|
|
@ -946,10 +1238,14 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
ioevent_add_to_deleted_list(task);
|
ioevent_add_to_deleted_list(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
total_read = 0;
|
total_read = 0;
|
||||||
action = sf_comm_action_continue;
|
action = sf_comm_action_continue;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
@ -996,23 +1292,63 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
return total_read;
|
return total_read;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_client_sock_write(int sock, short event, void *arg)
|
static int sock_write_done(struct fast_task_info *task,
|
||||||
|
const int length, const bool send_done)
|
||||||
|
{
|
||||||
|
int next_stage;
|
||||||
|
|
||||||
|
release_iovec_buffer(task);
|
||||||
|
task->recv.ptr->offset = 0;
|
||||||
|
task->recv.ptr->length = 0;
|
||||||
|
|
||||||
|
if (SF_CTX->callbacks.send_done == NULL || !send_done) {
|
||||||
|
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
||||||
|
} else {
|
||||||
|
if (SF_CTX->callbacks.send_done(task,
|
||||||
|
length, &next_stage) != 0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task->nio_stages.current != next_stage) {
|
||||||
|
task->nio_stages.current = next_stage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task->nio_stages.current == SF_NIO_STAGE_RECV) {
|
||||||
|
if (set_read_event(task) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int sf_client_sock_write(int sock, const int event, void *arg)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
int total_write;
|
int total_write;
|
||||||
int length;
|
int length;
|
||||||
int next_stage;
|
|
||||||
SFCommAction action;
|
SFCommAction action;
|
||||||
bool send_done;
|
bool send_done;
|
||||||
struct fast_task_info *task;
|
struct fast_task_info *task;
|
||||||
|
|
||||||
task = (struct fast_task_info *)arg;
|
task = (struct fast_task_info *)arg;
|
||||||
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
|
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring && event != IOEVENT_TIMEOUT) {
|
||||||
|
if (event == IOEVENT_NOTIFY || !(FC_URING_IS_SEND_ZC(task) &&
|
||||||
|
task->thread_data->ev_puller.send_zc_done_notify))
|
||||||
|
{
|
||||||
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return result >= 0 ? 0 : -1;
|
return result >= 0 ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event & IOEVENT_TIMEOUT) {
|
if (event == IOEVENT_TIMEOUT) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, send timeout. total length: %d, offset: %d, "
|
"client ip: %s, send timeout. total length: %d, offset: %d, "
|
||||||
"remain: %d", __LINE__, task->client_ip, task->send.ptr->length,
|
"remain: %d", __LINE__, task->client_ip, task->send.ptr->length,
|
||||||
|
|
@ -1023,6 +1359,42 @@ int sf_client_sock_write(int sock, short event, void *arg)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (event == IOEVENT_NOTIFY) {
|
||||||
|
if (!FC_URING_IS_SEND_ZC(task)) {
|
||||||
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
|
"unexpected io_uring notify!", __LINE__);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
FC_URING_OP_TYPE(task) = IORING_OP_NOP;
|
||||||
|
if (!task->canceled) {
|
||||||
|
if (task->send.ptr->offset >= task->send.ptr->length) {
|
||||||
|
length = task->send.ptr->length;
|
||||||
|
if (task->send.ptr != task->recv.ptr) { //double buffers
|
||||||
|
task->send.ptr->offset = 0;
|
||||||
|
task->send.ptr->length = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
result = sock_write_done(task, length, true);
|
||||||
|
} else {
|
||||||
|
result = prepare_next_send(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sf_release_task(task);
|
||||||
|
return result == 0 ? 0 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
if (!(FC_URING_IS_SEND_ZC(task) && task->thread_data->
|
||||||
|
ev_puller.send_zc_done_notify))
|
||||||
|
{
|
||||||
|
CLEAR_OP_TYPE_AND_RELEASE_TASK(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
total_write = 0;
|
total_write = 0;
|
||||||
length = task->send.ptr->length;
|
length = task->send.ptr->length;
|
||||||
action = sf_comm_action_continue;
|
action = sf_comm_action_continue;
|
||||||
|
|
@ -1038,27 +1410,9 @@ int sf_client_sock_write(int sock, short event, void *arg)
|
||||||
|
|
||||||
total_write += bytes;
|
total_write += bytes;
|
||||||
if (action == sf_comm_action_finish) {
|
if (action == sf_comm_action_finish) {
|
||||||
release_iovec_buffer(task);
|
if (sock_write_done(task, length, send_done) != 0) {
|
||||||
task->recv.ptr->offset = 0;
|
|
||||||
task->recv.ptr->length = 0;
|
|
||||||
if (set_read_event(task) != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SF_CTX->callbacks.send_done == NULL || !send_done) {
|
|
||||||
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
|
||||||
} else {
|
|
||||||
if (SF_CTX->callbacks.send_done(task,
|
|
||||||
length, &next_stage) != 0)
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (task->nio_stages.current != next_stage) {
|
|
||||||
task->nio_stages.current = next_stage;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
} else if (action == sf_comm_action_break) {
|
} else if (action == sf_comm_action_break) {
|
||||||
break;
|
break;
|
||||||
|
|
|
||||||
19
src/sf_nio.h
19
src/sf_nio.h
|
|
@ -68,15 +68,6 @@ static inline void sf_set_connect_done_callback_ex(SFContext *sf_context,
|
||||||
sf_set_connect_done_callback_ex(&g_sf_context, done_callback)
|
sf_set_connect_done_callback_ex(&g_sf_context, done_callback)
|
||||||
|
|
||||||
|
|
||||||
static inline void sf_set_remove_from_ready_list_ex(
|
|
||||||
SFContext *sf_context, const bool enabled)
|
|
||||||
{
|
|
||||||
sf_context->remove_from_ready_list = enabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
#define sf_set_remove_from_ready_list(enabled) \
|
|
||||||
sf_set_remove_from_ready_list_ex(&g_sf_context, enabled);
|
|
||||||
|
|
||||||
static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex(
|
static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex(
|
||||||
SFContext *sf_context)
|
SFContext *sf_context)
|
||||||
{
|
{
|
||||||
|
|
@ -99,10 +90,9 @@ static inline void sf_nio_reset_task_length(struct fast_task_info *task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_recv_notify_read(int sock, short event, void *arg);
|
void sf_socket_close_connection(struct fast_task_info *task);
|
||||||
|
void sf_recv_notify_read(int sock, const int event, void *arg);
|
||||||
int sf_send_add_event(struct fast_task_info *task);
|
int sf_send_add_event(struct fast_task_info *task);
|
||||||
int sf_client_sock_write(int sock, short event, void *arg);
|
|
||||||
int sf_client_sock_read(int sock, short event, void *arg);
|
|
||||||
|
|
||||||
void sf_task_finish_clean_up(struct fast_task_info *task);
|
void sf_task_finish_clean_up(struct fast_task_info *task);
|
||||||
|
|
||||||
|
|
@ -158,11 +148,6 @@ static inline int sf_nio_forward_request(struct fast_task_info *task,
|
||||||
return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED);
|
return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task)
|
|
||||||
{
|
|
||||||
return (task->event.callback == (IOEventCallback)sf_client_sock_read);
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
|
static inline void sf_nio_add_to_deleted_list(struct nio_thread_data
|
||||||
*thread_data, struct fast_task_info *task)
|
*thread_data, struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
118
src/sf_service.c
118
src/sf_service.c
|
|
@ -34,8 +34,8 @@
|
||||||
#include "fastcommon/ioevent_loop.h"
|
#include "fastcommon/ioevent_loop.h"
|
||||||
#include "fastcommon/fc_memory.h"
|
#include "fastcommon/fc_memory.h"
|
||||||
#include "sf_nio.h"
|
#include "sf_nio.h"
|
||||||
|
#include "sf_proto.h"
|
||||||
#include "sf_util.h"
|
#include "sf_util.h"
|
||||||
#include "sf_global.h"
|
|
||||||
#include "sf_service.h"
|
#include "sf_service.h"
|
||||||
|
|
||||||
#if defined(OS_LINUX)
|
#if defined(OS_LINUX)
|
||||||
|
|
@ -61,12 +61,14 @@ struct worker_thread_context {
|
||||||
static void *worker_thread_entrance(void *arg);
|
static void *worker_thread_entrance(void *arg);
|
||||||
|
|
||||||
static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
||||||
const bool double_buffers, const int task_padding_size,
|
const bool double_buffers, const bool need_shrink_task_buffer,
|
||||||
const int task_arg_size, TaskInitCallback init_callback,
|
const int task_padding_size, const int task_arg_size,
|
||||||
void *init_arg)
|
TaskInitCallback init_callback, void *init_arg)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
int buffer_size;
|
||||||
int m;
|
int m;
|
||||||
|
int max_m;
|
||||||
int alloc_conn_once;
|
int alloc_conn_once;
|
||||||
|
|
||||||
if ((result=set_rand_seed()) != 0) {
|
if ((result=set_rand_seed()) != 0) {
|
||||||
|
|
@ -75,17 +77,26 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
m = sf_context->net_buffer_cfg.min_buff_size / (64 * 1024);
|
if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) {
|
||||||
|
buffer_size = FC_MAX(4 * 1024 * 1024, sf_context->
|
||||||
|
net_buffer_cfg.max_buff_size);
|
||||||
|
max_m = 64;
|
||||||
|
} else {
|
||||||
|
buffer_size = sf_context->net_buffer_cfg.min_buff_size;
|
||||||
|
max_m = 16;
|
||||||
|
}
|
||||||
|
m = buffer_size / (64 * 1024);
|
||||||
if (m == 0) {
|
if (m == 0) {
|
||||||
m = 1;
|
m = 1;
|
||||||
} else if (m > 16) {
|
} else if (m > max_m) {
|
||||||
m = 16;
|
m = max_m;
|
||||||
}
|
}
|
||||||
alloc_conn_once = 256 / m;
|
alloc_conn_once = 256 / m;
|
||||||
|
|
||||||
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
|
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
|
||||||
sf_context->net_buffer_cfg.max_connections, alloc_conn_once,
|
need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections,
|
||||||
sf_context->net_buffer_cfg.min_buff_size, sf_context->
|
alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size,
|
||||||
net_buffer_cfg.max_buff_size, task_padding_size,
|
sf_context->net_buffer_cfg.max_buff_size, task_padding_size,
|
||||||
task_arg_size, init_callback, init_arg);
|
task_arg_size, init_callback, init_arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -101,12 +112,14 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
||||||
const int proto_header_size, const int task_padding_size,
|
const int proto_header_size, const int task_padding_size,
|
||||||
const int task_arg_size, const bool double_buffers,
|
const int task_arg_size, const bool double_buffers,
|
||||||
const bool explicit_post_recv, TaskInitCallback init_callback,
|
const bool need_shrink_task_buffer, const bool explicit_post_recv,
|
||||||
void *init_arg, sf_release_buffer_callback release_buffer_callback)
|
TaskInitCallback init_callback, void *init_arg,
|
||||||
|
sf_release_buffer_callback release_buffer_callback)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
int extra_events;
|
int extra_events;
|
||||||
|
int max_entries;
|
||||||
int i;
|
int i;
|
||||||
struct worker_thread_context *thread_contexts;
|
struct worker_thread_context *thread_contexts;
|
||||||
struct worker_thread_context *thread_ctx;
|
struct worker_thread_context *thread_ctx;
|
||||||
|
|
@ -132,8 +145,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
|
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
|
||||||
task_padding_size, task_arg_size, init_callback,
|
need_shrink_task_buffer, task_padding_size,
|
||||||
init_arg)) != 0)
|
task_arg_size, init_callback, init_arg)) != 0)
|
||||||
{
|
{
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
@ -161,7 +174,11 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
|
|
||||||
if (SF_G_EPOLL_EDGE_TRIGGER) {
|
if (SF_G_EPOLL_EDGE_TRIGGER) {
|
||||||
#ifdef OS_LINUX
|
#ifdef OS_LINUX
|
||||||
|
#if IOEVENT_USE_EPOLL
|
||||||
extra_events = EPOLLET;
|
extra_events = EPOLLET;
|
||||||
|
#else
|
||||||
|
extra_events = 0;
|
||||||
|
#endif
|
||||||
#elif defined(OS_FREEBSD)
|
#elif defined(OS_FREEBSD)
|
||||||
extra_events = EV_CLEAR;
|
extra_events = EV_CLEAR;
|
||||||
#else
|
#else
|
||||||
|
|
@ -171,6 +188,36 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
extra_events = 0;
|
extra_events = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
max_entries = (sf_context->net_buffer_cfg.max_connections +
|
||||||
|
sf_context->work_threads - 1) / sf_context->work_threads;
|
||||||
|
if (strcmp(sf_context->name, "service") == 0) {
|
||||||
|
if (max_entries < 4 * 1024) {
|
||||||
|
max_entries = max_entries * 2;
|
||||||
|
} else if (max_entries < 8 * 1024) {
|
||||||
|
max_entries = (max_entries * 3) / 2;
|
||||||
|
} else if (max_entries < 16 * 1024) {
|
||||||
|
max_entries = (max_entries * 5) / 4;
|
||||||
|
} else if (max_entries < 32 * 1024) {
|
||||||
|
max_entries = (max_entries * 6) / 5;
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (max_entries > 32 * 1024) {
|
||||||
|
max_entries = 32 * 1024;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
} else if (max_entries < 64 * 1024) {
|
||||||
|
max_entries = (max_entries * 11) / 10;
|
||||||
|
} else if (max_entries < 128 * 1024) {
|
||||||
|
max_entries = (max_entries * 21) / 20;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (max_entries < 1024) {
|
||||||
|
max_entries += 8;
|
||||||
|
} else {
|
||||||
|
max_entries = 1024;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
g_current_time = time(NULL);
|
g_current_time = time(NULL);
|
||||||
sf_context->thread_count = 0;
|
sf_context->thread_count = 0;
|
||||||
data_end = sf_context->thread_data + sf_context->work_threads;
|
data_end = sf_context->thread_data + sf_context->work_threads;
|
||||||
|
|
@ -195,18 +242,37 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
thread_data->arg = NULL;
|
thread_data->arg = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ioevent_init(&thread_data->ev_puller, 2 + sf_context->
|
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->name,
|
||||||
net_buffer_cfg.max_connections, net_timeout_ms,
|
max_entries, net_timeout_ms, extra_events)) != 0)
|
||||||
extra_events) != 0)
|
|
||||||
{
|
{
|
||||||
result = errno != 0 ? errno : ENOMEM;
|
char prompt[256];
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (result == EPERM) {
|
||||||
|
strcpy(prompt, " make sure kernel.io_uring_disabled set to 0");
|
||||||
|
} else if (result == EINVAL) {
|
||||||
|
sprintf(prompt, " maybe max_connections: %d is too large"
|
||||||
|
" or [%s]'s work_threads: %d is too small",
|
||||||
|
sf_context->net_buffer_cfg.max_connections,
|
||||||
|
sf_context->name, sf_context->work_threads);
|
||||||
|
} else {
|
||||||
|
*prompt = '\0';
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
*prompt = '\0';
|
||||||
|
#endif
|
||||||
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"ioevent_init fail, "
|
"ioevent_init fail, errno: %d, error info: %s.%s"
|
||||||
"errno: %d, error info: %s",
|
, __LINE__, result, strerror(result), prompt);
|
||||||
__LINE__, result, strerror(result));
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (send_done_callback != NULL) {
|
||||||
|
ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
result = fast_timer_init(&thread_data->timer, 2 * sf_context->
|
result = fast_timer_init(&thread_data->timer, 2 * sf_context->
|
||||||
net_buffer_cfg.network_timeout, g_current_time);
|
net_buffer_cfg.network_timeout, g_current_time);
|
||||||
if (result != 0) {
|
if (result != 0) {
|
||||||
|
|
@ -487,7 +553,9 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener)
|
||||||
}
|
}
|
||||||
FC_SET_CLOEXEC(incomesock);
|
FC_SET_CLOEXEC(incomesock);
|
||||||
|
|
||||||
if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) {
|
if ((task=sf_alloc_init_server_task(listener->handler,
|
||||||
|
incomesock)) == NULL)
|
||||||
|
{
|
||||||
close(incomesock);
|
close(incomesock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
@ -498,12 +566,6 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener)
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_socket_close_connection(struct fast_task_info *task)
|
|
||||||
{
|
|
||||||
close(task->event.fd);
|
|
||||||
task->event.fd = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void sf_socket_close_ex(SFContext *sf_context)
|
void sf_socket_close_ex(SFContext *sf_context)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,8 @@
|
||||||
#include "fastcommon/ioevent.h"
|
#include "fastcommon/ioevent.h"
|
||||||
#include "fastcommon/fast_task_queue.h"
|
#include "fastcommon/fast_task_queue.h"
|
||||||
#include "sf_types.h"
|
#include "sf_types.h"
|
||||||
|
#include "sf_proto.h"
|
||||||
|
#include "sf_global.h"
|
||||||
|
|
||||||
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
|
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
|
||||||
typedef void (*sf_sig_quit_handler)(int sig);
|
typedef void (*sf_sig_quit_handler)(int sig);
|
||||||
|
|
@ -45,8 +47,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
||||||
const int proto_header_size, const int task_padding_size,
|
const int proto_header_size, const int task_padding_size,
|
||||||
const int task_arg_size, const bool double_buffers,
|
const int task_arg_size, const bool double_buffers,
|
||||||
const bool explicit_post_recv, TaskInitCallback init_callback,
|
const bool need_shrink_task_buffer, const bool explicit_post_recv,
|
||||||
void *init_arg, sf_release_buffer_callback release_buffer_callback);
|
TaskInitCallback init_callback, void *init_arg,
|
||||||
|
sf_release_buffer_callback release_buffer_callback);
|
||||||
|
|
||||||
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
|
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
|
|
@ -56,7 +59,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
NULL, send_done_callback, deal_func, task_cleanup_func, \
|
NULL, send_done_callback, deal_func, task_cleanup_func, \
|
||||||
timeout_callback, net_timeout_ms, proto_header_size, \
|
timeout_callback, net_timeout_ms, proto_header_size, \
|
||||||
0, task_arg_size, false, false, NULL, NULL, NULL)
|
0, task_arg_size, false, true, false, NULL, NULL, NULL)
|
||||||
|
|
||||||
#define sf_service_init(name, alloc_thread_extra_data_callback, \
|
#define sf_service_init(name, alloc_thread_extra_data_callback, \
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
|
|
@ -65,8 +68,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
|
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
|
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
|
||||||
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
|
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
|
||||||
net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \
|
net_timeout_ms, proto_header_size, 0, task_arg_size, false, true, \
|
||||||
NULL, NULL, NULL)
|
false, NULL, NULL, NULL)
|
||||||
|
|
||||||
int sf_service_destroy_ex(SFContext *sf_context);
|
int sf_service_destroy_ex(SFContext *sf_context);
|
||||||
|
|
||||||
|
|
@ -108,8 +111,6 @@ int sf_socket_create_server(SFListener *listener,
|
||||||
void sf_socket_close_server(SFListener *listener);
|
void sf_socket_close_server(SFListener *listener);
|
||||||
struct fast_task_info *sf_socket_accept_connection(SFListener *listener);
|
struct fast_task_info *sf_socket_accept_connection(SFListener *listener);
|
||||||
|
|
||||||
void sf_socket_close_connection(struct fast_task_info *task);
|
|
||||||
|
|
||||||
int sf_socket_server_ex(SFContext *sf_context);
|
int sf_socket_server_ex(SFContext *sf_context);
|
||||||
#define sf_socket_server() sf_socket_server_ex(&g_sf_context)
|
#define sf_socket_server() sf_socket_server_ex(&g_sf_context)
|
||||||
|
|
||||||
|
|
@ -163,6 +164,11 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (task->shrinked) {
|
||||||
|
task->shrinked = false;
|
||||||
|
sf_proto_init_task_magic(task);
|
||||||
|
}
|
||||||
|
|
||||||
__sync_add_and_fetch(&task->reffer_count, reffer_count);
|
__sync_add_and_fetch(&task->reffer_count, reffer_count);
|
||||||
__sync_bool_compare_and_swap(&task->canceled, 1, 0);
|
__sync_bool_compare_and_swap(&task->canceled, 1, 0);
|
||||||
task->handler = handler;
|
task->handler = handler;
|
||||||
|
|
@ -170,12 +176,42 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define sf_hold_task_ex(task, inc_count) __sync_add_and_fetch( \
|
#define sf_hold_task_ex(task, inc_count) fc_hold_task_ex(task, inc_count)
|
||||||
&task->reffer_count, inc_count)
|
#define sf_hold_task(task) fc_hold_task(task)
|
||||||
#define sf_hold_task(task) sf_hold_task_ex(task, 1)
|
|
||||||
|
|
||||||
#define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1)
|
#define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1)
|
||||||
|
|
||||||
|
static inline struct fast_task_info *sf_alloc_init_server_task(
|
||||||
|
SFNetworkHandler *handler, const int fd)
|
||||||
|
{
|
||||||
|
const int reffer_count = 1;
|
||||||
|
struct fast_task_info *task;
|
||||||
|
|
||||||
|
if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
FC_URING_IS_CLIENT(task) = false;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline struct fast_task_info *sf_alloc_init_client_task(
|
||||||
|
SFNetworkHandler *handler)
|
||||||
|
{
|
||||||
|
const int fd = -1;
|
||||||
|
const int reffer_count = 1;
|
||||||
|
struct fast_task_info *task;
|
||||||
|
|
||||||
|
if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
FC_URING_IS_CLIENT(task) = true;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
static inline void sf_release_task(struct fast_task_info *task)
|
static inline void sf_release_task(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) {
|
if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) {
|
||||||
|
|
@ -187,6 +223,15 @@ static inline void sf_release_task(struct fast_task_info *task)
|
||||||
"used: %d, freed: %d", __LINE__, task,
|
"used: %d, freed: %d", __LINE__, task,
|
||||||
alloc_count, alloc_count - free_count, free_count);
|
alloc_count, alloc_count - free_count, free_count);
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
if (task->handler->use_io_uring) {
|
||||||
|
task->handler->close_connection(task);
|
||||||
|
__sync_fetch_and_sub(&g_sf_global_vars.
|
||||||
|
connection_stat.current_count, 1);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
free_queue_push(task);
|
free_queue_push(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -262,6 +307,19 @@ static inline SFNetworkHandler *sf_get_rdma_network_handler3(
|
||||||
return sf_get_rdma_network_handler(sf_context3);
|
return sf_get_rdma_network_handler(sf_context3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline bool sf_get_double_buffers_flag(FCServerGroupInfo *server_group)
|
||||||
|
{
|
||||||
|
if (server_group->comm_type == fc_comm_type_sock) {
|
||||||
|
#if IOEVENT_USE_URING
|
||||||
|
return true;
|
||||||
|
#else
|
||||||
|
return false;
|
||||||
|
#endif
|
||||||
|
} else { //RDMA
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -115,9 +115,12 @@ typedef struct sf_listener {
|
||||||
|
|
||||||
struct sf_context;
|
struct sf_context;
|
||||||
struct sf_address_family_handler;
|
struct sf_address_family_handler;
|
||||||
|
|
||||||
typedef struct sf_network_handler {
|
typedef struct sf_network_handler {
|
||||||
bool enabled;
|
bool enabled;
|
||||||
bool explicit_post_recv;
|
bool explicit_post_recv;
|
||||||
|
bool use_io_uring; //since v1.2.9
|
||||||
|
bool use_send_zc; //since v1.2.9
|
||||||
FCCommunicationType comm_type;
|
FCCommunicationType comm_type;
|
||||||
struct sf_address_family_handler *fh;
|
struct sf_address_family_handler *fh;
|
||||||
struct ibv_pd *pd;
|
struct ibv_pd *pd;
|
||||||
|
|
@ -179,7 +182,6 @@ typedef struct sf_context {
|
||||||
struct nio_thread_data *thread_data;
|
struct nio_thread_data *thread_data;
|
||||||
volatile int thread_count;
|
volatile int thread_count;
|
||||||
|
|
||||||
//int rdma_port_offset;
|
|
||||||
bool is_client; //since v1.2.5
|
bool is_client; //since v1.2.5
|
||||||
SFAddressFamily address_family;
|
SFAddressFamily address_family;
|
||||||
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
|
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
|
||||||
|
|
@ -190,7 +192,6 @@ typedef struct sf_context {
|
||||||
int work_threads;
|
int work_threads;
|
||||||
|
|
||||||
int header_size;
|
int header_size;
|
||||||
bool remove_from_ready_list;
|
|
||||||
bool realloc_task_buffer;
|
bool realloc_task_buffer;
|
||||||
bool connect_need_log; //for client connect
|
bool connect_need_log; //for client connect
|
||||||
FCSmartPollingConfig smart_polling;
|
FCSmartPollingConfig smart_polling;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue