diff --git a/src/connection_pool.c b/src/connection_pool.c index 806b58e..0dfefe5 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -18,7 +18,9 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int max_count_per_entry, const int max_idle_time, const int socket_domain, const int htable_init_capacity, - fc_validate_connection_func validate_func, void *validate_args) + fc_connection_callback_func connect_done_func, void *connect_done_args, + fc_connection_callback_func validate_func, void *validate_args, + const int extra_data_size) { int result; int init_capacity; @@ -31,6 +33,8 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, cp->max_count_per_entry = max_count_per_entry; cp->max_idle_time = max_idle_time; cp->socket_domain = socket_domain; + cp->connect_done_callback.func = connect_done_func; + cp->connect_done_callback.args = connect_done_args; cp->validate_callback.func = validate_func; cp->validate_callback.args = validate_args; @@ -43,8 +47,8 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, } if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool_node", - sizeof(ConnectionNode) + sizeof(ConnectionInfo), - 4 * init_capacity, NULL, NULL, false)) != 0) + sizeof(ConnectionNode) + sizeof(ConnectionInfo) + + extra_data_size, init_capacity, NULL, NULL, false)) != 0) { return result; } @@ -178,7 +182,6 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, { char key[INET6_ADDRSTRLEN + 8]; int key_len; - char *p; ConnectionManager *cm; ConnectionNode *node; ConnectionInfo *ci; @@ -233,8 +236,9 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, return NULL; } - p = (char *)fast_mblock_alloc_object(&cp->node_allocator); - if (p == NULL) + node = (ConnectionNode *)fast_mblock_alloc_object( + &cp->node_allocator); + if (node == NULL) { *err_no = ENOMEM; logError("file: "__FILE__", line: %d, " @@ -244,8 +248,7 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, return NULL; } - node = (ConnectionNode *)p; - node->conn = (ConnectionInfo *)(p + sizeof(ConnectionNode)); + node->conn = (ConnectionInfo *)(node + 1); node->manager = cm; node->next = NULL; node->atime = 0; @@ -259,11 +262,21 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, node->conn->validate_flag = false; *err_no = conn_pool_connect_server(node->conn, cp->connect_timeout); + if (*err_no == 0 && cp->connect_done_callback.func != NULL) + { + *err_no = cp->connect_done_callback.func(node->conn, + cp->connect_done_callback.args); + } if (*err_no != 0) { + if (node->conn->sock >= 0) + { + close(node->conn->sock); + node->conn->sock = -1; + } pthread_mutex_lock(&cm->lock); cm->total_count--; //rollback - fast_mblock_free_object(&cp->node_allocator, p); + fast_mblock_free_object(&cp->node_allocator, node); pthread_mutex_unlock(&cm->lock); return NULL; @@ -287,10 +300,19 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, cm->free_count--; if (current_time - node->atime > cp->max_idle_time) - { + { + if (cp->validate_callback.func != NULL) + { + ci->validate_flag = true; + } invalid = true; } - else if (ci->validate_flag) + else + { + invalid = false; + } + + if (ci->validate_flag) { ci->validate_flag = false; if (cp->validate_callback.func != NULL) @@ -303,10 +325,6 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, invalid = false; } } - else - { - invalid = false; - } if (invalid) { diff --git a/src/connection_pool.h b/src/connection_pool.h index 2e041a4..b21d561 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -40,9 +40,10 @@ typedef struct short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect bool validate_flag; //for connection pool char ip_addr[INET6_ADDRSTRLEN]; + char args[0]; //for extra data } ConnectionInfo; -typedef int (*fc_validate_connection_func)(ConnectionInfo *conn, void *args); +typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args); struct tagConnectionManager; @@ -77,7 +78,12 @@ typedef struct tagConnectionPool { struct fast_mblock_man node_allocator; struct { - fc_validate_connection_func func; + fc_connection_callback_func func; + void *args; + } connect_done_callback; + + struct { + fc_connection_callback_func func; void *args; } validate_callback; } ConnectionPool; @@ -90,14 +96,20 @@ typedef struct tagConnectionPool { * max_count_per_entry: max connection count per host:port * max_idle_time: reconnect the server after max idle time in seconds * socket_domain: the socket domain +* htable_init_capacity: the init capacity of connection hash table +* connect_done_func: the connect done connection callback +* connect_done_args: the args for connect done connection callback * validate_func: the validate connection callback * validate_args: the args for validate connection callback +* extra_data_size: the extra data size of connection * return 0 for success, != 0 for error */ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int max_count_per_entry, const int max_idle_time, const int socket_domain, const int htable_init_capacity, - fc_validate_connection_func validate_func, void *validate_args); + fc_connection_callback_func connect_done_func, void *connect_done_args, + fc_connection_callback_func validate_func, void *validate_args, + const int extra_data_size); /** * init ex function @@ -114,8 +126,10 @@ static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, const int socket_domain) { const int htable_init_capacity = 0; + const int extra_data_size = 0; return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, - max_idle_time, socket_domain, htable_init_capacity, NULL, NULL); + max_idle_time, socket_domain, htable_init_capacity, + NULL, NULL, NULL, NULL, extra_data_size); } /** @@ -132,8 +146,10 @@ static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout, { const int socket_domain = AF_INET; const int htable_init_capacity = 0; + const int extra_data_size = 0; return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, - max_idle_time, socket_domain, htable_init_capacity, NULL, NULL); + max_idle_time, socket_domain, htable_init_capacity, + NULL, NULL, NULL, NULL, extra_data_size); } /**