sf_binlog_writer support version range

connection_manager
YuQing 2020-12-09 11:46:12 +08:00
parent a6b9d8d08d
commit d7309da368
3 changed files with 82 additions and 36 deletions

View File

@ -305,13 +305,13 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *
if (thread->queue.head == NULL) { if (thread->queue.head == NULL) {
wb->next = NULL; wb->next = NULL;
thread->queue.head = thread->queue.tail = wb; thread->queue.head = thread->queue.tail = wb;
} else if (wb->version <= ((SFBinlogWriterBuffer *) } else if (wb->version.first <= ((SFBinlogWriterBuffer *)
thread->queue.head)->version) thread->queue.head)->version.first)
{ {
wb->next = thread->queue.head; wb->next = thread->queue.head;
thread->queue.head = wb; thread->queue.head = wb;
} else if (wb->version > ((SFBinlogWriterBuffer *) } else if (wb->version.first > ((SFBinlogWriterBuffer *)
thread->queue.tail)->version) thread->queue.tail)->version.last)
{ {
wb->next = NULL; wb->next = NULL;
((SFBinlogWriterBuffer *)thread->queue.tail)->next = wb; ((SFBinlogWriterBuffer *)thread->queue.tail)->next = wb;
@ -319,7 +319,7 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *
} else { } else {
previous = thread->queue.head; previous = thread->queue.head;
current = ((SFBinlogWriterBuffer *)thread->queue.head)->next; current = ((SFBinlogWriterBuffer *)thread->queue.head)->next;
while (current != NULL && wb->version > current->version) { while (current != NULL && wb->version.first > current->version.last) {
previous = current; previous = current;
current = current->next; current = current->next;
} }
@ -330,75 +330,105 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *
PTHREAD_MUTEX_UNLOCK(&thread->queue.lc_pair.lock); PTHREAD_MUTEX_UNLOCK(&thread->queue.lc_pair.lock);
} }
#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb) \ #define DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count) \
do { \ do { \
if ((result=deal_binlog_one_record(wb)) != 0) { \ if ((result=deal_binlog_one_record(wb)) != 0) { \
return result; \ return result; \
} \ } \
fast_mblock_free_object(&writer->thread->mblock, wb); \ fast_mblock_free_object(&writer->thread->mblock, wb); \
++writer->version_ctx.next; \ writer->version_ctx.next += version_count; \
} while (0) } while (0)
#define GET_WBUFFER_VERSION_COUNT(wb) \
(((wb)->version.last - (wb)->version.first) + 1)
static int deal_record_by_version(SFBinlogWriterBuffer *wb) static int deal_record_by_version(SFBinlogWriterBuffer *wb)
{ {
SFBinlogWriterInfo *writer; SFBinlogWriterInfo *writer;
SFBinlogWriterBuffer **current; SFBinlogWriterBuffer **current;
int64_t distance; int64_t distance;
int version_count;
int result; int result;
int index; int next_index;
bool expand; bool expand;
writer = wb->writer; writer = wb->writer;
distance = wb->version - writer->version_ctx.next; distance = (int64_t)wb->version.first - (int64_t)writer->version_ctx.next;
if (distance >= (writer->version_ctx.ring.size - 1)) { if (distance >= (writer->version_ctx.ring.size - 1)) {
logWarning("file: "__FILE__", line: %d, subdir_name: %s, " logWarning("file: "__FILE__", line: %d, subdir_name: %s, "
"current version: %"PRId64" is too large, " "current version: %"PRId64" is too large which "
"exceeds %"PRId64" + %d", __LINE__, "exceeds %"PRId64" + %d", __LINE__,
writer->cfg.subdir_name, wb->version, writer->cfg.subdir_name, wb->version.first,
writer->version_ctx.next, writer->version_ctx.next,
writer->version_ctx.ring.size - 1); writer->version_ctx.ring.size - 1);
repush_to_queue(writer->thread, wb); repush_to_queue(writer->thread, wb);
fc_sleep_ms(10); fc_sleep_ms(10);
return 0; return EAGAIN;
} else if (distance < 0) {
logError("file: "__FILE__", line: %d, subdir_name: %s, "
"current version: %"PRId64" is too small which "
"less than %"PRId64, __LINE__,
writer->cfg.subdir_name, wb->version.first,
writer->version_ctx.next);
return EINVAL;
} }
/* /*
logInfo("%s wb version===== %"PRId64", next: %"PRId64", writer: %p", logInfo("%s wb version===== %"PRId64", next: %"PRId64", writer: %p",
writer->cfg.subdir_name, wb->version, writer->cfg.subdir_name, wb->version.first,
writer->version_ctx.next, writer); writer->version_ctx.next, writer);
*/ */
current = writer->version_ctx.ring.entries + wb->version % current = writer->version_ctx.ring.entries + wb->version.first %
writer->version_ctx.ring.size; writer->version_ctx.ring.size;
if (current == writer->version_ctx.ring.start) { if (current == writer->version_ctx.ring.start) {
DEAL_CURRENT_VERSION_WBUFFER(writer, wb); version_count = GET_WBUFFER_VERSION_COUNT(wb);
DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count);
index = writer->version_ctx.ring.start - writer->version_ctx.ring.entries; next_index = (writer->version_ctx.ring.start -
writer->version_ctx.ring.entries) + version_count;
if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) {
writer->version_ctx.ring.start = writer->version_ctx.ring.end = writer->version_ctx.ring.start = writer->version_ctx.ring.end =
writer->version_ctx.ring.entries + writer->version_ctx.ring.entries + next_index %
(++index) % writer->version_ctx.ring.size; writer->version_ctx.ring.size;
return 0; return 0;
} }
writer->version_ctx.ring.start = writer->version_ctx.ring.entries + writer->version_ctx.ring.start = writer->version_ctx.ring.entries +
(++index) % writer->version_ctx.ring.size; next_index % writer->version_ctx.ring.size;
while (writer->version_ctx.ring.start != writer->version_ctx.ring.end && while (writer->version_ctx.ring.start != writer->version_ctx.ring.end &&
*(writer->version_ctx.ring.start) != NULL) *(writer->version_ctx.ring.start) != NULL)
{ {
DEAL_CURRENT_VERSION_WBUFFER(writer, *(writer->version_ctx.ring.start)); current = writer->version_ctx.ring.start;
*(writer->version_ctx.ring.start) = NULL; version_count = GET_WBUFFER_VERSION_COUNT(*current);
DEAL_CURRENT_VERSION_WBUFFER(writer, *current, version_count);
*current = NULL;
next_index += version_count;
writer->version_ctx.ring.start = writer->version_ctx.ring.entries + writer->version_ctx.ring.start = writer->version_ctx.ring.entries +
(++index) % writer->version_ctx.ring.size; next_index % writer->version_ctx.ring.size;
writer->version_ctx.ring.count--; writer->version_ctx.ring.count--;
} }
return 0; return 0;
} }
version_count = GET_WBUFFER_VERSION_COUNT(wb);
distance = (int64_t)wb->version.last - (int64_t)writer->version_ctx.next;
if (distance >= (writer->version_ctx.ring.size - 1)) {
logWarning("file: "__FILE__", line: %d, subdir_name: %s, "
"current version: %"PRId64" is too large which "
"exceeds %"PRId64" + %d", __LINE__,
writer->cfg.subdir_name, wb->version.last,
writer->version_ctx.next,
writer->version_ctx.ring.size - 1);
repush_to_queue(writer->thread, wb);
fc_sleep_ms(10);
return EAGAIN;
}
*current = wb; *current = wb;
writer->version_ctx.ring.count++; writer->version_ctx.ring.count++;
if (writer->version_ctx.ring.count > writer->version_ctx.ring.max_count) { if (writer->version_ctx.ring.count > writer->version_ctx.ring.max_count) {
writer->version_ctx.ring.max_count = writer->version_ctx.ring.count; writer->version_ctx.ring.max_count = writer->version_ctx.ring.count;
logDebug("%s max ring.count ==== %d", writer->cfg.subdir_name, logDebug("%s max ring.count ==== %d", writer->cfg.subdir_name,
@ -408,8 +438,11 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb)
if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { //empty if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { //empty
expand = true; expand = true;
} else if (writer->version_ctx.ring.end > writer->version_ctx.ring.start) { } else if (writer->version_ctx.ring.end > writer->version_ctx.ring.start) {
SFBinlogWriterBuffer **last;
last = writer->version_ctx.ring.entries + wb->version.last %
writer->version_ctx.ring.size;
expand = !(current > writer->version_ctx.ring.start && expand = !(current > writer->version_ctx.ring.start &&
current < writer->version_ctx.ring.end); last < writer->version_ctx.ring.end);
} else { } else {
expand = (current >= writer->version_ctx.ring.end && expand = (current >= writer->version_ctx.ring.end &&
current < writer->version_ctx.ring.start); current < writer->version_ctx.ring.start);
@ -417,10 +450,10 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb)
if (expand) { if (expand) {
writer->version_ctx.ring.end = writer->version_ctx.ring.entries + writer->version_ctx.ring.end = writer->version_ctx.ring.entries +
(wb->version + 1) % writer->version_ctx.ring.size; (wb->version.last + 1) % writer->version_ctx.ring.size;
} }
return 0; return EAGAIN;
} }
static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread, static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread,
@ -485,15 +518,16 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"subdir_name: %s, set next version to %"PRId64, "subdir_name: %s, set next version to %"PRId64,
__LINE__, current->writer->cfg.subdir_name, __LINE__, current->writer->cfg.subdir_name,
current->version); current->version.first);
binlog_writer_set_next_version(current->writer, binlog_writer_set_next_version(current->writer,
current->version); current->version.first);
fast_mblock_free_object(&current->writer-> fast_mblock_free_object(&current->writer->
thread->mblock, current); thread->mblock, current);
} else { } else {
add_to_flush_writer_queue(thread, current->writer); if ((result=deal_record_by_version(current)) == 0) {
if ((result=deal_record_by_version(current)) != 0) { add_to_flush_writer_queue(thread, current->writer);
} else if (!(result == EAGAIN || result == EINVAL)) {
return result; return result;
} }
} }
@ -734,7 +768,7 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
} }
if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version,
SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) next_version, SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL)
{ {
return ENOMEM; return ENOMEM;
} }

View File

@ -41,7 +41,7 @@
struct sf_binlog_writer_info; struct sf_binlog_writer_info;
typedef struct sf_binlog_writer_buffer { typedef struct sf_binlog_writer_buffer {
int64_t version; SFVersionRange version;
BufferInfo bf; BufferInfo bf;
int type; //for versioned writer int type; //for versioned writer
struct sf_binlog_writer_info *writer; struct sf_binlog_writer_info *writer;
@ -162,12 +162,18 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_buffer(
return (SFBinlogWriterBuffer *)fast_mblock_alloc_object(&thread->mblock); return (SFBinlogWriterBuffer *)fast_mblock_alloc_object(&thread->mblock);
} }
#define sf_binlog_writer_alloc_versioned_buffer(writer, version) \ #define sf_binlog_writer_alloc_one_version_buffer(writer, version) \
sf_binlog_writer_alloc_versioned_buffer_ex(writer, version, \ sf_binlog_writer_alloc_versioned_buffer_ex(writer, version, \
SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE) version, SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE)
#define sf_binlog_writer_alloc_multi_version_buffer(writer, \
first_version, last_version) \
sf_binlog_writer_alloc_versioned_buffer_ex(writer, first_version, \
last_version, SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE)
static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
SFBinlogWriterInfo *writer, const int64_t version, const int type) SFBinlogWriterInfo *writer, const int64_t first_version,
const int64_t last_version, const int type)
{ {
SFBinlogWriterBuffer *buffer; SFBinlogWriterBuffer *buffer;
buffer = (SFBinlogWriterBuffer *)fast_mblock_alloc_object( buffer = (SFBinlogWriterBuffer *)fast_mblock_alloc_object(
@ -175,7 +181,8 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
if (buffer != NULL) { if (buffer != NULL) {
buffer->type = type; buffer->type = type;
buffer->writer = writer; buffer->writer = writer;
buffer->version = version; buffer->version.first = first_version;
buffer->version.last = last_version;
} }
return buffer; return buffer;
} }

View File

@ -102,4 +102,9 @@ typedef struct sf_space_stat {
int64_t used; int64_t used;
} SFSpaceStat; } SFSpaceStat;
typedef struct sf_version_range {
int64_t first; //including
int64_t last; //including
} SFVersionRange;
#endif #endif