src/connection_pool.[hc]: restore pthread mutex lock
parent
7fbb5c620b
commit
cf16c41054
3
HISTORY
3
HISTORY
|
|
@ -1,8 +1,7 @@
|
||||||
|
|
||||||
Version 1.78 2025-08-02
|
Version 1.78 2025-08-03
|
||||||
* getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6
|
* getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6
|
||||||
* add files: spinlock.[hc]
|
* add files: spinlock.[hc]
|
||||||
* connection_pool.[hc]: use CAS instead of pthread mutex lock
|
|
||||||
* shared_func.[hc]: change int2buff, buff2int etc. functions to static inline
|
* shared_func.[hc]: change int2buff, buff2int etc. functions to static inline
|
||||||
|
|
||||||
Version 1.77 2025-03-18
|
Version 1.77 2025-03-18
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@
|
||||||
#include "shared_func.h"
|
#include "shared_func.h"
|
||||||
#include "sched_thread.h"
|
#include "sched_thread.h"
|
||||||
#include "server_id_func.h"
|
#include "server_id_func.h"
|
||||||
#include "fc_atomic.h"
|
|
||||||
#include "connection_pool.h"
|
#include "connection_pool.h"
|
||||||
|
|
||||||
ConnectionCallbacks g_connection_callbacks = {
|
ConnectionCallbacks g_connection_callbacks = {
|
||||||
|
|
@ -63,9 +62,6 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
|
||||||
ConnectionInfo *conn, const bool bForce)
|
ConnectionInfo *conn, const bool bForce)
|
||||||
{
|
{
|
||||||
ConnectionNode *node;
|
ConnectionNode *node;
|
||||||
int64_t index;
|
|
||||||
int64_t head;
|
|
||||||
int64_t tail;
|
|
||||||
char formatted_ip[FORMATTED_IP_SIZE];
|
char formatted_ip[FORMATTED_IP_SIZE];
|
||||||
|
|
||||||
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
|
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
|
||||||
|
|
@ -78,109 +74,80 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
|
||||||
return EINVAL;
|
return EINVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
if (bForce)
|
||||||
head = FC_ATOMIC_GET(cm->ring.head);
|
{
|
||||||
tail = FC_ATOMIC_GET(cm->ring.tail);
|
cm->total_count--;
|
||||||
if (bForce || (head - tail) >= cp->ring_size - 1) {
|
|
||||||
FC_ATOMIC_DEC(cm->total_count);
|
|
||||||
|
|
||||||
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
||||||
format_ip_address(conn->ip_addr, formatted_ip);
|
format_ip_address(conn->ip_addr, formatted_ip);
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"server %s:%u, release connection: %d, "
|
"server %s:%u, release connection: %d, "
|
||||||
"total_count: %d, free_count: %d",
|
"total_count: %d, free_count: %d",
|
||||||
__LINE__, formatted_ip, conn->port,
|
__LINE__, formatted_ip, conn->port,
|
||||||
conn->sock, cm->total_count, cm->free_count);
|
conn->sock, cm->total_count, cm->free_count);
|
||||||
}
|
|
||||||
|
|
||||||
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
|
|
||||||
close_connection(conn);
|
|
||||||
fast_mblock_free_object(&cp->node_allocator, node);
|
|
||||||
|
|
||||||
if (bForce) {
|
|
||||||
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) {
|
|
||||||
node = (ConnectionNode *)FC_ATOMIC_GET(
|
|
||||||
cm->ring.nodes[index % cp->ring_size]);
|
|
||||||
if (node != NULL) {
|
|
||||||
node->conn->validate_flag = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (__sync_bool_compare_and_swap(&cm->ring.nodes[
|
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
|
||||||
head % cp->ring_size], NULL, node))
|
close_connection(conn);
|
||||||
|
fast_mblock_free_object(&cp->node_allocator, node);
|
||||||
|
|
||||||
|
node = cm->head;
|
||||||
|
while (node != NULL)
|
||||||
{
|
{
|
||||||
if (__sync_bool_compare_and_swap(&cm->ring.head, head, head + 1)) {
|
node->conn->validate_flag = true;
|
||||||
node->atime = get_current_time();
|
node = node->next;
|
||||||
FC_ATOMIC_INC(cm->free_count);
|
|
||||||
|
|
||||||
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
|
||||||
format_ip_address(conn->ip_addr, formatted_ip);
|
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
|
||||||
"server %s:%u, free connection: %d, "
|
|
||||||
"total_count: %d, free_count: %d",
|
|
||||||
__LINE__, formatted_ip, conn->port,
|
|
||||||
conn->sock, cm->total_count, cm->free_count);
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
} else { //rollback
|
|
||||||
__sync_bool_compare_and_swap(&cm->ring.nodes[
|
|
||||||
head % cp->ring_size], node, NULL);
|
|
||||||
sched_yield();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
node->atime = get_current_time();
|
||||||
|
node->next = cm->head;
|
||||||
|
cm->head = node;
|
||||||
|
cm->free_count++;
|
||||||
|
|
||||||
|
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
||||||
|
format_ip_address(conn->ip_addr, formatted_ip);
|
||||||
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
|
"server %s:%u, free connection: %d, "
|
||||||
|
"total_count: %d, free_count: %d",
|
||||||
|
__LINE__, formatted_ip, conn->port,
|
||||||
|
conn->sock, cm->total_count, cm->free_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline ConnectionManager *do_find_manager(
|
static ConnectionManager *find_manager(ConnectionPool *cp,
|
||||||
ConnectionManager *head, const string_t *key)
|
ConnectionBucket *bucket, const string_t *key,
|
||||||
|
const bool need_create)
|
||||||
{
|
{
|
||||||
ConnectionManager *cm;
|
ConnectionManager *cm;
|
||||||
|
|
||||||
if (fc_string_equal(&head->key, key)) //fast path
|
if (bucket->head != NULL)
|
||||||
{
|
{
|
||||||
return head;
|
if (fc_string_equal(&bucket->head->key, key)) //fast path
|
||||||
}
|
|
||||||
|
|
||||||
cm = (ConnectionManager *)FC_ATOMIC_GET(head->next);
|
|
||||||
while (cm != NULL)
|
|
||||||
{
|
|
||||||
if (fc_string_equal(&cm->key, key))
|
|
||||||
{
|
{
|
||||||
return cm;
|
return bucket->head;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cm = bucket->head->next;
|
||||||
|
while (cm != NULL)
|
||||||
|
{
|
||||||
|
if (fc_string_equal(&cm->key, key))
|
||||||
|
{
|
||||||
|
return cm;
|
||||||
|
}
|
||||||
|
cm = cm->next;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
|
|
||||||
}
|
}
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static ConnectionManager *find_manager(ConnectionBucket *bucket,
|
if (!need_create)
|
||||||
const string_t *key)
|
|
||||||
{
|
|
||||||
ConnectionManager *head;
|
|
||||||
|
|
||||||
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
|
|
||||||
if (head == NULL)
|
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return do_find_manager(head, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
static ConnectionManager *create_manager(ConnectionPool *cp,
|
|
||||||
ConnectionBucket *bucket, ConnectionManager *old_head,
|
|
||||||
const string_t *key, int *result)
|
|
||||||
{
|
|
||||||
ConnectionManager *cm;
|
|
||||||
char *buff;
|
|
||||||
int aligned_key_len;
|
|
||||||
int bytes;
|
|
||||||
|
|
||||||
cm = (ConnectionManager *)fast_mblock_alloc_object(
|
cm = (ConnectionManager *)fast_mblock_alloc_object(
|
||||||
&cp->manager_allocator);
|
&cp->manager_allocator);
|
||||||
|
|
@ -189,65 +156,23 @@ static ConnectionManager *create_manager(ConnectionPool *cp,
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"malloc %d bytes fail", __LINE__,
|
"malloc %d bytes fail", __LINE__,
|
||||||
(int)sizeof(ConnectionManager));
|
(int)sizeof(ConnectionManager));
|
||||||
*result = ENOMEM;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
cm->ring.head = cm->ring.tail = 0;
|
cm->head = NULL;
|
||||||
cm->total_count = 0;
|
cm->total_count = 0;
|
||||||
cm->free_count = 0;
|
cm->free_count = 0;
|
||||||
|
if ((cm->key.str=fc_malloc(key->len + 1)) == NULL)
|
||||||
aligned_key_len = MEM_ALIGN(key->len);
|
|
||||||
bytes = aligned_key_len + sizeof(ConnectionNode *) * cp->ring_size;
|
|
||||||
if ((buff=fc_malloc(bytes)) == NULL)
|
|
||||||
{
|
{
|
||||||
*result = ENOMEM;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(buff, 0, bytes);
|
memcpy(cm->key.str, key->str, key->len + 1);
|
||||||
cm->key.str = buff;
|
|
||||||
cm->ring.nodes = (ConnectionNode **)(buff + aligned_key_len);
|
|
||||||
memcpy(cm->key.str, key->str, key->len);
|
|
||||||
cm->key.len = key->len;
|
cm->key.len = key->len;
|
||||||
|
|
||||||
//add to manager chain
|
//add to manager chain
|
||||||
FC_ATOMIC_SET(cm->next, old_head);
|
cm->next = bucket->head;
|
||||||
if (__sync_bool_compare_and_swap(&bucket->head, old_head, cm))
|
bucket->head = cm;
|
||||||
{
|
return cm;
|
||||||
return cm;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
fast_mblock_free_object(&cp->manager_allocator, cm);
|
|
||||||
*result = EAGAIN;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static ConnectionManager *get_manager(ConnectionPool *cp,
|
|
||||||
ConnectionBucket *bucket, const string_t *key)
|
|
||||||
{
|
|
||||||
ConnectionManager *head;
|
|
||||||
ConnectionManager *cm;
|
|
||||||
int result = 0;
|
|
||||||
|
|
||||||
do {
|
|
||||||
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
|
|
||||||
if (head != NULL)
|
|
||||||
{
|
|
||||||
if ((cm=do_find_manager(head, key)) != NULL)
|
|
||||||
{
|
|
||||||
return cm;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((cm=create_manager(cp, bucket, head, key, &result)) != NULL)
|
|
||||||
{
|
|
||||||
return cm;
|
|
||||||
}
|
|
||||||
} while (result == EAGAIN);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
||||||
|
|
@ -259,7 +184,8 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
|
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
|
||||||
if ((cm=find_manager(bucket, key)) != NULL)
|
pthread_mutex_lock(&bucket->lock);
|
||||||
|
if ((cm=find_manager(cp, bucket, key, false)) != NULL)
|
||||||
{
|
{
|
||||||
result = close_conn(cp, cm, conn, bForce);
|
result = close_conn(cp, cm, conn, bForce);
|
||||||
}
|
}
|
||||||
|
|
@ -271,6 +197,7 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
||||||
__LINE__, formatted_ip, conn->port);
|
__LINE__, formatted_ip, conn->port);
|
||||||
result = ENOENT;
|
result = ENOENT;
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&bucket->lock);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
@ -311,6 +238,7 @@ static void cp_tls_destroy(void *ptr)
|
||||||
static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
|
static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
|
||||||
{
|
{
|
||||||
int bytes;
|
int bytes;
|
||||||
|
int result;
|
||||||
unsigned int *hash_capacity;
|
unsigned int *hash_capacity;
|
||||||
ConnectionBucket *bucket;
|
ConnectionBucket *bucket;
|
||||||
ConnectionBucket *end;
|
ConnectionBucket *end;
|
||||||
|
|
@ -335,6 +263,10 @@ static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
|
||||||
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
|
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
|
||||||
{
|
{
|
||||||
bucket->head = NULL;
|
bucket->head = NULL;
|
||||||
|
if ((result=init_pthread_lock(&bucket->lock)) != 0)
|
||||||
|
{
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
@ -355,12 +287,12 @@ int conn_pool_init_ex1(ConnectionPool *cp, const int connect_timeout,
|
||||||
cp->connect_timeout_ms = connect_timeout * 1000;
|
cp->connect_timeout_ms = connect_timeout * 1000;
|
||||||
cp->max_count_per_entry = max_count_per_entry;
|
cp->max_count_per_entry = max_count_per_entry;
|
||||||
cp->max_idle_time = max_idle_time;
|
cp->max_idle_time = max_idle_time;
|
||||||
cp->ring_size = max_count_per_entry > 0 ? max_count_per_entry : 1024;
|
|
||||||
cp->extra_data_size = extra_data_size;
|
cp->extra_data_size = extra_data_size;
|
||||||
cp->connect_done_callback.func = connect_done_func;
|
cp->connect_done_callback.func = connect_done_func;
|
||||||
cp->connect_done_callback.args = connect_done_args;
|
cp->connect_done_callback.args = connect_done_args;
|
||||||
cp->validate_callback.func = validate_func;
|
cp->validate_callback.func = validate_func;
|
||||||
cp->validate_callback.args = validate_args;
|
cp->validate_callback.args = validate_args;
|
||||||
|
|
||||||
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager",
|
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager",
|
||||||
sizeof(ConnectionManager), 256, alloc_elements_limit,
|
sizeof(ConnectionManager), 256, alloc_elements_limit,
|
||||||
NULL, NULL, true)) != 0)
|
NULL, NULL, true)) != 0)
|
||||||
|
|
@ -423,31 +355,31 @@ static void conn_pool_hash_walk(ConnectionPool *cp,
|
||||||
end = cp->hashtable.buckets + cp->hashtable.capacity;
|
end = cp->hashtable.buckets + cp->hashtable.capacity;
|
||||||
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
|
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
|
||||||
{
|
{
|
||||||
cm = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
|
pthread_mutex_lock(&bucket->lock);
|
||||||
|
cm = bucket->head;
|
||||||
while (cm != NULL)
|
while (cm != NULL)
|
||||||
{
|
{
|
||||||
current = cm;
|
current = cm;
|
||||||
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
|
cm = cm->next;
|
||||||
callback(cp, current, args);
|
callback(cp, current, args);
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&bucket->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cp_destroy_walk_callback(ConnectionPool *cp,
|
static void cp_destroy_walk_callback(ConnectionPool *cp,
|
||||||
ConnectionManager *cm, void *args)
|
ConnectionManager *cm, void *args)
|
||||||
{
|
{
|
||||||
int64_t index;
|
|
||||||
int64_t head;
|
|
||||||
ConnectionNode *node;
|
ConnectionNode *node;
|
||||||
|
ConnectionNode *deleted;
|
||||||
|
|
||||||
head = FC_ATOMIC_GET(cm->ring.head);
|
node = cm->head;
|
||||||
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) {
|
while (node != NULL)
|
||||||
node = (ConnectionNode *)FC_ATOMIC_GET(
|
{
|
||||||
cm->ring.nodes[index % cp->ring_size]);
|
deleted = node;
|
||||||
if (node != NULL) {
|
node = node->next;
|
||||||
G_COMMON_CONNECTION_CALLBACKS[node->conn->comm_type].
|
G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type].
|
||||||
close_connection(node->conn);
|
close_connection(deleted->conn);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
free(cm->key.str);
|
free(cm->key.str);
|
||||||
|
|
@ -455,11 +387,19 @@ static void cp_destroy_walk_callback(ConnectionPool *cp,
|
||||||
|
|
||||||
void conn_pool_destroy(ConnectionPool *cp)
|
void conn_pool_destroy(ConnectionPool *cp)
|
||||||
{
|
{
|
||||||
|
ConnectionBucket *bucket;
|
||||||
|
ConnectionBucket *end;
|
||||||
|
|
||||||
if (cp->hashtable.buckets == NULL) {
|
if (cp->hashtable.buckets == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp);
|
conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp);
|
||||||
|
|
||||||
|
end = cp->hashtable.buckets + cp->hashtable.capacity;
|
||||||
|
for (bucket=cp->hashtable.buckets; bucket<end; bucket++) {
|
||||||
|
pthread_mutex_destroy(&bucket->lock);
|
||||||
|
}
|
||||||
free(cp->hashtable.buckets);
|
free(cp->hashtable.buckets);
|
||||||
cp->hashtable.buckets = NULL;
|
cp->hashtable.buckets = NULL;
|
||||||
|
|
||||||
|
|
@ -557,11 +497,10 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
|
||||||
}
|
}
|
||||||
|
|
||||||
static ConnectionInfo *get_conn(ConnectionPool *cp,
|
static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||||
ConnectionManager *cm, const ConnectionInfo *conn,
|
ConnectionManager *cm, pthread_mutex_t *lock,
|
||||||
const char *service_name, int *err_no)
|
const ConnectionInfo *conn, const char *service_name,
|
||||||
|
int *err_no)
|
||||||
{
|
{
|
||||||
int64_t tail;
|
|
||||||
int index;
|
|
||||||
ConnectionNode *node;
|
ConnectionNode *node;
|
||||||
ConnectionInfo *ci;
|
ConnectionInfo *ci;
|
||||||
char formatted_ip[FORMATTED_IP_SIZE];
|
char formatted_ip[FORMATTED_IP_SIZE];
|
||||||
|
|
@ -570,11 +509,10 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||||
current_time = get_current_time();
|
current_time = get_current_time();
|
||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
tail = FC_ATOMIC_GET(cm->ring.tail);
|
if (cm->head == NULL)
|
||||||
if (tail == FC_ATOMIC_GET(cm->ring.head)) //empty
|
|
||||||
{
|
{
|
||||||
if ((cp->max_count_per_entry > 0) && FC_ATOMIC_GET(
|
if ((cp->max_count_per_entry > 0) &&
|
||||||
cm->total_count) >= cp->max_count_per_entry)
|
(cm->total_count >= cp->max_count_per_entry))
|
||||||
{
|
{
|
||||||
format_ip_address(conn->ip_addr, formatted_ip);
|
format_ip_address(conn->ip_addr, formatted_ip);
|
||||||
*err_no = ENOSPC;
|
*err_no = ENOSPC;
|
||||||
|
|
@ -600,7 +538,9 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||||
node->manager = cm;
|
node->manager = cm;
|
||||||
node->next = NULL;
|
node->next = NULL;
|
||||||
node->atime = 0;
|
node->atime = 0;
|
||||||
FC_ATOMIC_INC(cm->total_count);
|
|
||||||
|
cm->total_count++;
|
||||||
|
pthread_mutex_unlock(lock);
|
||||||
|
|
||||||
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
|
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
|
||||||
node->conn->port = conn->port;
|
node->conn->port = conn->port;
|
||||||
|
|
@ -622,7 +562,8 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||||
close_connection(node->conn);
|
close_connection(node->conn);
|
||||||
fast_mblock_free_object(&cp->node_allocator, node);
|
fast_mblock_free_object(&cp->node_allocator, node);
|
||||||
|
|
||||||
FC_ATOMIC_DEC(cm->total_count); //rollback
|
pthread_mutex_lock(lock);
|
||||||
|
cm->total_count--; //rollback
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -635,27 +576,19 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||||
node->conn->sock, cm->total_count,
|
node->conn->sock, cm->total_count,
|
||||||
cm->free_count);
|
cm->free_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(lock);
|
||||||
return node->conn;
|
return node->conn;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
bool invalid;
|
bool invalid;
|
||||||
|
|
||||||
if (!__sync_bool_compare_and_swap(&cm->ring.tail, tail, tail + 1)) {
|
node = cm->head;
|
||||||
sched_yield();
|
ci = node->conn;
|
||||||
continue;
|
cm->head = node->next;
|
||||||
}
|
cm->free_count--;
|
||||||
|
|
||||||
index = tail % cp->ring_size;
|
|
||||||
node = (ConnectionNode *)FC_ATOMIC_GET(
|
|
||||||
cm->ring.nodes[index]);
|
|
||||||
if (node == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
__sync_bool_compare_and_swap(&cm->ring.nodes[index], node, NULL);
|
|
||||||
|
|
||||||
ci = node->conn;
|
|
||||||
FC_ATOMIC_DEC(cm->free_count);
|
|
||||||
if (current_time - node->atime > cp->max_idle_time)
|
if (current_time - node->atime > cp->max_idle_time)
|
||||||
{
|
{
|
||||||
if (cp->validate_callback.func != NULL)
|
if (cp->validate_callback.func != NULL)
|
||||||
|
|
@ -685,7 +618,8 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||||
|
|
||||||
if (invalid)
|
if (invalid)
|
||||||
{
|
{
|
||||||
FC_ATOMIC_DEC(cm->total_count);
|
cm->total_count--;
|
||||||
|
|
||||||
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
||||||
format_ip_address(conn->ip_addr, formatted_ip);
|
format_ip_address(conn->ip_addr, formatted_ip);
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
|
|
@ -728,9 +662,10 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
|
||||||
ConnectionInfo *ci;
|
ConnectionInfo *ci;
|
||||||
|
|
||||||
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
|
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
|
||||||
if ((cm=get_manager(cp, bucket, key)) != NULL)
|
pthread_mutex_lock(&bucket->lock);
|
||||||
|
if ((cm=find_manager(cp, bucket, key, true)) != NULL)
|
||||||
{
|
{
|
||||||
ci = get_conn(cp, cm, conn, service_name, err_no);
|
ci = get_conn(cp, cm, &bucket->lock, conn, service_name, err_no);
|
||||||
if (ci != NULL)
|
if (ci != NULL)
|
||||||
{
|
{
|
||||||
ci->shared = shared;
|
ci->shared = shared;
|
||||||
|
|
@ -741,6 +676,7 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
|
||||||
*err_no = ENOMEM;
|
*err_no = ENOMEM;
|
||||||
ci = NULL;
|
ci = NULL;
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&bucket->lock);
|
||||||
return ci;
|
return ci;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -889,8 +825,8 @@ static void cp_stat_walk_callback(ConnectionPool *cp,
|
||||||
|
|
||||||
stat = args;
|
stat = args;
|
||||||
stat->server_count++;
|
stat->server_count++;
|
||||||
stat->connection.total_count += FC_ATOMIC_GET(cm->total_count);
|
stat->connection.total_count += cm->total_count;
|
||||||
stat->connection.free_count += FC_ATOMIC_GET(cm->free_count);
|
stat->connection.free_count += cm->free_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat)
|
void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat)
|
||||||
|
|
|
||||||
|
|
@ -147,24 +147,21 @@ struct tagConnectionManager;
|
||||||
typedef struct tagConnectionNode {
|
typedef struct tagConnectionNode {
|
||||||
ConnectionInfo *conn;
|
ConnectionInfo *conn;
|
||||||
struct tagConnectionManager *manager;
|
struct tagConnectionManager *manager;
|
||||||
struct tagConnectionNode *next; //for thread local
|
struct tagConnectionNode *next;
|
||||||
time_t atime; //last access time
|
time_t atime; //last access time
|
||||||
} ConnectionNode;
|
} ConnectionNode;
|
||||||
|
|
||||||
typedef struct tagConnectionManager {
|
typedef struct tagConnectionManager {
|
||||||
string_t key;
|
string_t key;
|
||||||
struct {
|
ConnectionNode *head;
|
||||||
ConnectionNode **nodes;
|
int total_count; //total connections
|
||||||
volatile int64_t head; //producer
|
int free_count; //free connections
|
||||||
volatile int64_t tail; //consumer
|
struct tagConnectionManager *next;
|
||||||
} ring;
|
|
||||||
volatile int total_count; //total connections
|
|
||||||
volatile int free_count; //free connections
|
|
||||||
volatile struct tagConnectionManager *next;
|
|
||||||
} ConnectionManager;
|
} ConnectionManager;
|
||||||
|
|
||||||
typedef struct tagConnectionBucket {
|
typedef struct tagConnectionBucket {
|
||||||
volatile ConnectionManager *head;
|
ConnectionManager *head;
|
||||||
|
pthread_mutex_t lock;
|
||||||
} ConnectionBucket;
|
} ConnectionBucket;
|
||||||
|
|
||||||
struct tagConnectionPool;
|
struct tagConnectionPool;
|
||||||
|
|
@ -182,7 +179,6 @@ typedef struct tagConnectionPool {
|
||||||
|
|
||||||
int connect_timeout_ms;
|
int connect_timeout_ms;
|
||||||
int max_count_per_entry; //0 means no limit
|
int max_count_per_entry; //0 means no limit
|
||||||
int ring_size;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
connections whose idle time exceeds this time will be closed
|
connections whose idle time exceeds this time will be closed
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue