From f3e24601d25d29dfb786c18bac939c362e1a92f3 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 22 Feb 2021 19:37:57 +0800 Subject: [PATCH] connection manager: support detect server for alive --- src/sf_connection_manager.c | 89 +++++++++++++++++++++++++++++-------- src/sf_connection_manager.h | 37 ++++++++------- 2 files changed, 92 insertions(+), 34 deletions(-) diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 210a6f8..ad13b26 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -171,6 +171,22 @@ static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, return NULL; } +static inline int push_to_detect_queue(SFConnectionManager *cm, + SFCMConnGroupEntry *group, SFCMServerPtrArray *alives) +{ + if (!cm->alive_detect.bg_thread_enabled) { + return 0; + } + + if (alives->count < group->all.count) { + if (__sync_bool_compare_and_swap(&group->in_queue, 0, 1)) { + return common_blocked_queue_push(&cm->alive_detect.queue, group); + } + } + + return 0; +} + static inline bool alive_array_cas(SFConnectionManager *cm, SFCMConnGroupEntry *group, SFCMServerPtrArray *old_alives, SFCMServerPtrArray *new_alives) @@ -178,9 +194,15 @@ static inline bool alive_array_cas(SFConnectionManager *cm, if (__sync_bool_compare_and_swap(&group->alives, old_alives, new_alives)) { + logInfo("file: "__FILE__", line: %d, " + "[%s] group_id: %d, old alive server count: %d, " + "new alive server count: %d", __LINE__, cm->module_name, + group->id, old_alives->count, new_alives->count); + fast_mblock_delay_free_object(&cm->sptr_array_allocator, old_alives, (cm->common_cfg->connect_timeout + cm->common_cfg-> network_timeout) * group->all.count); + push_to_detect_queue(cm, group, new_alives); return true; } else { fast_mblock_free_object(&cm->sptr_array_allocator, new_alives); @@ -450,10 +472,7 @@ static int validate_connection_callback(ConnectionInfo *conn, void *args) static int init_group_array(SFConnectionManager *cm, SFCMConnGroupArray *garray, const int group_count) { - int result; int bytes; - SFCMConnGroupEntry *group; - SFCMConnGroupEntry *end; bytes = sizeof(SFCMConnGroupEntry) * group_count; garray->entries = (SFCMConnGroupEntry *)fc_malloc(bytes); @@ -461,24 +480,16 @@ static int init_group_array(SFConnectionManager *cm, return ENOMEM; } memset(garray->entries, 0, bytes); - - end = garray->entries + group_count; - for (group=garray->entries; grouplock)) != 0) { - return result; - } - } - garray->count = group_count; return 0; } int sf_connection_manager_init_ex(SFConnectionManager *cm, - const SFClientCommonConfig *common_cfg, const int group_count, - const int server_group_index, const int server_count, - const int max_count_per_entry, const int max_idle_time, - fc_connection_callback_func connect_done_callback, void *args, - const bool bg_thread_enabled) + const char *module_name, const SFClientCommonConfig *common_cfg, + const int group_count, const int server_group_index, + const int server_count, const int max_count_per_entry, + const int max_idle_time, fc_connection_callback_func + connect_done_callback, void *args, const bool bg_thread_enabled) { const int socket_domain = AF_INET; int htable_init_capacity; @@ -501,9 +512,18 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, return result; } + if (bg_thread_enabled) { + if ((result=common_blocked_queue_init(&cm-> + alive_detect.queue)) != 0) + { + return result; + } + } + cm->server_group_index = server_group_index; + cm->module_name = module_name; cm->common_cfg = common_cfg; - cm->bg_thread_enabled = bg_thread_enabled; + cm->alive_detect.bg_thread_enabled = bg_thread_enabled; cm->max_servers_per_group = 0; cm->extra = NULL; @@ -678,6 +698,7 @@ static int do_get_group_servers(SFConnectionManager *cm, { return result; } + old_alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); if (sptr_array_compare(old_alives, new_alives) == 0) { fast_mblock_free_object(&cm->sptr_array_allocator, new_alives); @@ -764,13 +785,43 @@ static int get_group_servers(SFConnectionManager *cm, return get_group_servers_by_all(cm, group); } +static void deal_nodes(SFConnectionManager *cm, + struct common_blocked_node *node) +{ + SFCMConnGroupEntry *group; + SFCMServerPtrArray *alives; + + do { + group = (SFCMConnGroupEntry *)node->data; + __sync_bool_compare_and_swap(&group->in_queue, 1, 0); + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (alives->count < group->all.count) { + if (get_group_servers(cm, group) != 0) { + push_to_detect_queue(cm, group, (SFCMServerPtrArray *) + FC_ATOMIC_GET(group->alives)); + } + } + + node = node->next; + } while (node != NULL); +} + static void *connection_manager_thread_func(void *arg) { SFConnectionManager *cm; + struct common_blocked_node *head; cm = (SFConnectionManager *)arg; while (1) { - //TODO + sleep(1); + if ((head=common_blocked_queue_pop_all_nodes(&cm-> + alive_detect.queue)) == NULL) + { + continue; + } + + deal_nodes(cm, head); + common_blocked_queue_free_all_nodes(&cm->alive_detect.queue, head); } return NULL; @@ -820,7 +871,7 @@ int sf_connection_manager_start(SFConnectionManager *cm) __sync_bool_compare_and_swap(&group->alives, NULL, sptr_array); } - if (cm->bg_thread_enabled) { + if (cm->alive_detect.bg_thread_enabled) { return fc_create_thread(&tid, connection_manager_thread_func, cm, SF_G_THREAD_STACK_SIZE); } else { diff --git a/src/sf_connection_manager.h b/src/sf_connection_manager.h index 88779f7..6d48235 100644 --- a/src/sf_connection_manager.h +++ b/src/sf_connection_manager.h @@ -18,6 +18,7 @@ #ifndef _SF_CONNECTION_MANAGER_H #define _SF_CONNECTION_MANAGER_H +#include "fastcommon/common_blocked_queue.h" #include "fastcommon/server_id_func.h" #include "fastcommon/connection_pool.h" #include "sf_types.h" @@ -62,10 +63,10 @@ typedef struct sf_cm_server_ptr_array { typedef struct sf_cm_conn_group_entry { int id; + volatile char in_queue; //if in active detect queue SFCMServerArray all; volatile SFCMServerEntry *master; volatile SFCMServerPtrArray *alives; - pthread_mutex_t lock; } SFCMConnGroupEntry; typedef struct sf_cm_conn_group_array { @@ -113,32 +114,38 @@ typedef struct sf_cm_simple_extra { typedef struct sf_connection_manager { short server_group_index; short max_servers_per_group; - bool bg_thread_enabled; + struct { + bool bg_thread_enabled; + struct common_blocked_queue queue; + } alive_detect; + const char *module_name; const SFClientCommonConfig *common_cfg; SFCMConnGroupArray groups; ConnectionPool cpool; struct fast_mblock_man sptr_array_allocator; //element: SFCMServerPtrArray SFCMOperations ops; - SFCMSimpleExtra *extra; //for simple + SFCMSimpleExtra *extra; //for simple connection manager } SFConnectionManager; int sf_connection_manager_init_ex(SFConnectionManager *cm, - const SFClientCommonConfig *common_cfg, const int group_count, - const int server_group_index, const int server_count, - const int max_count_per_entry, const int max_idle_time, - fc_connection_callback_func connect_done_callback, void *args, - const bool bg_thread_enabled); + const char *module_name, const SFClientCommonConfig *common_cfg, + const int group_count, const int server_group_index, + const int server_count, const int max_count_per_entry, + const int max_idle_time, fc_connection_callback_func + connect_done_callback, void *args, const bool bg_thread_enabled); static inline int sf_connection_manager_init(SFConnectionManager *cm, - const SFClientCommonConfig *common_cfg, const int group_count, - const int server_group_index, const int server_count, - const int max_count_per_entry, const int max_idle_time, - fc_connection_callback_func connect_done_callback, void *args) + const char *module_name, const SFClientCommonConfig *common_cfg, + const int group_count, const int server_group_index, + const int server_count, const int max_count_per_entry, + const int max_idle_time, fc_connection_callback_func + connect_done_callback, void *args) { const bool bg_thread_enabled = true; - return sf_connection_manager_init_ex(cm, common_cfg, group_count, - server_group_index, server_count, max_count_per_entry, - max_idle_time, connect_done_callback, args, bg_thread_enabled); + return sf_connection_manager_init_ex(cm, module_name, + common_cfg, group_count, server_group_index, + server_count, max_count_per_entry, max_idle_time, + connect_done_callback, args, bg_thread_enabled); } int sf_connection_manager_add(SFConnectionManager *cm, const int group_id,