nio thread data support busy_polling_callback

support_rdma
YuQing 2023-09-18 16:17:34 +08:00
parent b4e5a26ba0
commit 70c44ea490
6 changed files with 88 additions and 16 deletions

View File

@ -1,9 +1,10 @@
Version 1.70 2023-09-10
Version 1.70 2023-09-18
* get full mac address of infiniband NIC under Linux
* struct fast_task_info add field conn for RDMA connection
* server_id_func.[hc]: support communication type
* connection_pool.[hc] support callbacks for RDMA
* nio thread data support busy_polling_callback
Version 1.69 2023-08-05
* bugfixed: array_allocator_alloc MUST init the array

View File

@ -690,7 +690,7 @@ int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params,
return 0;
} else {
first_server = FC_SID_SERVERS(*server_cfg);
extra_params->buffer_size = server_group->buffer_size + padding_size;
extra_params->buffer_size = server_cfg->buffer_size + padding_size;
extra_params->pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS.
alloc_pd, &first_server->group_addrs[server_group_index].
address_array, &result);

View File

@ -25,6 +25,7 @@
#include "common_define.h"
#include "ioevent.h"
#include "fast_timer.h"
#include "fc_list.h"
#define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0]
#define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1]
@ -59,6 +60,7 @@ struct nio_thread_data
int pipe_fds[2]; //for notify
struct fast_task_info *deleted_list; //tasks for cleanup
ThreadLoopCallback thread_loop_callback;
ThreadLoopCallback busy_polling_callback;
void *arg; //extra argument pointer
struct {
struct fast_task_info *head;
@ -70,6 +72,8 @@ struct nio_thread_data
bool enabled;
volatile int64_t counter;
} notify; //for thread notify
struct fc_list_head polling_queue; //for RDMA busy polling
};
struct ioevent_notify_entry
@ -107,6 +111,13 @@ struct fast_task_info
short connect_timeout; //for client side
short network_timeout;
int64_t req_count; //request count
struct {
int64_t last_req_count;
uint32_t last_calc_time;
uint16_t continuous_count;
bool in_queue;
struct fc_list_head dlink; //for polling queue
} polling; //for RDMA busy polling
TaskContinueCallback continue_callback; //for continue stage
TaskFinishCallback finish_callback;
struct nio_thread_data *thread_data;

View File

@ -152,6 +152,11 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
task = pThreadData->deleted_list;
pThreadData->deleted_list = task->next;
if (task->polling.in_queue)
{
fc_list_del_init(&task->polling.dlink);
task->polling.in_queue = false;
}
clean_up_callback(task);
count++;
}
@ -188,6 +193,10 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
{
pThreadData->thread_loop_callback(pThreadData);
}
if (pThreadData->busy_polling_callback != NULL)
{
pThreadData->busy_polling_callback(pThreadData);
}
}
return 0;

View File

@ -342,13 +342,11 @@ static int fc_server_load_one_group(FCServerConfig *ctx,
{
int result;
FCServerGroupInfo *group;
IniFullContext full_ini_ctx;
char new_name[FAST_INI_ITEM_NAME_SIZE];
char *port_str;
char *net_type;
char *ip_prefix;
char *comm_type;
int buffer_size;
strcpy(new_name, section_name);
group = ctx->group_array.groups + ctx->group_array.count;
@ -407,14 +405,16 @@ static int fc_server_load_one_group(FCServerConfig *ctx,
}
if (group->comm_type == fc_comm_type_sock) {
group->buffer_size = 0;
group->smart_polling.enabled = false;
group->smart_polling.switch_on_iops = 0;
group->smart_polling.switch_on_count = 0;
} else {
FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, config_filename,
section_name, ini_context);
buffer_size = iniGetByteValue(section_name, "buffer_size",
ini_context, 256 * 1024);
group->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx,
"buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024);
group->smart_polling.enabled = iniGetBoolValue(section_name,
"smart_polling", ini_context, true);
group->smart_polling.switch_on_iops = iniGetIntValue(section_name,
"polling_switch_on_iops", ini_context, 10240);
group->smart_polling.switch_on_count = iniGetIntValue(section_name,
"polling_switch_on_count", ini_context, 3);
}
ctx->group_array.count++;
@ -1226,6 +1226,11 @@ static int fc_server_load_data(FCServerConfig *ctx,
IniContext *ini_context, const char *config_filename)
{
int result;
int buffer_size;
bool have_rdma;
IniFullContext full_ini_ctx;
FCServerGroupInfo *group;
FCServerGroupInfo *end;
if ((result=fc_server_load_groups(ctx, config_filename,
ini_context)) != 0)
@ -1233,6 +1238,26 @@ static int fc_server_load_data(FCServerConfig *ctx,
return result;
}
have_rdma = false;
end = ctx->group_array.groups + ctx->group_array.count;
for (group=ctx->group_array.groups; group<end; group++) {
if (group->comm_type != fc_comm_type_sock) {
have_rdma = true;
break;
}
}
if (have_rdma) {
FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, config_filename,
NULL, ini_context);
buffer_size = iniGetByteValue(NULL, "buffer_size",
ini_context, 256 * 1024);
ctx->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx,
"buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024);
} else {
ctx->buffer_size = 0;
}
if ((result=fc_server_load_servers(ctx, config_filename,
ini_context)) != 0)
{
@ -1403,9 +1428,13 @@ static int fc_groups_to_string(FCServerConfig *ctx, FastBuffer *buffer)
if (group->comm_type != fc_comm_type_sock) {
fast_buffer_append(buffer,
"communication = %s\n"
"buffer_size = %d\n",
"smart_polling = %d\n"
"polling_switch_on_iops = %d\n"
"polling_switch_on_count = %d\n",
fc_comm_type_str(group->comm_type),
group->buffer_size);
group->smart_polling.enabled,
group->smart_polling.switch_on_iops,
group->smart_polling.switch_on_count);
}
fast_buffer_append(buffer,
@ -1488,6 +1517,14 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer)
{
int result;
if (ctx->buffer_size > 0) {
if ((result=fast_buffer_check(buffer, 1024)) != 0) {
return result;
}
fast_buffer_append(buffer, "buffer_size = %d KB",
ctx->buffer_size / 1024);
}
fc_server_clear_server_port(&ctx->group_array);
if ((result=fc_groups_to_string(ctx, buffer)) != 0) {
return result;
@ -1509,9 +1546,12 @@ static void fc_server_log_groups(FCServerConfig *ctx)
group->group_name.len, group->group_name.str,
group->port);
if (group->comm_type != fc_comm_type_sock) {
p += sprintf(p, ", communication: %s, buffer_size: %d KB",
p += sprintf(p, ", communication: %s, smart_polling: %d, "
"polling_switch_on_iops: %d, polling_switch_on_count: %d",
fc_comm_type_str(group->comm_type),
group->buffer_size / 1024);
group->smart_polling.enabled,
group->smart_polling.switch_on_iops,
group->smart_polling.switch_on_count);
}
p += sprintf(p, ", net_type: %s, ip_prefix: %.*s",
get_net_type_caption(group->filter.net_type),
@ -1567,6 +1607,9 @@ static void fc_server_log_servers(FCServerConfig *ctx)
void fc_server_to_log(FCServerConfig *ctx)
{
if (ctx->buffer_size > 0) {
logInfo("buffer_size: %d KB", ctx->buffer_size / 1024);
}
fc_server_log_groups(ctx);
fc_server_log_servers(ctx);
}

View File

@ -52,13 +52,20 @@ typedef struct {
FCAddressInfo **addrs;
} FCAddressPtrArray;
typedef struct
{
bool enabled;
int switch_on_iops;
int switch_on_count;
} FCSmartPollingConfig;
typedef struct
{
string_t group_name;
int port; //default port
int server_port; //port in server section
FCCommunicationType comm_type;
int buffer_size; //for RDMA
FCSmartPollingConfig smart_polling;
struct {
int net_type;
string_t ip_prefix;
@ -118,6 +125,7 @@ typedef struct fc_server_config
int default_port;
int min_hosts_each_group;
bool share_between_groups; //if an address shared between different groups
int buffer_size; //for RDMA
FCServerGroupArray group_array;
struct {
FCServerInfoArray by_id; //sorted by server id