diff --git a/src/connection_pool.c b/src/connection_pool.c index 7c6cbae..ad0207e 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -24,9 +24,9 @@ #include "connection_pool.h" ConnectionCallbacks g_connection_callbacks[2] = { - {NULL, NULL, conn_pool_connect_server_ex1, + {NULL, NULL, NULL, conn_pool_connect_server_ex1, conn_pool_disconnect_server, NULL}, - {NULL, NULL, NULL, NULL, NULL} + {NULL, NULL, NULL, NULL, NULL, NULL} }; static int node_init_for_socket(ConnectionNode *node, @@ -41,8 +41,8 @@ static int node_init_for_rdma(ConnectionNode *node, { node->conn = (ConnectionInfo *)(node + 1); node->conn->arg1 = node->conn->args + cp->extra_data_size; - return g_connection_callbacks[fc_comm_type_rdma].init_connection( - node->conn, cp->extra_params.buffer_size, cp->extra_params.pd); + return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn, + cp->extra_params.buffer_size, cp->extra_params.pd); } int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, @@ -81,8 +81,8 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, } if (extra_params != NULL) { - extra_connection_size = g_connection_callbacks - [fc_comm_type_rdma].get_connection_size(); + extra_connection_size = G_RDMA_CONNECTION_CALLBACKS. + get_connection_size(); obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma; cp->extra_params = *extra_params; } else { @@ -595,6 +595,7 @@ int conn_pool_global_init_for_rdma() } callbacks = g_connection_callbacks + fc_comm_type_rdma; + LOAD_API(callbacks, alloc_pd); LOAD_API(callbacks, get_connection_size); LOAD_API(callbacks, init_connection); LOAD_API(callbacks, make_connection); diff --git a/src/connection_pool.h b/src/connection_pool.h index daf794a..69f6a6f 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -57,6 +57,9 @@ typedef struct { char args[0]; //for extra data } ConnectionInfo; +struct ibv_pd; +typedef struct ibv_pd *(*fc_alloc_pd_callback)(const char **ip_addrs, + const int count, const int port); typedef int (*fc_get_connection_size_callback)(); typedef int (*fc_init_connection_callback)(ConnectionInfo *conn, const int buffer_size, void *arg); @@ -67,6 +70,7 @@ typedef void (*fc_close_connection_callback)(ConnectionInfo *conn); typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn); typedef struct { + fc_alloc_pd_callback alloc_pd; fc_get_connection_size_callback get_connection_size; fc_init_connection_callback init_connection; fc_make_connection_callback make_connection; @@ -74,7 +78,6 @@ typedef struct { fc_destroy_connection_callback destroy_connection; } ConnectionCallbacks; -struct ibv_pd; typedef struct { int buffer_size; struct ibv_pd *pd; @@ -132,6 +135,8 @@ extern ConnectionCallbacks g_connection_callbacks[2]; int conn_pool_global_init_for_rdma(); +#define G_RDMA_CONNECTION_CALLBACKS g_connection_callbacks[fc_comm_type_rdma] + /** * init ex function * parameters: @@ -394,6 +399,8 @@ static inline const char *fc_comm_type_str(const FCCommunicationType type) return "socket"; case fc_comm_type_rdma: return "rdma"; + case fc_comm_type_both: + return "both"; default: return "unkown"; } diff --git a/src/server_id_func.c b/src/server_id_func.c index 19790d7..0c25028 100644 --- a/src/server_id_func.c +++ b/src/server_id_func.c @@ -342,11 +342,13 @@ static int fc_server_load_one_group(FCServerConfig *ctx, { int result; FCServerGroupInfo *group; + IniFullContext full_ini_ctx; char new_name[FAST_INI_ITEM_NAME_SIZE]; char *port_str; char *net_type; char *ip_prefix; char *comm_type; + int buffer_size; strcpy(new_name, section_name); group = ctx->group_array.groups + ctx->group_array.count; @@ -404,6 +406,17 @@ static int fc_server_load_one_group(FCServerConfig *ctx, return result; } + if (group->comm_type == fc_comm_type_sock) { + group->buffer_size = 0; + } else { + FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, config_filename, + section_name, ini_context); + buffer_size = iniGetByteValue(section_name, "buffer_size", + ini_context, 256 * 1024); + group->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx, + "buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024); + } + ctx->group_array.count++; return 0; } @@ -1382,13 +1395,22 @@ static int fc_groups_to_string(FCServerConfig *ctx, FastBuffer *buffer) fast_buffer_append(buffer, "[%s%.*s]\n" - "port = %d\n" - "communication = %s\n" - "net_type = %s\n" - "ip_prefix = %.*s\n\n", + "port = %d\n", GROUP_SECTION_PREFIX_STR, group->group_name.len, group->group_name.str, - group->port, fc_comm_type_str(group->comm_type), + group->port); + + if (group->comm_type != fc_comm_type_sock) { + fast_buffer_append(buffer, + "communication = %s\n" + "buffer_size = %d\n", + fc_comm_type_str(group->comm_type), + group->buffer_size); + } + + fast_buffer_append(buffer, + "net_type = %s\n" + "ip_prefix = %.*s\n\n", net_type_caption, group->filter.ip_prefix.len, group->filter.ip_prefix.str); } @@ -1478,14 +1500,24 @@ static void fc_server_log_groups(FCServerConfig *ctx) { FCServerGroupInfo *group; FCServerGroupInfo *end; + char buff[1024]; + char *p; end = ctx->group_array.groups + ctx->group_array.count; for (group=ctx->group_array.groups; groupgroup_name.len, group->group_name.str, - group->port, fc_comm_type_str(group->comm_type), + p = buff + sprintf(buff, "group_name: %.*s, port: %d", + group->group_name.len, group->group_name.str, + group->port); + if (group->comm_type != fc_comm_type_sock) { + p += sprintf(p, ", communication: %s, buffer_size: %d KB", + fc_comm_type_str(group->comm_type), + group->buffer_size / 1024); + } + p += sprintf(p, ", net_type: %s, ip_prefix: %.*s", get_net_type_caption(group->filter.net_type), group->filter.ip_prefix.len, group->filter.ip_prefix.str); + + log_it1(LOG_INFO, buff, p - buff); } } @@ -1668,3 +1700,34 @@ const FCAddressInfo *fc_server_get_address_by_peer( return *(addr_array->addrs); } + +struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd, + FCAddressPtrArray *address_array, int *result) +{ + char *ip_addrs[FC_MAX_SERVER_IP_COUNT]; + char **ip_addr; + FCAddressInfo **addr; + FCAddressInfo **end; + struct ibv_pd *pd; + int port; + + if (address_array->count == 0) { + port = 0; + } else { + port = address_array->addrs[0]->conn.port; + } + + end = address_array->addrs + address_array->count; + for (addr=address_array->addrs, ip_addr=ip_addrs; addrconn.ip_addr; + } + + if ((pd=alloc_pd((const char **)ip_addrs, address_array-> + count, port)) != NULL) + { + *result = 0; + } else { + *result = ENODEV; + } + return pd; +} diff --git a/src/server_id_func.h b/src/server_id_func.h index 2355e90..c034936 100644 --- a/src/server_id_func.h +++ b/src/server_id_func.h @@ -58,6 +58,7 @@ typedef struct int port; //default port int server_port; //port in server section FCCommunicationType comm_type; + int buffer_size; //for RDMA struct { int net_type; string_t ip_prefix; @@ -246,6 +247,9 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, fc_server_make_connection_ex(addr_array, conn, \ service_name, connect_timeout, NULL, true) +struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd, + FCAddressPtrArray *address_array, int *result); + #ifdef __cplusplus } #endif