From cf16c41054c98b93369948e8e610fc265ac0cb61 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 3 Aug 2025 15:18:23 +0800 Subject: [PATCH] src/connection_pool.[hc]: restore pthread mutex lock --- HISTORY | 3 +- src/connection_pool.c | 290 ++++++++++++++++-------------------------- src/connection_pool.h | 18 +-- 3 files changed, 121 insertions(+), 190 deletions(-) diff --git a/HISTORY b/HISTORY index acb8afa..dd0893a 100644 --- a/HISTORY +++ b/HISTORY @@ -1,8 +1,7 @@ -Version 1.78 2025-08-02 +Version 1.78 2025-08-03 * getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6 * add files: spinlock.[hc] - * connection_pool.[hc]: use CAS instead of pthread mutex lock * shared_func.[hc]: change int2buff, buff2int etc. functions to static inline Version 1.77 2025-03-18 diff --git a/src/connection_pool.c b/src/connection_pool.c index 10ecb6d..e5b5abd 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -23,7 +23,6 @@ #include "shared_func.h" #include "sched_thread.h" #include "server_id_func.h" -#include "fc_atomic.h" #include "connection_pool.h" ConnectionCallbacks g_connection_callbacks = { @@ -63,9 +62,6 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm, ConnectionInfo *conn, const bool bForce) { ConnectionNode *node; - int64_t index; - int64_t head; - int64_t tail; char formatted_ip[FORMATTED_IP_SIZE]; node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode)); @@ -78,109 +74,80 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm, return EINVAL; } - while (1) { - head = FC_ATOMIC_GET(cm->ring.head); - tail = FC_ATOMIC_GET(cm->ring.tail); - if (bForce || (head - tail) >= cp->ring_size - 1) { - FC_ATOMIC_DEC(cm->total_count); + if (bForce) + { + cm->total_count--; - if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { - format_ip_address(conn->ip_addr, formatted_ip); - logDebug("file: "__FILE__", line: %d, " - "server %s:%u, release connection: %d, " - "total_count: %d, free_count: %d", - __LINE__, formatted_ip, conn->port, - conn->sock, cm->total_count, cm->free_count); - } - - G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. - close_connection(conn); - fast_mblock_free_object(&cp->node_allocator, node); - - if (bForce) { - for (index=FC_ATOMIC_GET(cm->ring.tail); indexring.nodes[index % cp->ring_size]); - if (node != NULL) { - node->conn->validate_flag = true; - } - } - } - - break; + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { + format_ip_address(conn->ip_addr, formatted_ip); + logDebug("file: "__FILE__", line: %d, " + "server %s:%u, release connection: %d, " + "total_count: %d, free_count: %d", + __LINE__, formatted_ip, conn->port, + conn->sock, cm->total_count, cm->free_count); } - if (__sync_bool_compare_and_swap(&cm->ring.nodes[ - head % cp->ring_size], NULL, node)) + G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + close_connection(conn); + fast_mblock_free_object(&cp->node_allocator, node); + + node = cm->head; + while (node != NULL) { - if (__sync_bool_compare_and_swap(&cm->ring.head, head, head + 1)) { - node->atime = get_current_time(); - FC_ATOMIC_INC(cm->free_count); - - if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { - format_ip_address(conn->ip_addr, formatted_ip); - logDebug("file: "__FILE__", line: %d, " - "server %s:%u, free connection: %d, " - "total_count: %d, free_count: %d", - __LINE__, formatted_ip, conn->port, - conn->sock, cm->total_count, cm->free_count); - } - - break; - } else { //rollback - __sync_bool_compare_and_swap(&cm->ring.nodes[ - head % cp->ring_size], node, NULL); - sched_yield(); - } + node->conn->validate_flag = true; + node = node->next; } } + else + { + node->atime = get_current_time(); + node->next = cm->head; + cm->head = node; + cm->free_count++; + + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { + format_ip_address(conn->ip_addr, formatted_ip); + logDebug("file: "__FILE__", line: %d, " + "server %s:%u, free connection: %d, " + "total_count: %d, free_count: %d", + __LINE__, formatted_ip, conn->port, + conn->sock, cm->total_count, cm->free_count); + } + } return 0; } -static inline ConnectionManager *do_find_manager( - ConnectionManager *head, const string_t *key) +static ConnectionManager *find_manager(ConnectionPool *cp, + ConnectionBucket *bucket, const string_t *key, + const bool need_create) { ConnectionManager *cm; - if (fc_string_equal(&head->key, key)) //fast path + if (bucket->head != NULL) { - return head; - } - - cm = (ConnectionManager *)FC_ATOMIC_GET(head->next); - while (cm != NULL) - { - if (fc_string_equal(&cm->key, key)) + if (fc_string_equal(&bucket->head->key, key)) //fast path { - return cm; + return bucket->head; + } + else + { + cm = bucket->head->next; + while (cm != NULL) + { + if (fc_string_equal(&cm->key, key)) + { + return cm; + } + cm = cm->next; + } } - cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next); } - return NULL; -} -static ConnectionManager *find_manager(ConnectionBucket *bucket, - const string_t *key) -{ - ConnectionManager *head; - - head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head); - if (head == NULL) + if (!need_create) { return NULL; } - return do_find_manager(head, key); -} - -static ConnectionManager *create_manager(ConnectionPool *cp, - ConnectionBucket *bucket, ConnectionManager *old_head, - const string_t *key, int *result) -{ - ConnectionManager *cm; - char *buff; - int aligned_key_len; - int bytes; cm = (ConnectionManager *)fast_mblock_alloc_object( &cp->manager_allocator); @@ -189,65 +156,23 @@ static ConnectionManager *create_manager(ConnectionPool *cp, logError("file: "__FILE__", line: %d, " "malloc %d bytes fail", __LINE__, (int)sizeof(ConnectionManager)); - *result = ENOMEM; return NULL; } - cm->ring.head = cm->ring.tail = 0; + cm->head = NULL; cm->total_count = 0; cm->free_count = 0; - - aligned_key_len = MEM_ALIGN(key->len); - bytes = aligned_key_len + sizeof(ConnectionNode *) * cp->ring_size; - if ((buff=fc_malloc(bytes)) == NULL) + if ((cm->key.str=fc_malloc(key->len + 1)) == NULL) { - *result = ENOMEM; return NULL; } - memset(buff, 0, bytes); - cm->key.str = buff; - cm->ring.nodes = (ConnectionNode **)(buff + aligned_key_len); - memcpy(cm->key.str, key->str, key->len); + memcpy(cm->key.str, key->str, key->len + 1); cm->key.len = key->len; //add to manager chain - FC_ATOMIC_SET(cm->next, old_head); - if (__sync_bool_compare_and_swap(&bucket->head, old_head, cm)) - { - return cm; - } - else - { - fast_mblock_free_object(&cp->manager_allocator, cm); - *result = EAGAIN; - return NULL; - } -} - -static ConnectionManager *get_manager(ConnectionPool *cp, - ConnectionBucket *bucket, const string_t *key) -{ - ConnectionManager *head; - ConnectionManager *cm; - int result = 0; - - do { - head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head); - if (head != NULL) - { - if ((cm=do_find_manager(head, key)) != NULL) - { - return cm; - } - } - - if ((cm=create_manager(cp, bucket, head, key, &result)) != NULL) - { - return cm; - } - } while (result == EAGAIN); - - return NULL; + cm->next = bucket->head; + bucket->head = cm; + return cm; } static int close_connection(ConnectionPool *cp, ConnectionInfo *conn, @@ -259,7 +184,8 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn, int result; bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity; - if ((cm=find_manager(bucket, key)) != NULL) + pthread_mutex_lock(&bucket->lock); + if ((cm=find_manager(cp, bucket, key, false)) != NULL) { result = close_conn(cp, cm, conn, bForce); } @@ -271,6 +197,7 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn, __LINE__, formatted_ip, conn->port); result = ENOENT; } + pthread_mutex_unlock(&bucket->lock); return result; } @@ -311,6 +238,7 @@ static void cp_tls_destroy(void *ptr) static int init_hashtable(ConnectionPool *cp, const int htable_capacity) { int bytes; + int result; unsigned int *hash_capacity; ConnectionBucket *bucket; ConnectionBucket *end; @@ -335,6 +263,10 @@ static int init_hashtable(ConnectionPool *cp, const int htable_capacity) for (bucket=cp->hashtable.buckets; buckethead = NULL; + if ((result=init_pthread_lock(&bucket->lock)) != 0) + { + return result; + } } return 0; @@ -355,12 +287,12 @@ int conn_pool_init_ex1(ConnectionPool *cp, const int connect_timeout, cp->connect_timeout_ms = connect_timeout * 1000; cp->max_count_per_entry = max_count_per_entry; cp->max_idle_time = max_idle_time; - cp->ring_size = max_count_per_entry > 0 ? max_count_per_entry : 1024; cp->extra_data_size = extra_data_size; cp->connect_done_callback.func = connect_done_func; cp->connect_done_callback.args = connect_done_args; cp->validate_callback.func = validate_func; cp->validate_callback.args = validate_args; + if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager", sizeof(ConnectionManager), 256, alloc_elements_limit, NULL, NULL, true)) != 0) @@ -423,31 +355,31 @@ static void conn_pool_hash_walk(ConnectionPool *cp, end = cp->hashtable.buckets + cp->hashtable.capacity; for (bucket=cp->hashtable.buckets; buckethead); + pthread_mutex_lock(&bucket->lock); + cm = bucket->head; while (cm != NULL) { current = cm; - cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next); + cm = cm->next; callback(cp, current, args); } + pthread_mutex_unlock(&bucket->lock); } } static void cp_destroy_walk_callback(ConnectionPool *cp, ConnectionManager *cm, void *args) { - int64_t index; - int64_t head; ConnectionNode *node; + ConnectionNode *deleted; - head = FC_ATOMIC_GET(cm->ring.head); - for (index=FC_ATOMIC_GET(cm->ring.tail); indexring.nodes[index % cp->ring_size]); - if (node != NULL) { - G_COMMON_CONNECTION_CALLBACKS[node->conn->comm_type]. - close_connection(node->conn); - } + node = cm->head; + while (node != NULL) + { + deleted = node; + node = node->next; + G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type]. + close_connection(deleted->conn); } free(cm->key.str); @@ -455,11 +387,19 @@ static void cp_destroy_walk_callback(ConnectionPool *cp, void conn_pool_destroy(ConnectionPool *cp) { + ConnectionBucket *bucket; + ConnectionBucket *end; + if (cp->hashtable.buckets == NULL) { return; } conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp); + + end = cp->hashtable.buckets + cp->hashtable.capacity; + for (bucket=cp->hashtable.buckets; bucketlock); + } free(cp->hashtable.buckets); cp->hashtable.buckets = NULL; @@ -557,11 +497,10 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn, } static ConnectionInfo *get_conn(ConnectionPool *cp, - ConnectionManager *cm, const ConnectionInfo *conn, - const char *service_name, int *err_no) + ConnectionManager *cm, pthread_mutex_t *lock, + const ConnectionInfo *conn, const char *service_name, + int *err_no) { - int64_t tail; - int index; ConnectionNode *node; ConnectionInfo *ci; char formatted_ip[FORMATTED_IP_SIZE]; @@ -570,11 +509,10 @@ static ConnectionInfo *get_conn(ConnectionPool *cp, current_time = get_current_time(); while (1) { - tail = FC_ATOMIC_GET(cm->ring.tail); - if (tail == FC_ATOMIC_GET(cm->ring.head)) //empty + if (cm->head == NULL) { - if ((cp->max_count_per_entry > 0) && FC_ATOMIC_GET( - cm->total_count) >= cp->max_count_per_entry) + if ((cp->max_count_per_entry > 0) && + (cm->total_count >= cp->max_count_per_entry)) { format_ip_address(conn->ip_addr, formatted_ip); *err_no = ENOSPC; @@ -600,7 +538,9 @@ static ConnectionInfo *get_conn(ConnectionPool *cp, node->manager = cm; node->next = NULL; node->atime = 0; - FC_ATOMIC_INC(cm->total_count); + + cm->total_count++; + pthread_mutex_unlock(lock); memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr)); node->conn->port = conn->port; @@ -622,7 +562,8 @@ static ConnectionInfo *get_conn(ConnectionPool *cp, close_connection(node->conn); fast_mblock_free_object(&cp->node_allocator, node); - FC_ATOMIC_DEC(cm->total_count); //rollback + pthread_mutex_lock(lock); + cm->total_count--; //rollback return NULL; } @@ -635,27 +576,19 @@ static ConnectionInfo *get_conn(ConnectionPool *cp, node->conn->sock, cm->total_count, cm->free_count); } + + pthread_mutex_lock(lock); return node->conn; } else { bool invalid; - if (!__sync_bool_compare_and_swap(&cm->ring.tail, tail, tail + 1)) { - sched_yield(); - continue; - } + node = cm->head; + ci = node->conn; + cm->head = node->next; + cm->free_count--; - index = tail % cp->ring_size; - node = (ConnectionNode *)FC_ATOMIC_GET( - cm->ring.nodes[index]); - if (node == NULL) { - continue; - } - __sync_bool_compare_and_swap(&cm->ring.nodes[index], node, NULL); - - ci = node->conn; - FC_ATOMIC_DEC(cm->free_count); if (current_time - node->atime > cp->max_idle_time) { if (cp->validate_callback.func != NULL) @@ -685,7 +618,8 @@ static ConnectionInfo *get_conn(ConnectionPool *cp, if (invalid) { - FC_ATOMIC_DEC(cm->total_count); + cm->total_count--; + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { format_ip_address(conn->ip_addr, formatted_ip); logDebug("file: "__FILE__", line: %d, " @@ -728,9 +662,10 @@ static ConnectionInfo *get_connection(ConnectionPool *cp, ConnectionInfo *ci; bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity; - if ((cm=get_manager(cp, bucket, key)) != NULL) + pthread_mutex_lock(&bucket->lock); + if ((cm=find_manager(cp, bucket, key, true)) != NULL) { - ci = get_conn(cp, cm, conn, service_name, err_no); + ci = get_conn(cp, cm, &bucket->lock, conn, service_name, err_no); if (ci != NULL) { ci->shared = shared; @@ -741,6 +676,7 @@ static ConnectionInfo *get_connection(ConnectionPool *cp, *err_no = ENOMEM; ci = NULL; } + pthread_mutex_unlock(&bucket->lock); return ci; } @@ -889,8 +825,8 @@ static void cp_stat_walk_callback(ConnectionPool *cp, stat = args; stat->server_count++; - stat->connection.total_count += FC_ATOMIC_GET(cm->total_count); - stat->connection.free_count += FC_ATOMIC_GET(cm->free_count); + stat->connection.total_count += cm->total_count; + stat->connection.free_count += cm->free_count; } void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat) diff --git a/src/connection_pool.h b/src/connection_pool.h index ae2c649..b3f670c 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -147,24 +147,21 @@ struct tagConnectionManager; typedef struct tagConnectionNode { ConnectionInfo *conn; struct tagConnectionManager *manager; - struct tagConnectionNode *next; //for thread local + struct tagConnectionNode *next; time_t atime; //last access time } ConnectionNode; typedef struct tagConnectionManager { string_t key; - struct { - ConnectionNode **nodes; - volatile int64_t head; //producer - volatile int64_t tail; //consumer - } ring; - volatile int total_count; //total connections - volatile int free_count; //free connections - volatile struct tagConnectionManager *next; + ConnectionNode *head; + int total_count; //total connections + int free_count; //free connections + struct tagConnectionManager *next; } ConnectionManager; typedef struct tagConnectionBucket { - volatile ConnectionManager *head; + ConnectionManager *head; + pthread_mutex_t lock; } ConnectionBucket; struct tagConnectionPool; @@ -182,7 +179,6 @@ typedef struct tagConnectionPool { int connect_timeout_ms; int max_count_per_entry; //0 means no limit - int ring_size; /* connections whose idle time exceeds this time will be closed