connection manager: support detect server for alive

connection_manager
YuQing 2021-02-22 19:37:57 +08:00
parent f7ac526284
commit f3e24601d2
2 changed files with 92 additions and 34 deletions

View File

@ -171,6 +171,22 @@ static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
return NULL;
}
static inline int push_to_detect_queue(SFConnectionManager *cm,
SFCMConnGroupEntry *group, SFCMServerPtrArray *alives)
{
if (!cm->alive_detect.bg_thread_enabled) {
return 0;
}
if (alives->count < group->all.count) {
if (__sync_bool_compare_and_swap(&group->in_queue, 0, 1)) {
return common_blocked_queue_push(&cm->alive_detect.queue, group);
}
}
return 0;
}
static inline bool alive_array_cas(SFConnectionManager *cm,
SFCMConnGroupEntry *group, SFCMServerPtrArray *old_alives,
SFCMServerPtrArray *new_alives)
@ -178,9 +194,15 @@ static inline bool alive_array_cas(SFConnectionManager *cm,
if (__sync_bool_compare_and_swap(&group->alives,
old_alives, new_alives))
{
logInfo("file: "__FILE__", line: %d, "
"[%s] group_id: %d, old alive server count: %d, "
"new alive server count: %d", __LINE__, cm->module_name,
group->id, old_alives->count, new_alives->count);
fast_mblock_delay_free_object(&cm->sptr_array_allocator, old_alives,
(cm->common_cfg->connect_timeout + cm->common_cfg->
network_timeout) * group->all.count);
push_to_detect_queue(cm, group, new_alives);
return true;
} else {
fast_mblock_free_object(&cm->sptr_array_allocator, new_alives);
@ -450,10 +472,7 @@ static int validate_connection_callback(ConnectionInfo *conn, void *args)
static int init_group_array(SFConnectionManager *cm,
SFCMConnGroupArray *garray, const int group_count)
{
int result;
int bytes;
SFCMConnGroupEntry *group;
SFCMConnGroupEntry *end;
bytes = sizeof(SFCMConnGroupEntry) * group_count;
garray->entries = (SFCMConnGroupEntry *)fc_malloc(bytes);
@ -461,24 +480,16 @@ static int init_group_array(SFConnectionManager *cm,
return ENOMEM;
}
memset(garray->entries, 0, bytes);
end = garray->entries + group_count;
for (group=garray->entries; group<end; group++) {
if ((result=init_pthread_lock(&group->lock)) != 0) {
return result;
}
}
garray->count = group_count;
return 0;
}
int sf_connection_manager_init_ex(SFConnectionManager *cm,
const SFClientCommonConfig *common_cfg, const int group_count,
const int server_group_index, const int server_count,
const int max_count_per_entry, const int max_idle_time,
fc_connection_callback_func connect_done_callback, void *args,
const bool bg_thread_enabled)
const char *module_name, const SFClientCommonConfig *common_cfg,
const int group_count, const int server_group_index,
const int server_count, const int max_count_per_entry,
const int max_idle_time, fc_connection_callback_func
connect_done_callback, void *args, const bool bg_thread_enabled)
{
const int socket_domain = AF_INET;
int htable_init_capacity;
@ -501,9 +512,18 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm,
return result;
}
if (bg_thread_enabled) {
if ((result=common_blocked_queue_init(&cm->
alive_detect.queue)) != 0)
{
return result;
}
}
cm->server_group_index = server_group_index;
cm->module_name = module_name;
cm->common_cfg = common_cfg;
cm->bg_thread_enabled = bg_thread_enabled;
cm->alive_detect.bg_thread_enabled = bg_thread_enabled;
cm->max_servers_per_group = 0;
cm->extra = NULL;
@ -678,6 +698,7 @@ static int do_get_group_servers(SFConnectionManager *cm,
{
return result;
}
old_alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
if (sptr_array_compare(old_alives, new_alives) == 0) {
fast_mblock_free_object(&cm->sptr_array_allocator, new_alives);
@ -764,13 +785,43 @@ static int get_group_servers(SFConnectionManager *cm,
return get_group_servers_by_all(cm, group);
}
static void deal_nodes(SFConnectionManager *cm,
struct common_blocked_node *node)
{
SFCMConnGroupEntry *group;
SFCMServerPtrArray *alives;
do {
group = (SFCMConnGroupEntry *)node->data;
__sync_bool_compare_and_swap(&group->in_queue, 1, 0);
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
if (alives->count < group->all.count) {
if (get_group_servers(cm, group) != 0) {
push_to_detect_queue(cm, group, (SFCMServerPtrArray *)
FC_ATOMIC_GET(group->alives));
}
}
node = node->next;
} while (node != NULL);
}
static void *connection_manager_thread_func(void *arg)
{
SFConnectionManager *cm;
struct common_blocked_node *head;
cm = (SFConnectionManager *)arg;
while (1) {
//TODO
sleep(1);
if ((head=common_blocked_queue_pop_all_nodes(&cm->
alive_detect.queue)) == NULL)
{
continue;
}
deal_nodes(cm, head);
common_blocked_queue_free_all_nodes(&cm->alive_detect.queue, head);
}
return NULL;
@ -820,7 +871,7 @@ int sf_connection_manager_start(SFConnectionManager *cm)
__sync_bool_compare_and_swap(&group->alives, NULL, sptr_array);
}
if (cm->bg_thread_enabled) {
if (cm->alive_detect.bg_thread_enabled) {
return fc_create_thread(&tid, connection_manager_thread_func,
cm, SF_G_THREAD_STACK_SIZE);
} else {

View File

@ -18,6 +18,7 @@
#ifndef _SF_CONNECTION_MANAGER_H
#define _SF_CONNECTION_MANAGER_H
#include "fastcommon/common_blocked_queue.h"
#include "fastcommon/server_id_func.h"
#include "fastcommon/connection_pool.h"
#include "sf_types.h"
@ -62,10 +63,10 @@ typedef struct sf_cm_server_ptr_array {
typedef struct sf_cm_conn_group_entry {
int id;
volatile char in_queue; //if in active detect queue
SFCMServerArray all;
volatile SFCMServerEntry *master;
volatile SFCMServerPtrArray *alives;
pthread_mutex_t lock;
} SFCMConnGroupEntry;
typedef struct sf_cm_conn_group_array {
@ -113,32 +114,38 @@ typedef struct sf_cm_simple_extra {
typedef struct sf_connection_manager {
short server_group_index;
short max_servers_per_group;
bool bg_thread_enabled;
struct {
bool bg_thread_enabled;
struct common_blocked_queue queue;
} alive_detect;
const char *module_name;
const SFClientCommonConfig *common_cfg;
SFCMConnGroupArray groups;
ConnectionPool cpool;
struct fast_mblock_man sptr_array_allocator; //element: SFCMServerPtrArray
SFCMOperations ops;
SFCMSimpleExtra *extra; //for simple
SFCMSimpleExtra *extra; //for simple connection manager
} SFConnectionManager;
int sf_connection_manager_init_ex(SFConnectionManager *cm,
const SFClientCommonConfig *common_cfg, const int group_count,
const int server_group_index, const int server_count,
const int max_count_per_entry, const int max_idle_time,
fc_connection_callback_func connect_done_callback, void *args,
const bool bg_thread_enabled);
const char *module_name, const SFClientCommonConfig *common_cfg,
const int group_count, const int server_group_index,
const int server_count, const int max_count_per_entry,
const int max_idle_time, fc_connection_callback_func
connect_done_callback, void *args, const bool bg_thread_enabled);
static inline int sf_connection_manager_init(SFConnectionManager *cm,
const SFClientCommonConfig *common_cfg, const int group_count,
const int server_group_index, const int server_count,
const int max_count_per_entry, const int max_idle_time,
fc_connection_callback_func connect_done_callback, void *args)
const char *module_name, const SFClientCommonConfig *common_cfg,
const int group_count, const int server_group_index,
const int server_count, const int max_count_per_entry,
const int max_idle_time, fc_connection_callback_func
connect_done_callback, void *args)
{
const bool bg_thread_enabled = true;
return sf_connection_manager_init_ex(cm, common_cfg, group_count,
server_group_index, server_count, max_count_per_entry,
max_idle_time, connect_done_callback, args, bg_thread_enabled);
return sf_connection_manager_init_ex(cm, module_name,
common_cfg, group_count, server_group_index,
server_count, max_count_per_entry, max_idle_time,
connect_done_callback, args, bg_thread_enabled);
}
int sf_connection_manager_add(SFConnectionManager *cm, const int group_id,