diff --git a/HISTORY b/HISTORY index 5baeaef..ee7b7fe 100644 --- a/HISTORY +++ b/HISTORY @@ -1,9 +1,10 @@ -Version 1.70 2023-09-10 +Version 1.70 2023-09-18 * get full mac address of infiniband NIC under Linux * struct fast_task_info add field conn for RDMA connection * server_id_func.[hc]: support communication type * connection_pool.[hc] support callbacks for RDMA + * nio thread data support busy_polling_callback Version 1.69 2023-08-05 * bugfixed: array_allocator_alloc MUST init the array diff --git a/src/connection_pool.c b/src/connection_pool.c index 073a78f..bc2602e 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -690,7 +690,7 @@ int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params, return 0; } else { first_server = FC_SID_SERVERS(*server_cfg); - extra_params->buffer_size = server_group->buffer_size + padding_size; + extra_params->buffer_size = server_cfg->buffer_size + padding_size; extra_params->pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS. alloc_pd, &first_server->group_addrs[server_group_index]. address_array, &result); diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 70d9459..592beaf 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -25,6 +25,7 @@ #include "common_define.h" #include "ioevent.h" #include "fast_timer.h" +#include "fc_list.h" #define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0] #define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1] @@ -59,6 +60,7 @@ struct nio_thread_data int pipe_fds[2]; //for notify struct fast_task_info *deleted_list; //tasks for cleanup ThreadLoopCallback thread_loop_callback; + ThreadLoopCallback busy_polling_callback; void *arg; //extra argument pointer struct { struct fast_task_info *head; @@ -70,6 +72,8 @@ struct nio_thread_data bool enabled; volatile int64_t counter; } notify; //for thread notify + + struct fc_list_head polling_queue; //for RDMA busy polling }; struct ioevent_notify_entry @@ -107,6 +111,13 @@ struct fast_task_info short connect_timeout; //for client side short network_timeout; int64_t req_count; //request count + struct { + int64_t last_req_count; + uint32_t last_calc_time; + uint16_t continuous_count; + bool in_queue; + struct fc_list_head dlink; //for polling queue + } polling; //for RDMA busy polling TaskContinueCallback continue_callback; //for continue stage TaskFinishCallback finish_callback; struct nio_thread_data *thread_data; diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index a6de380..ef6f499 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -152,6 +152,11 @@ int ioevent_loop(struct nio_thread_data *pThreadData, task = pThreadData->deleted_list; pThreadData->deleted_list = task->next; + if (task->polling.in_queue) + { + fc_list_del_init(&task->polling.dlink); + task->polling.in_queue = false; + } clean_up_callback(task); count++; } @@ -188,6 +193,10 @@ int ioevent_loop(struct nio_thread_data *pThreadData, { pThreadData->thread_loop_callback(pThreadData); } + if (pThreadData->busy_polling_callback != NULL) + { + pThreadData->busy_polling_callback(pThreadData); + } } return 0; diff --git a/src/server_id_func.c b/src/server_id_func.c index ef7b96e..7ecfeb0 100644 --- a/src/server_id_func.c +++ b/src/server_id_func.c @@ -342,13 +342,11 @@ static int fc_server_load_one_group(FCServerConfig *ctx, { int result; FCServerGroupInfo *group; - IniFullContext full_ini_ctx; char new_name[FAST_INI_ITEM_NAME_SIZE]; char *port_str; char *net_type; char *ip_prefix; char *comm_type; - int buffer_size; strcpy(new_name, section_name); group = ctx->group_array.groups + ctx->group_array.count; @@ -407,14 +405,16 @@ static int fc_server_load_one_group(FCServerConfig *ctx, } if (group->comm_type == fc_comm_type_sock) { - group->buffer_size = 0; + group->smart_polling.enabled = false; + group->smart_polling.switch_on_iops = 0; + group->smart_polling.switch_on_count = 0; } else { - FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, config_filename, - section_name, ini_context); - buffer_size = iniGetByteValue(section_name, "buffer_size", - ini_context, 256 * 1024); - group->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx, - "buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024); + group->smart_polling.enabled = iniGetBoolValue(section_name, + "smart_polling", ini_context, true); + group->smart_polling.switch_on_iops = iniGetIntValue(section_name, + "polling_switch_on_iops", ini_context, 10240); + group->smart_polling.switch_on_count = iniGetIntValue(section_name, + "polling_switch_on_count", ini_context, 3); } ctx->group_array.count++; @@ -1226,6 +1226,11 @@ static int fc_server_load_data(FCServerConfig *ctx, IniContext *ini_context, const char *config_filename) { int result; + int buffer_size; + bool have_rdma; + IniFullContext full_ini_ctx; + FCServerGroupInfo *group; + FCServerGroupInfo *end; if ((result=fc_server_load_groups(ctx, config_filename, ini_context)) != 0) @@ -1233,6 +1238,26 @@ static int fc_server_load_data(FCServerConfig *ctx, return result; } + have_rdma = false; + end = ctx->group_array.groups + ctx->group_array.count; + for (group=ctx->group_array.groups; groupcomm_type != fc_comm_type_sock) { + have_rdma = true; + break; + } + } + + if (have_rdma) { + FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, config_filename, + NULL, ini_context); + buffer_size = iniGetByteValue(NULL, "buffer_size", + ini_context, 256 * 1024); + ctx->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx, + "buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024); + } else { + ctx->buffer_size = 0; + } + if ((result=fc_server_load_servers(ctx, config_filename, ini_context)) != 0) { @@ -1403,9 +1428,13 @@ static int fc_groups_to_string(FCServerConfig *ctx, FastBuffer *buffer) if (group->comm_type != fc_comm_type_sock) { fast_buffer_append(buffer, "communication = %s\n" - "buffer_size = %d\n", + "smart_polling = %d\n" + "polling_switch_on_iops = %d\n" + "polling_switch_on_count = %d\n", fc_comm_type_str(group->comm_type), - group->buffer_size); + group->smart_polling.enabled, + group->smart_polling.switch_on_iops, + group->smart_polling.switch_on_count); } fast_buffer_append(buffer, @@ -1488,6 +1517,14 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer) { int result; + if (ctx->buffer_size > 0) { + if ((result=fast_buffer_check(buffer, 1024)) != 0) { + return result; + } + fast_buffer_append(buffer, "buffer_size = %d KB", + ctx->buffer_size / 1024); + } + fc_server_clear_server_port(&ctx->group_array); if ((result=fc_groups_to_string(ctx, buffer)) != 0) { return result; @@ -1509,9 +1546,12 @@ static void fc_server_log_groups(FCServerConfig *ctx) group->group_name.len, group->group_name.str, group->port); if (group->comm_type != fc_comm_type_sock) { - p += sprintf(p, ", communication: %s, buffer_size: %d KB", + p += sprintf(p, ", communication: %s, smart_polling: %d, " + "polling_switch_on_iops: %d, polling_switch_on_count: %d", fc_comm_type_str(group->comm_type), - group->buffer_size / 1024); + group->smart_polling.enabled, + group->smart_polling.switch_on_iops, + group->smart_polling.switch_on_count); } p += sprintf(p, ", net_type: %s, ip_prefix: %.*s", get_net_type_caption(group->filter.net_type), @@ -1567,6 +1607,9 @@ static void fc_server_log_servers(FCServerConfig *ctx) void fc_server_to_log(FCServerConfig *ctx) { + if (ctx->buffer_size > 0) { + logInfo("buffer_size: %d KB", ctx->buffer_size / 1024); + } fc_server_log_groups(ctx); fc_server_log_servers(ctx); } diff --git a/src/server_id_func.h b/src/server_id_func.h index 6255e0b..edcc8e0 100644 --- a/src/server_id_func.h +++ b/src/server_id_func.h @@ -52,13 +52,20 @@ typedef struct { FCAddressInfo **addrs; } FCAddressPtrArray; +typedef struct +{ + bool enabled; + int switch_on_iops; + int switch_on_count; +} FCSmartPollingConfig; + typedef struct { string_t group_name; int port; //default port int server_port; //port in server section FCCommunicationType comm_type; - int buffer_size; //for RDMA + FCSmartPollingConfig smart_polling; struct { int net_type; string_t ip_prefix; @@ -118,6 +125,7 @@ typedef struct fc_server_config int default_port; int min_hosts_each_group; bool share_between_groups; //if an address shared between different groups + int buffer_size; //for RDMA FCServerGroupArray group_array; struct { FCServerInfoArray by_id; //sorted by server id