conn_pool_alloc_connection impl.

support_rdma
YuQing 2023-09-12 16:01:29 +08:00
parent c9687df03a
commit db49d54a37
4 changed files with 142 additions and 103 deletions

View File

@ -21,11 +21,14 @@
#include "sockopt.h"
#include "shared_func.h"
#include "sched_thread.h"
#include "server_id_func.h"
#include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks = {
{{conn_pool_connect_server_ex1, conn_pool_disconnect_server},
{NULL, NULL}}, {NULL}
false, {{conn_pool_connect_server_ex1,
conn_pool_disconnect_server,
conn_pool_is_connected},
{NULL, NULL, NULL}}, {NULL}
};
static int node_init_for_socket(ConnectionNode *node,
@ -61,7 +64,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
{
return result;
}
cp->connect_timeout = connect_timeout;
cp->connect_timeout_ms = connect_timeout * 1000;
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
cp->extra_data_size = extra_data_size;
@ -139,17 +142,22 @@ void conn_pool_destroy(ConnectionPool *cp)
pthread_mutex_destroy(&cp->lock);
}
void conn_pool_disconnect_server(ConnectionInfo *pConnection)
void conn_pool_disconnect_server(ConnectionInfo *conn)
{
if (pConnection->sock >= 0)
{
close(pConnection->sock);
pConnection->sock = -1;
}
if (conn->sock >= 0)
{
close(conn->sock);
conn->sock = -1;
}
}
bool conn_pool_is_connected(ConnectionInfo *conn)
{
return (conn->sock >= 0);
}
int conn_pool_connect_server_ex1(ConnectionInfo *conn,
const char *service_name, const int connect_timeout,
const char *service_name, const int connect_timeout_ms,
const char *bind_ipaddr, const bool log_connect_error)
{
int result;
@ -166,7 +174,7 @@ int conn_pool_connect_server_ex1(ConnectionInfo *conn,
}
if ((result=connectserverbyip_nb(conn->sock, conn->ip_addr,
conn->port, connect_timeout)) != 0)
conn->port, connect_timeout_ms / 1000)) != 0)
{
if (log_connect_error)
{
@ -308,7 +316,7 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
node->conn->validate_flag = false;
*err_no = G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
make_connection(node->conn, service_name,
cp->connect_timeout, NULL, true);
cp->connect_timeout_ms, NULL, true);
if (*err_no == 0 && cp->connect_done_callback.func != NULL)
{
*err_no = cp->connect_done_callback.func(node->conn,
@ -586,6 +594,10 @@ int conn_pool_global_init_for_rdma()
const char *library = "libfastrdma.so";
void *dlhandle;
if (g_connection_callbacks.inited) {
return 0;
}
dlhandle = dlopen(library, RTLD_LAZY);
if (dlhandle == NULL) {
logError("file: "__FILE__", line: %d, "
@ -598,6 +610,8 @@ int conn_pool_global_init_for_rdma()
make_connection);
LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma],
close_connection);
LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma],
is_connected);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, alloc_pd);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_connection_size);
@ -605,11 +619,77 @@ int conn_pool_global_init_for_rdma()
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, make_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, close_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, destroy_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, is_connected);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_buffer);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf1);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf2);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_iov);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_mix);
g_connection_callbacks.inited = true;
return 0;
}
ConnectionInfo *conn_pool_alloc_connection_ex(
const FCCommunicationType comm_type,
const int extra_data_size,
const ConnectionExtraParams *extra_params,
int *err_no)
{
ConnectionInfo *conn;
int bytes;
if (comm_type == fc_comm_type_rdma) {
bytes = sizeof(ConnectionInfo) + extra_data_size +
G_RDMA_CONNECTION_CALLBACKS.get_connection_size();
} else {
bytes = sizeof(ConnectionInfo) + extra_data_size;
}
if ((conn=fc_malloc(bytes)) == NULL) {
*err_no = ENOMEM;
return NULL;
}
memset(conn, 0, bytes);
if (comm_type == fc_comm_type_rdma) {
conn->arg1 = conn->args + extra_data_size;
if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection(
conn, extra_params->buffer_size,
extra_params->pd)) != 0)
{
free(conn);
return NULL;
}
} else {
*err_no = 0;
}
conn->comm_type = comm_type;
return conn;
}
int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params,
struct fc_server_config *server_cfg, const int server_group_index)
{
const int padding_size = 1024;
FCServerGroupInfo *server_group;
FCServerInfo *first_server;
int result;
if ((server_group=fc_server_get_group_by_index(server_cfg,
server_group_index)) == NULL)
{
return ENOENT;
}
if (server_group->comm_type == fc_comm_type_sock) {
return 0;
} else {
first_server = FC_SID_SERVERS(*server_cfg);
extra_params->buffer_size = server_group->buffer_size + padding_size;
extra_params->pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS.
alloc_pd, &first_server->group_addrs[server_group_index].
address_array, &result);
return result;
}
}

View File

@ -57,6 +57,7 @@ typedef struct {
char args[0]; //for extra data
} ConnectionInfo;
struct fc_server_config;
struct ibv_pd;
typedef struct ibv_pd *(*fc_alloc_pd_callback)(const char **ip_addrs,
const int count, const int port);
@ -66,6 +67,7 @@ typedef int (*fc_init_connection_callback)(ConnectionInfo *conn,
typedef int (*fc_make_connection_callback)(ConnectionInfo *conn,
const char *service_name, const int timeout_ms,
const char *bind_ipaddr, const bool log_connect_error);
typedef bool (*fc_is_connected_callback)(ConnectionInfo *conn);
typedef void (*fc_close_connection_callback)(ConnectionInfo *conn);
typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn);
@ -85,6 +87,7 @@ typedef int (*fc_rdma_request_by_mix_callback)(ConnectionInfo *conn,
typedef struct {
fc_make_connection_callback make_connection;
fc_close_connection_callback close_connection;
fc_is_connected_callback is_connected;
} CommonConnectionCallbacks;
typedef struct {
@ -94,6 +97,7 @@ typedef struct {
fc_make_connection_callback make_connection;
fc_close_connection_callback close_connection;
fc_destroy_connection_callback destroy_connection;
fc_is_connected_callback is_connected;
fc_rdma_get_buffer_callback get_buffer;
fc_rdma_request_by_buf1_callback request_by_buf1;
@ -103,6 +107,7 @@ typedef struct {
} RDMAConnectionCallbacks;
typedef struct {
bool inited;
CommonConnectionCallbacks common_callbacks[2];
RDMAConnectionCallbacks rdma_callbacks;
} ConnectionCallbacks;
@ -133,7 +138,7 @@ typedef struct tagConnectionManager {
typedef struct tagConnectionPool {
HashArray hash_array; //key is ip:port, value is ConnectionManager
pthread_mutex_t lock;
int connect_timeout;
int connect_timeout_ms;
int max_count_per_entry; //0 means no limit
/*
@ -274,76 +279,78 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
/**
* disconnect from the server
* parameters:
* pConnection: the connection
* conn: the connection
* return 0 for success, != 0 for error
*/
void conn_pool_disconnect_server(ConnectionInfo *pConnection);
void conn_pool_disconnect_server(ConnectionInfo *conn);
bool conn_pool_is_connected(ConnectionInfo *conn);
/**
* connect to the server
* parameters:
* pConnection: the connection
* service_name: the service name to log
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in milliseconds
* bind_ipaddr: the ip address to bind, NULL or empty for any
* log_connect_error: if log error info when connect fail
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 for success, != 0 for error
*/
int conn_pool_connect_server_ex1(ConnectionInfo *conn,
const char *service_name, const int connect_timeout,
const char *service_name, const int connect_timeout_ms,
const char *bind_ipaddr, const bool log_connect_error);
/**
* connect to the server
* parameters:
* pConnection: the connection
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in milliseconds
* bind_ipaddr: the ip address to bind, NULL or empty for any
* log_connect_error: if log error info when connect fail
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 for success, != 0 for error
*/
static inline int conn_pool_connect_server_ex(ConnectionInfo *pConnection,
const int connect_timeout, const char *bind_ipaddr,
const int connect_timeout_ms, const char *bind_ipaddr,
const bool log_connect_error)
{
const char *service_name = NULL;
return conn_pool_connect_server_ex1(pConnection, service_name,
connect_timeout, bind_ipaddr, log_connect_error);
connect_timeout_ms, bind_ipaddr, log_connect_error);
}
/**
* connect to the server
* parameters:
* pConnection: the connection
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in seconds
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 for success, != 0 for error
*/
static inline int conn_pool_connect_server(ConnectionInfo *pConnection,
const int connect_timeout)
const int connect_timeout_ms)
{
const char *service_name = NULL;
const char *bind_ipaddr = NULL;
return conn_pool_connect_server_ex1(pConnection, service_name,
connect_timeout, bind_ipaddr, true);
connect_timeout_ms, bind_ipaddr, true);
}
/**
* connect to the server
* parameters:
* pConnection: the connection
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in seconds
* return 0 for success, != 0 for error
*/
static inline int conn_pool_connect_server_anyway(ConnectionInfo *pConnection,
const int connect_timeout)
const int connect_timeout_ms)
{
const char *service_name = NULL;
const char *bind_ipaddr = NULL;
pConnection->sock = -1;
return conn_pool_connect_server_ex1(pConnection, service_name,
connect_timeout, bind_ipaddr, true);
connect_timeout_ms, bind_ipaddr, true);
}
/**
@ -422,6 +429,25 @@ static inline int conn_pool_compare_ip_and_port(const char *ip1,
return port1 - port2;
}
ConnectionInfo *conn_pool_alloc_connection_ex(
const FCCommunicationType comm_type,
const int extra_data_size,
const ConnectionExtraParams *extra_params,
int *err_no);
static inline ConnectionInfo *conn_pool_alloc_connection(
const FCCommunicationType comm_type,
const ConnectionExtraParams *extra_params,
int *err_no)
{
const int extra_data_size = 0;
return conn_pool_alloc_connection_ex(comm_type,
extra_data_size, extra_params, err_no);
}
int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params,
struct fc_server_config *server_cfg, const int server_group_index);
static inline const char *fc_comm_type_str(const FCCommunicationType type)
{
switch (type) {

View File

@ -1571,63 +1571,6 @@ void fc_server_to_log(FCServerConfig *ctx)
fc_server_log_servers(ctx);
}
ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array,
const char *service_name, const int connect_timeout,
const char *bind_ipaddr, const bool log_connect_error, int *err_no)
{
FCAddressInfo **current;
FCAddressInfo **addr;
FCAddressInfo **end;
if (addr_array->count <= 0) {
*err_no = ENOENT;
return NULL;
}
current = addr_array->addrs + addr_array->index;
if ((*current)->conn.sock >= 0) {
return &(*current)->conn;
}
if ((*err_no=conn_pool_connect_server_ex1(&(*current)->conn,
service_name, connect_timeout, bind_ipaddr,
log_connect_error)) == 0)
{
return &(*current)->conn;
}
if (addr_array->count == 1) {
return NULL;
}
end = addr_array->addrs + addr_array->count;
for (addr=addr_array->addrs; addr<end; addr++) {
if (addr == current) {
continue;
}
if ((*err_no=conn_pool_connect_server_ex1(&(*addr)->conn,
service_name, connect_timeout, bind_ipaddr,
log_connect_error)) == 0)
{
addr_array->index = addr - addr_array->addrs;
return &(*addr)->conn;
}
}
return NULL;
}
void fc_server_disconnect(FCAddressPtrArray *addr_array)
{
FCAddressInfo **current;
current = addr_array->addrs + addr_array->index;
if ((*current)->conn.sock >= 0) {
close((*current)->conn.sock);
(*current)->conn.sock = -1;
}
}
int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
ConnectionInfo *conn, const char *service_name,
const int connect_timeout, const char *bind_ipaddr,
@ -1643,10 +1586,11 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
}
current = addr_array->addrs + addr_array->index;
*conn = (*current)->conn;
conn->sock = -1;
if ((result=conn_pool_connect_server_ex1(conn,
service_name, connect_timeout,
conn_pool_set_server_info(conn, (*current)->conn.ip_addr,
(*current)->conn.port);
conn->comm_type = (*current)->conn.comm_type;
if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
make_connection(conn, service_name, connect_timeout * 1000,
bind_ipaddr, log_connect_error)) == 0)
{
return 0;
@ -1662,10 +1606,10 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
continue;
}
*conn = (*addr)->conn;
conn->sock = -1;
if ((result=conn_pool_connect_server_ex1(conn,
service_name, connect_timeout,
conn_pool_set_server_info(conn, (*addr)->conn.ip_addr,
(*addr)->conn.port);
if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
make_connection(conn, service_name, connect_timeout * 1000,
bind_ipaddr, log_connect_error)) == 0)
{
addr_array->index = addr - addr_array->addrs;

View File

@ -113,7 +113,7 @@ typedef struct
FCServerMap *maps;
} FCServerMapArray;
typedef struct
typedef struct fc_server_config
{
int default_port;
int min_hosts_each_group;
@ -223,17 +223,6 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer);
void fc_server_to_log(FCServerConfig *ctx);
ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array,
const char *service_name, const int connect_timeout,
const char *bind_ipaddr, const bool log_connect_error, int *err_no);
#define fc_server_check_connect(addr_array, service_name, \
connect_timeout, err_no) \
fc_server_check_connect_ex(addr_array, service_name, \
connect_timeout, NULL, true, err_no)
void fc_server_disconnect(FCAddressPtrArray *addr_array);
const FCAddressInfo *fc_server_get_address_by_peer(
FCAddressPtrArray *addr_array, const char *peer_ip);