From 5139ec468276223681d17f3173e562c16b432753 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 10 Sep 2023 20:54:24 +0800 Subject: [PATCH] connection_pool.[hc] support callbacks for RDMA --- HISTORY | 3 +- src/connection_pool.c | 118 +++++++++++++++++++++++++++++++++--------- src/connection_pool.h | 46 +++++++++++++--- 3 files changed, 136 insertions(+), 31 deletions(-) diff --git a/HISTORY b/HISTORY index 3236f5f..5baeaef 100644 --- a/HISTORY +++ b/HISTORY @@ -1,8 +1,9 @@ -Version 1.70 2023-09-06 +Version 1.70 2023-09-10 * get full mac address of infiniband NIC under Linux * struct fast_task_info add field conn for RDMA connection * server_id_func.[hc]: support communication type + * connection_pool.[hc] support callbacks for RDMA Version 1.69 2023-08-05 * bugfixed: array_allocator_alloc MUST init the array diff --git a/src/connection_pool.c b/src/connection_pool.c index fe3f37c..7c6cbae 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -16,22 +16,47 @@ #include #include #include +#include #include "logger.h" #include "sockopt.h" #include "shared_func.h" #include "sched_thread.h" #include "connection_pool.h" +ConnectionCallbacks g_connection_callbacks[2] = { + {NULL, NULL, conn_pool_connect_server_ex1, + conn_pool_disconnect_server, NULL}, + {NULL, NULL, NULL, NULL, NULL} +}; + +static int node_init_for_socket(ConnectionNode *node, + ConnectionPool *cp) +{ + node->conn = (ConnectionInfo *)(node + 1); + return 0; +} + +static int node_init_for_rdma(ConnectionNode *node, + ConnectionPool *cp) +{ + node->conn = (ConnectionInfo *)(node + 1); + node->conn->arg1 = node->conn->args + cp->extra_data_size; + return g_connection_callbacks[fc_comm_type_rdma].init_connection( + node->conn, cp->extra_params.buffer_size, cp->extra_params.pd); +} + int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int max_count_per_entry, const int max_idle_time, const int socket_domain, const int htable_init_capacity, fc_connection_callback_func connect_done_func, void *connect_done_args, fc_connection_callback_func validate_func, void *validate_args, - const int extra_data_size) + const int extra_data_size, const ConnectionExtraParams *extra_params) { const int64_t alloc_elements_limit = 0; int result; int init_capacity; + int extra_connection_size; + fast_mblock_object_init_func obj_init_func; if ((result=init_pthread_lock(&cp->lock)) != 0) { @@ -40,6 +65,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, cp->connect_timeout = connect_timeout; cp->max_count_per_entry = max_count_per_entry; cp->max_idle_time = max_idle_time; + cp->extra_data_size = extra_data_size; cp->socket_domain = socket_domain; cp->connect_done_callback.func = connect_done_func; cp->connect_done_callback.args = connect_done_args; @@ -54,10 +80,19 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, return result; } + if (extra_params != NULL) { + extra_connection_size = g_connection_callbacks + [fc_comm_type_rdma].get_connection_size(); + obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma; + cp->extra_params = *extra_params; + } else { + extra_connection_size = 0; + obj_init_func = (fast_mblock_object_init_func)node_init_for_socket; + } if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node", sizeof(ConnectionNode) + sizeof(ConnectionInfo) + - extra_data_size, init_capacity, alloc_elements_limit, - NULL, NULL, true)) != 0) + extra_data_size + extra_connection_size, init_capacity, + alloc_elements_limit, obj_init_func, cp, true)) != 0) { return result; } @@ -84,7 +119,8 @@ static int coon_pool_close_connections(const int index, deleted = node; node = node->next; - conn_pool_disconnect_server(deleted->conn); + g_connection_callbacks[deleted->conn->comm_type]. + close_connection(deleted->conn); fast_mblock_free_object(&cp->node_allocator, deleted); } @@ -258,7 +294,6 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, return NULL; } - node->conn = (ConnectionInfo *)(node + 1); node->manager = cm; node->next = NULL; node->atime = 0; @@ -266,12 +301,15 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, cm->total_count++; pthread_mutex_unlock(&cm->lock); - memcpy(node->conn, conn, sizeof(ConnectionInfo)); + memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr)); + node->conn->port = conn->port; + node->conn->comm_type = conn->comm_type; node->conn->socket_domain = cp->socket_domain; node->conn->sock = -1; node->conn->validate_flag = false; - *err_no = conn_pool_connect_server_ex1(node->conn, - service_name, cp->connect_timeout, NULL, true); + *err_no = g_connection_callbacks[conn->comm_type]. + make_connection(node->conn, service_name, + cp->connect_timeout, NULL, true); if (*err_no == 0 && cp->connect_done_callback.func != NULL) { *err_no = cp->connect_done_callback.func(node->conn, @@ -279,11 +317,8 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, } if (*err_no != 0) { - if (node->conn->sock >= 0) - { - close(node->conn->sock); - node->conn->sock = -1; - } + g_connection_callbacks[conn->comm_type]. + close_connection(node->conn); pthread_mutex_lock(&cm->lock); cm->total_count--; //rollback fast_mblock_free_object(&cp->node_allocator, node); @@ -340,17 +375,15 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, { cm->total_count--; - logDebug("file: "__FILE__", line: %d, " \ - "server %s:%u, connection: %d idle " \ - "time: %d exceeds max idle time: %d, "\ - "total_count: %d, free_count: %d", \ - __LINE__, conn->ip_addr, conn->port, \ - ci->sock, \ - (int)(current_time - node->atime), \ - cp->max_idle_time, cm->total_count, \ - cm->free_count); + logDebug("file: "__FILE__", line: %d, " + "server %s:%u, connection: %d idle " + "time: %d exceeds max idle time: %d, " + "total_count: %d, free_count: %d", __LINE__, + conn->ip_addr, conn->port, ci->sock, (int) + (current_time - node->atime), cp->max_idle_time, + cm->total_count, cm->free_count); - conn_pool_disconnect_server(ci); + g_connection_callbacks[ci->comm_type].close_connection(ci); fast_mblock_free_object(&cp->node_allocator, node); continue; } @@ -408,7 +441,7 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, __LINE__, conn->ip_addr, conn->port, conn->sock, cm->total_count, cm->free_count); - conn_pool_disconnect_server(conn); + g_connection_callbacks[conn->comm_type].close_connection(conn); fast_mblock_free_object(&cp->node_allocator, node); node = cm->head; @@ -533,3 +566,40 @@ int conn_pool_load_server_info(IniContext *pIniContext, const char *filename, return conn_pool_parse_server_info(pServerStr, pServerInfo, default_port); } + +#define API_PREFIX_NAME "fast_rdma_client_" + +#define LOAD_API(callbacks, fname) \ + do { \ + callbacks->fname = dlsym(dlhandle, API_PREFIX_NAME#fname); \ + if (callbacks->fname == NULL) { \ + logError("file: "__FILE__", line: %d, " \ + "dlsym api %s fail, error info: %s", \ + __LINE__, API_PREFIX_NAME#fname, dlerror()); \ + return ENOENT; \ + } \ + } while (0) + +int conn_pool_global_init_for_rdma() +{ + const char *library = "libfastrdma.so"; + ConnectionCallbacks *callbacks; + void *dlhandle; + + dlhandle = dlopen(library, RTLD_LAZY); + if (dlhandle == NULL) { + logError("file: "__FILE__", line: %d, " + "dlopen %s fail, error info: %s", + __LINE__, library, dlerror()); + return EFAULT; + } + + callbacks = g_connection_callbacks + fc_comm_type_rdma; + LOAD_API(callbacks, get_connection_size); + LOAD_API(callbacks, init_connection); + LOAD_API(callbacks, make_connection); + LOAD_API(callbacks, close_connection); + LOAD_API(callbacks, destroy_connection); + + return 0; +} diff --git a/src/connection_pool.h b/src/connection_pool.h index fa8deed..daf794a 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -47,15 +47,39 @@ typedef enum { } FCCommunicationType; typedef struct { - int sock; - uint16_t port; + int sock; + uint16_t port; short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect FCCommunicationType comm_type; bool validate_flag; //for connection pool - char ip_addr[INET6_ADDRSTRLEN]; + char ip_addr[INET6_ADDRSTRLEN]; + void *arg1; //for RDMA char args[0]; //for extra data } ConnectionInfo; +typedef int (*fc_get_connection_size_callback)(); +typedef int (*fc_init_connection_callback)(ConnectionInfo *conn, + const int buffer_size, void *arg); +typedef int (*fc_make_connection_callback)(ConnectionInfo *conn, + const char *service_name, const int timeout_ms, + const char *bind_ipaddr, const bool log_connect_error); +typedef void (*fc_close_connection_callback)(ConnectionInfo *conn); +typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn); + +typedef struct { + fc_get_connection_size_callback get_connection_size; + fc_init_connection_callback init_connection; + fc_make_connection_callback make_connection; + fc_close_connection_callback close_connection; + fc_destroy_connection_callback destroy_connection; +} ConnectionCallbacks; + +struct ibv_pd; +typedef struct { + int buffer_size; + struct ibv_pd *pd; +} ConnectionExtraParams; + typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args); struct tagConnectionManager; @@ -99,8 +123,15 @@ typedef struct tagConnectionPool { fc_connection_callback_func func; void *args; } validate_callback; + + int extra_data_size; + ConnectionExtraParams extra_params; } ConnectionPool; +extern ConnectionCallbacks g_connection_callbacks[2]; + +int conn_pool_global_init_for_rdma(); + /** * init ex function * parameters: @@ -115,6 +146,7 @@ typedef struct tagConnectionPool { * validate_func: the validate connection callback * validate_args: the args for validate connection callback * extra_data_size: the extra data size of connection +* extra_params: for RDMA * return 0 for success, != 0 for error */ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, @@ -122,7 +154,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int socket_domain, const int htable_init_capacity, fc_connection_callback_func connect_done_func, void *connect_done_args, fc_connection_callback_func validate_func, void *validate_args, - const int extra_data_size); + const int extra_data_size, const ConnectionExtraParams *extra_params); /** * init ex function @@ -140,9 +172,10 @@ static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, { const int htable_init_capacity = 0; const int extra_data_size = 0; + const ConnectionExtraParams *extra_params = NULL; return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, max_idle_time, socket_domain, htable_init_capacity, - NULL, NULL, NULL, NULL, extra_data_size); + NULL, NULL, NULL, NULL, extra_data_size, extra_params); } /** @@ -160,9 +193,10 @@ static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout, const int socket_domain = AF_INET; const int htable_init_capacity = 0; const int extra_data_size = 0; + const ConnectionExtraParams *extra_params = NULL; return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, max_idle_time, socket_domain, htable_init_capacity, - NULL, NULL, NULL, NULL, extra_data_size); + NULL, NULL, NULL, NULL, extra_data_size, extra_params); } /**