diff --git a/HISTORY b/HISTORY index ee7b7fe..77590d2 100644 --- a/HISTORY +++ b/HISTORY @@ -1,10 +1,11 @@ -Version 1.70 2023-09-18 +Version 1.70 2023-09-20 * get full mac address of infiniband NIC under Linux * struct fast_task_info add field conn for RDMA connection * server_id_func.[hc]: support communication type * connection_pool.[hc] support callbacks for RDMA * nio thread data support busy_polling_callback + * connection_pool.[hc] support thread local for performance Version 1.69 2023-08-05 * bugfixed: array_allocator_alloc MUST init the array diff --git a/src/connection_pool.c b/src/connection_pool.c index bc2602e..f2c5835 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -44,7 +44,33 @@ static int node_init_for_rdma(ConnectionNode *node, node->conn = (ConnectionInfo *)(node + 1); node->conn->arg1 = node->conn->args + cp->extra_data_size; return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn, - cp->extra_params.buffer_size, cp->extra_params.pd); + cp->extra_params.rdma.buffer_size, cp->extra_params.rdma.pd); +} + +static void cp_tls_destroy(void *ptr) +{ + ConnectionThreadHashTable *htable; + ConnectionNode **pp; + ConnectionNode **end; + ConnectionNode *current; + ConnectionNode *node; + + htable = ptr; + end = htable->buckets + htable->cp->extra_params.tls.htable_capacity; + for (pp=htable->buckets; ppnext; + conn_pool_close_connection(htable->cp, current->conn); + } while (node != NULL); + } + + free(ptr); } int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, @@ -89,8 +115,10 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, cp->extra_params = *extra_params; } else { extra_connection_size = 0; - cp->extra_params.buffer_size = 0; - cp->extra_params.pd = NULL; + cp->extra_params.tls.enabled = false; + cp->extra_params.tls.htable_capacity = 0; + cp->extra_params.rdma.buffer_size = 0; + cp->extra_params.rdma.pd = NULL; obj_init_func = (fast_mblock_object_init_func)node_init_for_socket; } if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node", @@ -101,6 +129,19 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, return result; } + logDebug("cp: %p, tls.enabled: %d, htable_capacity: %d", + cp, cp->extra_params.tls.enabled, + cp->extra_params.tls.htable_capacity); + + if (cp->extra_params.tls.enabled) { + if ((result=pthread_key_create(&cp->tls_key, cp_tls_destroy)) != 0) { + logError("file: "__FILE__", line: %d, " + "pthread_key_create fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + } + return fc_hash_init(&(cp->hash_array), fc_simple_hash, init_capacity, 0.75); } @@ -229,23 +270,21 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn, static inline void conn_pool_get_key(const ConnectionInfo *conn, char *key, int *key_len) { - *key_len = sprintf(key, "%s_%u", conn->ip_addr, conn->port); + *key_len = sprintf(key, "%s-%u", conn->ip_addr, conn->port); } -ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, - const ConnectionInfo *conn, const char *service_name, int *err_no) +static ConnectionInfo *get_connection(ConnectionPool *cp, + const ConnectionInfo *conn, const string_t *key, + const char *service_name, int *err_no) { - char key[INET6_ADDRSTRLEN + 8]; - int key_len; ConnectionManager *cm; ConnectionNode *node; ConnectionInfo *ci; time_t current_time; - conn_pool_get_key(conn, key, &key_len); - pthread_mutex_lock(&cp->lock); - cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len); + cm = (ConnectionManager *)fc_hash_find( + &cp->hash_array, key->str, key->len); if (cm == NULL) { cm = (ConnectionManager *)fast_mblock_alloc_object( @@ -268,7 +307,7 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, pthread_mutex_unlock(&cp->lock); return NULL; } - fc_hash_insert(&cp->hash_array, key, key_len, cm); + fc_hash_insert(&cp->hash_array, key->str, key->len, cm); } pthread_mutex_unlock(&cp->lock); @@ -410,18 +449,84 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, } } -int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, - const bool bForce) +ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, + const ConnectionInfo *conn, const char *service_name, int *err_no) +{ + string_t key; + int bytes; + uint32_t hash_code; + ConnectionNode **bucket; + ConnectionNode *node; + ConnectionInfo *ci; + char key_buff[INET6_ADDRSTRLEN + 8]; + ConnectionThreadHashTable *htable; + + key.str = key_buff; + conn_pool_get_key(conn, key.str, &key.len); + if (!cp->extra_params.tls.enabled) { + return get_connection(cp, conn, &key, service_name, err_no); + } + + htable = pthread_getspecific(cp->tls_key); + if (htable == NULL) { + bytes = sizeof(ConnectionThreadHashTable) + sizeof(ConnectionNode *) * + cp->extra_params.tls.htable_capacity; + htable = fc_malloc(bytes); + memset(htable, 0, bytes); + + htable->buckets = (ConnectionNode **)(htable + 1); + htable->cp = cp; + if ((*err_no=pthread_setspecific(cp->tls_key, htable)) != 0) { + logError("file: "__FILE__", line: %d, " + "pthread_setspecific fail, errno: %d, error info: %s", + __LINE__, *err_no, STRERROR(*err_no)); + return NULL; + } + } + + hash_code = fc_simple_hash(key.str, key.len); + bucket = htable->buckets + hash_code % cp-> + extra_params.tls.htable_capacity; + if (*bucket == NULL) { + node = NULL; + } else if (FC_CONNECTION_SERVER_EQUAL1(*conn, *(*bucket)->conn)) { + node = *bucket; + } else { + node = (*bucket)->next; + while (node != NULL) { + if (FC_CONNECTION_SERVER_EQUAL1(*conn, *node->conn)) { + break; + } + node = node->next; + } + } + + if (node != NULL) { + *err_no = 0; + return node->conn; + } else { + if ((ci=get_connection(cp, conn, &key, service_name, + err_no)) == NULL) + { + return NULL; + } + + node = (ConnectionNode *)((char *)ci - sizeof(ConnectionNode)); + node->next = *bucket; + *bucket = node; + *err_no = 0; + return ci; + } +} + +static int close_connection(ConnectionPool *cp, ConnectionInfo *conn, + const string_t *key, const bool bForce) { - char key[INET6_ADDRSTRLEN + 8]; - int key_len; ConnectionManager *cm; ConnectionNode *node; - conn_pool_get_key(conn, key, &key_len); - pthread_mutex_lock(&cp->lock); - cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len); + cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key->str, key->len); pthread_mutex_unlock(&cp->lock); if (cm == NULL) { @@ -431,7 +536,7 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, return ENOENT; } - node = (ConnectionNode *)(((char *)conn) - sizeof(ConnectionNode)); + node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode)); if (node->manager != cm) { logError("file: "__FILE__", line: %d, " \ @@ -480,6 +585,64 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, return 0; } +int conn_pool_close_connection_ex(ConnectionPool *cp, + ConnectionInfo *conn, const bool bForce) +{ + string_t key; + uint32_t hash_code; + ConnectionNode **bucket; + ConnectionNode *previous; + ConnectionNode *node; + char key_buff[INET6_ADDRSTRLEN + 8]; + ConnectionThreadHashTable *htable; + + key.str = key_buff; + conn_pool_get_key(conn, key.str, &key.len); + if (!cp->extra_params.tls.enabled) { + return close_connection(cp, conn, &key, bForce); + } + + if (!bForce) { + return 0; + } + + htable = pthread_getspecific(cp->tls_key); + if (htable == NULL) { + return close_connection(cp, conn, &key, bForce); + } + + hash_code = fc_simple_hash(key.str, key.len); + bucket = htable->buckets + hash_code % cp-> + extra_params.tls.htable_capacity; + if (*bucket == NULL) { + node = NULL; + previous = NULL; + } else if ((*bucket)->conn == conn) { + node = *bucket; + previous = NULL; + } else { + previous = *bucket; + node = (*bucket)->next; + while (node != NULL) { + if (node->conn == conn) { + break; + } + previous = node; + node = node->next; + } + } + + if (node != NULL) { + if (previous == NULL) { + *bucket = node->next; + } else { + previous->next = node->next; + } + } + + return close_connection(cp, conn, &key, bForce); +} + static int _conn_count_walk(const int index, const HashData *data, void *args) { int *count; @@ -656,8 +819,8 @@ ConnectionInfo *conn_pool_alloc_connection_ex( if (comm_type == fc_comm_type_rdma) { conn->arg1 = conn->args + extra_data_size; if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection( - conn, extra_params->buffer_size, - extra_params->pd)) != 0) + conn, extra_params->rdma.buffer_size, + extra_params->rdma.pd)) != 0) { free(conn); return NULL; @@ -685,13 +848,18 @@ int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params, } if (server_group->comm_type == fc_comm_type_sock) { - extra_params->buffer_size = 0; - extra_params->pd = NULL; + extra_params->tls.enabled = false; + extra_params->tls.htable_capacity = 0; + extra_params->rdma.buffer_size = 0; + extra_params->rdma.pd = NULL; return 0; } else { + extra_params->tls.enabled = true; + extra_params->tls.htable_capacity = fc_ceil_prime( + FC_SID_SERVER_COUNT(*server_cfg)); first_server = FC_SID_SERVERS(*server_cfg); - extra_params->buffer_size = server_cfg->buffer_size + padding_size; - extra_params->pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS. + extra_params->rdma.buffer_size = server_cfg->buffer_size + padding_size; + extra_params->rdma.pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS. alloc_pd, &first_server->group_addrs[server_group_index]. address_array, &result); return result; diff --git a/src/connection_pool.h b/src/connection_pool.h index 466232f..b38f33b 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -113,8 +113,15 @@ typedef struct { } ConnectionCallbacks; typedef struct { - int buffer_size; - struct ibv_pd *pd; + struct { + bool enabled; + int htable_capacity; + } tls; //for thread local + + struct { + int buffer_size; + struct ibv_pd *pd; + } rdma; } ConnectionExtraParams; typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args); @@ -135,8 +142,15 @@ typedef struct tagConnectionManager { pthread_mutex_t lock; } ConnectionManager; +struct tagConnectionPool; + +typedef struct { + ConnectionNode **buckets; + struct tagConnectionPool *cp; +} ConnectionThreadHashTable; + typedef struct tagConnectionPool { - HashArray hash_array; //key is ip:port, value is ConnectionManager + HashArray hash_array; //key is ip-port, value is ConnectionManager pthread_mutex_t lock; int connect_timeout_ms; int max_count_per_entry; //0 means no limit @@ -163,6 +177,7 @@ typedef struct tagConnectionPool { int extra_data_size; ConnectionExtraParams extra_params; + pthread_key_t tls_key; //for ConnectionThreadHashTable } ConnectionPool; extern ConnectionCallbacks g_connection_callbacks; @@ -273,8 +288,8 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, * bForce: set true to close the socket, else only push back to connection pool * return 0 for success, != 0 for error */ -int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, - const bool bForce); +int conn_pool_close_connection_ex(ConnectionPool *cp, + ConnectionInfo *conn, const bool bForce); /** * disconnect from the server