connection_pool.[hc] support thread local for performance
parent
1c1cb6d5e7
commit
e0bbe89d23
3
HISTORY
3
HISTORY
|
|
@ -1,10 +1,11 @@
|
||||||
|
|
||||||
Version 1.70 2023-09-18
|
Version 1.70 2023-09-20
|
||||||
* get full mac address of infiniband NIC under Linux
|
* get full mac address of infiniband NIC under Linux
|
||||||
* struct fast_task_info add field conn for RDMA connection
|
* struct fast_task_info add field conn for RDMA connection
|
||||||
* server_id_func.[hc]: support communication type
|
* server_id_func.[hc]: support communication type
|
||||||
* connection_pool.[hc] support callbacks for RDMA
|
* connection_pool.[hc] support callbacks for RDMA
|
||||||
* nio thread data support busy_polling_callback
|
* nio thread data support busy_polling_callback
|
||||||
|
* connection_pool.[hc] support thread local for performance
|
||||||
|
|
||||||
Version 1.69 2023-08-05
|
Version 1.69 2023-08-05
|
||||||
* bugfixed: array_allocator_alloc MUST init the array
|
* bugfixed: array_allocator_alloc MUST init the array
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,33 @@ static int node_init_for_rdma(ConnectionNode *node,
|
||||||
node->conn = (ConnectionInfo *)(node + 1);
|
node->conn = (ConnectionInfo *)(node + 1);
|
||||||
node->conn->arg1 = node->conn->args + cp->extra_data_size;
|
node->conn->arg1 = node->conn->args + cp->extra_data_size;
|
||||||
return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn,
|
return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn,
|
||||||
cp->extra_params.buffer_size, cp->extra_params.pd);
|
cp->extra_params.rdma.buffer_size, cp->extra_params.rdma.pd);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cp_tls_destroy(void *ptr)
|
||||||
|
{
|
||||||
|
ConnectionThreadHashTable *htable;
|
||||||
|
ConnectionNode **pp;
|
||||||
|
ConnectionNode **end;
|
||||||
|
ConnectionNode *current;
|
||||||
|
ConnectionNode *node;
|
||||||
|
|
||||||
|
htable = ptr;
|
||||||
|
end = htable->buckets + htable->cp->extra_params.tls.htable_capacity;
|
||||||
|
for (pp=htable->buckets; pp<end; pp++) {
|
||||||
|
if (*pp == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = *pp;
|
||||||
|
do {
|
||||||
|
current = node;
|
||||||
|
node = node->next;
|
||||||
|
conn_pool_close_connection(htable->cp, current->conn);
|
||||||
|
} while (node != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
|
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
|
||||||
|
|
@ -89,8 +115,10 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
|
||||||
cp->extra_params = *extra_params;
|
cp->extra_params = *extra_params;
|
||||||
} else {
|
} else {
|
||||||
extra_connection_size = 0;
|
extra_connection_size = 0;
|
||||||
cp->extra_params.buffer_size = 0;
|
cp->extra_params.tls.enabled = false;
|
||||||
cp->extra_params.pd = NULL;
|
cp->extra_params.tls.htable_capacity = 0;
|
||||||
|
cp->extra_params.rdma.buffer_size = 0;
|
||||||
|
cp->extra_params.rdma.pd = NULL;
|
||||||
obj_init_func = (fast_mblock_object_init_func)node_init_for_socket;
|
obj_init_func = (fast_mblock_object_init_func)node_init_for_socket;
|
||||||
}
|
}
|
||||||
if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node",
|
if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node",
|
||||||
|
|
@ -101,6 +129,19 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logDebug("cp: %p, tls.enabled: %d, htable_capacity: %d",
|
||||||
|
cp, cp->extra_params.tls.enabled,
|
||||||
|
cp->extra_params.tls.htable_capacity);
|
||||||
|
|
||||||
|
if (cp->extra_params.tls.enabled) {
|
||||||
|
if ((result=pthread_key_create(&cp->tls_key, cp_tls_destroy)) != 0) {
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"pthread_key_create fail, errno: %d, error info: %s",
|
||||||
|
__LINE__, result, STRERROR(result));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return fc_hash_init(&(cp->hash_array), fc_simple_hash, init_capacity, 0.75);
|
return fc_hash_init(&(cp->hash_array), fc_simple_hash, init_capacity, 0.75);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -229,23 +270,21 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
|
||||||
static inline void conn_pool_get_key(const ConnectionInfo *conn,
|
static inline void conn_pool_get_key(const ConnectionInfo *conn,
|
||||||
char *key, int *key_len)
|
char *key, int *key_len)
|
||||||
{
|
{
|
||||||
*key_len = sprintf(key, "%s_%u", conn->ip_addr, conn->port);
|
*key_len = sprintf(key, "%s-%u", conn->ip_addr, conn->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
|
static ConnectionInfo *get_connection(ConnectionPool *cp,
|
||||||
const ConnectionInfo *conn, const char *service_name, int *err_no)
|
const ConnectionInfo *conn, const string_t *key,
|
||||||
|
const char *service_name, int *err_no)
|
||||||
{
|
{
|
||||||
char key[INET6_ADDRSTRLEN + 8];
|
|
||||||
int key_len;
|
|
||||||
ConnectionManager *cm;
|
ConnectionManager *cm;
|
||||||
ConnectionNode *node;
|
ConnectionNode *node;
|
||||||
ConnectionInfo *ci;
|
ConnectionInfo *ci;
|
||||||
time_t current_time;
|
time_t current_time;
|
||||||
|
|
||||||
conn_pool_get_key(conn, key, &key_len);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&cp->lock);
|
pthread_mutex_lock(&cp->lock);
|
||||||
cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len);
|
cm = (ConnectionManager *)fc_hash_find(
|
||||||
|
&cp->hash_array, key->str, key->len);
|
||||||
if (cm == NULL)
|
if (cm == NULL)
|
||||||
{
|
{
|
||||||
cm = (ConnectionManager *)fast_mblock_alloc_object(
|
cm = (ConnectionManager *)fast_mblock_alloc_object(
|
||||||
|
|
@ -268,7 +307,7 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
|
||||||
pthread_mutex_unlock(&cp->lock);
|
pthread_mutex_unlock(&cp->lock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
fc_hash_insert(&cp->hash_array, key, key_len, cm);
|
fc_hash_insert(&cp->hash_array, key->str, key->len, cm);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&cp->lock);
|
pthread_mutex_unlock(&cp->lock);
|
||||||
|
|
||||||
|
|
@ -410,18 +449,84 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
|
ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
|
||||||
const bool bForce)
|
const ConnectionInfo *conn, const char *service_name, int *err_no)
|
||||||
|
{
|
||||||
|
string_t key;
|
||||||
|
int bytes;
|
||||||
|
uint32_t hash_code;
|
||||||
|
ConnectionNode **bucket;
|
||||||
|
ConnectionNode *node;
|
||||||
|
ConnectionInfo *ci;
|
||||||
|
char key_buff[INET6_ADDRSTRLEN + 8];
|
||||||
|
ConnectionThreadHashTable *htable;
|
||||||
|
|
||||||
|
key.str = key_buff;
|
||||||
|
conn_pool_get_key(conn, key.str, &key.len);
|
||||||
|
if (!cp->extra_params.tls.enabled) {
|
||||||
|
return get_connection(cp, conn, &key, service_name, err_no);
|
||||||
|
}
|
||||||
|
|
||||||
|
htable = pthread_getspecific(cp->tls_key);
|
||||||
|
if (htable == NULL) {
|
||||||
|
bytes = sizeof(ConnectionThreadHashTable) + sizeof(ConnectionNode *) *
|
||||||
|
cp->extra_params.tls.htable_capacity;
|
||||||
|
htable = fc_malloc(bytes);
|
||||||
|
memset(htable, 0, bytes);
|
||||||
|
|
||||||
|
htable->buckets = (ConnectionNode **)(htable + 1);
|
||||||
|
htable->cp = cp;
|
||||||
|
if ((*err_no=pthread_setspecific(cp->tls_key, htable)) != 0) {
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"pthread_setspecific fail, errno: %d, error info: %s",
|
||||||
|
__LINE__, *err_no, STRERROR(*err_no));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hash_code = fc_simple_hash(key.str, key.len);
|
||||||
|
bucket = htable->buckets + hash_code % cp->
|
||||||
|
extra_params.tls.htable_capacity;
|
||||||
|
if (*bucket == NULL) {
|
||||||
|
node = NULL;
|
||||||
|
} else if (FC_CONNECTION_SERVER_EQUAL1(*conn, *(*bucket)->conn)) {
|
||||||
|
node = *bucket;
|
||||||
|
} else {
|
||||||
|
node = (*bucket)->next;
|
||||||
|
while (node != NULL) {
|
||||||
|
if (FC_CONNECTION_SERVER_EQUAL1(*conn, *node->conn)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
node = node->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node != NULL) {
|
||||||
|
*err_no = 0;
|
||||||
|
return node->conn;
|
||||||
|
} else {
|
||||||
|
if ((ci=get_connection(cp, conn, &key, service_name,
|
||||||
|
err_no)) == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = (ConnectionNode *)((char *)ci - sizeof(ConnectionNode));
|
||||||
|
node->next = *bucket;
|
||||||
|
*bucket = node;
|
||||||
|
*err_no = 0;
|
||||||
|
return ci;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
||||||
|
const string_t *key, const bool bForce)
|
||||||
{
|
{
|
||||||
char key[INET6_ADDRSTRLEN + 8];
|
|
||||||
int key_len;
|
|
||||||
ConnectionManager *cm;
|
ConnectionManager *cm;
|
||||||
ConnectionNode *node;
|
ConnectionNode *node;
|
||||||
|
|
||||||
conn_pool_get_key(conn, key, &key_len);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&cp->lock);
|
pthread_mutex_lock(&cp->lock);
|
||||||
cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len);
|
cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key->str, key->len);
|
||||||
pthread_mutex_unlock(&cp->lock);
|
pthread_mutex_unlock(&cp->lock);
|
||||||
if (cm == NULL)
|
if (cm == NULL)
|
||||||
{
|
{
|
||||||
|
|
@ -431,7 +536,7 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
|
||||||
return ENOENT;
|
return ENOENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
node = (ConnectionNode *)(((char *)conn) - sizeof(ConnectionNode));
|
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
|
||||||
if (node->manager != cm)
|
if (node->manager != cm)
|
||||||
{
|
{
|
||||||
logError("file: "__FILE__", line: %d, " \
|
logError("file: "__FILE__", line: %d, " \
|
||||||
|
|
@ -480,6 +585,64 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int conn_pool_close_connection_ex(ConnectionPool *cp,
|
||||||
|
ConnectionInfo *conn, const bool bForce)
|
||||||
|
{
|
||||||
|
string_t key;
|
||||||
|
uint32_t hash_code;
|
||||||
|
ConnectionNode **bucket;
|
||||||
|
ConnectionNode *previous;
|
||||||
|
ConnectionNode *node;
|
||||||
|
char key_buff[INET6_ADDRSTRLEN + 8];
|
||||||
|
ConnectionThreadHashTable *htable;
|
||||||
|
|
||||||
|
key.str = key_buff;
|
||||||
|
conn_pool_get_key(conn, key.str, &key.len);
|
||||||
|
if (!cp->extra_params.tls.enabled) {
|
||||||
|
return close_connection(cp, conn, &key, bForce);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!bForce) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
htable = pthread_getspecific(cp->tls_key);
|
||||||
|
if (htable == NULL) {
|
||||||
|
return close_connection(cp, conn, &key, bForce);
|
||||||
|
}
|
||||||
|
|
||||||
|
hash_code = fc_simple_hash(key.str, key.len);
|
||||||
|
bucket = htable->buckets + hash_code % cp->
|
||||||
|
extra_params.tls.htable_capacity;
|
||||||
|
if (*bucket == NULL) {
|
||||||
|
node = NULL;
|
||||||
|
previous = NULL;
|
||||||
|
} else if ((*bucket)->conn == conn) {
|
||||||
|
node = *bucket;
|
||||||
|
previous = NULL;
|
||||||
|
} else {
|
||||||
|
previous = *bucket;
|
||||||
|
node = (*bucket)->next;
|
||||||
|
while (node != NULL) {
|
||||||
|
if (node->conn == conn) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
previous = node;
|
||||||
|
node = node->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node != NULL) {
|
||||||
|
if (previous == NULL) {
|
||||||
|
*bucket = node->next;
|
||||||
|
} else {
|
||||||
|
previous->next = node->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return close_connection(cp, conn, &key, bForce);
|
||||||
|
}
|
||||||
|
|
||||||
static int _conn_count_walk(const int index, const HashData *data, void *args)
|
static int _conn_count_walk(const int index, const HashData *data, void *args)
|
||||||
{
|
{
|
||||||
int *count;
|
int *count;
|
||||||
|
|
@ -656,8 +819,8 @@ ConnectionInfo *conn_pool_alloc_connection_ex(
|
||||||
if (comm_type == fc_comm_type_rdma) {
|
if (comm_type == fc_comm_type_rdma) {
|
||||||
conn->arg1 = conn->args + extra_data_size;
|
conn->arg1 = conn->args + extra_data_size;
|
||||||
if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection(
|
if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection(
|
||||||
conn, extra_params->buffer_size,
|
conn, extra_params->rdma.buffer_size,
|
||||||
extra_params->pd)) != 0)
|
extra_params->rdma.pd)) != 0)
|
||||||
{
|
{
|
||||||
free(conn);
|
free(conn);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
@ -685,13 +848,18 @@ int conn_pool_set_rdma_extra_params(ConnectionExtraParams *extra_params,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server_group->comm_type == fc_comm_type_sock) {
|
if (server_group->comm_type == fc_comm_type_sock) {
|
||||||
extra_params->buffer_size = 0;
|
extra_params->tls.enabled = false;
|
||||||
extra_params->pd = NULL;
|
extra_params->tls.htable_capacity = 0;
|
||||||
|
extra_params->rdma.buffer_size = 0;
|
||||||
|
extra_params->rdma.pd = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
extra_params->tls.enabled = true;
|
||||||
|
extra_params->tls.htable_capacity = fc_ceil_prime(
|
||||||
|
FC_SID_SERVER_COUNT(*server_cfg));
|
||||||
first_server = FC_SID_SERVERS(*server_cfg);
|
first_server = FC_SID_SERVERS(*server_cfg);
|
||||||
extra_params->buffer_size = server_cfg->buffer_size + padding_size;
|
extra_params->rdma.buffer_size = server_cfg->buffer_size + padding_size;
|
||||||
extra_params->pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS.
|
extra_params->rdma.pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS.
|
||||||
alloc_pd, &first_server->group_addrs[server_group_index].
|
alloc_pd, &first_server->group_addrs[server_group_index].
|
||||||
address_array, &result);
|
address_array, &result);
|
||||||
return result;
|
return result;
|
||||||
|
|
|
||||||
|
|
@ -113,8 +113,15 @@ typedef struct {
|
||||||
} ConnectionCallbacks;
|
} ConnectionCallbacks;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int buffer_size;
|
struct {
|
||||||
struct ibv_pd *pd;
|
bool enabled;
|
||||||
|
int htable_capacity;
|
||||||
|
} tls; //for thread local
|
||||||
|
|
||||||
|
struct {
|
||||||
|
int buffer_size;
|
||||||
|
struct ibv_pd *pd;
|
||||||
|
} rdma;
|
||||||
} ConnectionExtraParams;
|
} ConnectionExtraParams;
|
||||||
|
|
||||||
typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args);
|
typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args);
|
||||||
|
|
@ -135,8 +142,15 @@ typedef struct tagConnectionManager {
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
} ConnectionManager;
|
} ConnectionManager;
|
||||||
|
|
||||||
|
struct tagConnectionPool;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ConnectionNode **buckets;
|
||||||
|
struct tagConnectionPool *cp;
|
||||||
|
} ConnectionThreadHashTable;
|
||||||
|
|
||||||
typedef struct tagConnectionPool {
|
typedef struct tagConnectionPool {
|
||||||
HashArray hash_array; //key is ip:port, value is ConnectionManager
|
HashArray hash_array; //key is ip-port, value is ConnectionManager
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
int connect_timeout_ms;
|
int connect_timeout_ms;
|
||||||
int max_count_per_entry; //0 means no limit
|
int max_count_per_entry; //0 means no limit
|
||||||
|
|
@ -163,6 +177,7 @@ typedef struct tagConnectionPool {
|
||||||
|
|
||||||
int extra_data_size;
|
int extra_data_size;
|
||||||
ConnectionExtraParams extra_params;
|
ConnectionExtraParams extra_params;
|
||||||
|
pthread_key_t tls_key; //for ConnectionThreadHashTable
|
||||||
} ConnectionPool;
|
} ConnectionPool;
|
||||||
|
|
||||||
extern ConnectionCallbacks g_connection_callbacks;
|
extern ConnectionCallbacks g_connection_callbacks;
|
||||||
|
|
@ -273,8 +288,8 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
|
||||||
* bForce: set true to close the socket, else only push back to connection pool
|
* bForce: set true to close the socket, else only push back to connection pool
|
||||||
* return 0 for success, != 0 for error
|
* return 0 for success, != 0 for error
|
||||||
*/
|
*/
|
||||||
int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
|
int conn_pool_close_connection_ex(ConnectionPool *cp,
|
||||||
const bool bForce);
|
ConnectionInfo *conn, const bool bForce);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* disconnect from the server
|
* disconnect from the server
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue