From d7309da368164485b8ddc03580e3785e68c96b53 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 9 Dec 2020 11:46:12 +0800 Subject: [PATCH] sf_binlog_writer support version range --- src/sf_binlog_writer.c | 96 ++++++++++++++++++++++++++++-------------- src/sf_binlog_writer.h | 17 +++++--- src/sf_types.h | 5 +++ 3 files changed, 82 insertions(+), 36 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 2e4ce4d..62963ff 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -305,13 +305,13 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer * if (thread->queue.head == NULL) { wb->next = NULL; thread->queue.head = thread->queue.tail = wb; - } else if (wb->version <= ((SFBinlogWriterBuffer *) - thread->queue.head)->version) + } else if (wb->version.first <= ((SFBinlogWriterBuffer *) + thread->queue.head)->version.first) { wb->next = thread->queue.head; thread->queue.head = wb; - } else if (wb->version > ((SFBinlogWriterBuffer *) - thread->queue.tail)->version) + } else if (wb->version.first > ((SFBinlogWriterBuffer *) + thread->queue.tail)->version.last) { wb->next = NULL; ((SFBinlogWriterBuffer *)thread->queue.tail)->next = wb; @@ -319,7 +319,7 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer * } else { previous = thread->queue.head; current = ((SFBinlogWriterBuffer *)thread->queue.head)->next; - while (current != NULL && wb->version > current->version) { + while (current != NULL && wb->version.first > current->version.last) { previous = current; current = current->next; } @@ -330,75 +330,105 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer * PTHREAD_MUTEX_UNLOCK(&thread->queue.lc_pair.lock); } -#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb) \ +#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count) \ do { \ if ((result=deal_binlog_one_record(wb)) != 0) { \ return result; \ } \ fast_mblock_free_object(&writer->thread->mblock, wb); \ - ++writer->version_ctx.next; \ + writer->version_ctx.next += version_count; \ } while (0) +#define GET_WBUFFER_VERSION_COUNT(wb) \ + (((wb)->version.last - (wb)->version.first) + 1) + static int deal_record_by_version(SFBinlogWriterBuffer *wb) { SFBinlogWriterInfo *writer; SFBinlogWriterBuffer **current; int64_t distance; + int version_count; int result; - int index; + int next_index; bool expand; writer = wb->writer; - distance = wb->version - writer->version_ctx.next; + distance = (int64_t)wb->version.first - (int64_t)writer->version_ctx.next; if (distance >= (writer->version_ctx.ring.size - 1)) { logWarning("file: "__FILE__", line: %d, subdir_name: %s, " - "current version: %"PRId64" is too large, " + "current version: %"PRId64" is too large which " "exceeds %"PRId64" + %d", __LINE__, - writer->cfg.subdir_name, wb->version, + writer->cfg.subdir_name, wb->version.first, writer->version_ctx.next, writer->version_ctx.ring.size - 1); repush_to_queue(writer->thread, wb); fc_sleep_ms(10); - return 0; + return EAGAIN; + } else if (distance < 0) { + logError("file: "__FILE__", line: %d, subdir_name: %s, " + "current version: %"PRId64" is too small which " + "less than %"PRId64, __LINE__, + writer->cfg.subdir_name, wb->version.first, + writer->version_ctx.next); + return EINVAL; } /* logInfo("%s wb version===== %"PRId64", next: %"PRId64", writer: %p", - writer->cfg.subdir_name, wb->version, + writer->cfg.subdir_name, wb->version.first, writer->version_ctx.next, writer); */ - current = writer->version_ctx.ring.entries + wb->version % + current = writer->version_ctx.ring.entries + wb->version.first % writer->version_ctx.ring.size; if (current == writer->version_ctx.ring.start) { - DEAL_CURRENT_VERSION_WBUFFER(writer, wb); + version_count = GET_WBUFFER_VERSION_COUNT(wb); + DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count); - index = writer->version_ctx.ring.start - writer->version_ctx.ring.entries; + next_index = (writer->version_ctx.ring.start - + writer->version_ctx.ring.entries) + version_count; if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { writer->version_ctx.ring.start = writer->version_ctx.ring.end = - writer->version_ctx.ring.entries + - (++index) % writer->version_ctx.ring.size; + writer->version_ctx.ring.entries + next_index % + writer->version_ctx.ring.size; return 0; } writer->version_ctx.ring.start = writer->version_ctx.ring.entries + - (++index) % writer->version_ctx.ring.size; + next_index % writer->version_ctx.ring.size; while (writer->version_ctx.ring.start != writer->version_ctx.ring.end && *(writer->version_ctx.ring.start) != NULL) { - DEAL_CURRENT_VERSION_WBUFFER(writer, *(writer->version_ctx.ring.start)); - *(writer->version_ctx.ring.start) = NULL; + current = writer->version_ctx.ring.start; + version_count = GET_WBUFFER_VERSION_COUNT(*current); + DEAL_CURRENT_VERSION_WBUFFER(writer, *current, version_count); + *current = NULL; + next_index += version_count; writer->version_ctx.ring.start = writer->version_ctx.ring.entries + - (++index) % writer->version_ctx.ring.size; + next_index % writer->version_ctx.ring.size; writer->version_ctx.ring.count--; } + return 0; } + version_count = GET_WBUFFER_VERSION_COUNT(wb); + distance = (int64_t)wb->version.last - (int64_t)writer->version_ctx.next; + if (distance >= (writer->version_ctx.ring.size - 1)) { + logWarning("file: "__FILE__", line: %d, subdir_name: %s, " + "current version: %"PRId64" is too large which " + "exceeds %"PRId64" + %d", __LINE__, + writer->cfg.subdir_name, wb->version.last, + writer->version_ctx.next, + writer->version_ctx.ring.size - 1); + repush_to_queue(writer->thread, wb); + fc_sleep_ms(10); + return EAGAIN; + } + *current = wb; writer->version_ctx.ring.count++; - if (writer->version_ctx.ring.count > writer->version_ctx.ring.max_count) { writer->version_ctx.ring.max_count = writer->version_ctx.ring.count; logDebug("%s max ring.count ==== %d", writer->cfg.subdir_name, @@ -408,8 +438,11 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb) if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { //empty expand = true; } else if (writer->version_ctx.ring.end > writer->version_ctx.ring.start) { + SFBinlogWriterBuffer **last; + last = writer->version_ctx.ring.entries + wb->version.last % + writer->version_ctx.ring.size; expand = !(current > writer->version_ctx.ring.start && - current < writer->version_ctx.ring.end); + last < writer->version_ctx.ring.end); } else { expand = (current >= writer->version_ctx.ring.end && current < writer->version_ctx.ring.start); @@ -417,10 +450,10 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb) if (expand) { writer->version_ctx.ring.end = writer->version_ctx.ring.entries + - (wb->version + 1) % writer->version_ctx.ring.size; + (wb->version.last + 1) % writer->version_ctx.ring.size; } - return 0; + return EAGAIN; } static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread, @@ -485,15 +518,16 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, logDebug("file: "__FILE__", line: %d, " "subdir_name: %s, set next version to %"PRId64, __LINE__, current->writer->cfg.subdir_name, - current->version); + current->version.first); binlog_writer_set_next_version(current->writer, - current->version); + current->version.first); fast_mblock_free_object(¤t->writer-> thread->mblock, current); } else { - add_to_flush_writer_queue(thread, current->writer); - if ((result=deal_record_by_version(current)) != 0) { + if ((result=deal_record_by_version(current)) == 0) { + add_to_flush_writer_queue(thread, current->writer); + } else if (!(result == EAGAIN || result == EINVAL)) { return result; } } @@ -734,7 +768,7 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, } if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, - SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) + next_version, SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) { return ENOMEM; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index faf1671..62f39d2 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -41,7 +41,7 @@ struct sf_binlog_writer_info; typedef struct sf_binlog_writer_buffer { - int64_t version; + SFVersionRange version; BufferInfo bf; int type; //for versioned writer struct sf_binlog_writer_info *writer; @@ -162,12 +162,18 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_buffer( return (SFBinlogWriterBuffer *)fast_mblock_alloc_object(&thread->mblock); } -#define sf_binlog_writer_alloc_versioned_buffer(writer, version) \ +#define sf_binlog_writer_alloc_one_version_buffer(writer, version) \ sf_binlog_writer_alloc_versioned_buffer_ex(writer, version, \ - SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE) + version, SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE) + +#define sf_binlog_writer_alloc_multi_version_buffer(writer, \ + first_version, last_version) \ + sf_binlog_writer_alloc_versioned_buffer_ex(writer, first_version, \ + last_version, SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE) static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( - SFBinlogWriterInfo *writer, const int64_t version, const int type) + SFBinlogWriterInfo *writer, const int64_t first_version, + const int64_t last_version, const int type) { SFBinlogWriterBuffer *buffer; buffer = (SFBinlogWriterBuffer *)fast_mblock_alloc_object( @@ -175,7 +181,8 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( if (buffer != NULL) { buffer->type = type; buffer->writer = writer; - buffer->version = version; + buffer->version.first = first_version; + buffer->version.last = last_version; } return buffer; } diff --git a/src/sf_types.h b/src/sf_types.h index e8d9296..a24603d 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -102,4 +102,9 @@ typedef struct sf_space_stat { int64_t used; } SFSpaceStat; +typedef struct sf_version_range { + int64_t first; //including + int64_t last; //including +} SFVersionRange; + #endif