Merge branch 'recovery_and_balance'

vote_node
YuQing 2022-04-24 08:26:18 +08:00
commit dcd024019b
13 changed files with 367 additions and 130 deletions

View File

@ -2,7 +2,7 @@
%define CommitVersion %(echo $COMMIT_VERSION)
Name: libserverframe
Version: 1.1.13
Version: 1.1.14
Release: 1%{?dist}
Summary: network framework library
License: AGPL v3.0
@ -12,9 +12,9 @@ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
BuildRequires: libfastcommon-devel >= 1.0.56
BuildRequires: libfastcommon-devel >= 1.0.57
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
Requires: libfastcommon >= 1.0.56
Requires: libfastcommon >= 1.0.57
%description
common framework library

View File

@ -80,7 +80,19 @@ static inline int idempotency_client_channel_check_wait_ex(
idempotency_client_channel_check_reconnect(channel);
lcp_timedwait_sec(&channel->lc_pair, timeout);
return __sync_add_and_fetch(&channel->established, 0) ? 0 : ETIMEDOUT;
if (__sync_add_and_fetch(&channel->established, 0)) {
return 0;
} else {
/*
logInfo("file: "__FILE__", line: %d, "
"channel_check fail, server %s:%u, in_ioevent: %d, "
"canceled: %d, req count: %"PRId64, __LINE__, channel->task->server_ip,
channel->task->port, __sync_add_and_fetch(&channel->
in_ioevent, 0), __sync_add_and_fetch(&channel->
task->canceled, 0), channel->task->req_count);
*/
return ETIMEDOUT;
}
}
#ifdef __cplusplus

View File

@ -68,8 +68,10 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task)
__LINE__, task->server_ip, task->port);
} else {
logError("file: "__FILE__", line: %d, "
"communication with server %s:%u timeout",
__LINE__, task->server_ip, task->port);
"communication with server %s:%u timeout, "
"channel established: %d", __LINE__,
task->server_ip, task->port,
FC_ATOMIC_GET(channel->established));
}
return ETIMEDOUT;
@ -85,8 +87,11 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
task->event.fd = -1;
}
channel = (IdempotencyClientChannel *)task->arg;
task->length = 0;
task->offset = 0;
task->req_count = 0;
channel = (IdempotencyClientChannel *)task->arg;
fc_list_del_init(&channel->dlink);
__sync_bool_compare_and_swap(&channel->established, 1, 0);
__sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0);
@ -334,12 +339,15 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
setup_channel_request(task);
result = 0;
break;
} else if (stage == SF_NIO_STAGE_CONTINUE && task->length == 0) {
} else if (stage == SF_NIO_STAGE_CONTINUE) {
if (task->length == 0 && task->offset == 0) {
if (((IdempotencyClientChannel *)task->arg)->established) {
report_req_receipt_request(task, true);
} else {
} else if (task->req_count > 0) {
sf_set_read_event(task); //trigger read event
}
}
result = 0;
break;
}

View File

@ -34,6 +34,8 @@
#include "sf_func.h"
#include "sf_binlog_writer.h"
#define ERRNO_THREAD_EXIT -1000
static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer,
const uint64_t next_version)
{
@ -44,7 +46,8 @@ static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer,
}
#define deal_binlog_one_record(wb) \
sf_file_writer_deal_buffer(&wb->writer->fw, &wb->bf, wb->version.last)
sf_file_writer_deal_versioned_buffer(&wb->writer->fw, \
&wb->bf, wb->version.last)
#define GET_WBUFFER_VERSION_COUNT(wb) \
(((wb)->version.last - (wb)->version.first) + 1)
@ -190,18 +193,23 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
switch (current->type) {
case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE:
thread->order_by = current->version.first;
current->writer->order_by = current->version.first;
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break;
case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT:
flush_writer_files(thread);
return ERRNO_THREAD_EXIT;
case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION:
if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
if (current->writer->order_by !=
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION)
{
logWarning("file: "__FILE__", line: %d, "
"subdir_name: %s, invalid order by: %d != %d, "
"maybe some mistake happen", __LINE__,
current->writer->fw.cfg.subdir_name, thread->order_by,
SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION);
current->writer->fw.cfg.subdir_name,
current->writer->order_by,
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION);
}
if (current->writer->version_ctx.ring.waiting_count != 0) {
@ -231,7 +239,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
current->writer->fw.total_count++;
add_to_flush_writer_queue(thread, current->writer);
if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
if (current->writer->order_by ==
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION)
{
/* NOTE: current maybe be released in the deal function */
if ((result=deal_record_by_version(current)) != 0) {
return result;
@ -257,7 +267,10 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer)
int count;
if (writer->fw.file.name != NULL) {
fc_queue_terminate(&writer->thread->queue);
while (!fc_queue_empty(&writer->thread->queue)) {
fc_sleep_ms(10);
}
sf_binlog_writer_notify_exit(writer);
count = 0;
while (writer->thread->running && ++count < 300) {
@ -290,6 +303,7 @@ static void *binlog_writer_func(void *arg)
{
SFBinlogWriterThread *thread;
SFBinlogWriterBuffer *wb_head;
int result;
thread = (SFBinlogWriterThread *)arg;
@ -309,12 +323,15 @@ static void *binlog_writer_func(void *arg)
continue;
}
if (deal_binlog_records(thread, wb_head) != 0) {
if ((result=deal_binlog_records(thread, wb_head)) != 0) {
if (result != ERRNO_THREAD_EXIT) {
logCrit("file: "__FILE__", line: %d, "
"deal_binlog_records fail, "
"program exit!", __LINE__);
sf_terminate_myself();
}
break;
}
}
thread->running = false;
@ -341,13 +358,23 @@ static int binlog_wbuffer_alloc_init(void *element, void *args)
return 0;
}
static void binlog_wbuffer_destroy_func(void *element, void *args)
{
SFBinlogWriterBuffer *wbuffer;
wbuffer = (SFBinlogWriterBuffer *)element;
if (wbuffer->bf.buff != NULL) {
free(wbuffer->bf.buff);
}
}
int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer,
const char *data_path, const char *subdir_name,
const int buffer_size)
{
writer->flush.in_queue = false;
return sf_file_writer_init_normal(&writer->fw,
data_path, subdir_name, buffer_size);
memset(writer, 0, sizeof(*writer));
writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE;
return sf_file_writer_init(&writer->fw, data_path,
subdir_name, buffer_size);
}
int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
@ -367,36 +394,43 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
writer->version_ctx.ring.waiting_count = 0;
writer->version_ctx.ring.max_waitings = 0;
writer->version_ctx.change_count = 0;
writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION;
binlog_writer_set_next_version(writer, next_version);
return sf_binlog_writer_init_normal(writer,
data_path, subdir_name, buffer_size);
writer->flush.in_queue = false;
return sf_file_writer_init(&writer->fw, data_path,
subdir_name, buffer_size);
}
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const short order_by, const int max_record_size,
const int writer_count, const bool use_fixed_buffer_size)
const int max_record_size, const int writer_count,
const bool use_fixed_buffer_size)
{
const int alloc_elements_once = 1024;
int result;
int element_size;
pthread_t tid;
int result;
struct fast_mblock_object_callbacks callbacks;
snprintf(thread->name, sizeof(thread->name), "%s", name);
thread->order_mode = order_mode;
thread->order_by = order_by;
thread->use_fixed_buffer_size = use_fixed_buffer_size;
writer->fw.cfg.max_record_size = max_record_size;
writer->thread = thread;
callbacks.init_func = binlog_wbuffer_alloc_init;
callbacks.args = writer;
element_size = sizeof(SFBinlogWriterBuffer);
if (use_fixed_buffer_size) {
element_size += max_record_size;
callbacks.destroy_func = NULL;
} else {
callbacks.destroy_func = binlog_wbuffer_destroy_func;
}
if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog-wbuffer",
if ((result=fast_mblock_init_ex2(&thread->mblock, "binlog-wbuffer",
element_size, alloc_elements_once, 0,
binlog_wbuffer_alloc_init, writer, true)) != 0)
&callbacks, true, NULL)) != 0)
{
return result;
}
@ -417,12 +451,12 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
{
SFBinlogWriterBuffer *buffer;
if (writer->thread->order_by == order_by) {
if (writer->order_by == order_by) {
return 0;
}
if (!(order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE ||
order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION))
if (!(order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE ||
order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION))
{
logError("file: "__FILE__", line: %d, "
"invalid order by: %d!", __LINE__, order_by);
@ -437,6 +471,15 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
return EINVAL;
}
if (order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) {
if (writer->version_ctx.ring.slots == NULL) {
logError("file: "__FILE__", line: %d, "
"the writer is NOT versioned writer, can't "
"set order by to %d!", __LINE__, order_by);
return EINVAL;
}
}
if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, order_by,
order_by, SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE)) == NULL)
{
@ -447,17 +490,31 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
return 0;
}
static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer,
const int buffer_type, const int64_t version)
{
SFBinlogWriterBuffer *buffer;
if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer,
version, version, buffer_type)) == NULL)
{
return ENOMEM;
}
fc_queue_push(&writer->thread->queue, buffer);
return 0;
}
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version)
{
SFBinlogWriterBuffer *buffer;
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION,
next_version);
}
if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version,
next_version, SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL)
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
{
return ENOMEM;
}
fc_queue_push(&writer->thread->queue, buffer);
return 0;
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0);
}

View File

@ -25,12 +25,13 @@
#define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0
#define SF_BINLOG_THREAD_ORDER_MODE_VARY 1
#define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0
#define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1
#define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0
#define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 3
#define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \
(buffer)->version.first = (buffer)->version.last = ver
@ -64,7 +65,6 @@ typedef struct binlog_writer_thread {
volatile bool running;
bool use_fixed_buffer_size;
short order_mode;
short order_by;
struct {
struct sf_binlog_writer_info *head;
struct sf_binlog_writer_info *tail;
@ -81,6 +81,7 @@ typedef struct sf_binlog_writer_info {
} version_ctx;
SFBinlogWriterThread *thread;
short order_by;
struct {
bool in_queue;
struct sf_binlog_writer_info *next;
@ -107,14 +108,13 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const short order_by, const int max_record_size,
const int writer_count, const bool use_fixed_buffer_size);
const int max_record_size, const int writer_count,
const bool use_fixed_buffer_size);
#define sf_binlog_writer_init_thread(thread, name, \
writer, order_by, max_record_size) \
#define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \
sf_binlog_writer_init_thread_ex(thread, name, writer, \
SF_BINLOG_THREAD_ORDER_MODE_FIXED, \
order_by, max_record_size, 1, true)
max_record_size, 1, true)
static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
const char *data_path, const char *subdir_name,
@ -127,9 +127,35 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
return result;
}
return sf_binlog_writer_init_thread(&context->thread, subdir_name,
&context->writer, SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE,
max_record_size);
return sf_binlog_writer_init_thread(&context->thread,
subdir_name, &context->writer, max_record_size);
}
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);
static inline void sf_binlog_writer_destroy_writer(
SFBinlogWriterInfo *writer)
{
sf_file_writer_destroy(&writer->fw);
if (writer->version_ctx.ring.slots != NULL) {
free(writer->version_ctx.ring.slots);
writer->version_ctx.ring.slots = NULL;
}
}
static inline void sf_binlog_writer_destroy_thread(
SFBinlogWriterThread *thread)
{
fast_mblock_destroy(&thread->mblock);
fc_queue_destroy(&thread->queue);
}
static inline void sf_binlog_writer_destroy(
SFBinlogWriterContext *context)
{
sf_binlog_writer_finish(&context->writer);
sf_binlog_writer_destroy_writer(&context->writer);
sf_binlog_writer_destroy_thread(&context->thread);
}
int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
@ -138,14 +164,14 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version);
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer);
#define sf_binlog_writer_set_flags(writer, flags) \
sf_file_writer_set_flags(&(writer)->fw, flags)
#define sf_binlog_writer_get_last_version(writer) \
sf_file_writer_get_last_version(&(writer)->fw)
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);
#define sf_binlog_get_current_write_index(writer) \
sf_file_writer_get_current_index(&(writer)->fw)
@ -196,6 +222,11 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
sf_file_writer_get_index_filename(data_path, \
subdir_name, filename, size)
#define sf_binlog_writer_get_binlog_index(data_path, \
subdir_name, write_index) \
sf_file_writer_get_binlog_index(data_path, \
subdir_name, write_index)
#define sf_binlog_writer_set_binlog_index(writer, binlog_index) \
sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index)

View File

@ -19,6 +19,7 @@
#define _SF_BUFFERED_WRITER_H_
#include "sf_types.h"
#include "sf_func.h"
typedef struct {
int fd;

View File

@ -25,10 +25,10 @@
#include "fastcommon/logger.h"
#include "sf_configs.h"
#define DEFAULT_RETRY_MAX_INTERVAL_MS 5000
#define DEFAULT_CONNECT_RETRY_TIMES 10
#define DEFAULT_RETRY_MAX_INTERVAL_MS 3000
#define DEFAULT_CONNECT_RETRY_TIMES 200
#define DEFAULT_CONNECT_RETRY_INTERVAL_MS 100
#define DEFAULT_NETWORK_RETRY_TIMES 10
#define DEFAULT_NETWORK_RETRY_TIMES 200
#define DEFAULT_NETWORK_RETRY_INTERVAL_MS 100
int sf_load_net_retry_config(SFNetRetryConfig *net_retry_cfg,
@ -94,13 +94,14 @@ void sf_net_retry_config_to_string(SFNetRetryConfig *net_retry_cfg,
net_retry_cfg->network.interval_ms);
}
void sf_load_read_rule_config_ex(SFDataReadRule *rule,
int sf_load_read_rule_config_ex(SFDataReadRule *rule,
IniFullContext *ini_ctx, const SFDataReadRule def_rule)
{
char *read_rule;
read_rule = iniGetStrValueEx(ini_ctx->section_name,
"read_rule", ini_ctx->context, true);
if (read_rule == NULL || *read_rule == '\0') {
if (read_rule == NULL) {
*rule = def_rule;
} else if (strncasecmp(read_rule, "any", 3) == 0) {
*rule = sf_data_read_rule_any_available;
@ -110,8 +111,35 @@ void sf_load_read_rule_config_ex(SFDataReadRule *rule,
*rule = sf_data_read_rule_master_only;
} else {
logError("file: "__FILE__", line: %d, "
"config file: %s, unkown read_rule: %s, set to any",
"config file: %s, unkown read_rule: %s",
__LINE__, ini_ctx->filename, read_rule);
*rule = sf_data_read_rule_any_available;
return EINVAL;
}
return 0;
}
int sf_load_quorum_config_ex(SFElectionQuorum *quorum,
IniFullContext *ini_ctx, const SFElectionQuorum def_quorum)
{
char *str;
str = iniGetStrValue(ini_ctx->section_name,
"quorum", ini_ctx->context);
if (str == NULL) {
*quorum = def_quorum;
} else if (strncasecmp(str, "auto", 4) == 0) {
*quorum = sf_election_quorum_auto;
} else if (strncasecmp(str, "any", 3) == 0) {
*quorum = sf_election_quorum_any;
} else if (strncasecmp(str, "majority", 8) == 0) {
*quorum = sf_election_quorum_majority;
} else {
logError("file: "__FILE__", line: %d, "
"config file: %s, unkown quorum: %s",
__LINE__, ini_ctx->filename, str);
return EINVAL;
}
return 0;
}

View File

@ -67,7 +67,7 @@ static inline int sf_calc_next_retry_interval(SFNetRetryIntervalContext *ctx)
return ctx->interval_ms;
}
void sf_load_read_rule_config_ex(SFDataReadRule *rule,
int sf_load_read_rule_config_ex(SFDataReadRule *rule,
IniFullContext *ini_ctx, const SFDataReadRule def_rule);
static inline const char *sf_get_read_rule_caption(
@ -85,9 +85,50 @@ static inline const char *sf_get_read_rule_caption(
}
}
int sf_load_quorum_config_ex(SFElectionQuorum *quorum,
IniFullContext *ini_ctx, const SFElectionQuorum def_quorum);
static inline const char *sf_get_quorum_caption(
const SFElectionQuorum quorum)
{
switch (quorum) {
case sf_election_quorum_auto:
return "auto";
case sf_election_quorum_any:
return "any";
case sf_election_quorum_majority:
return "majority";
default:
return "unknown";
}
}
static inline bool sf_election_quorum_check(const SFElectionQuorum quorum,
const int total_count, const int active_count)
{
switch (quorum) {
case sf_election_quorum_any:
return active_count > 0;
case sf_election_quorum_auto:
if (total_count % 2 == 0) { //same as sf_election_quorum_any
return active_count > 0;
}
//continue
case sf_election_quorum_majority:
if (active_count == total_count) {
return true;
} else {
return active_count > total_count / 2;
}
}
}
#define sf_load_read_rule_config(rule, ini_ctx) \
sf_load_read_rule_config_ex(rule, ini_ctx, sf_data_read_rule_master_only)
#define sf_load_quorum_config(quorum, ini_ctx) \
sf_load_quorum_config_ex(quorum, ini_ctx, sf_election_quorum_auto)
#define SF_NET_RETRY_FINISHED(retry_times, counter, result) \
!((SF_IS_RETRIABLE_ERROR(result) && ((retry_times > 0 && \
counter <= retry_times) || (retry_times < 0))))

View File

@ -144,33 +144,6 @@ static inline void set_connection_params(ConnectionInfo *conn,
cparam->cm.old_alives = old_alives;
}
static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, int *err_no)
{
SFCMServerEntry *master;
ConnectionInfo *conn;
master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master);
if (master != NULL) {
if ((conn=make_connection(cm, master->addr_array,
err_no)) != NULL)
{
if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) {
set_connection_params(conn, master, NULL);
} else {
SFCMServerPtrArray *alives;
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
set_connection_params(conn, master, alives);
}
return conn;
}
__sync_bool_compare_and_swap(&group->master, master, NULL);
}
return NULL;
}
static inline int push_to_detect_queue(SFConnectionManager *cm,
SFCMConnGroupEntry *group, SFCMServerPtrArray *alives)
{
@ -250,6 +223,34 @@ static int remove_from_alives(SFConnectionManager *cm,
return 0;
}
static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, int *err_no)
{
SFCMServerEntry *master;
ConnectionInfo *conn;
SFCMServerPtrArray *alives;
master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master);
if (master != NULL) {
if ((conn=make_connection(cm, master->addr_array,
err_no)) != NULL)
{
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
set_connection_params(conn, master, alives);
return conn;
} else {
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
if (alives != NULL) {
remove_from_alives(cm, group, alives, master);
}
__sync_bool_compare_and_swap(&group->master, master, NULL);
}
}
*err_no = SF_RETRIABLE_ERROR_NO_SERVER;
return NULL;
}
static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, SFCMServerPtrArray *alives,
const int index, int *err_no)
@ -273,30 +274,39 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group;
ConnectionInfo *conn;
SFNetRetryIntervalContext net_retry_ctx;
int i;
int retry_count;
group = cm->groups.entries + group_index;
sf_init_net_retry_interval_context(&net_retry_ctx,
&cm->common_cfg->net_retry_cfg.interval_mm,
&cm->common_cfg->net_retry_cfg.connect);
i = 0;
retry_count = 0;
while (1) {
if ((conn=make_master_connection(cm, group, err_no)) != NULL) {
return conn;
}
/*
logInfo("file: "__FILE__", line: %d, "
"retry_count: %d, interval_ms: %d, data group id: %d, "
"master: %p, alive count: %d, all count: %d", __LINE__,
retry_count, net_retry_ctx.interval_ms, group->id,
FC_ATOMIC_GET(group->master), ((SFCMServerPtrArray *)
FC_ATOMIC_GET(group->alives))->count, group->all.count);
*/
*err_no = get_group_servers(cm, group);
if (*err_no == 0) {
*err_no = SF_RETRIABLE_ERROR_NO_SERVER; //for try again
}
SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx,
cm->common_cfg->net_retry_cfg.
connect.times, ++i, *err_no);
connect.times, ++retry_count, *err_no);
}
logError("file: "__FILE__", line: %d, "
"get_master_connection fail, errno: %d",
__LINE__, *err_no);
"get_master_connection fail, group id: %d, retry count: %d, "
"errno: %d", __LINE__, group->id, retry_count, *err_no);
return NULL;
}
@ -308,7 +318,7 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm,
ConnectionInfo *conn;
SFNetRetryIntervalContext net_retry_ctx;
uint32_t index;
int i;
int retry_count;
group = cm->groups.entries + group_index;
if ((cm->common_cfg->read_rule == sf_data_read_rule_master_only) ||
@ -320,7 +330,7 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm,
sf_init_net_retry_interval_context(&net_retry_ctx,
&cm->common_cfg->net_retry_cfg.interval_mm,
&cm->common_cfg->net_retry_cfg.connect);
i = 0;
retry_count = 0;
while (1) {
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
if (alives->count > 0) {
@ -344,12 +354,12 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm,
}
SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx,
cm->common_cfg->net_retry_cfg.
connect.times, ++i, *err_no);
connect.times, ++retry_count, *err_no);
}
logError("file: "__FILE__", line: %d, "
"get_readable_connection fail, errno: %d",
__LINE__, *err_no);
"get_readable_connection fail, retry count: %d, errno: %d",
__LINE__, retry_count, *err_no);
return NULL;
}
@ -376,12 +386,11 @@ static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn)
if (cparam->cm.sentry != NULL) {
server = cparam->cm.sentry;
group = cm->groups.entries + server->group_index;
if (cparam->cm.old_alives == NULL) {
__sync_bool_compare_and_swap(&group->master, server, NULL);
} else {
if (cparam->cm.old_alives != NULL) {
remove_from_alives(cm, group, cparam->cm.old_alives, server);
cparam->cm.old_alives = NULL;
}
__sync_bool_compare_and_swap(&group->master, server, NULL);
cparam->cm.sentry = NULL;
}

View File

@ -82,20 +82,18 @@ static int write_to_binlog_index_file(SFFileWriterInfo *writer)
return result;
}
static int get_binlog_index_from_file(SFFileWriterInfo *writer)
static int get_binlog_info_from_file(const char *data_path,
const char *subdir_name, int *write_index,
int *compress_index)
{
char full_filename[PATH_MAX];
IniContext ini_context;
int result;
snprintf(full_filename, sizeof(full_filename), "%s/%s/%s",
writer->cfg.data_path, writer->cfg.subdir_name,
BINLOG_INDEX_FILENAME);
data_path, subdir_name, BINLOG_INDEX_FILENAME);
if (access(full_filename, F_OK) != 0) {
if (errno == ENOENT) {
writer->binlog.index = 0;
return write_to_binlog_index_file(writer);
}
return errno != 0 ? errno : EPERM;
}
if ((result=iniLoadFromFile(full_filename, &ini_context)) != 0) {
@ -105,15 +103,40 @@ static int get_binlog_index_from_file(SFFileWriterInfo *writer)
return result;
}
writer->binlog.index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_WRITE, &ini_context, 0);
writer->binlog.compress_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_COMPRESS, &ini_context, 0);
*write_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_WRITE,
&ini_context, 0);
*compress_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_COMPRESS,
&ini_context, 0);
iniFreeContext(&ini_context);
return 0;
}
int sf_file_writer_get_binlog_index(const char *data_path,
const char *subdir_name, int *write_index)
{
int compress_index;
return get_binlog_info_from_file(data_path, subdir_name,
write_index, &compress_index);
}
static inline int get_binlog_index_from_file(SFFileWriterInfo *writer)
{
int result;
result = get_binlog_info_from_file(writer->cfg.data_path,
writer->cfg.subdir_name, &writer->binlog.index,
&writer->binlog.compress_index);
if (result == ENOENT) {
writer->binlog.index = 0;
writer->binlog.compress_index = 0;
return write_to_binlog_index_file(writer);
}
return result;
}
static int open_writable_binlog(SFFileWriterInfo *writer)
{
if (writer->file.fd >= 0) {
@ -253,7 +276,7 @@ int sf_file_writer_get_current_index(SFFileWriterInfo *writer)
return writer->binlog.index;
}
int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
BufferInfo *buffer, const int64_t version)
{
int result;
@ -298,7 +321,7 @@ int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
return 0;
}
int sf_file_writer_init_normal(SFFileWriterInfo *writer,
int sf_file_writer_init(SFFileWriterInfo *writer,
const char *data_path, const char *subdir_name,
const int buffer_size)
{
@ -347,6 +370,19 @@ int sf_file_writer_init_normal(SFFileWriterInfo *writer,
return 0;
}
void sf_file_writer_destroy(SFFileWriterInfo *writer)
{
if (writer->file.fd >= 0) {
close(writer->file.fd);
writer->file.fd = -1;
}
if (writer->file.name != NULL) {
free(writer->file.name);
writer->file.name = NULL;
}
sf_binlog_buffer_destroy(&writer->binlog_buffer);
}
int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer,
const int binlog_index)
{

View File

@ -39,7 +39,7 @@ typedef struct sf_file_writer_info {
} cfg;
struct {
int index;
int index; //current write index
int compress_index;
} binlog;
@ -63,13 +63,18 @@ typedef struct sf_file_writer_info {
extern "C" {
#endif
int sf_file_writer_init_normal(SFFileWriterInfo *writer,
int sf_file_writer_init(SFFileWriterInfo *writer,
const char *data_path, const char *subdir_name,
const int buffer_size);
int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
void sf_file_writer_destroy(SFFileWriterInfo *writer);
int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
BufferInfo *buffer, const int64_t version);
#define sf_file_writer_deal_buffer(writer, buffer) \
sf_file_writer_deal_versioned_buffer(writer, buffer, 0)
int sf_file_writer_flush(SFFileWriterInfo *writer);
static inline void sf_file_writer_set_flags(
@ -91,6 +96,9 @@ static inline int64_t sf_file_writer_get_last_version(
}
}
int sf_file_writer_get_binlog_index(const char *data_path,
const char *subdir_name, int *write_index);
int sf_file_writer_get_current_index(SFFileWriterInfo *writer);
static inline void sf_file_writer_get_current_position(

View File

@ -35,7 +35,7 @@
#include "sf_ordered_writer.h"
#define deal_binlog_one_record(writer, wb) \
sf_file_writer_deal_buffer(&(writer)->fw, &wb->bf, wb->version)
sf_file_writer_deal_versioned_buffer(&(writer)->fw, &wb->bf, wb->version)
static inline int flush_writer_files(SFOrderedWriterInfo *writer)
{
@ -239,7 +239,7 @@ int sf_ordered_writer_init(SFOrderedWriterContext *context,
const int buffer_size, const int max_record_size)
{
int result;
if ((result=sf_file_writer_init_normal(&context->writer.fw,
if ((result=sf_file_writer_init(&context->writer.fw,
data_path, subdir_name, buffer_size)) != 0)
{
return result;

View File

@ -244,4 +244,10 @@ typedef struct sf_synchronize_context {
};
} SFSynchronizeContext;
typedef enum sf_election_quorum {
sf_election_quorum_auto,
sf_election_quorum_any,
sf_election_quorum_majority,
} SFElectionQuorum;
#endif