load connection_thread_local from cluster.conf

support_rdma
YuQing 2023-09-20 10:43:05 +08:00
parent e0bbe89d23
commit 7b0631e37a
4 changed files with 84 additions and 8 deletions

View File

@ -847,16 +847,35 @@ int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params,
return ENOENT;
}
if (server_group->comm_type == fc_comm_type_sock) {
extra_params->tls.enabled = false;
switch (server_cfg->connection_thread_local) {
case fc_connection_thread_local_auto:
if (server_group->comm_type == fc_comm_type_sock) {
extra_params->tls.enabled = false;
} else {
extra_params->tls.enabled = (FC_SID_SERVER_COUNT(
*server_cfg) <= 64);
}
break;
case fc_connection_thread_local_yes:
extra_params->tls.enabled = true;
break;
default:
extra_params->tls.enabled = false;
break;
}
if (extra_params->tls.enabled) {
extra_params->tls.htable_capacity = fc_ceil_prime(
FC_SID_SERVER_COUNT(*server_cfg));
} else {
extra_params->tls.htable_capacity = 0;
}
if (server_group->comm_type == fc_comm_type_sock) {
extra_params->rdma.buffer_size = 0;
extra_params->rdma.pd = NULL;
return 0;
} else {
extra_params->tls.enabled = true;
extra_params->tls.htable_capacity = fc_ceil_prime(
FC_SID_SERVER_COUNT(*server_cfg));
first_server = FC_SID_SERVERS(*server_cfg);
extra_params->rdma.buffer_size = server_cfg->buffer_size + padding_size;
extra_params->rdma.pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS.

View File

@ -128,12 +128,15 @@ int ioevent_loop(struct nio_thread_data *thread_data,
last_check_time = g_current_time;
while (*continue_flag)
{
if (thread_data->ev_puller.timeout == 0)
{
#ifdef OS_LINUX
if (thread_data->ev_puller.timeout == 0) {
sched_pull = (sched_counter++ & 8) != 0;
} else {
sched_pull = true;
}
#else
sched_pull = true;
#endif
if (sched_pull)
{

View File

@ -1222,6 +1222,30 @@ static int fc_server_load_servers(FCServerConfig *ctx,
return result;
}
static void load_connection_thread_local(FCServerConfig *ctx,
IniContext *ini_context, const char *config_filename)
{
char *connection_thread_local;
connection_thread_local = iniGetStrValue(NULL,
"connection_thread_local", ini_context);
if (connection_thread_local == NULL || *connection_thread_local == '\0') {
ctx->connection_thread_local = fc_connection_thread_local_auto;
} else if (strcasecmp(connection_thread_local, "auto") == 0) {
ctx->connection_thread_local = fc_connection_thread_local_auto;
} else if (strcasecmp(connection_thread_local, "yes") == 0) {
ctx->connection_thread_local = fc_connection_thread_local_yes;
} else if (strcasecmp(connection_thread_local, "no") == 0) {
ctx->connection_thread_local = fc_connection_thread_local_no;
} else {
logWarning("file: "__FILE__", line: %d, "
"config file: %s, invalid connection_thread_local: %s, "
"set to auto!", __LINE__, config_filename,
connection_thread_local);
ctx->connection_thread_local = fc_connection_thread_local_auto;
}
}
static int fc_server_load_data(FCServerConfig *ctx,
IniContext *ini_context, const char *config_filename)
{
@ -1257,6 +1281,7 @@ static int fc_server_load_data(FCServerConfig *ctx,
} else {
ctx->buffer_size = 0;
}
load_connection_thread_local(ctx, ini_context, config_filename);
if ((result=fc_server_load_servers(ctx, config_filename,
ini_context)) != 0)
@ -1607,9 +1632,16 @@ static void fc_server_log_servers(FCServerConfig *ctx)
void fc_server_to_log(FCServerConfig *ctx)
{
char buff[256];
char *p;
p = buff + sprintf(buff, "connection_thread_local: %s",
fc_connection_thread_local_str(ctx->connection_thread_local));
if (ctx->buffer_size > 0) {
logInfo("buffer_size: %d KB", ctx->buffer_size / 1024);
p += sprintf(p, ", buffer_size: %d KB", ctx->buffer_size / 1024);
}
log_it1(LOG_INFO, buff, p - buff);
fc_server_log_groups(ctx);
fc_server_log_servers(ctx);
}

View File

@ -120,12 +120,19 @@ typedef struct
FCServerMap *maps;
} FCServerMapArray;
typedef enum {
fc_connection_thread_local_auto,
fc_connection_thread_local_yes,
fc_connection_thread_local_no
} FCServerConnThreadLocal;
typedef struct fc_server_config
{
int default_port;
int min_hosts_each_group;
bool share_between_groups; //if an address shared between different groups
int buffer_size; //for RDMA
FCServerConnThreadLocal connection_thread_local;
FCServerGroupArray group_array;
struct {
FCServerInfoArray by_id; //sorted by server id
@ -247,6 +254,21 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd,
FCAddressPtrArray *address_array, int *result);
static inline const char *fc_connection_thread_local_str(
const FCServerConnThreadLocal value)
{
switch (value) {
case fc_connection_thread_local_auto:
return "auto";
case fc_connection_thread_local_yes:
return "yes";
case fc_connection_thread_local_no:
return "no";
default:
return "unkown";
}
}
#ifdef __cplusplus
}
#endif