diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 4f3c70a..1cfb777 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -427,12 +427,26 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *current; wbuffer = wb_head; - if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { - do { - current = wbuffer; - wbuffer = wbuffer->next; + do { + current = wbuffer; + wbuffer = wbuffer->next; + + switch (current->type) { + case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE: + thread->order_by = current->version.first; + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); + break; + + case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: + if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + logWarning("file: "__FILE__", line: %d, " + "subdir_name: %s, invalid order by: %d != %d, " + "maybe some mistake happen", __LINE__, + current->writer->cfg.subdir_name, thread->order_by, + SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION); + } - if (current->type == SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION) { if (current->writer->version_ctx.ring.waiting_count != 0) { logWarning("file: "__FILE__", line: %d, " "subdir_name: %s, ring not empty, " @@ -445,35 +459,37 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, __LINE__, current->writer->cfg.subdir_name, current->version.first); - binlog_writer_set_next_version(current->writer, - current->version.first); + if (current->writer->version_ctx.next != + current->version.first) + { + binlog_writer_set_next_version(current->writer, + current->version.first); + current->writer->version_ctx.change_count++; + } fast_mblock_free_object(¤t->writer-> thread->mblock, current); - } else { + break; + + default: current->writer->total_count++; add_to_flush_writer_queue(thread, current->writer); - /* NOTE: current maybe be released in the deal function */ - if ((result=deal_record_by_version(current)) != 0) { - return result; + if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + /* NOTE: current maybe be released in the deal function */ + if ((result=deal_record_by_version(current)) != 0) { + return result; + } + } else { + if ((result=deal_binlog_one_record(current)) != 0) { + return result; + } + + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); } - } - } while (wbuffer != NULL); - } else { - do { - wbuffer->writer->total_count++; - if ((result=deal_binlog_one_record(wbuffer)) != 0) { - return result; - } - - current = wbuffer; - wbuffer = wbuffer->next; - - add_to_flush_writer_queue(thread, current->writer); - fast_mblock_free_object(¤t->writer-> - thread->mblock, current); - } while (wbuffer != NULL); - } + break; + } + } while (wbuffer != NULL); return flush_writer_files(thread); } @@ -618,6 +634,7 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, writer->version_ctx.ring.size = ring_size; writer->version_ctx.ring.waiting_count = 0; writer->version_ctx.ring.max_waitings = 0; + writer->version_ctx.change_count = 0; binlog_writer_set_next_version(writer, next_version); return sf_binlog_writer_init_normal(writer, subdir_name, buffer_size); @@ -661,22 +678,38 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, SF_G_THREAD_STACK_SIZE); } -int sf_binlog_writer_change_order_by(SFBinlogWriterThread *thread, +int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, const short order_by) { - if (thread->order_by == order_by) { + SFBinlogWriterBuffer *buffer; + + if (writer->thread->order_by == order_by) { return 0; } - if (thread->order_mode != SF_BINLOG_THREAD_ORDER_MODE_VARY) { + if (!(order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE || + order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION)) + { logError("file: "__FILE__", line: %d, " - "unexpected order mode: %d, can't set " - "order by to %d!", __LINE__, - thread->order_mode, order_by); + "invalid order by: %d!", __LINE__, order_by); return EINVAL; } - thread->order_by = order_by; + if (writer->thread->order_mode != SF_BINLOG_THREAD_ORDER_MODE_VARY) { + logError("file: "__FILE__", line: %d, " + "unexpected order mode: %d, can't set " + "order by to %d!", __LINE__, + writer->thread->order_mode, order_by); + return EINVAL; + } + + if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, order_by, + order_by, SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE)) == NULL) + { + return ENOMEM; + } + + fc_queue_push(&writer->thread->queue, buffer); return 0; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index a38d635..f5a8092 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -29,6 +29,7 @@ #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 +#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 #define SF_BINLOG_SUBDIR_NAME_SIZE 128 #define SF_BINLOG_FILE_MAX_SIZE (1024 * 1024 * 1024) //for binlog rotating by size @@ -97,6 +98,7 @@ typedef struct sf_binlog_writer_info { struct { SFBinlogWriterBufferRing ring; int64_t next; + int64_t change_count; //version change count } version_ctx; SFBinlogBuffer binlog_buffer; SFBinlogWriterThread *thread; @@ -150,7 +152,7 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE, max_record_size); } -int sf_binlog_writer_change_order_by(SFBinlogWriterThread *thread, +int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, const short order_by); int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,