idempotency channel heartbeat

connection_manager
YuQing 2020-09-16 10:46:52 +08:00
parent 6433e3e8d9
commit 4d1dfb9cab
7 changed files with 195 additions and 32 deletions

View File

@ -37,7 +37,51 @@ typedef struct {
static ClientChannelContext channel_context;
static int init_htable(ClientChannelHashtable *htable, const int hint_capacity)
IdempotencyClientConfig g_idempotency_client_cfg = {false, 3, 300};
static int load_client_channel_config(IniFullContext *ini_ctx)
{
g_idempotency_client_cfg.channel_htable_capacity = iniGetIntValue(
ini_ctx->section_name, "channel_htable_capacity",
ini_ctx->context, 1361);
if (g_idempotency_client_cfg.channel_htable_capacity < 163) {
logWarning("file: "__FILE__", line: %d, "
"config file: %s, channel_htable_capacity: %d is "
"too small, set to 163", __LINE__, ini_ctx->filename,
g_idempotency_client_cfg.channel_htable_capacity);
g_idempotency_client_cfg.channel_htable_capacity = 163;
}
g_idempotency_client_cfg.channel_heartbeat_interval = iniGetIntValue(
ini_ctx->section_name, "channel_heartbeat_interval",
ini_ctx->context, 3);
if (g_idempotency_client_cfg.channel_heartbeat_interval <= 0) {
logWarning("file: "__FILE__", line: %d, "
"config file: %s, channel_heartbeat_interval: %d is "
"invalid, set to 3", __LINE__, ini_ctx->filename,
g_idempotency_client_cfg.channel_heartbeat_interval);
g_idempotency_client_cfg.channel_heartbeat_interval = 3;
}
g_idempotency_client_cfg.channel_max_idle_time = iniGetIntValue(
ini_ctx->section_name, "channel_max_idle_time",
ini_ctx->context, 3);
return 0;
}
void idempotency_client_channel_config_to_string_ex(
char *output, const int size, const bool add_comma)
{
snprintf(output, size, "channel_htable_capacity=%d, "
"channel_heartbeat_interval=%ds, "
"channel_max_idle_time=%ds%s",
g_idempotency_client_cfg.channel_htable_capacity,
g_idempotency_client_cfg.channel_heartbeat_interval,
g_idempotency_client_cfg.channel_max_idle_time,
(add_comma ? ", " : ""));
}
static int init_htable(ClientChannelHashtable *htable)
{
int result;
int bytes;
@ -46,11 +90,8 @@ static int init_htable(ClientChannelHashtable *htable, const int hint_capacity)
return result;
}
if (hint_capacity <= 1024) {
htable->capacity = 1361;
} else {
htable->capacity = fc_ceil_prime(hint_capacity);
}
htable->capacity = fc_ceil_prime(g_idempotency_client_cfg.
channel_htable_capacity);
bytes = sizeof(IdempotencyClientChannel *) * htable->capacity;
htable->buckets = (IdempotencyClientChannel **)fc_malloc(bytes);
if (htable->buckets == NULL) {
@ -84,9 +125,14 @@ static int idempotency_channel_alloc_init(void *element, void *args)
(&((IdempotencyClientReceipt *)NULL)->next));
}
int client_channel_init_ex(const int hint_capacity)
int client_channel_init(IniFullContext *ini_ctx)
{
int result;
if ((result=load_client_channel_config(ini_ctx)) != 0) {
return result;
}
if ((result=fast_mblock_init_ex1(&channel_context.channel_allocator,
"channel_info", sizeof(IdempotencyClientChannel),
64, 0, idempotency_channel_alloc_init, NULL, true)) != 0)
@ -94,7 +140,7 @@ int client_channel_init_ex(const int hint_capacity)
return result;
}
if ((result=init_htable(&channel_context.htable, hint_capacity)) != 0) {
if ((result=init_htable(&channel_context.htable)) != 0) {
return result;
}
@ -130,7 +176,7 @@ struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel,
task->thread_data = g_sf_context.thread_data +
hash_code % g_sf_context.work_threads;
channel->in_ioevent = 1;
channel->last_connect_time = get_current_time();
channel->last_connect_time = g_current_time;
if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) {
channel->in_ioevent = 0;
free_queue_push(task);
@ -143,16 +189,14 @@ int idempotency_client_channel_check_reconnect(
IdempotencyClientChannel *channel)
{
int result;
time_t current_time;
if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) {
return 0;
}
current_time = get_current_time();
if (channel->last_connect_time >= current_time) {
if (channel->last_connect_time >= g_current_time) {
sleep(1);
channel->last_connect_time = ++current_time;
channel->last_connect_time = g_current_time;
}
logDebug("file: "__FILE__", line: %d, "
@ -162,7 +206,8 @@ int idempotency_client_channel_check_reconnect(
channel->task->canceled = false;
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {
channel->last_connect_time = current_time;
channel->last_connect_time = g_current_time;
channel->last_report_time = g_current_time;
} else {
__sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); //rollback
}

View File

@ -3,6 +3,7 @@
#ifndef IDEMPOTENCY_CLIENT_CHANNEL_H
#define IDEMPOTENCY_CLIENT_CHANNEL_H
#include "fastcommon/ini_file_reader.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/fc_atomic.h"
@ -12,11 +13,17 @@
extern "C" {
#endif
#define client_channel_init() client_channel_init_ex(0)
extern IdempotencyClientConfig g_idempotency_client_cfg;
int client_channel_init_ex(const int hint_capacity);
int client_channel_init(IniFullContext *ini_ctx);
void client_channel_destroy();
#define idempotency_client_channel_config_to_string(output, size) \
idempotency_client_channel_config_to_string_ex(output, size, false)
void idempotency_client_channel_config_to_string_ex(
char *output, const int size, const bool add_comma);
struct idempotency_client_channel *idempotency_client_channel_get(
const char *server_ip, const short server_port,
const int timeout, int *err_no);

View File

@ -7,6 +7,12 @@
#include "fastcommon/fc_list.h"
#include "fastcommon/fc_queue.h"
typedef struct idempotency_client_config {
int channel_htable_capacity;
int channel_heartbeat_interval;
int channel_max_idle_time;
} IdempotencyClientConfig;
typedef struct idempotency_client_receipt {
uint64_t req_id;
struct idempotency_client_receipt *next;
@ -16,10 +22,11 @@ typedef struct idempotency_client_channel {
volatile uint32_t id; //channel id, 0 for invalid
volatile int key; //channel key
volatile char in_ioevent;
volatile char in_heartbeat;
//volatile char in_heartbeat;
volatile char established;
time_t last_connect_time;
time_t last_pkg_time; //last communication time
time_t last_connect_time; //for connect frequency control
time_t last_pkg_time; //last communication time
time_t last_report_time; //last report time for rpc receipt
pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify
volatile uint64_t next_req_id;
struct fast_mblock_man receipt_allocator;
@ -32,6 +39,10 @@ typedef struct idempotency_client_channel {
typedef struct idempotency_receipt_thread_context {
struct fc_list_head head; //LRU head for hearbeat
struct {
time_t heartbeat;
time_t idle;
} last_check_times;
} IdempotencyReceiptThreadContext;
#ifdef __cplusplus

View File

@ -168,6 +168,29 @@ static int check_report_req_receipt(struct fast_task_info *task,
return sf_send_add_event(task);
}
static int close_channel_request(struct fast_task_info *task)
{
IdempotencyClientChannel *channel;
SFCommonProtoHeader *header;
channel = (IdempotencyClientChannel *)task->arg;
idempotency_client_channel_set_id_key(channel, 0, 0);
header = (SFCommonProtoHeader *)task->data;
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0);
task->length = sizeof(SFCommonProtoHeader);
return sf_send_add_event(task);
}
static int active_test_request(struct fast_task_info *task)
{
SFCommonProtoHeader *header;
header = (SFCommonProtoHeader *)task->data;
SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0);
task->length = sizeof(SFCommonProtoHeader);
return sf_send_add_event(task);
}
static inline void update_lru_chain(struct fast_task_info *task)
{
IdempotencyReceiptThreadContext *thread_ctx;
@ -192,6 +215,8 @@ static int report_req_receipt_request(struct fast_task_info *task,
if (count == 0) {
result = sf_set_read_event(task);
} else if (update_lru) {
((IdempotencyClientChannel *)task->arg)->
last_report_time = g_current_time;
update_lru_chain(task);
}
@ -336,6 +361,15 @@ static int receipt_deal_task(struct fast_task_info *task)
case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP:
result = deal_report_req_receipt_response(task);
break;
case SF_PROTO_ACTIVE_TEST_RESP:
result = 0;
break;
case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP:
result = ECONNRESET; //force to close socket
logDebug("file: "__FILE__", line: %d, "
"close channel to server %s:%d !!!",
__LINE__, task->server_ip, task->port);
break;
default:
logError("file: "__FILE__", line: %d, "
"response from server %s:%d, unexpect cmd: %d (%s)",
@ -356,16 +390,64 @@ static int receipt_deal_task(struct fast_task_info *task)
return result > 0 ? -1 * result : result;
}
static int receipt_thread_loop_callback(struct nio_thread_data *thread_data)
static void receipt_thread_check_heartbeat(
IdempotencyReceiptThreadContext *thread_ctx)
{
IdempotencyClientChannel *channel;
IdempotencyClientChannel *tmp;
IdempotencyReceiptThreadContext *thread_ctx;
thread_ctx = (IdempotencyReceiptThreadContext *)thread_data->arg;
fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) {
//check heartbeat
//channel->task
if (g_current_time - channel->last_pkg_time <
g_idempotency_client_cfg.channel_heartbeat_interval)
{
break;
}
if (sf_nio_task_is_idle(channel->task)) {
channel->last_pkg_time = g_current_time;
active_test_request(channel->task);
}
}
}
static void receipt_thread_close_idle_channel(
IdempotencyReceiptThreadContext *thread_ctx)
{
IdempotencyClientChannel *channel;
IdempotencyClientChannel *tmp;
fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) {
if (!sf_nio_task_is_idle(channel->task)) {
continue;
}
if (g_current_time - channel->last_report_time >
g_idempotency_client_cfg.channel_max_idle_time)
{
logDebug("file: "__FILE__", line: %d, "
"close channel to server %s:%d because idle too long",
__LINE__, channel->task->server_ip, channel->task->port);
close_channel_request(channel->task);
}
}
}
static int receipt_thread_loop_callback(struct nio_thread_data *thread_data)
{
IdempotencyReceiptThreadContext *thread_ctx;
thread_ctx = (IdempotencyReceiptThreadContext *)thread_data->arg;
if (g_current_time - thread_ctx->last_check_times.heartbeat > 0) {
thread_ctx->last_check_times.heartbeat = g_current_time;
receipt_thread_check_heartbeat(thread_ctx);
}
if ((g_idempotency_client_cfg.channel_max_idle_time > 0) &&
(g_current_time - thread_ctx->last_check_times.idle >
g_idempotency_client_cfg.channel_max_idle_time))
{
thread_ctx->last_check_times.idle = g_current_time;
receipt_thread_close_idle_channel(thread_ctx);
}
return 0;
@ -382,11 +464,15 @@ static void *receipt_alloc_thread_extra_data(const int thread_index)
int receipt_handler_init()
{
receipt_thread_contexts = (IdempotencyReceiptThreadContext *)fc_malloc(
sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS);
int bytes;
bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS;
receipt_thread_contexts = (IdempotencyReceiptThreadContext *)
fc_malloc(bytes);
if (receipt_thread_contexts == NULL) {
return ENOMEM;
}
memset(receipt_thread_contexts, 0, bytes);
return sf_service_init_ex2(&g_sf_context,
receipt_alloc_thread_extra_data, receipt_thread_loop_callback,

View File

@ -56,6 +56,9 @@ int sf_nio_notify_ex(struct fast_task_info *task, const int new_stage,
#define sf_nio_notify(task, new_stage) \
sf_nio_notify_ex(task, new_stage, __FILE__, __LINE__)
#define sf_nio_task_is_idle(task) \
(task->offset == 0 && task->length == 0)
int sf_set_read_event(struct fast_task_info *task);
void sf_task_switch_thread(struct fast_task_info *task,

View File

@ -187,6 +187,12 @@ int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response,
const char *sf_get_cmd_caption(const int cmd)
{
switch (cmd) {
case SF_PROTO_ACK:
return "ACK";
case SF_PROTO_ACTIVE_TEST_REQ:
return "ACTIVE_TEST_REQ";
case SF_PROTO_ACTIVE_TEST_RESP:
return "ACTIVE_TEST_RESP";
case SF_SERVICE_PROTO_SETUP_CHANNEL_REQ:
return "SETUP_CHANNEL_REQ";
case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP:

View File

@ -10,13 +10,18 @@
#include "fastcommon/sockopt.h"
#include "sf_types.h"
#define SF_PROTO_ACK 116
#define SF_PROTO_ACTIVE_TEST_REQ 117
#define SF_PROTO_ACTIVE_TEST_RESP 118
//for request idempotency
#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 111
#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 112
#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113
#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114
#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115
#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116
#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 121
#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 122
#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 123
#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 124
#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 125
#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 126
#define SF_PROTO_MAGIC_CHAR '@'
#define SF_PROTO_SET_MAGIC(m) \