order_by feature belongs to writer instead of thread

recovery_and_balance
YuQing 2022-04-17 18:18:18 +08:00
parent a57709de93
commit 952647cbc9
2 changed files with 27 additions and 23 deletions

View File

@ -193,7 +193,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
switch (current->type) { switch (current->type) {
case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE: case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE:
thread->order_by = current->version.first; current->writer->order_by = current->version.first;
fast_mblock_free_object(&current->writer-> fast_mblock_free_object(&current->writer->
thread->mblock, current); thread->mblock, current);
break; break;
@ -201,12 +201,15 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
flush_writer_files(thread); flush_writer_files(thread);
return ERRNO_THREAD_EXIT; return ERRNO_THREAD_EXIT;
case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION:
if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { if (current->writer->order_by !=
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION)
{
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"subdir_name: %s, invalid order by: %d != %d, " "subdir_name: %s, invalid order by: %d != %d, "
"maybe some mistake happen", __LINE__, "maybe some mistake happen", __LINE__,
current->writer->fw.cfg.subdir_name, thread->order_by, current->writer->fw.cfg.subdir_name,
SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION); current->writer->order_by,
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION);
} }
if (current->writer->version_ctx.ring.waiting_count != 0) { if (current->writer->version_ctx.ring.waiting_count != 0) {
@ -236,7 +239,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
current->writer->fw.total_count++; current->writer->fw.total_count++;
add_to_flush_writer_queue(thread, current->writer); add_to_flush_writer_queue(thread, current->writer);
if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { if (current->writer->order_by ==
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION)
{
/* NOTE: current maybe be released in the deal function */ /* NOTE: current maybe be released in the deal function */
if ((result=deal_record_by_version(current)) != 0) { if ((result=deal_record_by_version(current)) != 0) {
return result; return result;
@ -367,6 +372,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer,
const int buffer_size) const int buffer_size)
{ {
memset(writer, 0, sizeof(*writer)); memset(writer, 0, sizeof(*writer));
writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE;
return sf_file_writer_init(&writer->fw, data_path, return sf_file_writer_init(&writer->fw, data_path,
subdir_name, buffer_size); subdir_name, buffer_size);
} }
@ -388,6 +394,7 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
writer->version_ctx.ring.waiting_count = 0; writer->version_ctx.ring.waiting_count = 0;
writer->version_ctx.ring.max_waitings = 0; writer->version_ctx.ring.max_waitings = 0;
writer->version_ctx.change_count = 0; writer->version_ctx.change_count = 0;
writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION;
binlog_writer_set_next_version(writer, next_version); binlog_writer_set_next_version(writer, next_version);
writer->flush.in_queue = false; writer->flush.in_queue = false;
@ -397,8 +404,8 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode, const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const short order_by, const int max_record_size, const int max_record_size, const int writer_count,
const int writer_count, const bool use_fixed_buffer_size) const bool use_fixed_buffer_size)
{ {
const int alloc_elements_once = 1024; const int alloc_elements_once = 1024;
int result; int result;
@ -408,7 +415,6 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
snprintf(thread->name, sizeof(thread->name), "%s", name); snprintf(thread->name, sizeof(thread->name), "%s", name);
thread->order_mode = order_mode; thread->order_mode = order_mode;
thread->order_by = order_by;
thread->use_fixed_buffer_size = use_fixed_buffer_size; thread->use_fixed_buffer_size = use_fixed_buffer_size;
writer->fw.cfg.max_record_size = max_record_size; writer->fw.cfg.max_record_size = max_record_size;
writer->thread = thread; writer->thread = thread;
@ -445,12 +451,12 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
{ {
SFBinlogWriterBuffer *buffer; SFBinlogWriterBuffer *buffer;
if (writer->thread->order_by == order_by) { if (writer->order_by == order_by) {
return 0; return 0;
} }
if (!(order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE || if (!(order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE ||
order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION)) order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION))
{ {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"invalid order by: %d!", __LINE__, order_by); "invalid order by: %d!", __LINE__, order_by);

View File

@ -25,8 +25,8 @@
#define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0 #define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0
#define SF_BINLOG_THREAD_ORDER_MODE_VARY 1 #define SF_BINLOG_THREAD_ORDER_MODE_VARY 1
#define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0 #define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0
#define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1 #define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
@ -65,7 +65,6 @@ typedef struct binlog_writer_thread {
volatile bool running; volatile bool running;
bool use_fixed_buffer_size; bool use_fixed_buffer_size;
short order_mode; short order_mode;
short order_by;
struct { struct {
struct sf_binlog_writer_info *head; struct sf_binlog_writer_info *head;
struct sf_binlog_writer_info *tail; struct sf_binlog_writer_info *tail;
@ -82,6 +81,7 @@ typedef struct sf_binlog_writer_info {
} version_ctx; } version_ctx;
SFBinlogWriterThread *thread; SFBinlogWriterThread *thread;
short order_by;
struct { struct {
bool in_queue; bool in_queue;
struct sf_binlog_writer_info *next; struct sf_binlog_writer_info *next;
@ -108,14 +108,13 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode, const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const short order_by, const int max_record_size, const int max_record_size, const int writer_count,
const int writer_count, const bool use_fixed_buffer_size); const bool use_fixed_buffer_size);
#define sf_binlog_writer_init_thread(thread, name, \ #define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \
writer, order_by, max_record_size) \
sf_binlog_writer_init_thread_ex(thread, name, writer, \ sf_binlog_writer_init_thread_ex(thread, name, writer, \
SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ SF_BINLOG_THREAD_ORDER_MODE_FIXED, \
order_by, max_record_size, 1, true) max_record_size, 1, true)
static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
const char *data_path, const char *subdir_name, const char *data_path, const char *subdir_name,
@ -128,9 +127,8 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
return result; return result;
} }
return sf_binlog_writer_init_thread(&context->thread, subdir_name, return sf_binlog_writer_init_thread(&context->thread,
&context->writer, SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE, subdir_name, &context->writer, max_record_size);
max_record_size);
} }
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);