diff --git a/src/connection_pool.c b/src/connection_pool.c index 2aa4510..9b1734b 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -21,11 +21,14 @@ #include "sockopt.h" #include "shared_func.h" #include "sched_thread.h" +#include "server_id_func.h" #include "connection_pool.h" ConnectionCallbacks g_connection_callbacks = { - {{conn_pool_connect_server_ex1, conn_pool_disconnect_server}, - {NULL, NULL}}, {NULL} + false, {{conn_pool_connect_server_ex1, + conn_pool_disconnect_server, + conn_pool_is_connected}, + {NULL, NULL, NULL}}, {NULL} }; static int node_init_for_socket(ConnectionNode *node, @@ -61,7 +64,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, { return result; } - cp->connect_timeout = connect_timeout; + cp->connect_timeout_ms = connect_timeout * 1000; cp->max_count_per_entry = max_count_per_entry; cp->max_idle_time = max_idle_time; cp->extra_data_size = extra_data_size; @@ -139,17 +142,22 @@ void conn_pool_destroy(ConnectionPool *cp) pthread_mutex_destroy(&cp->lock); } -void conn_pool_disconnect_server(ConnectionInfo *pConnection) +void conn_pool_disconnect_server(ConnectionInfo *conn) { - if (pConnection->sock >= 0) - { - close(pConnection->sock); - pConnection->sock = -1; - } + if (conn->sock >= 0) + { + close(conn->sock); + conn->sock = -1; + } +} + +bool conn_pool_is_connected(ConnectionInfo *conn) +{ + return (conn->sock >= 0); } int conn_pool_connect_server_ex1(ConnectionInfo *conn, - const char *service_name, const int connect_timeout, + const char *service_name, const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error) { int result; @@ -166,7 +174,7 @@ int conn_pool_connect_server_ex1(ConnectionInfo *conn, } if ((result=connectserverbyip_nb(conn->sock, conn->ip_addr, - conn->port, connect_timeout)) != 0) + conn->port, connect_timeout_ms / 1000)) != 0) { if (log_connect_error) { @@ -308,7 +316,7 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, node->conn->validate_flag = false; *err_no = G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. make_connection(node->conn, service_name, - cp->connect_timeout, NULL, true); + cp->connect_timeout_ms, NULL, true); if (*err_no == 0 && cp->connect_done_callback.func != NULL) { *err_no = cp->connect_done_callback.func(node->conn, @@ -586,6 +594,10 @@ int conn_pool_global_init_for_rdma() const char *library = "libfastrdma.so"; void *dlhandle; + if (g_connection_callbacks.inited) { + return 0; + } + dlhandle = dlopen(library, RTLD_LAZY); if (dlhandle == NULL) { logError("file: "__FILE__", line: %d, " @@ -598,6 +610,8 @@ int conn_pool_global_init_for_rdma() make_connection); LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma], close_connection); + LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma], + is_connected); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, alloc_pd); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_connection_size); @@ -605,11 +619,77 @@ int conn_pool_global_init_for_rdma() LOAD_API(G_RDMA_CONNECTION_CALLBACKS, make_connection); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, close_connection); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, destroy_connection); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, is_connected); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_buffer); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf1); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf2); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_iov); LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_mix); + g_connection_callbacks.inited = true; return 0; } + +ConnectionInfo *conn_pool_alloc_connection_ex( + const FCCommunicationType comm_type, + const int extra_data_size, + const ConnectionExtraParams *extra_params, + int *err_no) +{ + ConnectionInfo *conn; + int bytes; + + if (comm_type == fc_comm_type_rdma) { + bytes = sizeof(ConnectionInfo) + extra_data_size + + G_RDMA_CONNECTION_CALLBACKS.get_connection_size(); + } else { + bytes = sizeof(ConnectionInfo) + extra_data_size; + } + if ((conn=fc_malloc(bytes)) == NULL) { + *err_no = ENOMEM; + return NULL; + } + memset(conn, 0, bytes); + + if (comm_type == fc_comm_type_rdma) { + conn->arg1 = conn->args + extra_data_size; + if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection( + conn, extra_params->buffer_size, + extra_params->pd)) != 0) + { + free(conn); + return NULL; + } + } else { + *err_no = 0; + } + + conn->comm_type = comm_type; + return conn; +} + +int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params, + struct fc_server_config *server_cfg, const int server_group_index) +{ + const int padding_size = 1024; + FCServerGroupInfo *server_group; + FCServerInfo *first_server; + int result; + + if ((server_group=fc_server_get_group_by_index(server_cfg, + server_group_index)) == NULL) + { + return ENOENT; + } + + if (server_group->comm_type == fc_comm_type_sock) { + return 0; + } else { + first_server = FC_SID_SERVERS(*server_cfg); + extra_params->buffer_size = server_group->buffer_size + padding_size; + extra_params->pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS. + alloc_pd, &first_server->group_addrs[server_group_index]. + address_array, &result); + return result; + } +} diff --git a/src/connection_pool.h b/src/connection_pool.h index f3b5ef4..466232f 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -57,6 +57,7 @@ typedef struct { char args[0]; //for extra data } ConnectionInfo; +struct fc_server_config; struct ibv_pd; typedef struct ibv_pd *(*fc_alloc_pd_callback)(const char **ip_addrs, const int count, const int port); @@ -66,6 +67,7 @@ typedef int (*fc_init_connection_callback)(ConnectionInfo *conn, typedef int (*fc_make_connection_callback)(ConnectionInfo *conn, const char *service_name, const int timeout_ms, const char *bind_ipaddr, const bool log_connect_error); +typedef bool (*fc_is_connected_callback)(ConnectionInfo *conn); typedef void (*fc_close_connection_callback)(ConnectionInfo *conn); typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn); @@ -85,6 +87,7 @@ typedef int (*fc_rdma_request_by_mix_callback)(ConnectionInfo *conn, typedef struct { fc_make_connection_callback make_connection; fc_close_connection_callback close_connection; + fc_is_connected_callback is_connected; } CommonConnectionCallbacks; typedef struct { @@ -94,6 +97,7 @@ typedef struct { fc_make_connection_callback make_connection; fc_close_connection_callback close_connection; fc_destroy_connection_callback destroy_connection; + fc_is_connected_callback is_connected; fc_rdma_get_buffer_callback get_buffer; fc_rdma_request_by_buf1_callback request_by_buf1; @@ -103,6 +107,7 @@ typedef struct { } RDMAConnectionCallbacks; typedef struct { + bool inited; CommonConnectionCallbacks common_callbacks[2]; RDMAConnectionCallbacks rdma_callbacks; } ConnectionCallbacks; @@ -133,7 +138,7 @@ typedef struct tagConnectionManager { typedef struct tagConnectionPool { HashArray hash_array; //key is ip:port, value is ConnectionManager pthread_mutex_t lock; - int connect_timeout; + int connect_timeout_ms; int max_count_per_entry; //0 means no limit /* @@ -274,76 +279,78 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, /** * disconnect from the server * parameters: -* pConnection: the connection +* conn: the connection * return 0 for success, != 0 for error */ -void conn_pool_disconnect_server(ConnectionInfo *pConnection); +void conn_pool_disconnect_server(ConnectionInfo *conn); + +bool conn_pool_is_connected(ConnectionInfo *conn); /** * connect to the server * parameters: * pConnection: the connection * service_name: the service name to log -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in milliseconds * bind_ipaddr: the ip address to bind, NULL or empty for any * log_connect_error: if log error info when connect fail * NOTE: pConnection->sock will be closed when it >= 0 before connect * return 0 for success, != 0 for error */ int conn_pool_connect_server_ex1(ConnectionInfo *conn, - const char *service_name, const int connect_timeout, + const char *service_name, const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error); /** * connect to the server * parameters: * pConnection: the connection -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in milliseconds * bind_ipaddr: the ip address to bind, NULL or empty for any * log_connect_error: if log error info when connect fail * NOTE: pConnection->sock will be closed when it >= 0 before connect * return 0 for success, != 0 for error */ static inline int conn_pool_connect_server_ex(ConnectionInfo *pConnection, - const int connect_timeout, const char *bind_ipaddr, + const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error) { const char *service_name = NULL; return conn_pool_connect_server_ex1(pConnection, service_name, - connect_timeout, bind_ipaddr, log_connect_error); + connect_timeout_ms, bind_ipaddr, log_connect_error); } /** * connect to the server * parameters: * pConnection: the connection -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in seconds * NOTE: pConnection->sock will be closed when it >= 0 before connect * return 0 for success, != 0 for error */ static inline int conn_pool_connect_server(ConnectionInfo *pConnection, - const int connect_timeout) + const int connect_timeout_ms) { const char *service_name = NULL; const char *bind_ipaddr = NULL; return conn_pool_connect_server_ex1(pConnection, service_name, - connect_timeout, bind_ipaddr, true); + connect_timeout_ms, bind_ipaddr, true); } /** * connect to the server * parameters: * pConnection: the connection -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in seconds * return 0 for success, != 0 for error */ static inline int conn_pool_connect_server_anyway(ConnectionInfo *pConnection, - const int connect_timeout) + const int connect_timeout_ms) { const char *service_name = NULL; const char *bind_ipaddr = NULL; pConnection->sock = -1; return conn_pool_connect_server_ex1(pConnection, service_name, - connect_timeout, bind_ipaddr, true); + connect_timeout_ms, bind_ipaddr, true); } /** @@ -422,6 +429,25 @@ static inline int conn_pool_compare_ip_and_port(const char *ip1, return port1 - port2; } +ConnectionInfo *conn_pool_alloc_connection_ex( + const FCCommunicationType comm_type, + const int extra_data_size, + const ConnectionExtraParams *extra_params, + int *err_no); + +static inline ConnectionInfo *conn_pool_alloc_connection( + const FCCommunicationType comm_type, + const ConnectionExtraParams *extra_params, + int *err_no) +{ + const int extra_data_size = 0; + return conn_pool_alloc_connection_ex(comm_type, + extra_data_size, extra_params, err_no); +} + +int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params, + struct fc_server_config *server_cfg, const int server_group_index); + static inline const char *fc_comm_type_str(const FCCommunicationType type) { switch (type) { diff --git a/src/server_id_func.c b/src/server_id_func.c index 0c25028..ef7b96e 100644 --- a/src/server_id_func.c +++ b/src/server_id_func.c @@ -1571,63 +1571,6 @@ void fc_server_to_log(FCServerConfig *ctx) fc_server_log_servers(ctx); } -ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array, - const char *service_name, const int connect_timeout, - const char *bind_ipaddr, const bool log_connect_error, int *err_no) -{ - FCAddressInfo **current; - FCAddressInfo **addr; - FCAddressInfo **end; - - if (addr_array->count <= 0) { - *err_no = ENOENT; - return NULL; - } - - current = addr_array->addrs + addr_array->index; - if ((*current)->conn.sock >= 0) { - return &(*current)->conn; - } - - if ((*err_no=conn_pool_connect_server_ex1(&(*current)->conn, - service_name, connect_timeout, bind_ipaddr, - log_connect_error)) == 0) - { - return &(*current)->conn; - } - - if (addr_array->count == 1) { - return NULL; - } - - end = addr_array->addrs + addr_array->count; - for (addr=addr_array->addrs; addrconn, - service_name, connect_timeout, bind_ipaddr, - log_connect_error)) == 0) - { - addr_array->index = addr - addr_array->addrs; - return &(*addr)->conn; - } - } - - return NULL; -} - -void fc_server_disconnect(FCAddressPtrArray *addr_array) -{ - FCAddressInfo **current; - - current = addr_array->addrs + addr_array->index; - if ((*current)->conn.sock >= 0) { - close((*current)->conn.sock); - (*current)->conn.sock = -1; - } -} - int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, ConnectionInfo *conn, const char *service_name, const int connect_timeout, const char *bind_ipaddr, @@ -1643,10 +1586,11 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, } current = addr_array->addrs + addr_array->index; - *conn = (*current)->conn; - conn->sock = -1; - if ((result=conn_pool_connect_server_ex1(conn, - service_name, connect_timeout, + conn_pool_set_server_info(conn, (*current)->conn.ip_addr, + (*current)->conn.port); + conn->comm_type = (*current)->conn.comm_type; + if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + make_connection(conn, service_name, connect_timeout * 1000, bind_ipaddr, log_connect_error)) == 0) { return 0; @@ -1662,10 +1606,10 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, continue; } - *conn = (*addr)->conn; - conn->sock = -1; - if ((result=conn_pool_connect_server_ex1(conn, - service_name, connect_timeout, + conn_pool_set_server_info(conn, (*addr)->conn.ip_addr, + (*addr)->conn.port); + if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + make_connection(conn, service_name, connect_timeout * 1000, bind_ipaddr, log_connect_error)) == 0) { addr_array->index = addr - addr_array->addrs; diff --git a/src/server_id_func.h b/src/server_id_func.h index c034936..6255e0b 100644 --- a/src/server_id_func.h +++ b/src/server_id_func.h @@ -113,7 +113,7 @@ typedef struct FCServerMap *maps; } FCServerMapArray; -typedef struct +typedef struct fc_server_config { int default_port; int min_hosts_each_group; @@ -223,17 +223,6 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer); void fc_server_to_log(FCServerConfig *ctx); -ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array, - const char *service_name, const int connect_timeout, - const char *bind_ipaddr, const bool log_connect_error, int *err_no); - -#define fc_server_check_connect(addr_array, service_name, \ - connect_timeout, err_no) \ - fc_server_check_connect_ex(addr_array, service_name, \ - connect_timeout, NULL, true, err_no) - -void fc_server_disconnect(FCAddressPtrArray *addr_array); - const FCAddressInfo *fc_server_get_address_by_peer( FCAddressPtrArray *addr_array, const char *peer_ip);