sf_binlog_writer: change order_by gracefully

connection_manager
YuQing 2021-01-24 22:34:19 +08:00
parent c8899102be
commit 3659542eba
2 changed files with 71 additions and 36 deletions

View File

@ -427,12 +427,26 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
SFBinlogWriterBuffer *current;
wbuffer = wb_head;
if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
do {
current = wbuffer;
wbuffer = wbuffer->next;
do {
current = wbuffer;
wbuffer = wbuffer->next;
switch (current->type) {
case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE:
thread->order_by = current->version.first;
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break;
case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION:
if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
logWarning("file: "__FILE__", line: %d, "
"subdir_name: %s, invalid order by: %d != %d, "
"maybe some mistake happen", __LINE__,
current->writer->cfg.subdir_name, thread->order_by,
SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION);
}
if (current->type == SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION) {
if (current->writer->version_ctx.ring.waiting_count != 0) {
logWarning("file: "__FILE__", line: %d, "
"subdir_name: %s, ring not empty, "
@ -445,35 +459,37 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
__LINE__, current->writer->cfg.subdir_name,
current->version.first);
binlog_writer_set_next_version(current->writer,
current->version.first);
if (current->writer->version_ctx.next !=
current->version.first)
{
binlog_writer_set_next_version(current->writer,
current->version.first);
current->writer->version_ctx.change_count++;
}
fast_mblock_free_object(&current->writer->
thread->mblock, current);
} else {
break;
default:
current->writer->total_count++;
add_to_flush_writer_queue(thread, current->writer);
/* NOTE: current maybe be released in the deal function */
if ((result=deal_record_by_version(current)) != 0) {
return result;
if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
/* NOTE: current maybe be released in the deal function */
if ((result=deal_record_by_version(current)) != 0) {
return result;
}
} else {
if ((result=deal_binlog_one_record(current)) != 0) {
return result;
}
fast_mblock_free_object(&current->writer->
thread->mblock, current);
}
}
} while (wbuffer != NULL);
} else {
do {
wbuffer->writer->total_count++;
if ((result=deal_binlog_one_record(wbuffer)) != 0) {
return result;
}
current = wbuffer;
wbuffer = wbuffer->next;
add_to_flush_writer_queue(thread, current->writer);
fast_mblock_free_object(&current->writer->
thread->mblock, current);
} while (wbuffer != NULL);
}
break;
}
} while (wbuffer != NULL);
return flush_writer_files(thread);
}
@ -618,6 +634,7 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
writer->version_ctx.ring.size = ring_size;
writer->version_ctx.ring.waiting_count = 0;
writer->version_ctx.ring.max_waitings = 0;
writer->version_ctx.change_count = 0;
binlog_writer_set_next_version(writer, next_version);
return sf_binlog_writer_init_normal(writer, subdir_name, buffer_size);
@ -661,22 +678,38 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
SF_G_THREAD_STACK_SIZE);
}
int sf_binlog_writer_change_order_by(SFBinlogWriterThread *thread,
int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
const short order_by)
{
if (thread->order_by == order_by) {
SFBinlogWriterBuffer *buffer;
if (writer->thread->order_by == order_by) {
return 0;
}
if (thread->order_mode != SF_BINLOG_THREAD_ORDER_MODE_VARY) {
if (!(order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE ||
order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION))
{
logError("file: "__FILE__", line: %d, "
"unexpected order mode: %d, can't set "
"order by to %d!", __LINE__,
thread->order_mode, order_by);
"invalid order by: %d!", __LINE__, order_by);
return EINVAL;
}
thread->order_by = order_by;
if (writer->thread->order_mode != SF_BINLOG_THREAD_ORDER_MODE_VARY) {
logError("file: "__FILE__", line: %d, "
"unexpected order mode: %d, can't set "
"order by to %d!", __LINE__,
writer->thread->order_mode, order_by);
return EINVAL;
}
if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, order_by,
order_by, SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE)) == NULL)
{
return ENOMEM;
}
fc_queue_push(&writer->thread->queue, buffer);
return 0;
}

View File

@ -29,6 +29,7 @@
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
#define SF_BINLOG_SUBDIR_NAME_SIZE 128
#define SF_BINLOG_FILE_MAX_SIZE (1024 * 1024 * 1024) //for binlog rotating by size
@ -97,6 +98,7 @@ typedef struct sf_binlog_writer_info {
struct {
SFBinlogWriterBufferRing ring;
int64_t next;
int64_t change_count; //version change count
} version_ctx;
SFBinlogBuffer binlog_buffer;
SFBinlogWriterThread *thread;
@ -150,7 +152,7 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE, max_record_size);
}
int sf_binlog_writer_change_order_by(SFBinlogWriterThread *thread,
int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
const short order_by);
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,