From a265bbbbea32725256c717bb62bf0f38f5637749 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 17 Mar 2022 20:52:41 +0800 Subject: [PATCH 01/12] add function sf_binlog_writer_destroy --- src/sf_binlog_writer.c | 36 ++++++++++++++++++++++++++++-------- src/sf_binlog_writer.h | 29 +++++++++++++++++++++++++++-- src/sf_file_writer.c | 15 ++++++++++++++- src/sf_file_writer.h | 4 +++- src/sf_ordered_writer.c | 2 +- 5 files changed, 73 insertions(+), 13 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index fa5a2ba..bedda07 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -257,6 +257,10 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) int count; if (writer->fw.file.name != NULL) { + while (!fc_queue_empty(&writer->thread->queue)) { + fc_sleep_ms(10); + } + fc_queue_terminate(&writer->thread->queue); count = 0; @@ -341,13 +345,22 @@ static int binlog_wbuffer_alloc_init(void *element, void *args) return 0; } +static void binlog_wbuffer_destroy_func(void *element, void *args) +{ + SFBinlogWriterBuffer *wbuffer; + wbuffer = (SFBinlogWriterBuffer *)element; + if (wbuffer->bf.buff != NULL) { + free(wbuffer->bf.buff); + } +} + int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, const int buffer_size) { - writer->flush.in_queue = false; - return sf_file_writer_init_normal(&writer->fw, - data_path, subdir_name, buffer_size); + memset(writer, 0, sizeof(*writer)); + return sf_file_writer_init(&writer->fw, data_path, + subdir_name, buffer_size); } int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, @@ -369,8 +382,9 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, writer->version_ctx.change_count = 0; binlog_writer_set_next_version(writer, next_version); - return sf_binlog_writer_init_normal(writer, - data_path, subdir_name, buffer_size); + writer->flush.in_queue = false; + return sf_file_writer_init(&writer->fw, data_path, + subdir_name, buffer_size); } int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, @@ -379,9 +393,10 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const int writer_count, const bool use_fixed_buffer_size) { const int alloc_elements_once = 1024; + int result; int element_size; pthread_t tid; - int result; + struct fast_mblock_object_callbacks callbacks; snprintf(thread->name, sizeof(thread->name), "%s", name); thread->order_mode = order_mode; @@ -390,13 +405,18 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; + callbacks.init_func = binlog_wbuffer_alloc_init; + callbacks.args = writer; element_size = sizeof(SFBinlogWriterBuffer); if (use_fixed_buffer_size) { element_size += max_record_size; + callbacks.destroy_func = NULL; + } else { + callbacks.destroy_func = binlog_wbuffer_destroy_func; } - if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog-wbuffer", + if ((result=fast_mblock_init_ex2(&thread->mblock, "binlog-wbuffer", element_size, alloc_elements_once, 0, - binlog_wbuffer_alloc_init, writer, true)) != 0) + &callbacks, true, NULL)) != 0) { return result; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 4834118..03c9de2 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -132,6 +132,33 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, max_record_size); } +void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); + +static inline void sf_binlog_writer_destroy_writer( + SFBinlogWriterInfo *writer) +{ + sf_file_writer_destroy(&writer->fw); + if (writer->version_ctx.ring.slots != NULL) { + free(writer->version_ctx.ring.slots); + writer->version_ctx.ring.slots = NULL; + } +} + +static inline void sf_binlog_writer_destroy_thread( + SFBinlogWriterThread *thread) +{ + fast_mblock_destroy(&thread->mblock); + fc_queue_destroy(&thread->queue); +} + +static inline void sf_binlog_writer_destroy( + SFBinlogWriterContext *context) +{ + sf_binlog_writer_finish(&context->writer); + sf_binlog_writer_destroy_writer(&context->writer); + sf_binlog_writer_destroy_thread(&context->thread); +} + int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, const short order_by); @@ -144,8 +171,6 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, #define sf_binlog_writer_get_last_version(writer) \ sf_file_writer_get_last_version(&(writer)->fw) -void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); - #define sf_binlog_get_current_write_index(writer) \ sf_file_writer_get_current_index(&(writer)->fw) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 20a07b7..071faf0 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -298,7 +298,7 @@ int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, return 0; } -int sf_file_writer_init_normal(SFFileWriterInfo *writer, +int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, const int buffer_size) { @@ -347,6 +347,19 @@ int sf_file_writer_init_normal(SFFileWriterInfo *writer, return 0; } +void sf_file_writer_destroy(SFFileWriterInfo *writer) +{ + if (writer->file.fd >= 0) { + close(writer->file.fd); + writer->file.fd = -1; + } + if (writer->file.name != NULL) { + free(writer->file.name); + writer->file.name = NULL; + } + sf_binlog_buffer_destroy(&writer->binlog_buffer); +} + int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer, const int binlog_index) { diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index e0b0634..51b5324 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -63,10 +63,12 @@ typedef struct sf_file_writer_info { extern "C" { #endif -int sf_file_writer_init_normal(SFFileWriterInfo *writer, +int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, const int buffer_size); +void sf_file_writer_destroy(SFFileWriterInfo *writer); + int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, BufferInfo *buffer, const int64_t version); diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index 0fbe1d7..417c16d 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -239,7 +239,7 @@ int sf_ordered_writer_init(SFOrderedWriterContext *context, const int buffer_size, const int max_record_size) { int result; - if ((result=sf_file_writer_init_normal(&context->writer.fw, + if ((result=sf_file_writer_init(&context->writer.fw, data_path, subdir_name, buffer_size)) != 0) { return result; From a727f382bc59790db9e92594515ac039bed4d1e8 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 18 Mar 2022 16:48:26 +0800 Subject: [PATCH 02/12] add function: sf_binlog_writer_notify_exit --- src/sf_binlog_writer.c | 45 +++++++++++++++++++++++++++++++----------- src/sf_binlog_writer.h | 3 +++ src/sf_file_writer.c | 1 + 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index bedda07..d7c8ac2 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -34,6 +34,8 @@ #include "sf_func.h" #include "sf_binlog_writer.h" +#define ERRNO_THREAD_EXIT -1000 + static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer, const uint64_t next_version) { @@ -194,7 +196,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; - + case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT: + flush_writer_files(thread); + return ERRNO_THREAD_EXIT; case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { logWarning("file: "__FILE__", line: %d, " @@ -260,8 +264,7 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) while (!fc_queue_empty(&writer->thread->queue)) { fc_sleep_ms(10); } - - fc_queue_terminate(&writer->thread->queue); + sf_binlog_writer_notify_exit(writer); count = 0; while (writer->thread->running && ++count < 300) { @@ -294,6 +297,7 @@ static void *binlog_writer_func(void *arg) { SFBinlogWriterThread *thread; SFBinlogWriterBuffer *wb_head; + int result; thread = (SFBinlogWriterThread *)arg; @@ -313,11 +317,14 @@ static void *binlog_writer_func(void *arg) continue; } - if (deal_binlog_records(thread, wb_head) != 0) { - logCrit("file: "__FILE__", line: %d, " - "deal_binlog_records fail, " - "program exit!", __LINE__); - sf_terminate_myself(); + if ((result=deal_binlog_records(thread, wb_head)) != 0) { + if (result != ERRNO_THREAD_EXIT) { + logCrit("file: "__FILE__", line: %d, " + "deal_binlog_records fail, " + "program exit!", __LINE__); + sf_terminate_myself(); + } + break; } } @@ -467,13 +474,13 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, return 0; } -int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, - const int64_t next_version) +static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer, + const int buffer_type, const int64_t version) { SFBinlogWriterBuffer *buffer; - if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, - next_version, SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) + if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, + version, version, buffer_type)) == NULL) { return ENOMEM; } @@ -481,3 +488,17 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, fc_queue_push(&writer->thread->queue, buffer); return 0; } + +int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, + const int64_t next_version) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION, + next_version); +} + +int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0); +} diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 03c9de2..5c79e4d 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -31,6 +31,7 @@ #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 +#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 3 #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ (buffer)->version.first = (buffer)->version.last = ver @@ -165,6 +166,8 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); +int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer); + #define sf_binlog_writer_set_flags(writer, flags) \ sf_file_writer_set_flags(&(writer)->fw, flags) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 071faf0..794697a 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -94,6 +94,7 @@ static int get_binlog_index_from_file(SFFileWriterInfo *writer) if (access(full_filename, F_OK) != 0) { if (errno == ENOENT) { writer->binlog.index = 0; + writer->binlog.compress_index = 0; return write_to_binlog_index_file(writer); } } From 1a03fec1f61fa68a55507c463ce76552f75ddacc Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 19 Mar 2022 16:36:11 +0800 Subject: [PATCH 03/12] add function sf_file_writer_get_binlog_index --- src/sf_binlog_writer.h | 5 +++++ src/sf_file_writer.c | 46 +++++++++++++++++++++++++++++++----------- src/sf_file_writer.h | 5 ++++- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 5c79e4d..4381191 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -224,6 +224,11 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( sf_file_writer_get_index_filename(data_path, \ subdir_name, filename, size) +#define sf_binlog_writer_get_binlog_index(data_path, \ + subdir_name, write_index) \ + sf_file_writer_get_binlog_index(data_path, \ + subdir_name, write_index) + #define sf_binlog_writer_set_binlog_index(writer, binlog_index) \ sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 794697a..03a1b44 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -82,21 +82,18 @@ static int write_to_binlog_index_file(SFFileWriterInfo *writer) return result; } -static int get_binlog_index_from_file(SFFileWriterInfo *writer) +static int get_binlog_info_from_file(const char *data_path, + const char *subdir_name, int *write_index, + int *compress_index) { char full_filename[PATH_MAX]; IniContext ini_context; int result; snprintf(full_filename, sizeof(full_filename), "%s/%s/%s", - writer->cfg.data_path, writer->cfg.subdir_name, - BINLOG_INDEX_FILENAME); + data_path, subdir_name, BINLOG_INDEX_FILENAME); if (access(full_filename, F_OK) != 0) { - if (errno == ENOENT) { - writer->binlog.index = 0; - writer->binlog.compress_index = 0; - return write_to_binlog_index_file(writer); - } + return errno != 0 ? errno : EPERM; } if ((result=iniLoadFromFile(full_filename, &ini_context)) != 0) { @@ -106,15 +103,40 @@ static int get_binlog_index_from_file(SFFileWriterInfo *writer) return result; } - writer->binlog.index = iniGetIntValue(NULL, - BINLOG_INDEX_ITEM_CURRENT_WRITE, &ini_context, 0); - writer->binlog.compress_index = iniGetIntValue(NULL, - BINLOG_INDEX_ITEM_CURRENT_COMPRESS, &ini_context, 0); + *write_index = iniGetIntValue(NULL, + BINLOG_INDEX_ITEM_CURRENT_WRITE, + &ini_context, 0); + *compress_index = iniGetIntValue(NULL, + BINLOG_INDEX_ITEM_CURRENT_COMPRESS, + &ini_context, 0); iniFreeContext(&ini_context); return 0; } +int sf_file_writer_get_binlog_index(const char *data_path, + const char *subdir_name, int *write_index) +{ + int compress_index; + return get_binlog_info_from_file(data_path, subdir_name, + write_index, &compress_index); +} + +static inline int get_binlog_index_from_file(SFFileWriterInfo *writer) +{ + int result; + + result = get_binlog_info_from_file(writer->cfg.data_path, + writer->cfg.subdir_name, &writer->binlog.index, + &writer->binlog.compress_index); + if (result == ENOENT) { + writer->binlog.index = 0; + writer->binlog.compress_index = 0; + return write_to_binlog_index_file(writer); + } + return result; +} + static int open_writable_binlog(SFFileWriterInfo *writer) { if (writer->file.fd >= 0) { diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index 51b5324..bdc4fd9 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -39,7 +39,7 @@ typedef struct sf_file_writer_info { } cfg; struct { - int index; + int index; //current write index int compress_index; } binlog; @@ -93,6 +93,9 @@ static inline int64_t sf_file_writer_get_last_version( } } +int sf_file_writer_get_binlog_index(const char *data_path, + const char *subdir_name, int *write_index); + int sf_file_writer_get_current_index(SFFileWriterInfo *writer); static inline void sf_file_writer_get_current_position( From 68d41aa690b794cfbdd73834a4d8cda340751d3f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 22 Mar 2022 08:23:03 +0800 Subject: [PATCH 04/12] rename to sf_file_writer_deal_versioned_buffer --- src/sf_binlog_writer.c | 3 ++- src/sf_file_writer.c | 2 +- src/sf_file_writer.h | 5 ++++- src/sf_ordered_writer.c | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index d7c8ac2..a4e569f 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -46,7 +46,8 @@ static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer, } #define deal_binlog_one_record(wb) \ - sf_file_writer_deal_buffer(&wb->writer->fw, &wb->bf, wb->version.last) + sf_file_writer_deal_versioned_buffer(&wb->writer->fw, \ + &wb->bf, wb->version.last) #define GET_WBUFFER_VERSION_COUNT(wb) \ (((wb)->version.last - (wb)->version.first) + 1) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 03a1b44..ecaafd0 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -276,7 +276,7 @@ int sf_file_writer_get_current_index(SFFileWriterInfo *writer) return writer->binlog.index; } -int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, +int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, BufferInfo *buffer, const int64_t version) { int result; diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index bdc4fd9..ec1de31 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -69,9 +69,12 @@ int sf_file_writer_init(SFFileWriterInfo *writer, void sf_file_writer_destroy(SFFileWriterInfo *writer); -int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, +int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, BufferInfo *buffer, const int64_t version); +#define sf_file_writer_deal_buffer(writer, buffer) \ + sf_file_writer_deal_versioned_buffer(writer, buffer, 0) + int sf_file_writer_flush(SFFileWriterInfo *writer); static inline void sf_file_writer_set_flags( diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index 417c16d..fac0a62 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -35,7 +35,7 @@ #include "sf_ordered_writer.h" #define deal_binlog_one_record(writer, wb) \ - sf_file_writer_deal_buffer(&(writer)->fw, &wb->bf, wb->version) + sf_file_writer_deal_versioned_buffer(&(writer)->fw, &wb->bf, wb->version) static inline int flush_writer_files(SFOrderedWriterInfo *writer) { From b4aaf69962fea6e2d7bd2bde3a78354e8c4ff6f1 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 25 Mar 2022 15:30:14 +0800 Subject: [PATCH 05/12] sf_buffered_writer.h: compile OK. --- src/sf_buffered_writer.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sf_buffered_writer.h b/src/sf_buffered_writer.h index 37d70a2..86ad71b 100644 --- a/src/sf_buffered_writer.h +++ b/src/sf_buffered_writer.h @@ -19,6 +19,7 @@ #define _SF_BUFFERED_WRITER_H_ #include "sf_types.h" +#include "sf_func.h" typedef struct { int fd; From de943f684ad5a43b78e19f51973316a4b84f80ae Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 30 Mar 2022 21:22:34 +0800 Subject: [PATCH 06/12] add function sf_load_quorum_config --- src/sf_configs.c | 34 ++++++++++++++++++++++++++++++---- src/sf_configs.h | 21 ++++++++++++++++++++- src/sf_types.h | 5 +++++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/src/sf_configs.c b/src/sf_configs.c index 20f4cce..dce8439 100644 --- a/src/sf_configs.c +++ b/src/sf_configs.c @@ -94,13 +94,14 @@ void sf_net_retry_config_to_string(SFNetRetryConfig *net_retry_cfg, net_retry_cfg->network.interval_ms); } -void sf_load_read_rule_config_ex(SFDataReadRule *rule, +int sf_load_read_rule_config_ex(SFDataReadRule *rule, IniFullContext *ini_ctx, const SFDataReadRule def_rule) { char *read_rule; + read_rule = iniGetStrValueEx(ini_ctx->section_name, "read_rule", ini_ctx->context, true); - if (read_rule == NULL || *read_rule == '\0') { + if (read_rule == NULL) { *rule = def_rule; } else if (strncasecmp(read_rule, "any", 3) == 0) { *rule = sf_data_read_rule_any_available; @@ -110,8 +111,33 @@ void sf_load_read_rule_config_ex(SFDataReadRule *rule, *rule = sf_data_read_rule_master_only; } else { logError("file: "__FILE__", line: %d, " - "config file: %s, unkown read_rule: %s, set to any", + "config file: %s, unkown read_rule: %s", __LINE__, ini_ctx->filename, read_rule); - *rule = sf_data_read_rule_any_available; + return EINVAL; } + + return 0; +} + +int sf_load_quorum_config_ex(SFElectionQuorum *quorum, + IniFullContext *ini_ctx, const SFElectionQuorum def_quorum) +{ + char *str; + + str = iniGetStrValue(ini_ctx->section_name, + "quorum", ini_ctx->context); + if (str == NULL) { + *quorum = def_quorum; + } else if (strncasecmp(str, "any", 3) == 0) { + *quorum = sf_election_quorum_any; + } else if (strncasecmp(str, "majority", 8) == 0) { + *quorum = sf_election_quorum_majority; + } else { + logError("file: "__FILE__", line: %d, " + "config file: %s, unkown quorum: %s", + __LINE__, ini_ctx->filename, str); + return EINVAL; + } + + return 0; } diff --git a/src/sf_configs.h b/src/sf_configs.h index 76ac295..b8887ac 100644 --- a/src/sf_configs.h +++ b/src/sf_configs.h @@ -67,7 +67,7 @@ static inline int sf_calc_next_retry_interval(SFNetRetryIntervalContext *ctx) return ctx->interval_ms; } -void sf_load_read_rule_config_ex(SFDataReadRule *rule, +int sf_load_read_rule_config_ex(SFDataReadRule *rule, IniFullContext *ini_ctx, const SFDataReadRule def_rule); static inline const char *sf_get_read_rule_caption( @@ -85,9 +85,28 @@ static inline const char *sf_get_read_rule_caption( } } +int sf_load_quorum_config_ex(SFElectionQuorum *quorum, + IniFullContext *ini_ctx, const SFElectionQuorum def_quorum); + +static inline const char *sf_get_quorum_caption( + const SFElectionQuorum quorum) +{ + switch (quorum) { + case sf_election_quorum_any: + return "any"; + case sf_election_quorum_majority: + return "majority"; + default: + return "unknown"; + } +} + #define sf_load_read_rule_config(rule, ini_ctx) \ sf_load_read_rule_config_ex(rule, ini_ctx, sf_data_read_rule_master_only) +#define sf_load_quorum_config(quorum, ini_ctx) \ + sf_load_quorum_config_ex(quorum, ini_ctx, sf_election_quorum_majority) + #define SF_NET_RETRY_FINISHED(retry_times, counter, result) \ !((SF_IS_RETRIABLE_ERROR(result) && ((retry_times > 0 && \ counter <= retry_times) || (retry_times < 0)))) diff --git a/src/sf_types.h b/src/sf_types.h index 9b935c5..ec3f3ba 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -244,4 +244,9 @@ typedef struct sf_synchronize_context { }; } SFSynchronizeContext; +typedef enum sf_election_quorum { + sf_election_quorum_any = 1, + sf_election_quorum_majority +} SFElectionQuorum; + #endif From 7259eaf6acf66123bf179f623a921b841296d836 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 11 Apr 2022 10:24:21 +0800 Subject: [PATCH 07/12] log retry count when get connection fail --- src/sf_connection_manager.c | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 4d9e50b..71e4b68 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -168,6 +168,7 @@ static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, __sync_bool_compare_and_swap(&group->master, master, NULL); } + *err_no = SF_RETRIABLE_ERROR_NO_SERVER; return NULL; } @@ -273,13 +274,13 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm, SFCMConnGroupEntry *group; ConnectionInfo *conn; SFNetRetryIntervalContext net_retry_ctx; - int i; + int retry_count; group = cm->groups.entries + group_index; sf_init_net_retry_interval_context(&net_retry_ctx, &cm->common_cfg->net_retry_cfg.interval_mm, &cm->common_cfg->net_retry_cfg.connect); - i = 0; + retry_count = 0; while (1) { if ((conn=make_master_connection(cm, group, err_no)) != NULL) { return conn; @@ -291,12 +292,12 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm, } SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, cm->common_cfg->net_retry_cfg. - connect.times, ++i, *err_no); + connect.times, ++retry_count, *err_no); } logError("file: "__FILE__", line: %d, " - "get_master_connection fail, errno: %d", - __LINE__, *err_no); + "get_master_connection fail, retry count: %d, errno: %d", + __LINE__, retry_count, *err_no); return NULL; } @@ -308,7 +309,7 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm, ConnectionInfo *conn; SFNetRetryIntervalContext net_retry_ctx; uint32_t index; - int i; + int retry_count; group = cm->groups.entries + group_index; if ((cm->common_cfg->read_rule == sf_data_read_rule_master_only) || @@ -320,7 +321,7 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm, sf_init_net_retry_interval_context(&net_retry_ctx, &cm->common_cfg->net_retry_cfg.interval_mm, &cm->common_cfg->net_retry_cfg.connect); - i = 0; + retry_count = 0; while (1) { alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); if (alives->count > 0) { @@ -344,12 +345,12 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm, } SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, cm->common_cfg->net_retry_cfg. - connect.times, ++i, *err_no); + connect.times, ++retry_count, *err_no); } logError("file: "__FILE__", line: %d, " - "get_readable_connection fail, errno: %d", - __LINE__, *err_no); + "get_readable_connection fail, retry count: %d, errno: %d", + __LINE__, retry_count, *err_no); return NULL; } From a57709de937dd4bbda82432bd5574660499c2bf9 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 15 Apr 2022 16:58:42 +0800 Subject: [PATCH 08/12] sf_connection_manager.c: make_master_connection refined --- src/idempotency/client/client_channel.h | 14 ++++- src/idempotency/client/receipt_handler.c | 24 +++++--- src/sf_configs.c | 6 +- src/sf_connection_manager.c | 76 +++++++++++++----------- 4 files changed, 74 insertions(+), 46 deletions(-) diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index 59bc3ed..bc58cc4 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -80,7 +80,19 @@ static inline int idempotency_client_channel_check_wait_ex( idempotency_client_channel_check_reconnect(channel); lcp_timedwait_sec(&channel->lc_pair, timeout); - return __sync_add_and_fetch(&channel->established, 0) ? 0 : ETIMEDOUT; + if (__sync_add_and_fetch(&channel->established, 0)) { + return 0; + } else { + /* + logInfo("file: "__FILE__", line: %d, " + "channel_check fail, server %s:%u, in_ioevent: %d, " + "canceled: %d, req count: %"PRId64, __LINE__, channel->task->server_ip, + channel->task->port, __sync_add_and_fetch(&channel-> + in_ioevent, 0), __sync_add_and_fetch(&channel-> + task->canceled, 0), channel->task->req_count); + */ + return ETIMEDOUT; + } } #ifdef __cplusplus diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 61c8d8c..0ce09d5 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -68,8 +68,10 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task) __LINE__, task->server_ip, task->port); } else { logError("file: "__FILE__", line: %d, " - "communication with server %s:%u timeout", - __LINE__, task->server_ip, task->port); + "communication with server %s:%u timeout, " + "channel established: %d", __LINE__, + task->server_ip, task->port, + FC_ATOMIC_GET(channel->established)); } return ETIMEDOUT; @@ -85,8 +87,11 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) task->event.fd = -1; } - channel = (IdempotencyClientChannel *)task->arg; + task->length = 0; + task->offset = 0; + task->req_count = 0; + channel = (IdempotencyClientChannel *)task->arg; fc_list_del_init(&channel->dlink); __sync_bool_compare_and_swap(&channel->established, 1, 0); __sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); @@ -334,12 +339,15 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) setup_channel_request(task); result = 0; break; - } else if (stage == SF_NIO_STAGE_CONTINUE && task->length == 0) { - if (((IdempotencyClientChannel *)task->arg)->established) { - report_req_receipt_request(task, true); - } else { - sf_set_read_event(task); //trigger read event + } else if (stage == SF_NIO_STAGE_CONTINUE) { + if (task->length == 0 && task->offset == 0) { + if (((IdempotencyClientChannel *)task->arg)->established) { + report_req_receipt_request(task, true); + } else if (task->req_count > 0) { + sf_set_read_event(task); //trigger read event + } } + result = 0; break; } diff --git a/src/sf_configs.c b/src/sf_configs.c index dce8439..f088515 100644 --- a/src/sf_configs.c +++ b/src/sf_configs.c @@ -25,10 +25,10 @@ #include "fastcommon/logger.h" #include "sf_configs.h" -#define DEFAULT_RETRY_MAX_INTERVAL_MS 5000 -#define DEFAULT_CONNECT_RETRY_TIMES 10 +#define DEFAULT_RETRY_MAX_INTERVAL_MS 3000 +#define DEFAULT_CONNECT_RETRY_TIMES 200 #define DEFAULT_CONNECT_RETRY_INTERVAL_MS 100 -#define DEFAULT_NETWORK_RETRY_TIMES 10 +#define DEFAULT_NETWORK_RETRY_TIMES 200 #define DEFAULT_NETWORK_RETRY_INTERVAL_MS 100 int sf_load_net_retry_config(SFNetRetryConfig *net_retry_cfg, diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 71e4b68..d85577e 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -144,34 +144,6 @@ static inline void set_connection_params(ConnectionInfo *conn, cparam->cm.old_alives = old_alives; } -static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, - SFCMConnGroupEntry *group, int *err_no) -{ - SFCMServerEntry *master; - ConnectionInfo *conn; - - master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master); - if (master != NULL) { - if ((conn=make_connection(cm, master->addr_array, - err_no)) != NULL) - { - if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) { - set_connection_params(conn, master, NULL); - } else { - SFCMServerPtrArray *alives; - alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); - set_connection_params(conn, master, alives); - } - return conn; - } - - __sync_bool_compare_and_swap(&group->master, master, NULL); - } - - *err_no = SF_RETRIABLE_ERROR_NO_SERVER; - return NULL; -} - static inline int push_to_detect_queue(SFConnectionManager *cm, SFCMConnGroupEntry *group, SFCMServerPtrArray *alives) { @@ -251,6 +223,34 @@ static int remove_from_alives(SFConnectionManager *cm, return 0; } +static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, + SFCMConnGroupEntry *group, int *err_no) +{ + SFCMServerEntry *master; + ConnectionInfo *conn; + SFCMServerPtrArray *alives; + + master = (SFCMServerEntry *)FC_ATOMIC_GET(group->master); + if (master != NULL) { + if ((conn=make_connection(cm, master->addr_array, + err_no)) != NULL) + { + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + set_connection_params(conn, master, alives); + return conn; + } else { + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (alives != NULL) { + remove_from_alives(cm, group, alives, master); + } + __sync_bool_compare_and_swap(&group->master, master, NULL); + } + } + + *err_no = SF_RETRIABLE_ERROR_NO_SERVER; + return NULL; +} + static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm, SFCMConnGroupEntry *group, SFCMServerPtrArray *alives, const int index, int *err_no) @@ -286,6 +286,15 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm, return conn; } + /* + logInfo("file: "__FILE__", line: %d, " + "retry_count: %d, interval_ms: %d, data group id: %d, " + "master: %p, alive count: %d, all count: %d", __LINE__, + retry_count, net_retry_ctx.interval_ms, group->id, + FC_ATOMIC_GET(group->master), ((SFCMServerPtrArray *) + FC_ATOMIC_GET(group->alives))->count, group->all.count); + */ + *err_no = get_group_servers(cm, group); if (*err_no == 0) { *err_no = SF_RETRIABLE_ERROR_NO_SERVER; //for try again @@ -296,8 +305,8 @@ static ConnectionInfo *get_master_connection(SFConnectionManager *cm, } logError("file: "__FILE__", line: %d, " - "get_master_connection fail, retry count: %d, errno: %d", - __LINE__, retry_count, *err_no); + "get_master_connection fail, group id: %d, retry count: %d, " + "errno: %d", __LINE__, group->id, retry_count, *err_no); return NULL; } @@ -377,12 +386,11 @@ static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn) if (cparam->cm.sentry != NULL) { server = cparam->cm.sentry; group = cm->groups.entries + server->group_index; - if (cparam->cm.old_alives == NULL) { - __sync_bool_compare_and_swap(&group->master, server, NULL); - } else { + if (cparam->cm.old_alives != NULL) { remove_from_alives(cm, group, cparam->cm.old_alives, server); cparam->cm.old_alives = NULL; } + __sync_bool_compare_and_swap(&group->master, server, NULL); cparam->cm.sentry = NULL; } @@ -422,7 +430,7 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm, return conn; } release_connection(cm, conn); - if ((conn=get_spec_connection(cm,&leader.conn, + if ((conn=get_spec_connection(cm, &leader.conn, err_no)) == NULL) { break; From 952647cbc9568f58a4efe7c386a8671bee8c56d8 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 17 Apr 2022 18:18:18 +0800 Subject: [PATCH 09/12] order_by feature belongs to writer instead of thread --- src/sf_binlog_writer.c | 28 +++++++++++++++++----------- src/sf_binlog_writer.h | 22 ++++++++++------------ 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index a4e569f..a0d0bed 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -193,7 +193,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, switch (current->type) { case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE: - thread->order_by = current->version.first; + current->writer->order_by = current->version.first; fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; @@ -201,12 +201,15 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, flush_writer_files(thread); return ERRNO_THREAD_EXIT; case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: - if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + if (current->writer->order_by != + SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) + { logWarning("file: "__FILE__", line: %d, " "subdir_name: %s, invalid order by: %d != %d, " "maybe some mistake happen", __LINE__, - current->writer->fw.cfg.subdir_name, thread->order_by, - SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION); + current->writer->fw.cfg.subdir_name, + current->writer->order_by, + SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION); } if (current->writer->version_ctx.ring.waiting_count != 0) { @@ -236,7 +239,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, current->writer->fw.total_count++; add_to_flush_writer_queue(thread, current->writer); - if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + if (current->writer->order_by == + SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) + { /* NOTE: current maybe be released in the deal function */ if ((result=deal_record_by_version(current)) != 0) { return result; @@ -367,6 +372,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, const int buffer_size) { memset(writer, 0, sizeof(*writer)); + writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE; return sf_file_writer_init(&writer->fw, data_path, subdir_name, buffer_size); } @@ -388,6 +394,7 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, writer->version_ctx.ring.waiting_count = 0; writer->version_ctx.ring.max_waitings = 0; writer->version_ctx.change_count = 0; + writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION; binlog_writer_set_next_version(writer, next_version); writer->flush.in_queue = false; @@ -397,8 +404,8 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const short order_by, const int max_record_size, - const int writer_count, const bool use_fixed_buffer_size) + const int max_record_size, const int writer_count, + const bool use_fixed_buffer_size) { const int alloc_elements_once = 1024; int result; @@ -408,7 +415,6 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, snprintf(thread->name, sizeof(thread->name), "%s", name); thread->order_mode = order_mode; - thread->order_by = order_by; thread->use_fixed_buffer_size = use_fixed_buffer_size; writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; @@ -445,12 +451,12 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, { SFBinlogWriterBuffer *buffer; - if (writer->thread->order_by == order_by) { + if (writer->order_by == order_by) { return 0; } - if (!(order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE || - order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION)) + if (!(order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE || + order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION)) { logError("file: "__FILE__", line: %d, " "invalid order by: %d!", __LINE__, order_by); diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 4381191..5d92b1c 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -25,8 +25,8 @@ #define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0 #define SF_BINLOG_THREAD_ORDER_MODE_VARY 1 -#define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0 -#define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1 +#define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0 +#define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 @@ -65,7 +65,6 @@ typedef struct binlog_writer_thread { volatile bool running; bool use_fixed_buffer_size; short order_mode; - short order_by; struct { struct sf_binlog_writer_info *head; struct sf_binlog_writer_info *tail; @@ -82,6 +81,7 @@ typedef struct sf_binlog_writer_info { } version_ctx; SFBinlogWriterThread *thread; + short order_by; struct { bool in_queue; struct sf_binlog_writer_info *next; @@ -108,14 +108,13 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const short order_by, const int max_record_size, - const int writer_count, const bool use_fixed_buffer_size); + const int max_record_size, const int writer_count, + const bool use_fixed_buffer_size); -#define sf_binlog_writer_init_thread(thread, name, \ - writer, order_by, max_record_size) \ +#define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ sf_binlog_writer_init_thread_ex(thread, name, writer, \ - SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ - order_by, max_record_size, 1, true) + SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ + max_record_size, 1, true) static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, const char *data_path, const char *subdir_name, @@ -128,9 +127,8 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, return result; } - return sf_binlog_writer_init_thread(&context->thread, subdir_name, - &context->writer, SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE, - max_record_size); + return sf_binlog_writer_init_thread(&context->thread, + subdir_name, &context->writer, max_record_size); } void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); From 613c31fcf3866dcdb996727cea29e68ac20121e1 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 18 Apr 2022 08:59:07 +0800 Subject: [PATCH 10/12] sf_binlog_writer_change_order_by check if versioned writer --- src/sf_binlog_writer.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index a0d0bed..bd25949 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -471,6 +471,15 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, return EINVAL; } + if (order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) { + if (writer->version_ctx.ring.slots == NULL) { + logError("file: "__FILE__", line: %d, " + "the writer is NOT versioned writer, can't " + "set order by to %d!", __LINE__, order_by); + return EINVAL; + } + } + if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, order_by, order_by, SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE)) == NULL) { From 78e321f4ad797e55faa5b635bac3e94240800602 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 21 Apr 2022 11:29:43 +0800 Subject: [PATCH 11/12] election quorum support sf_election_quorum_auto --- src/sf_configs.c | 2 ++ src/sf_configs.h | 24 +++++++++++++++++++++++- src/sf_types.h | 5 +++-- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/sf_configs.c b/src/sf_configs.c index f088515..4f02ee2 100644 --- a/src/sf_configs.c +++ b/src/sf_configs.c @@ -128,6 +128,8 @@ int sf_load_quorum_config_ex(SFElectionQuorum *quorum, "quorum", ini_ctx->context); if (str == NULL) { *quorum = def_quorum; + } else if (strncasecmp(str, "auto", 4) == 0) { + *quorum = sf_election_quorum_auto; } else if (strncasecmp(str, "any", 3) == 0) { *quorum = sf_election_quorum_any; } else if (strncasecmp(str, "majority", 8) == 0) { diff --git a/src/sf_configs.h b/src/sf_configs.h index b8887ac..a0a71ee 100644 --- a/src/sf_configs.h +++ b/src/sf_configs.h @@ -92,6 +92,8 @@ static inline const char *sf_get_quorum_caption( const SFElectionQuorum quorum) { switch (quorum) { + case sf_election_quorum_auto: + return "auto"; case sf_election_quorum_any: return "any"; case sf_election_quorum_majority: @@ -101,11 +103,31 @@ static inline const char *sf_get_quorum_caption( } } +static inline bool sf_election_quorum_check(const SFElectionQuorum quorum, + const int total_count, const int active_count) +{ + switch (quorum) { + case sf_election_quorum_any: + return active_count > 0; + case sf_election_quorum_auto: + if (total_count % 2 == 0) { //same as sf_election_quorum_any + return active_count > 0; + } + //continue + case sf_election_quorum_majority: + if (active_count == total_count) { + return true; + } else { + return active_count > total_count / 2; + } + } +} + #define sf_load_read_rule_config(rule, ini_ctx) \ sf_load_read_rule_config_ex(rule, ini_ctx, sf_data_read_rule_master_only) #define sf_load_quorum_config(quorum, ini_ctx) \ - sf_load_quorum_config_ex(quorum, ini_ctx, sf_election_quorum_majority) + sf_load_quorum_config_ex(quorum, ini_ctx, sf_election_quorum_auto) #define SF_NET_RETRY_FINISHED(retry_times, counter, result) \ !((SF_IS_RETRIABLE_ERROR(result) && ((retry_times > 0 && \ diff --git a/src/sf_types.h b/src/sf_types.h index ec3f3ba..2062e93 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -245,8 +245,9 @@ typedef struct sf_synchronize_context { } SFSynchronizeContext; typedef enum sf_election_quorum { - sf_election_quorum_any = 1, - sf_election_quorum_majority + sf_election_quorum_auto, + sf_election_quorum_any, + sf_election_quorum_majority, } SFElectionQuorum; #endif From a29ac30f6721cf9d4d0bfbe047dfdb4241ccf6f7 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 22 Apr 2022 14:57:26 +0800 Subject: [PATCH 12/12] upgrade version to V1.1.14 --- libserverframe.spec | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libserverframe.spec b/libserverframe.spec index bd977be..0418b8c 100644 --- a/libserverframe.spec +++ b/libserverframe.spec @@ -2,7 +2,7 @@ %define CommitVersion %(echo $COMMIT_VERSION) Name: libserverframe -Version: 1.1.13 +Version: 1.1.14 Release: 1%{?dist} Summary: network framework library License: AGPL v3.0 @@ -12,9 +12,9 @@ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) -BuildRequires: libfastcommon-devel >= 1.0.56 +BuildRequires: libfastcommon-devel >= 1.0.57 Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id -Requires: libfastcommon >= 1.0.56 +Requires: libfastcommon >= 1.0.57 %description common framework library