diff --git a/HISTORY b/HISTORY index f8fa1e1..9c9d8ab 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-04-24 +Version 1.44 2020-04-26 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -26,6 +26,7 @@ Version 1.44 2020-04-24 * bugfixed: call fast_mblock_destroy in common_blocked_queue_destroy * add function fc_get_file_line_count_ex * uniq_skiplist add function find_ge and support bidirection + * connection_pool support validate connection on error Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/libfastcommon.spec b/libfastcommon.spec index 8baadb6..3d0945e 100644 --- a/libfastcommon.spec +++ b/libfastcommon.spec @@ -4,7 +4,7 @@ %define CommitVersion %(echo $COMMIT_VERSION) Name: libfastcommon -Version: 1.0.43 +Version: 1.0.44 Release: 1%{?dist} Summary: c common functions library extracted from my open source projects FastDFS License: LGPL diff --git a/src/common_define.h b/src/common_define.h index e884e2d..a690e7f 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -124,6 +124,8 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind); #define FC_IS_LETTER(ch) ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')) #define FC_IS_UPPER_LETTER(ch) (ch >= 'A' && ch <= 'Z') #define FC_IS_LOWER_LETTER(ch) (ch >= 'a' && ch <= 'z') +#define FC_MIN(v1, v2) (v1 < v2 ? v1 : v2) +#define FC_MAX(v1, v2) (v1 > v2 ? v1 : v2) #define STRERROR(no) (strerror(no) != NULL ? strerror(no) : "Unkown error") diff --git a/src/connection_pool.c b/src/connection_pool.c index e9684fd..806b58e 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -15,11 +15,13 @@ #include "sched_thread.h" #include "connection_pool.h" -int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, \ +int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int max_count_per_entry, const int max_idle_time, - const int socket_domain) + const int socket_domain, const int htable_init_capacity, + fc_validate_connection_func validate_func, void *validate_args) { int result; + int init_capacity; if ((result=init_pthread_lock(&cp->lock)) != 0) { @@ -29,22 +31,34 @@ int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, \ cp->max_count_per_entry = max_count_per_entry; cp->max_idle_time = max_idle_time; cp->socket_domain = socket_domain; + cp->validate_callback.func = validate_func; + cp->validate_callback.args = validate_args; - return hash_init(&(cp->hash_array), simple_hash, 1024, 0.75); + init_capacity = htable_init_capacity > 0 ? htable_init_capacity : 256; + if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool_manager", + sizeof(ConnectionManager), init_capacity, NULL, NULL, + false)) != 0) + { + return result; + } + + if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool_node", + sizeof(ConnectionNode) + sizeof(ConnectionInfo), + 4 * init_capacity, NULL, NULL, false)) != 0) + { + return result; + } + + return hash_init(&(cp->hash_array), simple_hash, init_capacity, 0.75); } -int conn_pool_init(ConnectionPool *cp, int connect_timeout, - const int max_count_per_entry, const int max_idle_time) -{ - const int socket_domain = AF_INET; - return conn_pool_init_ex(cp, connect_timeout, max_count_per_entry, - max_idle_time, socket_domain); -} - -int coon_pool_close_connections(const int index, const HashData *data, void *args) +static int coon_pool_close_connections(const int index, + const HashData *data, void *args) { + ConnectionPool *cp; ConnectionManager *cm; + cp = (ConnectionPool *)args; cm = (ConnectionManager *)data->value; if (cm != NULL) { @@ -58,9 +72,10 @@ int coon_pool_close_connections(const int index, const HashData *data, void *arg node = node->next; conn_pool_disconnect_server(deleted->conn); - free(deleted); + fast_mblock_free_object(&cp->node_allocator, deleted); } - free(cm); + + fast_mblock_free_object(&cp->manager_allocator, cm); } return 0; @@ -69,7 +84,7 @@ int coon_pool_close_connections(const int index, const HashData *data, void *arg void conn_pool_destroy(ConnectionPool *cp) { pthread_mutex_lock(&cp->lock); - hash_walk(&(cp->hash_array), coon_pool_close_connections, NULL); + hash_walk(&(cp->hash_array), coon_pool_close_connections, cp); hash_destroy(&(cp->hash_array)); pthread_mutex_unlock(&cp->lock); @@ -163,7 +178,6 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, { char key[INET6_ADDRSTRLEN + 8]; int key_len; - int bytes; char *p; ConnectionManager *cm; ConnectionNode *node; @@ -176,15 +190,14 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, cm = (ConnectionManager *)hash_find(&cp->hash_array, key, key_len); if (cm == NULL) { - cm = (ConnectionManager *)malloc(sizeof(ConnectionManager)); + cm = (ConnectionManager *)fast_mblock_alloc_object( + &cp->manager_allocator); if (cm == NULL) { - *err_no = errno != 0 ? errno : ENOMEM; - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, errno: %d, " \ - "error info: %s", __LINE__, \ - (int)sizeof(ConnectionManager), \ - *err_no, STRERROR(*err_no)); + *err_no = ENOMEM; + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail", __LINE__, + (int)sizeof(ConnectionManager)); pthread_mutex_unlock(&cp->lock); return NULL; } @@ -220,18 +233,16 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, return NULL; } - bytes = sizeof(ConnectionNode) + sizeof(ConnectionInfo); - p = (char *)malloc(bytes); + p = (char *)fast_mblock_alloc_object(&cp->node_allocator); if (p == NULL) - { - *err_no = errno != 0 ? errno : ENOMEM; - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, errno: %d, " \ - "error info: %s", __LINE__, \ - bytes, *err_no, STRERROR(*err_no)); - pthread_mutex_unlock(&cm->lock); - return NULL; - } + { + *err_no = ENOMEM; + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail", __LINE__, (int) + (sizeof(ConnectionNode) + sizeof(ConnectionInfo))); + pthread_mutex_unlock(&cm->lock); + return NULL; + } node = (ConnectionNode *)p; node->conn = (ConnectionInfo *)(p + sizeof(ConnectionNode)); @@ -245,15 +256,16 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, memcpy(node->conn, conn, sizeof(ConnectionInfo)); node->conn->socket_domain = cp->socket_domain; node->conn->sock = -1; - *err_no = conn_pool_connect_server(node->conn, \ + node->conn->validate_flag = false; + *err_no = conn_pool_connect_server(node->conn, cp->connect_timeout); if (*err_no != 0) { pthread_mutex_lock(&cm->lock); cm->total_count--; //rollback + fast_mblock_free_object(&cp->node_allocator, p); pthread_mutex_unlock(&cm->lock); - free(p); return NULL; } @@ -267,6 +279,8 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, } else { + bool invalid; + node = cm->head; ci = node->conn; cm->head = node->next; @@ -274,6 +288,28 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, if (current_time - node->atime > cp->max_idle_time) { + invalid = true; + } + else if (ci->validate_flag) + { + ci->validate_flag = false; + if (cp->validate_callback.func != NULL) + { + invalid = cp->validate_callback.func(ci, + cp->validate_callback.args) != 0; + } + else + { + invalid = false; + } + } + else + { + invalid = false; + } + + if (invalid) + { cm->total_count--; logDebug("file: "__FILE__", line: %d, " \ @@ -287,7 +323,7 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp, cm->free_count); conn_pool_disconnect_server(ci); - free(node); + fast_mblock_free_object(&cp->node_allocator, node); continue; } @@ -335,18 +371,25 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, pthread_mutex_lock(&cm->lock); if (bForce) - { - cm->total_count--; + { + cm->total_count--; - logDebug("file: "__FILE__", line: %d, " \ - "server %s:%d, release connection: %d, " \ - "total_count: %d, free_count: %d", - __LINE__, conn->ip_addr, conn->port, - conn->sock, cm->total_count, cm->free_count); + logDebug("file: "__FILE__", line: %d, " + "server %s:%d, release connection: %d, " + "total_count: %d, free_count: %d", + __LINE__, conn->ip_addr, conn->port, + conn->sock, cm->total_count, cm->free_count); - conn_pool_disconnect_server(conn); - free(node); - } + conn_pool_disconnect_server(conn); + fast_mblock_free_object(&cp->node_allocator, node); + + node = cm->head; + while (node != NULL) + { + node->conn->validate_flag = true; + node = node->next; + } + } else { node->atime = get_current_time(); diff --git a/src/connection_pool.h b/src/connection_pool.h index 135e490..2e041a4 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -16,9 +16,10 @@ #include #include #include "common_define.h" +#include "fast_mblock.h" +#include "ini_file_reader.h" #include "pthread_func.h" #include "hash.h" -#include "ini_file_reader.h" #ifdef __cplusplus extern "C" { @@ -35,11 +36,14 @@ extern "C" { typedef struct { int sock; - int port; + short port; + short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect + bool validate_flag; //for connection pool char ip_addr[INET6_ADDRSTRLEN]; - int socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect } ConnectionInfo; +typedef int (*fc_validate_connection_func)(ConnectionInfo *conn, void *args); + struct tagConnectionManager; typedef struct tagConnectionNode { @@ -68,8 +72,33 @@ typedef struct tagConnectionPool { */ int max_idle_time; int socket_domain; //socket domain + + struct fast_mblock_man manager_allocator; + struct fast_mblock_man node_allocator; + + struct { + fc_validate_connection_func func; + void *args; + } validate_callback; } ConnectionPool; +/** +* init ex function +* parameters: +* cp: the ConnectionPool +* connect_timeout: the connect timeout in seconds +* max_count_per_entry: max connection count per host:port +* max_idle_time: reconnect the server after max idle time in seconds +* socket_domain: the socket domain +* validate_func: the validate connection callback +* validate_args: the args for validate connection callback +* return 0 for success, != 0 for error +*/ +int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, + const int max_count_per_entry, const int max_idle_time, + const int socket_domain, const int htable_init_capacity, + fc_validate_connection_func validate_func, void *validate_args); + /** * init ex function * parameters: @@ -80,9 +109,14 @@ typedef struct tagConnectionPool { * socket_domain: the socket domain * return 0 for success, != 0 for error */ -int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, +static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, const int max_count_per_entry, const int max_idle_time, - const int socket_domain); + const int socket_domain) +{ + const int htable_init_capacity = 0; + return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, + max_idle_time, socket_domain, htable_init_capacity, NULL, NULL); +} /** * init function @@ -93,8 +127,14 @@ int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, * max_idle_time: reconnect the server after max idle time in seconds * return 0 for success, != 0 for error */ -int conn_pool_init(ConnectionPool *cp, int connect_timeout, - const int max_count_per_entry, const int max_idle_time); +static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout, + const int max_count_per_entry, const int max_idle_time) +{ + const int socket_domain = AF_INET; + const int htable_init_capacity = 0; + return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, + max_idle_time, socket_domain, htable_init_capacity, NULL, NULL); +} /** * destroy function diff --git a/src/fast_mblock.h b/src/fast_mblock.h index 27d8ddb..084ed27 100644 --- a/src/fast_mblock.h +++ b/src/fast_mblock.h @@ -165,7 +165,8 @@ parameters: return error no, 0 for success, != 0 fail */ static inline int fast_mblock_init_ex1(struct fast_mblock_man *mblock, - const char *name, const int element_size, const int alloc_elements_once, + const char *name, const int element_size, + const int alloc_elements_once, fast_mblock_alloc_init_func init_func, void *init_args, const bool need_lock) { diff --git a/src/tests/test_uniq_skiplist.c b/src/tests/test_uniq_skiplist.c index 74928f0..ac07b95 100644 --- a/src/tests/test_uniq_skiplist.c +++ b/src/tests/test_uniq_skiplist.c @@ -254,7 +254,7 @@ int main(int argc, char *argv[]) srand(time(NULL)); fast_mblock_manager_init(); - result = uniq_skiplist_init_ex(&factory, LEVEL_COUNT, compare_func, + result = uniq_skiplist_init_ex2(&factory, LEVEL_COUNT, compare_func, free_test_func, 0, MIN_ALLOC_ONCE, 0, true); if (result != 0) { return result;