connection info support extra data

pull/37/head
YuQing 2020-04-26 20:19:46 +08:00
parent c8bf9f24ef
commit e4a5cadfe1
2 changed files with 54 additions and 20 deletions

View File

@ -18,7 +18,9 @@
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain, const int htable_init_capacity,
fc_validate_connection_func validate_func, void *validate_args)
fc_connection_callback_func connect_done_func, void *connect_done_args,
fc_connection_callback_func validate_func, void *validate_args,
const int extra_data_size)
{
int result;
int init_capacity;
@ -31,6 +33,8 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
cp->socket_domain = socket_domain;
cp->connect_done_callback.func = connect_done_func;
cp->connect_done_callback.args = connect_done_args;
cp->validate_callback.func = validate_func;
cp->validate_callback.args = validate_args;
@ -43,8 +47,8 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
}
if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool_node",
sizeof(ConnectionNode) + sizeof(ConnectionInfo),
4 * init_capacity, NULL, NULL, false)) != 0)
sizeof(ConnectionNode) + sizeof(ConnectionInfo) +
extra_data_size, init_capacity, NULL, NULL, false)) != 0)
{
return result;
}
@ -178,7 +182,6 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
{
char key[INET6_ADDRSTRLEN + 8];
int key_len;
char *p;
ConnectionManager *cm;
ConnectionNode *node;
ConnectionInfo *ci;
@ -233,8 +236,9 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
return NULL;
}
p = (char *)fast_mblock_alloc_object(&cp->node_allocator);
if (p == NULL)
node = (ConnectionNode *)fast_mblock_alloc_object(
&cp->node_allocator);
if (node == NULL)
{
*err_no = ENOMEM;
logError("file: "__FILE__", line: %d, "
@ -244,8 +248,7 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
return NULL;
}
node = (ConnectionNode *)p;
node->conn = (ConnectionInfo *)(p + sizeof(ConnectionNode));
node->conn = (ConnectionInfo *)(node + 1);
node->manager = cm;
node->next = NULL;
node->atime = 0;
@ -259,11 +262,21 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
node->conn->validate_flag = false;
*err_no = conn_pool_connect_server(node->conn,
cp->connect_timeout);
if (*err_no == 0 && cp->connect_done_callback.func != NULL)
{
*err_no = cp->connect_done_callback.func(node->conn,
cp->connect_done_callback.args);
}
if (*err_no != 0)
{
if (node->conn->sock >= 0)
{
close(node->conn->sock);
node->conn->sock = -1;
}
pthread_mutex_lock(&cm->lock);
cm->total_count--; //rollback
fast_mblock_free_object(&cp->node_allocator, p);
fast_mblock_free_object(&cp->node_allocator, node);
pthread_mutex_unlock(&cm->lock);
return NULL;
@ -287,10 +300,19 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
cm->free_count--;
if (current_time - node->atime > cp->max_idle_time)
{
{
if (cp->validate_callback.func != NULL)
{
ci->validate_flag = true;
}
invalid = true;
}
else if (ci->validate_flag)
else
{
invalid = false;
}
if (ci->validate_flag)
{
ci->validate_flag = false;
if (cp->validate_callback.func != NULL)
@ -303,10 +325,6 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
invalid = false;
}
}
else
{
invalid = false;
}
if (invalid)
{

View File

@ -40,9 +40,10 @@ typedef struct
short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect
bool validate_flag; //for connection pool
char ip_addr[INET6_ADDRSTRLEN];
char args[0]; //for extra data
} ConnectionInfo;
typedef int (*fc_validate_connection_func)(ConnectionInfo *conn, void *args);
typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args);
struct tagConnectionManager;
@ -77,7 +78,12 @@ typedef struct tagConnectionPool {
struct fast_mblock_man node_allocator;
struct {
fc_validate_connection_func func;
fc_connection_callback_func func;
void *args;
} connect_done_callback;
struct {
fc_connection_callback_func func;
void *args;
} validate_callback;
} ConnectionPool;
@ -90,14 +96,20 @@ typedef struct tagConnectionPool {
* max_count_per_entry: max connection count per host:port
* max_idle_time: reconnect the server after max idle time in seconds
* socket_domain: the socket domain
* htable_init_capacity: the init capacity of connection hash table
* connect_done_func: the connect done connection callback
* connect_done_args: the args for connect done connection callback
* validate_func: the validate connection callback
* validate_args: the args for validate connection callback
* extra_data_size: the extra data size of connection
* return 0 for success, != 0 for error
*/
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain, const int htable_init_capacity,
fc_validate_connection_func validate_func, void *validate_args);
fc_connection_callback_func connect_done_func, void *connect_done_args,
fc_connection_callback_func validate_func, void *validate_args,
const int extra_data_size);
/**
* init ex function
@ -114,8 +126,10 @@ static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
const int socket_domain)
{
const int htable_init_capacity = 0;
const int extra_data_size = 0;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity, NULL, NULL);
max_idle_time, socket_domain, htable_init_capacity,
NULL, NULL, NULL, NULL, extra_data_size);
}
/**
@ -132,8 +146,10 @@ static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout,
{
const int socket_domain = AF_INET;
const int htable_init_capacity = 0;
const int extra_data_size = 0;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity, NULL, NULL);
max_idle_time, socket_domain, htable_init_capacity,
NULL, NULL, NULL, NULL, extra_data_size);
}
/**