diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 2977752..57f8cf7 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -469,12 +469,12 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, thread->flush_writers.count = 0; wbuffer = wb_head; - if (thread->order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) { + if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { do { current = wbuffer; wbuffer = wbuffer->next; - if (current->type == SF_BINLOG_BUFFER_TYPESET_NEXT_VERSION) { + if (current->type == SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION) { if (current->writer->version_ctx.ring.start != current->writer->version_ctx.ring.end) { @@ -662,9 +662,9 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, } int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, - SFBinlogWriterInfo *writer, const int order_by, - const int max_record_size, const int writer_count, - const bool use_fixed_buffer_size) + SFBinlogWriterInfo *writer, const short order_mode, + const short order_by, const int max_record_size, + const int writer_count, const bool use_fixed_buffer_size) { const int alloc_elements_once = 1024; int element_size; @@ -672,6 +672,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int result; int bytes; + thread->order_mode = order_mode; thread->order_by = order_by; thread->use_fixed_buffer_size = use_fixed_buffer_size; writer->cfg.max_record_size = max_record_size; @@ -706,12 +707,40 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, SF_G_THREAD_STACK_SIZE); } +int sf_binlog_writer_change_order_by(SFBinlogWriterThread *thread, + const short order_by) +{ + if (thread->order_by == order_by) { + return 0; + } + + if (thread->order_mode != SF_BINLOG_THREAD_ORDER_MODE_VARY) { + logError("file: "__FILE__", line: %d, " + "unexpected order mode: %d, can't set " + "order by to %d!", __LINE__, + thread->order_mode, order_by); + return EINVAL; + } + + thread->order_by = order_by; + return 0; +} + int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version) { SFBinlogWriterBuffer *buffer; + + if (writer->thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + logError("file: "__FILE__", line: %d, " + "unexpected order by type: %d, can't set " + "next version to %"PRId64"!", __LINE__, + writer->thread->order_by, next_version); + return EINVAL; + } + if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, - SF_BINLOG_BUFFER_TYPESET_NEXT_VERSION)) == NULL) + SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) { return ENOMEM; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 20f297e..5bb1189 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -6,11 +6,14 @@ #include "fastcommon/fc_queue.h" #include "sf_types.h" -#define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0 -#define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1 +#define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0 +#define SF_BINLOG_THREAD_ORDER_MODE_VARY 1 -#define SF_BINLOG_BUFFER_TYPEWRITE_TO_FILE 0 //default type, must be 0 -#define SF_BINLOG_BUFFER_TYPESET_NEXT_VERSION 1 +#define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0 +#define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1 + +#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 +#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_SUBDIR_NAME_SIZE 128 #define SF_BINLOG_FILE_MAX_SIZE (1024 * 1024 * 1024) //for binlog rotating by size @@ -50,7 +53,8 @@ typedef struct binlog_writer_thread { struct fc_queue queue; volatile bool running; bool use_fixed_buffer_size; - int order_by; + short order_mode; + short order_by; SFBinlogWriterPtrArray flush_writers; } SFBinlogWriterThread; @@ -98,13 +102,14 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, const int buffer_size, const int ring_size); int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, - SFBinlogWriterInfo *writer, const int order_by, - const int max_record_size, const int writer_count, - const bool use_fixed_buffer_size); + SFBinlogWriterInfo *writer, const short order_mode, + const short order_by, const int max_record_size, + const int writer_count, const bool use_fixed_buffer_size); #define sf_binlog_writer_init_thread(thread, \ writer, order_by, max_record_size) \ sf_binlog_writer_init_thread_ex(thread, writer, \ + SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ order_by, max_record_size, 1, true) static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, @@ -119,9 +124,12 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, } return sf_binlog_writer_init_thread(&context->thread, &context->writer, - SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE, max_record_size); + SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE, max_record_size); } +int sf_binlog_writer_change_order_by(SFBinlogWriterThread *thread, + const short order_by); + int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); @@ -140,7 +148,7 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_buffer( #define sf_binlog_writer_alloc_versioned_buffer(writer, version) \ sf_binlog_writer_alloc_versioned_buffer_ex(writer, version, \ - SF_BINLOG_BUFFER_TYPEWRITE_TO_FILE) + SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE) static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( SFBinlogWriterInfo *writer, const int64_t version, const int type) @@ -181,7 +189,7 @@ int sf_binlog_writer_set_binlog_index(SFBinlogWriterInfo *writer, static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, SFBinlogWriterBuffer *buffer) { - buffer->type = SF_BINLOG_BUFFER_TYPEWRITE_TO_FILE; + buffer->type = SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE; fc_queue_push(&writer->thread->queue, buffer); }