net buffer config for each sf_context instance

use_iouring
YuQing 2024-02-20 09:53:52 +08:00
parent d5a9f40a66
commit 78d65ba2c6
10 changed files with 131 additions and 107 deletions

View File

@ -50,8 +50,6 @@ static IdempotencyReceiptGlobalVars receipt_global_vars;
static int receipt_init_task(struct fast_task_info *task) static int receipt_init_task(struct fast_task_info *task)
{ {
task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side
task->network_timeout = SF_G_NETWORK_TIMEOUT;
if (RDMA_INIT_CONNECTION != NULL) { if (RDMA_INIT_CONNECTION != NULL) {
return RDMA_INIT_CONNECTION(task, RDMA_PD); return RDMA_INIT_CONNECTION(task, RDMA_PD);
} else { } else {

View File

@ -80,13 +80,23 @@ extern "C" {
return 0; return 0;
} }
static inline void sf_buffered_writer_destroy(SFBufferedWriter *writer) static inline int sf_buffered_writer_destroy(SFBufferedWriter *writer)
{ {
int result;
if (writer->fd >= 0) { if (writer->fd >= 0) {
if (fsync(writer->fd) != 0) {
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, "
"fsync to file %s fail, errno: %d, error info: %s",
__LINE__, writer->filename, result, STRERROR(result));
return result;
}
close(writer->fd); close(writer->fd);
writer->fd = -1; writer->fd = -1;
} }
sf_binlog_buffer_destroy(&writer->buffer); sf_binlog_buffer_destroy(&writer->buffer);
return 0;
} }
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -38,7 +38,7 @@ int sf_connect_to_server(const char *ip_addr, const int port, int *sock)
if(*sock < 0) { if(*sock < 0) {
return errno != 0 ? errno : ENOMEM; return errno != 0 ? errno : ENOMEM;
} }
tcpsetserveropt(*sock, g_sf_global_vars.network_timeout); tcpsetserveropt(*sock, g_sf_global_vars.net_buffer_cfg.network_timeout);
if ((result=tcpsetnonblockopt(*sock)) != 0) { if ((result=tcpsetnonblockopt(*sock)) != 0) {
close(*sock); close(*sock);
@ -47,8 +47,8 @@ int sf_connect_to_server(const char *ip_addr, const int port, int *sock)
} }
FC_SET_CLOEXEC(*sock); FC_SET_CLOEXEC(*sock);
if ((result=connectserverbyip_nb(*sock, ip_addr, port, if ((result=connectserverbyip_nb(*sock, ip_addr, port, g_sf_global_vars.
g_sf_global_vars.connect_timeout)) != 0) net_buffer_cfg.connect_timeout)) != 0)
{ {
close(*sock); close(*sock);
*sock = -1; *sock = -1;

View File

@ -37,11 +37,11 @@
#include "sf_global.h" #include "sf_global.h"
SFGlobalVariables g_sf_global_vars = { SFGlobalVariables g_sf_global_vars = {
SF_DEFAULT_CONNECT_TIMEOUT, SF_DEFAULT_NETWORK_TIMEOUT,
{{'/', 't', 'm', 'p', '\0'}, false}, {{'/', 't', 'm', 'p', '\0'}, false},
true, true, false, DEFAULT_MAX_CONNECTONS, true, true, false, { SF_DEFAULT_CONNECT_TIMEOUT,
SF_DEFAULT_NETWORK_TIMEOUT, DEFAULT_MAX_CONNECTONS,
SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
SF_DEF_MAX_BUFF_SIZE, 0, SF_DEF_THREAD_STACK_SIZE, 0, SF_DEF_MAX_BUFF_SIZE}, 0, SF_DEF_THREAD_STACK_SIZE, 0,
{false, 0, 0, {'\0'}, {'\0'}}, {false, 0, 0, {'\0'}, {'\0'}},
{SF_DEF_SYNC_LOG_BUFF_INTERVAL, false}, {SF_DEF_SYNC_LOG_BUFF_INTERVAL, false},
{0, 0}, NULL, {NULL, 0} {0, 0}, NULL, {NULL, 0}
@ -50,54 +50,48 @@ SFGlobalVariables g_sf_global_vars = {
SFContext g_sf_context = {{'\0'}, NULL, 0, sf_address_family_auto, SFContext g_sf_context = {{'\0'}, NULL, 0, sf_address_family_auto,
{{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}, {{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
1, DEFAULT_WORK_THREADS, 0, true, true, true, {false, 0, 0}, {DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE,
{sf_task_finish_clean_up} SF_DEF_MAX_BUFF_SIZE}, 1, DEFAULT_WORK_THREADS, 0, true, true, true,
{false, 0, 0}, {sf_task_finish_clean_up}
}; };
static int load_network_parameters(IniFullContext *ini_ctx, static int load_network_parameters(IniFullContext *ini_ctx,
const char *max_pkg_size_item_nm, const int fixed_buff_size, const char *max_pkg_size_item_nm, const int fixed_buff_size,
const int task_buffer_extra_size) const int task_buffer_extra_size, SFNetBufferConfig *net_buffer_cfg)
{ {
int result;
int padding_buff_size; int padding_buff_size;
char *pMinBuffSize; char *pMinBuffSize;
char *pMaxBuffSize; char *pMaxBuffSize;
g_sf_global_vars.connect_timeout = iniGetIntValueEx(ini_ctx-> net_buffer_cfg->connect_timeout = iniGetIntValueEx(ini_ctx->
section_name, "connect_timeout", ini_ctx->context, section_name, "connect_timeout", ini_ctx->context,
SF_DEFAULT_CONNECT_TIMEOUT, true); SF_DEFAULT_CONNECT_TIMEOUT, true);
if (g_sf_global_vars.connect_timeout <= 0) { if (net_buffer_cfg->connect_timeout <= 0) {
g_sf_global_vars.connect_timeout = SF_DEFAULT_CONNECT_TIMEOUT; net_buffer_cfg->connect_timeout = SF_DEFAULT_CONNECT_TIMEOUT;
} }
g_sf_global_vars.network_timeout = iniGetIntValueEx(ini_ctx-> net_buffer_cfg->network_timeout = iniGetIntValueEx(ini_ctx->
section_name, "network_timeout", ini_ctx->context, section_name, "network_timeout", ini_ctx->context,
SF_DEFAULT_NETWORK_TIMEOUT, true); SF_DEFAULT_NETWORK_TIMEOUT, true);
if (g_sf_global_vars.network_timeout <= 0) { if (net_buffer_cfg->network_timeout <= 0) {
g_sf_global_vars.network_timeout = SF_DEFAULT_NETWORK_TIMEOUT; net_buffer_cfg->network_timeout = SF_DEFAULT_NETWORK_TIMEOUT;
} }
g_sf_global_vars.max_connections = iniGetIntValueEx(ini_ctx->section_name, net_buffer_cfg->max_connections = iniGetIntValueEx(ini_ctx->section_name,
"max_connections", ini_ctx->context, DEFAULT_MAX_CONNECTONS, true); "max_connections", ini_ctx->context, DEFAULT_MAX_CONNECTONS, true);
if (g_sf_global_vars.max_connections <= 0) { if (net_buffer_cfg->max_connections <= 0) {
g_sf_global_vars.max_connections = DEFAULT_MAX_CONNECTONS; net_buffer_cfg->max_connections = DEFAULT_MAX_CONNECTONS;
}
if ((result=set_rlimit(RLIMIT_NOFILE, g_sf_global_vars.
max_connections)) != 0)
{
return result;
} }
if (fixed_buff_size > 0) { if (fixed_buff_size > 0) {
padding_buff_size = fixed_buff_size + task_buffer_extra_size; padding_buff_size = fixed_buff_size + task_buffer_extra_size;
g_sf_global_vars.min_buff_size = padding_buff_size; net_buffer_cfg->min_buff_size = padding_buff_size;
g_sf_global_vars.max_buff_size = padding_buff_size; net_buffer_cfg->max_buff_size = padding_buff_size;
g_sf_global_vars.max_pkg_size = padding_buff_size; net_buffer_cfg->max_pkg_size = padding_buff_size;
return 0; return 0;
} }
g_sf_global_vars.max_pkg_size = iniGetByteCorrectValueEx(ini_ctx, net_buffer_cfg->max_pkg_size = iniGetByteCorrectValueEx(ini_ctx,
max_pkg_size_item_nm, SF_DEF_MAX_PACKAGE_SIZE, 1, 8192, max_pkg_size_item_nm, SF_DEF_MAX_PACKAGE_SIZE, 1, 8192,
SF_MAX_NETWORK_BUFF_SIZE, true); SF_MAX_NETWORK_BUFF_SIZE, true);
pMinBuffSize = iniGetStrValueEx(ini_ctx->section_name, pMinBuffSize = iniGetStrValueEx(ini_ctx->section_name,
@ -105,34 +99,34 @@ static int load_network_parameters(IniFullContext *ini_ctx,
pMaxBuffSize = iniGetStrValueEx(ini_ctx->section_name, pMaxBuffSize = iniGetStrValueEx(ini_ctx->section_name,
"max_buff_size", ini_ctx->context, true); "max_buff_size", ini_ctx->context, true);
if (pMinBuffSize == NULL || pMaxBuffSize == NULL) { if (pMinBuffSize == NULL || pMaxBuffSize == NULL) {
g_sf_global_vars.min_buff_size = g_sf_global_vars.max_pkg_size; net_buffer_cfg->min_buff_size = net_buffer_cfg->max_pkg_size;
g_sf_global_vars.max_buff_size = g_sf_global_vars.max_pkg_size; net_buffer_cfg->max_buff_size = net_buffer_cfg->max_pkg_size;
} else { } else {
g_sf_global_vars.min_buff_size = iniGetByteCorrectValueEx(ini_ctx, net_buffer_cfg->min_buff_size = iniGetByteCorrectValueEx(ini_ctx,
"min_buff_size", SF_DEF_MIN_BUFF_SIZE, 1, 4096, "min_buff_size", SF_DEF_MIN_BUFF_SIZE, 1, 4096,
SF_MAX_NETWORK_BUFF_SIZE, true); SF_MAX_NETWORK_BUFF_SIZE, true);
g_sf_global_vars.max_buff_size = iniGetByteCorrectValueEx(ini_ctx, net_buffer_cfg->max_buff_size = iniGetByteCorrectValueEx(ini_ctx,
"max_buff_size", SF_DEF_MAX_BUFF_SIZE, 1, 8192, "max_buff_size", SF_DEF_MAX_BUFF_SIZE, 1, 8192,
SF_MAX_NETWORK_BUFF_SIZE, true); SF_MAX_NETWORK_BUFF_SIZE, true);
if (g_sf_global_vars.max_buff_size < g_sf_global_vars.max_pkg_size) { if (net_buffer_cfg->max_buff_size < net_buffer_cfg->max_pkg_size) {
g_sf_global_vars.max_buff_size = g_sf_global_vars.max_pkg_size; net_buffer_cfg->max_buff_size = net_buffer_cfg->max_pkg_size;
} }
if (g_sf_global_vars.max_buff_size < g_sf_global_vars.min_buff_size) { if (net_buffer_cfg->max_buff_size < net_buffer_cfg->min_buff_size) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"max_buff_size: %d < min_buff_size: %d, " "max_buff_size: %d < min_buff_size: %d, "
"set max_buff_size to min_buff_size", __LINE__, "set max_buff_size to min_buff_size", __LINE__,
g_sf_global_vars.max_buff_size, net_buffer_cfg->max_buff_size,
g_sf_global_vars.min_buff_size); net_buffer_cfg->min_buff_size);
g_sf_global_vars.max_buff_size = g_sf_global_vars.min_buff_size; net_buffer_cfg->max_buff_size = net_buffer_cfg->min_buff_size;
} }
} }
if (task_buffer_extra_size > 0) { if (task_buffer_extra_size > 0) {
g_sf_global_vars.min_buff_size += task_buffer_extra_size; net_buffer_cfg->min_buff_size += task_buffer_extra_size;
g_sf_global_vars.max_buff_size += task_buffer_extra_size; net_buffer_cfg->max_buff_size += task_buffer_extra_size;
if (g_sf_global_vars.max_pkg_size < g_sf_global_vars.max_buff_size) { if (net_buffer_cfg->max_pkg_size < net_buffer_cfg->max_buff_size) {
g_sf_global_vars.max_pkg_size = g_sf_global_vars.max_buff_size; net_buffer_cfg->max_pkg_size = net_buffer_cfg->max_buff_size;
} }
} }
@ -305,7 +299,14 @@ int sf_load_global_config_ex(const char *log_filename_prefix,
tcp_set_quick_ack(g_sf_global_vars.tcp_quick_ack); tcp_set_quick_ack(g_sf_global_vars.tcp_quick_ack);
if (load_network_params) { if (load_network_params) {
if ((result=load_network_parameters(ini_ctx, max_pkg_size_item_nm, if ((result=load_network_parameters(ini_ctx, max_pkg_size_item_nm,
fixed_buff_size, task_buffer_extra_size)) != 0) fixed_buff_size, task_buffer_extra_size,
&g_sf_global_vars.net_buffer_cfg)) != 0)
{
return result;
}
if ((result=set_rlimit(RLIMIT_NOFILE, g_sf_global_vars.
net_buffer_cfg.max_connections)) != 0)
{ {
return result; return result;
} }
@ -416,7 +417,8 @@ int sf_load_config_ex(const char *log_filename_prefix,
{ {
return result; return result;
} }
return sf_load_context_from_config_ex(&g_sf_context, config); return sf_load_context_from_config_ex(&g_sf_context, config,
fixed_buff_size, task_buffer_extra_size);
} }
#define API_PREFIX_NAME "fast_rdma_" #define API_PREFIX_NAME "fast_rdma_"
@ -639,7 +641,8 @@ static int load_address_family(SFContext *sf_context,
} }
int sf_load_context_from_config_ex(SFContext *sf_context, int sf_load_context_from_config_ex(SFContext *sf_context,
SFContextIniConfig *config) SFContextIniConfig *config, const int fixed_buff_size,
const int task_buffer_extra_size)
{ {
SFAddressFamilyHandler *fh; SFAddressFamilyHandler *fh;
SFNetworkHandler *sock_handler; SFNetworkHandler *sock_handler;
@ -764,6 +767,14 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
return result; return result;
} }
if ((result=load_network_parameters(&config->ini_ctx, config->
max_pkg_size_item_name, fixed_buff_size,
task_buffer_extra_size, &sf_context->
net_buffer_cfg)) != 0)
{
return result;
}
return 0; return 0;
} }
@ -960,11 +971,11 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
int max_buff_size; int max_buff_size;
char pkg_buff[256]; char pkg_buff[256];
max_pkg_size = g_sf_global_vars.max_pkg_size - max_pkg_size = g_sf_global_vars.net_buffer_cfg.max_pkg_size -
g_sf_global_vars.task_buffer_extra_size; g_sf_global_vars.task_buffer_extra_size;
min_buff_size = g_sf_global_vars.min_buff_size - min_buff_size = g_sf_global_vars.net_buffer_cfg.min_buff_size -
g_sf_global_vars.task_buffer_extra_size; g_sf_global_vars.task_buffer_extra_size;
max_buff_size = g_sf_global_vars.max_buff_size - max_buff_size = g_sf_global_vars.net_buffer_cfg.max_buff_size -
g_sf_global_vars.task_buffer_extra_size; g_sf_global_vars.task_buffer_extra_size;
if (min_buff_size == max_buff_size && max_pkg_size == max_buff_size) { if (min_buff_size == max_buff_size && max_pkg_size == max_buff_size) {
@ -982,9 +993,9 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm,
"network_timeout=%d, thread_stack_size=%d KB, " "network_timeout=%d, thread_stack_size=%d KB, "
"%s, tcp_quick_ack=%d, log_level=%s, " "%s, tcp_quick_ack=%d, log_level=%s, "
"run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR, "run_by_group=%s, run_by_user=%s, ", SF_G_BASE_PATH_STR,
g_sf_global_vars.max_connections, g_sf_global_vars.net_buffer_cfg.max_connections,
g_sf_global_vars.connect_timeout, g_sf_global_vars.net_buffer_cfg.connect_timeout,
g_sf_global_vars.network_timeout, g_sf_global_vars.net_buffer_cfg.network_timeout,
g_sf_global_vars.thread_stack_size / 1024, g_sf_global_vars.thread_stack_size / 1024,
pkg_buff, g_sf_global_vars.tcp_quick_ack, pkg_buff, g_sf_global_vars.tcp_quick_ack,
log_get_level_caption(), log_get_level_caption(),

View File

@ -30,8 +30,6 @@ typedef struct sf_connection_stat {
} SFConnectionStat; } SFConnectionStat;
typedef struct sf_global_variables { typedef struct sf_global_variables {
int connect_timeout;
int network_timeout;
struct { struct {
char str[MAX_PATH_SIZE]; char str[MAX_PATH_SIZE];
bool inited; bool inited;
@ -41,10 +39,9 @@ typedef struct sf_global_variables {
volatile bool continue_flag; volatile bool continue_flag;
bool tcp_quick_ack; bool tcp_quick_ack;
bool epoll_edge_trigger; bool epoll_edge_trigger;
int max_connections;
int max_pkg_size; SFNetBufferConfig net_buffer_cfg;
int min_buff_size;
int max_buff_size;
int task_buffer_extra_size; int task_buffer_extra_size;
int thread_stack_size; int thread_stack_size;
@ -84,8 +81,8 @@ extern SFContext g_sf_context;
#define SF_G_BASE_PATH_INITED g_sf_global_vars.base_path.inited #define SF_G_BASE_PATH_INITED g_sf_global_vars.base_path.inited
#define SF_G_BASE_PATH_CREATED g_sf_global_vars.base_path.created #define SF_G_BASE_PATH_CREATED g_sf_global_vars.base_path.created
#define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag #define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag
#define SF_G_CONNECT_TIMEOUT g_sf_global_vars.connect_timeout #define SF_G_CONNECT_TIMEOUT g_sf_global_vars.net_buffer_cfg.connect_timeout
#define SF_G_NETWORK_TIMEOUT g_sf_global_vars.network_timeout #define SF_G_NETWORK_TIMEOUT g_sf_global_vars.net_buffer_cfg.network_timeout
#define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections #define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections
#define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size
#define SF_G_UP_TIME g_sf_global_vars.up_time #define SF_G_UP_TIME g_sf_global_vars.up_time
@ -236,20 +233,23 @@ static inline int sf_load_config(const char *log_filename_prefix,
} }
int sf_load_context_from_config_ex(SFContext *sf_context, int sf_load_context_from_config_ex(SFContext *sf_context,
SFContextIniConfig *config); SFContextIniConfig *config, const int fixed_buff_size,
const int task_buffer_extra_size);
static inline int sf_load_context_from_config(SFContext *sf_context, static inline int sf_load_context_from_config(SFContext *sf_context,
const FCCommunicationType comm_type, const FCCommunicationType comm_type,
const char *filename, IniContext *pIniContext, const char *filename, IniContext *pIniContext,
const char *section_name, const int default_inner_port, const char *section_name, const int default_inner_port,
const int default_outer_port) const int default_outer_port, const int fixed_buff_size,
const int task_buffer_extra_size)
{ {
SFContextIniConfig config; SFContextIniConfig config;
SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext, SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext,
section_name, default_inner_port, default_outer_port, section_name, default_inner_port, default_outer_port,
DEFAULT_WORK_THREADS); DEFAULT_WORK_THREADS);
return sf_load_context_from_config_ex(sf_context, &config); return sf_load_context_from_config_ex(sf_context, &config,
fixed_buff_size, task_buffer_extra_size);
} }
int sf_alloc_rdma_pd(SFContext *sf_context, int sf_alloc_rdma_pd(SFContext *sf_context,

View File

@ -197,7 +197,7 @@ static inline int sf_nio_init(struct fast_task_info *task)
{ {
inc_connection_current_count(); inc_connection_current_count();
return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read,
task->network_timeout); SF_CTX->net_buffer_cfg.network_timeout);
} }
int sf_socket_async_connect_check(struct fast_task_info *task) int sf_socket_async_connect_check(struct fast_task_info *task)
@ -274,7 +274,8 @@ static int sf_async_connect_server(struct fast_task_info *task)
if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) { if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) {
result = ioevent_set(task, task->thread_data, task->event.fd, result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
sf_client_connect_done, task->connect_timeout); sf_client_connect_done, SF_CTX->net_buffer_cfg.
connect_timeout);
return result > 0 ? -1 * result : result; return result > 0 ? -1 * result : result;
} else { } else {
if (SF_CTX->callbacks.connect_done != NULL) { if (SF_CTX->callbacks.connect_done != NULL) {
@ -283,7 +284,8 @@ static int sf_async_connect_server(struct fast_task_info *task)
if (result == 0) { if (result == 0) {
if ((result=sf_ioevent_add(task, (IOEventCallback) if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, task->network_timeout)) != 0) sf_client_sock_read, SF_CTX->
net_buffer_cfg.network_timeout)) != 0)
{ {
return result; return result;
} }
@ -339,7 +341,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
case SF_NIO_STAGE_FORWARDED: //forward by other thread case SF_NIO_STAGE_FORWARDED: //forward by other thread
if ((result=sf_ioevent_add(task, (IOEventCallback) if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, sf_client_sock_read,
task->network_timeout)) == 0) SF_CTX->net_buffer_cfg.network_timeout)) == 0)
{ {
result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND);
} }
@ -831,7 +833,7 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task)
task->thread_data->timeout_ms); task->thread_data->timeout_ms);
} }
result = sf_ioevent_add(task, (IOEventCallback) result = sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, task->network_timeout); sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout);
logInfo("file: "__FILE__", line: %d, client: %s:%u, " logInfo("file: "__FILE__", line: %d, client: %s:%u, "
"remove polling iops: %"PRId64, __LINE__, "remove polling iops: %"PRId64, __LINE__,
@ -915,7 +917,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
} }
task->event.timer.expires = g_current_time + task->event.timer.expires = g_current_time +
task->network_timeout; SF_CTX->net_buffer_cfg.network_timeout;
fast_timer_add(&task->thread_data->timer, fast_timer_add(&task->thread_data->timer,
&task->event.timer); &task->event.timer);
} else { } else {
@ -943,7 +945,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
while (1) { while (1) {
fast_timer_modify(&task->thread_data->timer, fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time + &task->event.timer, g_current_time +
task->network_timeout); SF_CTX->net_buffer_cfg.network_timeout);
if ((bytes=task->handler->recv_data(task, !task->handler-> if ((bytes=task->handler->recv_data(task, !task->handler->
explicit_post_recv, &action)) < 0) explicit_post_recv, &action)) < 0)
@ -1017,7 +1019,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
while (1) { while (1) {
fast_timer_modify(&task->thread_data->timer, fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time + &task->event.timer, g_current_time +
task->network_timeout); SF_CTX->net_buffer_cfg.network_timeout);
if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) { if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) {
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);

View File

@ -27,7 +27,8 @@
#include "sf_types.h" #include "sf_types.h"
#include "sf_global.h" #include "sf_global.h"
#define SF_CTX (task->handler->fh->ctx) #define SF_CTX (task->handler->fh->ctx)
#define SF_NET_BUFFER_CFG SF_CTX->net_buffer_cfg
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -128,12 +129,12 @@ static inline int sf_set_body_length(struct fast_task_info *task)
} }
task->recv.ptr->length += SF_CTX->header_size; task->recv.ptr->length += SF_CTX->header_size;
if (task->recv.ptr->length > g_sf_global_vars.max_pkg_size) { if (task->recv.ptr->length > SF_NET_BUFFER_CFG.max_pkg_size) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d > " "client ip: %s, pkg length: %d > "
"max pkg size: %d", __LINE__, "max pkg size: %d", __LINE__,
task->client_ip, task->recv.ptr->length, task->client_ip, task->recv.ptr->length,
g_sf_global_vars.max_pkg_size); SF_NET_BUFFER_CFG.max_pkg_size);
return -1; return -1;
} }

View File

@ -58,19 +58,11 @@ struct worker_thread_context {
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
}; };
int sf_init_task(struct fast_task_info *task)
{
task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side
task->network_timeout = SF_G_NETWORK_TIMEOUT;
return 0;
}
static void *worker_thread_entrance(void *arg); static void *worker_thread_entrance(void *arg);
static int sf_init_free_queue(struct fast_task_queue *free_queue, static int sf_init_free_queue(SFContext *sf_context, const char *name,
const char *name, const bool double_buffers, const bool double_buffers, const int task_padding_size,
const int task_padding_size, const int task_arg_size, const int task_arg_size, TaskInitCallback init_callback)
TaskInitCallback init_callback)
{ {
int result; int result;
int m; int m;
@ -82,19 +74,18 @@ static int sf_init_free_queue(struct fast_task_queue *free_queue,
return result; return result;
} }
m = g_sf_global_vars.min_buff_size / (64 * 1024); m = sf_context->net_buffer_cfg.min_buff_size / (64 * 1024);
if (m == 0) { if (m == 0) {
m = 1; m = 1;
} else if (m > 16) { } else if (m > 16) {
m = 16; m = 16;
} }
alloc_conn_once = 256 / m; alloc_conn_once = 256 / m;
return free_queue_init_ex2(free_queue, name, double_buffers, return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
g_sf_global_vars.max_connections, alloc_conn_once, sf_context->net_buffer_cfg.max_connections, alloc_conn_once,
g_sf_global_vars.min_buff_size, g_sf_global_vars. sf_context->net_buffer_cfg.min_buff_size, sf_context->
max_buff_size, task_padding_size, task_arg_size, net_buffer_cfg.max_buff_size, task_padding_size,
(init_callback != NULL ? init_callback : task_arg_size, init_callback);
sf_init_task));
} }
int sf_service_init_ex2(SFContext *sf_context, const char *name, int sf_service_init_ex2(SFContext *sf_context, const char *name,
@ -125,8 +116,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
snprintf(sf_context->name, sizeof(sf_context->name), "%s", name); snprintf(sf_context->name, sizeof(sf_context->name), "%s", name);
sf_context->connect_need_log = true; sf_context->connect_need_log = true;
sf_context->realloc_task_buffer = g_sf_global_vars. sf_context->realloc_task_buffer = sf_context->net_buffer_cfg.
min_buff_size < g_sf_global_vars.max_buff_size; min_buff_size < sf_context->net_buffer_cfg.max_buff_size;
sf_context->callbacks.accept_done = accept_done_callback; sf_context->callbacks.accept_done = accept_done_callback;
sf_set_parameters_ex(sf_context, proto_header_size, sf_set_parameters_ex(sf_context, proto_header_size,
set_body_length_func, alloc_recv_buffer_func, set_body_length_func, alloc_recv_buffer_func,
@ -139,9 +130,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
} }
} }
if ((result=sf_init_free_queue(&sf_context->free_queue, if ((result=sf_init_free_queue(sf_context, name, double_buffers,
name, double_buffers, task_padding_size, task_padding_size, task_arg_size, init_callback)) != 0)
task_arg_size, init_callback)) != 0)
{ {
return result; return result;
} }
@ -203,8 +193,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
thread_data->arg = NULL; thread_data->arg = NULL;
} }
if (ioevent_init(&thread_data->ev_puller, 2 + g_sf_global_vars. if (ioevent_init(&thread_data->ev_puller, 2 + sf_context->
max_connections, net_timeout_ms, extra_events) != 0) net_buffer_cfg.max_connections, net_timeout_ms,
extra_events) != 0)
{ {
result = errno != 0 ? errno : ENOMEM; result = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
@ -214,8 +205,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
return result; return result;
} }
result = fast_timer_init(&thread_data->timer, result = fast_timer_init(&thread_data->timer, 2 * sf_context->
2 * g_sf_global_vars.network_timeout, g_current_time); net_buffer_cfg.network_timeout, g_current_time);
if (result != 0) { if (result != 0) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"fast_timer_init fail, errno: %d, error info: %s", "fast_timer_init fail, errno: %d, error info: %s",
@ -359,7 +350,9 @@ int sf_socket_create_server(SFListener *listener,
return result; return result;
} }
if ((result=tcpsetserveropt(listener->sock, SF_G_NETWORK_TIMEOUT)) != 0) { if ((result=tcpsetserveropt(listener->sock, listener->handler->
fh->ctx->net_buffer_cfg.network_timeout)) != 0)
{
return result; return result;
} }

View File

@ -148,8 +148,6 @@ void sf_notify_all_threads_ex(SFContext *sf_context);
void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler); void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler);
int sf_init_task(struct fast_task_info *task);
static inline struct fast_task_info *sf_alloc_init_task_ex( static inline struct fast_task_info *sf_alloc_init_task_ex(
SFNetworkHandler *handler, const int fd, SFNetworkHandler *handler, const int fd,
const int reffer_count) const int reffer_count)

View File

@ -164,6 +164,15 @@ typedef struct sf_address_family_handler {
struct sf_context *ctx; struct sf_context *ctx;
} SFAddressFamilyHandler; } SFAddressFamilyHandler;
typedef struct sf_net_buffer_config {
int connect_timeout;
int network_timeout;
int max_connections;
int max_pkg_size;
int min_buff_size;
int max_buff_size;
} SFNetBufferConfig;
typedef struct sf_context { typedef struct sf_context {
char name[64]; char name[64];
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
@ -173,6 +182,8 @@ typedef struct sf_context {
SFAddressFamily address_family; SFAddressFamily address_family;
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT]; SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
SFNetBufferConfig net_buffer_cfg;
int accept_threads; int accept_threads;
int work_threads; int work_threads;