function fc_alloc_rdma_pd impl.

support_rdma
YuQing 2023-09-11 11:32:32 +08:00
parent 5139ec4682
commit d24023aee7
4 changed files with 90 additions and 15 deletions

View File

@ -24,9 +24,9 @@
#include "connection_pool.h" #include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks[2] = { ConnectionCallbacks g_connection_callbacks[2] = {
{NULL, NULL, conn_pool_connect_server_ex1, {NULL, NULL, NULL, conn_pool_connect_server_ex1,
conn_pool_disconnect_server, NULL}, conn_pool_disconnect_server, NULL},
{NULL, NULL, NULL, NULL, NULL} {NULL, NULL, NULL, NULL, NULL, NULL}
}; };
static int node_init_for_socket(ConnectionNode *node, static int node_init_for_socket(ConnectionNode *node,
@ -41,8 +41,8 @@ static int node_init_for_rdma(ConnectionNode *node,
{ {
node->conn = (ConnectionInfo *)(node + 1); node->conn = (ConnectionInfo *)(node + 1);
node->conn->arg1 = node->conn->args + cp->extra_data_size; node->conn->arg1 = node->conn->args + cp->extra_data_size;
return g_connection_callbacks[fc_comm_type_rdma].init_connection( return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn,
node->conn, cp->extra_params.buffer_size, cp->extra_params.pd); cp->extra_params.buffer_size, cp->extra_params.pd);
} }
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
@ -81,8 +81,8 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
} }
if (extra_params != NULL) { if (extra_params != NULL) {
extra_connection_size = g_connection_callbacks extra_connection_size = G_RDMA_CONNECTION_CALLBACKS.
[fc_comm_type_rdma].get_connection_size(); get_connection_size();
obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma; obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma;
cp->extra_params = *extra_params; cp->extra_params = *extra_params;
} else { } else {
@ -595,6 +595,7 @@ int conn_pool_global_init_for_rdma()
} }
callbacks = g_connection_callbacks + fc_comm_type_rdma; callbacks = g_connection_callbacks + fc_comm_type_rdma;
LOAD_API(callbacks, alloc_pd);
LOAD_API(callbacks, get_connection_size); LOAD_API(callbacks, get_connection_size);
LOAD_API(callbacks, init_connection); LOAD_API(callbacks, init_connection);
LOAD_API(callbacks, make_connection); LOAD_API(callbacks, make_connection);

View File

@ -57,6 +57,9 @@ typedef struct {
char args[0]; //for extra data char args[0]; //for extra data
} ConnectionInfo; } ConnectionInfo;
struct ibv_pd;
typedef struct ibv_pd *(*fc_alloc_pd_callback)(const char **ip_addrs,
const int count, const int port);
typedef int (*fc_get_connection_size_callback)(); typedef int (*fc_get_connection_size_callback)();
typedef int (*fc_init_connection_callback)(ConnectionInfo *conn, typedef int (*fc_init_connection_callback)(ConnectionInfo *conn,
const int buffer_size, void *arg); const int buffer_size, void *arg);
@ -67,6 +70,7 @@ typedef void (*fc_close_connection_callback)(ConnectionInfo *conn);
typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn); typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn);
typedef struct { typedef struct {
fc_alloc_pd_callback alloc_pd;
fc_get_connection_size_callback get_connection_size; fc_get_connection_size_callback get_connection_size;
fc_init_connection_callback init_connection; fc_init_connection_callback init_connection;
fc_make_connection_callback make_connection; fc_make_connection_callback make_connection;
@ -74,7 +78,6 @@ typedef struct {
fc_destroy_connection_callback destroy_connection; fc_destroy_connection_callback destroy_connection;
} ConnectionCallbacks; } ConnectionCallbacks;
struct ibv_pd;
typedef struct { typedef struct {
int buffer_size; int buffer_size;
struct ibv_pd *pd; struct ibv_pd *pd;
@ -132,6 +135,8 @@ extern ConnectionCallbacks g_connection_callbacks[2];
int conn_pool_global_init_for_rdma(); int conn_pool_global_init_for_rdma();
#define G_RDMA_CONNECTION_CALLBACKS g_connection_callbacks[fc_comm_type_rdma]
/** /**
* init ex function * init ex function
* parameters: * parameters:
@ -394,6 +399,8 @@ static inline const char *fc_comm_type_str(const FCCommunicationType type)
return "socket"; return "socket";
case fc_comm_type_rdma: case fc_comm_type_rdma:
return "rdma"; return "rdma";
case fc_comm_type_both:
return "both";
default: default:
return "unkown"; return "unkown";
} }

View File

@ -342,11 +342,13 @@ static int fc_server_load_one_group(FCServerConfig *ctx,
{ {
int result; int result;
FCServerGroupInfo *group; FCServerGroupInfo *group;
IniFullContext full_ini_ctx;
char new_name[FAST_INI_ITEM_NAME_SIZE]; char new_name[FAST_INI_ITEM_NAME_SIZE];
char *port_str; char *port_str;
char *net_type; char *net_type;
char *ip_prefix; char *ip_prefix;
char *comm_type; char *comm_type;
int buffer_size;
strcpy(new_name, section_name); strcpy(new_name, section_name);
group = ctx->group_array.groups + ctx->group_array.count; group = ctx->group_array.groups + ctx->group_array.count;
@ -404,6 +406,17 @@ static int fc_server_load_one_group(FCServerConfig *ctx,
return result; return result;
} }
if (group->comm_type == fc_comm_type_sock) {
group->buffer_size = 0;
} else {
FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, config_filename,
section_name, ini_context);
buffer_size = iniGetByteValue(section_name, "buffer_size",
ini_context, 256 * 1024);
group->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx,
"buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024);
}
ctx->group_array.count++; ctx->group_array.count++;
return 0; return 0;
} }
@ -1382,13 +1395,22 @@ static int fc_groups_to_string(FCServerConfig *ctx, FastBuffer *buffer)
fast_buffer_append(buffer, fast_buffer_append(buffer,
"[%s%.*s]\n" "[%s%.*s]\n"
"port = %d\n" "port = %d\n",
"communication = %s\n"
"net_type = %s\n"
"ip_prefix = %.*s\n\n",
GROUP_SECTION_PREFIX_STR, GROUP_SECTION_PREFIX_STR,
group->group_name.len, group->group_name.str, group->group_name.len, group->group_name.str,
group->port, fc_comm_type_str(group->comm_type), group->port);
if (group->comm_type != fc_comm_type_sock) {
fast_buffer_append(buffer,
"communication = %s\n"
"buffer_size = %d\n",
fc_comm_type_str(group->comm_type),
group->buffer_size);
}
fast_buffer_append(buffer,
"net_type = %s\n"
"ip_prefix = %.*s\n\n",
net_type_caption, group->filter.ip_prefix.len, net_type_caption, group->filter.ip_prefix.len,
group->filter.ip_prefix.str); group->filter.ip_prefix.str);
} }
@ -1478,14 +1500,24 @@ static void fc_server_log_groups(FCServerConfig *ctx)
{ {
FCServerGroupInfo *group; FCServerGroupInfo *group;
FCServerGroupInfo *end; FCServerGroupInfo *end;
char buff[1024];
char *p;
end = ctx->group_array.groups + ctx->group_array.count; end = ctx->group_array.groups + ctx->group_array.count;
for (group=ctx->group_array.groups; group<end; group++) { for (group=ctx->group_array.groups; group<end; group++) {
logInfo("group_name: %.*s, port: %d, communication: %s, net_type: %s, " p = buff + sprintf(buff, "group_name: %.*s, port: %d",
"ip_prefix: %.*s", group->group_name.len, group->group_name.str, group->group_name.len, group->group_name.str,
group->port, fc_comm_type_str(group->comm_type), group->port);
if (group->comm_type != fc_comm_type_sock) {
p += sprintf(p, ", communication: %s, buffer_size: %d KB",
fc_comm_type_str(group->comm_type),
group->buffer_size / 1024);
}
p += sprintf(p, ", net_type: %s, ip_prefix: %.*s",
get_net_type_caption(group->filter.net_type), get_net_type_caption(group->filter.net_type),
group->filter.ip_prefix.len, group->filter.ip_prefix.str); group->filter.ip_prefix.len, group->filter.ip_prefix.str);
log_it1(LOG_INFO, buff, p - buff);
} }
} }
@ -1668,3 +1700,34 @@ const FCAddressInfo *fc_server_get_address_by_peer(
return *(addr_array->addrs); return *(addr_array->addrs);
} }
struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd,
FCAddressPtrArray *address_array, int *result)
{
char *ip_addrs[FC_MAX_SERVER_IP_COUNT];
char **ip_addr;
FCAddressInfo **addr;
FCAddressInfo **end;
struct ibv_pd *pd;
int port;
if (address_array->count == 0) {
port = 0;
} else {
port = address_array->addrs[0]->conn.port;
}
end = address_array->addrs + address_array->count;
for (addr=address_array->addrs, ip_addr=ip_addrs; addr<end; addr++) {
*ip_addr = (*addr)->conn.ip_addr;
}
if ((pd=alloc_pd((const char **)ip_addrs, address_array->
count, port)) != NULL)
{
*result = 0;
} else {
*result = ENODEV;
}
return pd;
}

View File

@ -58,6 +58,7 @@ typedef struct
int port; //default port int port; //default port
int server_port; //port in server section int server_port; //port in server section
FCCommunicationType comm_type; FCCommunicationType comm_type;
int buffer_size; //for RDMA
struct { struct {
int net_type; int net_type;
string_t ip_prefix; string_t ip_prefix;
@ -246,6 +247,9 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
fc_server_make_connection_ex(addr_array, conn, \ fc_server_make_connection_ex(addr_array, conn, \
service_name, connect_timeout, NULL, true) service_name, connect_timeout, NULL, true)
struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd,
FCAddressPtrArray *address_array, int *result);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif