connection_pool.[hc] support callbacks for RDMA

support_rdma
YuQing 2023-09-10 20:54:24 +08:00
parent bc3a65ee19
commit 5139ec4682
3 changed files with 136 additions and 31 deletions

View File

@ -1,8 +1,9 @@
Version 1.70 2023-09-06 Version 1.70 2023-09-10
* get full mac address of infiniband NIC under Linux * get full mac address of infiniband NIC under Linux
* struct fast_task_info add field conn for RDMA connection * struct fast_task_info add field conn for RDMA connection
* server_id_func.[hc]: support communication type * server_id_func.[hc]: support communication type
* connection_pool.[hc] support callbacks for RDMA
Version 1.69 2023-08-05 Version 1.69 2023-08-05
* bugfixed: array_allocator_alloc MUST init the array * bugfixed: array_allocator_alloc MUST init the array

View File

@ -16,22 +16,47 @@
#include <netdb.h> #include <netdb.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include <dlfcn.h>
#include "logger.h" #include "logger.h"
#include "sockopt.h" #include "sockopt.h"
#include "shared_func.h" #include "shared_func.h"
#include "sched_thread.h" #include "sched_thread.h"
#include "connection_pool.h" #include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks[2] = {
{NULL, NULL, conn_pool_connect_server_ex1,
conn_pool_disconnect_server, NULL},
{NULL, NULL, NULL, NULL, NULL}
};
static int node_init_for_socket(ConnectionNode *node,
ConnectionPool *cp)
{
node->conn = (ConnectionInfo *)(node + 1);
return 0;
}
static int node_init_for_rdma(ConnectionNode *node,
ConnectionPool *cp)
{
node->conn = (ConnectionInfo *)(node + 1);
node->conn->arg1 = node->conn->args + cp->extra_data_size;
return g_connection_callbacks[fc_comm_type_rdma].init_connection(
node->conn, cp->extra_params.buffer_size, cp->extra_params.pd);
}
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time, const int max_count_per_entry, const int max_idle_time,
const int socket_domain, const int htable_init_capacity, const int socket_domain, const int htable_init_capacity,
fc_connection_callback_func connect_done_func, void *connect_done_args, fc_connection_callback_func connect_done_func, void *connect_done_args,
fc_connection_callback_func validate_func, void *validate_args, fc_connection_callback_func validate_func, void *validate_args,
const int extra_data_size) const int extra_data_size, const ConnectionExtraParams *extra_params)
{ {
const int64_t alloc_elements_limit = 0; const int64_t alloc_elements_limit = 0;
int result; int result;
int init_capacity; int init_capacity;
int extra_connection_size;
fast_mblock_object_init_func obj_init_func;
if ((result=init_pthread_lock(&cp->lock)) != 0) if ((result=init_pthread_lock(&cp->lock)) != 0)
{ {
@ -40,6 +65,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
cp->connect_timeout = connect_timeout; cp->connect_timeout = connect_timeout;
cp->max_count_per_entry = max_count_per_entry; cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time; cp->max_idle_time = max_idle_time;
cp->extra_data_size = extra_data_size;
cp->socket_domain = socket_domain; cp->socket_domain = socket_domain;
cp->connect_done_callback.func = connect_done_func; cp->connect_done_callback.func = connect_done_func;
cp->connect_done_callback.args = connect_done_args; cp->connect_done_callback.args = connect_done_args;
@ -54,10 +80,19 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
return result; return result;
} }
if (extra_params != NULL) {
extra_connection_size = g_connection_callbacks
[fc_comm_type_rdma].get_connection_size();
obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma;
cp->extra_params = *extra_params;
} else {
extra_connection_size = 0;
obj_init_func = (fast_mblock_object_init_func)node_init_for_socket;
}
if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node", if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node",
sizeof(ConnectionNode) + sizeof(ConnectionInfo) + sizeof(ConnectionNode) + sizeof(ConnectionInfo) +
extra_data_size, init_capacity, alloc_elements_limit, extra_data_size + extra_connection_size, init_capacity,
NULL, NULL, true)) != 0) alloc_elements_limit, obj_init_func, cp, true)) != 0)
{ {
return result; return result;
} }
@ -84,7 +119,8 @@ static int coon_pool_close_connections(const int index,
deleted = node; deleted = node;
node = node->next; node = node->next;
conn_pool_disconnect_server(deleted->conn); g_connection_callbacks[deleted->conn->comm_type].
close_connection(deleted->conn);
fast_mblock_free_object(&cp->node_allocator, deleted); fast_mblock_free_object(&cp->node_allocator, deleted);
} }
@ -258,7 +294,6 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
return NULL; return NULL;
} }
node->conn = (ConnectionInfo *)(node + 1);
node->manager = cm; node->manager = cm;
node->next = NULL; node->next = NULL;
node->atime = 0; node->atime = 0;
@ -266,12 +301,15 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
cm->total_count++; cm->total_count++;
pthread_mutex_unlock(&cm->lock); pthread_mutex_unlock(&cm->lock);
memcpy(node->conn, conn, sizeof(ConnectionInfo)); memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
node->conn->port = conn->port;
node->conn->comm_type = conn->comm_type;
node->conn->socket_domain = cp->socket_domain; node->conn->socket_domain = cp->socket_domain;
node->conn->sock = -1; node->conn->sock = -1;
node->conn->validate_flag = false; node->conn->validate_flag = false;
*err_no = conn_pool_connect_server_ex1(node->conn, *err_no = g_connection_callbacks[conn->comm_type].
service_name, cp->connect_timeout, NULL, true); make_connection(node->conn, service_name,
cp->connect_timeout, NULL, true);
if (*err_no == 0 && cp->connect_done_callback.func != NULL) if (*err_no == 0 && cp->connect_done_callback.func != NULL)
{ {
*err_no = cp->connect_done_callback.func(node->conn, *err_no = cp->connect_done_callback.func(node->conn,
@ -279,11 +317,8 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
} }
if (*err_no != 0) if (*err_no != 0)
{ {
if (node->conn->sock >= 0) g_connection_callbacks[conn->comm_type].
{ close_connection(node->conn);
close(node->conn->sock);
node->conn->sock = -1;
}
pthread_mutex_lock(&cm->lock); pthread_mutex_lock(&cm->lock);
cm->total_count--; //rollback cm->total_count--; //rollback
fast_mblock_free_object(&cp->node_allocator, node); fast_mblock_free_object(&cp->node_allocator, node);
@ -340,17 +375,15 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
{ {
cm->total_count--; cm->total_count--;
logDebug("file: "__FILE__", line: %d, " \ logDebug("file: "__FILE__", line: %d, "
"server %s:%u, connection: %d idle " \ "server %s:%u, connection: %d idle "
"time: %d exceeds max idle time: %d, "\ "time: %d exceeds max idle time: %d, "
"total_count: %d, free_count: %d", \ "total_count: %d, free_count: %d", __LINE__,
__LINE__, conn->ip_addr, conn->port, \ conn->ip_addr, conn->port, ci->sock, (int)
ci->sock, \ (current_time - node->atime), cp->max_idle_time,
(int)(current_time - node->atime), \ cm->total_count, cm->free_count);
cp->max_idle_time, cm->total_count, \
cm->free_count);
conn_pool_disconnect_server(ci); g_connection_callbacks[ci->comm_type].close_connection(ci);
fast_mblock_free_object(&cp->node_allocator, node); fast_mblock_free_object(&cp->node_allocator, node);
continue; continue;
} }
@ -408,7 +441,7 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
__LINE__, conn->ip_addr, conn->port, __LINE__, conn->ip_addr, conn->port,
conn->sock, cm->total_count, cm->free_count); conn->sock, cm->total_count, cm->free_count);
conn_pool_disconnect_server(conn); g_connection_callbacks[conn->comm_type].close_connection(conn);
fast_mblock_free_object(&cp->node_allocator, node); fast_mblock_free_object(&cp->node_allocator, node);
node = cm->head; node = cm->head;
@ -533,3 +566,40 @@ int conn_pool_load_server_info(IniContext *pIniContext, const char *filename,
return conn_pool_parse_server_info(pServerStr, pServerInfo, default_port); return conn_pool_parse_server_info(pServerStr, pServerInfo, default_port);
} }
#define API_PREFIX_NAME "fast_rdma_client_"
#define LOAD_API(callbacks, fname) \
do { \
callbacks->fname = dlsym(dlhandle, API_PREFIX_NAME#fname); \
if (callbacks->fname == NULL) { \
logError("file: "__FILE__", line: %d, " \
"dlsym api %s fail, error info: %s", \
__LINE__, API_PREFIX_NAME#fname, dlerror()); \
return ENOENT; \
} \
} while (0)
int conn_pool_global_init_for_rdma()
{
const char *library = "libfastrdma.so";
ConnectionCallbacks *callbacks;
void *dlhandle;
dlhandle = dlopen(library, RTLD_LAZY);
if (dlhandle == NULL) {
logError("file: "__FILE__", line: %d, "
"dlopen %s fail, error info: %s",
__LINE__, library, dlerror());
return EFAULT;
}
callbacks = g_connection_callbacks + fc_comm_type_rdma;
LOAD_API(callbacks, get_connection_size);
LOAD_API(callbacks, init_connection);
LOAD_API(callbacks, make_connection);
LOAD_API(callbacks, close_connection);
LOAD_API(callbacks, destroy_connection);
return 0;
}

View File

@ -53,9 +53,33 @@ typedef struct {
FCCommunicationType comm_type; FCCommunicationType comm_type;
bool validate_flag; //for connection pool bool validate_flag; //for connection pool
char ip_addr[INET6_ADDRSTRLEN]; char ip_addr[INET6_ADDRSTRLEN];
void *arg1; //for RDMA
char args[0]; //for extra data char args[0]; //for extra data
} ConnectionInfo; } ConnectionInfo;
typedef int (*fc_get_connection_size_callback)();
typedef int (*fc_init_connection_callback)(ConnectionInfo *conn,
const int buffer_size, void *arg);
typedef int (*fc_make_connection_callback)(ConnectionInfo *conn,
const char *service_name, const int timeout_ms,
const char *bind_ipaddr, const bool log_connect_error);
typedef void (*fc_close_connection_callback)(ConnectionInfo *conn);
typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn);
typedef struct {
fc_get_connection_size_callback get_connection_size;
fc_init_connection_callback init_connection;
fc_make_connection_callback make_connection;
fc_close_connection_callback close_connection;
fc_destroy_connection_callback destroy_connection;
} ConnectionCallbacks;
struct ibv_pd;
typedef struct {
int buffer_size;
struct ibv_pd *pd;
} ConnectionExtraParams;
typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args); typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args);
struct tagConnectionManager; struct tagConnectionManager;
@ -99,8 +123,15 @@ typedef struct tagConnectionPool {
fc_connection_callback_func func; fc_connection_callback_func func;
void *args; void *args;
} validate_callback; } validate_callback;
int extra_data_size;
ConnectionExtraParams extra_params;
} ConnectionPool; } ConnectionPool;
extern ConnectionCallbacks g_connection_callbacks[2];
int conn_pool_global_init_for_rdma();
/** /**
* init ex function * init ex function
* parameters: * parameters:
@ -115,6 +146,7 @@ typedef struct tagConnectionPool {
* validate_func: the validate connection callback * validate_func: the validate connection callback
* validate_args: the args for validate connection callback * validate_args: the args for validate connection callback
* extra_data_size: the extra data size of connection * extra_data_size: the extra data size of connection
* extra_params: for RDMA
* return 0 for success, != 0 for error * return 0 for success, != 0 for error
*/ */
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
@ -122,7 +154,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int socket_domain, const int htable_init_capacity, const int socket_domain, const int htable_init_capacity,
fc_connection_callback_func connect_done_func, void *connect_done_args, fc_connection_callback_func connect_done_func, void *connect_done_args,
fc_connection_callback_func validate_func, void *validate_args, fc_connection_callback_func validate_func, void *validate_args,
const int extra_data_size); const int extra_data_size, const ConnectionExtraParams *extra_params);
/** /**
* init ex function * init ex function
@ -140,9 +172,10 @@ static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
{ {
const int htable_init_capacity = 0; const int htable_init_capacity = 0;
const int extra_data_size = 0; const int extra_data_size = 0;
const ConnectionExtraParams *extra_params = NULL;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity, max_idle_time, socket_domain, htable_init_capacity,
NULL, NULL, NULL, NULL, extra_data_size); NULL, NULL, NULL, NULL, extra_data_size, extra_params);
} }
/** /**
@ -160,9 +193,10 @@ static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout,
const int socket_domain = AF_INET; const int socket_domain = AF_INET;
const int htable_init_capacity = 0; const int htable_init_capacity = 0;
const int extra_data_size = 0; const int extra_data_size = 0;
const ConnectionExtraParams *extra_params = NULL;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity, max_idle_time, socket_domain, htable_init_capacity,
NULL, NULL, NULL, NULL, extra_data_size); NULL, NULL, NULL, NULL, extra_data_size, extra_params);
} }
/** /**