binlog writer simplify versioned buffer queue

connection_manager
YuQing 2021-01-21 10:13:21 +08:00
parent 8040af4743
commit 1ece8a3389
2 changed files with 69 additions and 142 deletions

View File

@ -256,9 +256,6 @@ static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer,
const uint64_t next_version) const uint64_t next_version)
{ {
writer->version_ctx.next = next_version; writer->version_ctx.next = next_version;
writer->version_ctx.ring.start = writer->version_ctx.ring.end =
writer->version_ctx.ring.entries + next_version %
writer->version_ctx.ring.size;
} }
static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb) static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb)
@ -295,83 +292,36 @@ static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb)
return 0; return 0;
} }
static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *wb) #define GET_WBUFFER_VERSION_COUNT(wb) \
{ (((wb)->version.last - (wb)->version.first) + 1)
SFBinlogWriterBuffer *previous;
SFBinlogWriterBuffer *current;
PTHREAD_MUTEX_LOCK(&thread->queue.lc_pair.lock); #define DEAL_CURRENT_VERSION_WBUFFER(writer, wb) \
if (thread->queue.head == NULL) {
wb->next = NULL;
thread->queue.head = thread->queue.tail = wb;
} else if (wb->version.first <= ((SFBinlogWriterBuffer *)
thread->queue.head)->version.first)
{
wb->next = thread->queue.head;
thread->queue.head = wb;
} else if (wb->version.first > ((SFBinlogWriterBuffer *)
thread->queue.tail)->version.last)
{
wb->next = NULL;
((SFBinlogWriterBuffer *)thread->queue.tail)->next = wb;
thread->queue.tail = wb;
} else {
previous = thread->queue.head;
current = ((SFBinlogWriterBuffer *)thread->queue.head)->next;
while (current != NULL && wb->version.first > current->version.last) {
previous = current;
current = current->next;
}
wb->next = previous->next;
previous->next = wb;
}
PTHREAD_MUTEX_UNLOCK(&thread->queue.lc_pair.lock);
}
#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count) \
do { \ do { \
if ((result=deal_binlog_one_record(wb)) != 0) { \ if ((result=deal_binlog_one_record(wb)) != 0) { \
return result; \ return result; \
} \ } \
writer->version_ctx.next += GET_WBUFFER_VERSION_COUNT(wb); \
fast_mblock_free_object(&writer->thread->mblock, wb); \ fast_mblock_free_object(&writer->thread->mblock, wb); \
writer->version_ctx.next += version_count; \
} while (0) } while (0)
#define GET_WBUFFER_VERSION_COUNT(wb) \
(((wb)->version.last - (wb)->version.first) + 1)
static int deal_record_by_version(SFBinlogWriterBuffer *wb) static int deal_record_by_version(SFBinlogWriterBuffer *wb)
{ {
SFBinlogWriterInfo *writer; SFBinlogWriterInfo *writer;
SFBinlogWriterBuffer **current; SFBinlogWriterBuffer *current;
int64_t distance; SFBinlogWriterBuffer *previous;
int version_count; SFBinlogWriterSlot *slot;
int result; int result;
int next_index;
bool expand;
writer = wb->writer; writer = wb->writer;
distance = (int64_t)wb->version.first - (int64_t)writer->version_ctx.next; if (wb->version.first < writer->version_ctx.next) {
if (distance >= (writer->version_ctx.ring.size - 1)) {
logWarning("file: "__FILE__", line: %d, subdir_name: %s, "
"current version: %"PRId64" is too large which "
"exceeds %"PRId64" + %d", __LINE__,
writer->cfg.subdir_name, wb->version.first,
writer->version_ctx.next,
writer->version_ctx.ring.size - 1);
repush_to_queue(writer->thread, wb);
fc_sleep_ms(10);
return EAGAIN;
} else if (distance < 0) {
logError("file: "__FILE__", line: %d, subdir_name: %s, " logError("file: "__FILE__", line: %d, subdir_name: %s, "
"current version: %"PRId64" is too small which " "current version: %"PRId64" is too small which "
"less than %"PRId64", tag: %d, buffer(%d): %.*s", __LINE__, "less than %"PRId64", tag: %"PRId64", buffer(%d): %.*s",
writer->cfg.subdir_name, wb->version.first, __LINE__, writer->cfg.subdir_name, wb->version.first,
writer->version_ctx.next, wb->tag, wb->bf.length, writer->version_ctx.next, wb->tag, wb->bf.length,
wb->bf.length, wb->bf.buff); wb->bf.length, wb->bf.buff);
fast_mblock_free_object(&writer->thread->mblock, wb); fast_mblock_free_object(&writer->thread->mblock, wb);
return EINVAL; return 0;
} }
/* /*
@ -380,81 +330,57 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb)
writer->version_ctx.next, writer); writer->version_ctx.next, writer);
*/ */
current = writer->version_ctx.ring.entries + wb->version.first %
writer->version_ctx.ring.size;
if (current == writer->version_ctx.ring.start) {
version_count = GET_WBUFFER_VERSION_COUNT(wb);
DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count);
next_index = (writer->version_ctx.ring.start - if (wb->version.first == writer->version_ctx.next) {
writer->version_ctx.ring.entries) + version_count; DEAL_CURRENT_VERSION_WBUFFER(writer, wb);
if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) {
writer->version_ctx.ring.start = writer->version_ctx.ring.end =
writer->version_ctx.ring.entries + next_index %
writer->version_ctx.ring.size;
return 0;
}
writer->version_ctx.ring.start = writer->version_ctx.ring.entries + slot = writer->version_ctx.ring.slots +
next_index % writer->version_ctx.ring.size; writer->version_ctx.next % writer->version_ctx.ring.size;
while (writer->version_ctx.ring.start != writer->version_ctx.ring.end && while (slot->head.next != NULL && slot->head.next->
*(writer->version_ctx.ring.start) != NULL) version.first == writer->version_ctx.next)
{ {
current = writer->version_ctx.ring.start; current = slot->head.next;
version_count = GET_WBUFFER_VERSION_COUNT(*current); slot->head.next = current->next;
DEAL_CURRENT_VERSION_WBUFFER(writer, *current, version_count);
*current = NULL;
next_index += version_count; DEAL_CURRENT_VERSION_WBUFFER(writer, current);
writer->version_ctx.ring.start = writer->version_ctx.ring.entries + writer->version_ctx.ring.waiting_count--;
next_index % writer->version_ctx.ring.size;
writer->version_ctx.ring.count--; slot = writer->version_ctx.ring.slots + writer->
version_ctx.next % writer->version_ctx.ring.size;
} }
return 0; return 0;
} }
version_count = GET_WBUFFER_VERSION_COUNT(wb); slot = writer->version_ctx.ring.slots + wb->version.first %
distance = (int64_t)wb->version.last - (int64_t)writer->version_ctx.next;
if (distance >= (writer->version_ctx.ring.size - 1)) {
logWarning("file: "__FILE__", line: %d, subdir_name: %s, "
"current version: %"PRId64" is too large which "
"exceeds %"PRId64" + %d", __LINE__,
writer->cfg.subdir_name, wb->version.last,
writer->version_ctx.next,
writer->version_ctx.ring.size - 1);
repush_to_queue(writer->thread, wb);
fc_sleep_ms(10);
return EAGAIN;
}
*current = wb;
writer->version_ctx.ring.count++;
if (writer->version_ctx.ring.count > writer->version_ctx.ring.max_count) {
writer->version_ctx.ring.max_count = writer->version_ctx.ring.count;
logDebug("%s max ring.count ==== %d", writer->cfg.subdir_name,
writer->version_ctx.ring.count);
}
if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { //empty
expand = true;
} else if (writer->version_ctx.ring.end > writer->version_ctx.ring.start) {
SFBinlogWriterBuffer **last;
last = writer->version_ctx.ring.entries + wb->version.last %
writer->version_ctx.ring.size; writer->version_ctx.ring.size;
expand = !(current > writer->version_ctx.ring.start && if (slot->head.next == NULL) {
last < writer->version_ctx.ring.end); wb->next = NULL;
slot->head.next = wb;
} else if (wb->version.first < slot->head.next->version.first) {
wb->next = slot->head.next;
slot->head.next = wb;
} else { } else {
expand = (current >= writer->version_ctx.ring.end && previous = slot->head.next;
current < writer->version_ctx.ring.start); while (previous->next != NULL && wb->version.first >
previous->next->version.first)
{
previous = previous->next;
} }
if (expand) { wb->next = previous->next;
writer->version_ctx.ring.end = writer->version_ctx.ring.entries + previous->next = wb;
(wb->version.last + 1) % writer->version_ctx.ring.size;
} }
return EAGAIN; writer->version_ctx.ring.waiting_count++;
if (writer->version_ctx.ring.waiting_count >
writer->version_ctx.ring.max_waitings)
{
writer->version_ctx.ring.max_waitings =
writer->version_ctx.ring.waiting_count;
}
return 0;
} }
static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread, static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread,
@ -507,9 +433,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
wbuffer = wbuffer->next; wbuffer = wbuffer->next;
if (current->type == SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION) { if (current->type == SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION) {
if (current->writer->version_ctx.ring.start != if (current->writer->version_ctx.ring.waiting_count != 0) {
current->writer->version_ctx.ring.end)
{
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"subdir_name: %s, ring not empty, " "subdir_name: %s, ring not empty, "
"maybe some mistake happen", __LINE__, "maybe some mistake happen", __LINE__,
@ -526,15 +450,17 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
fast_mblock_free_object(&current->writer-> fast_mblock_free_object(&current->writer->
thread->mblock, current); thread->mblock, current);
} else { } else {
current->writer->total_count++;
if ((result=deal_record_by_version(current)) == 0) { if ((result=deal_record_by_version(current)) == 0) {
add_to_flush_writer_queue(thread, current->writer); add_to_flush_writer_queue(thread, current->writer);
} else if (!(result == EAGAIN || result == EINVAL)) { } else {
return result; return result;
} }
} }
} while (wbuffer != NULL); } while (wbuffer != NULL);
} else { } else {
do { do {
wbuffer->writer->total_count++;
if ((result=deal_binlog_one_record(wbuffer)) != 0) { if ((result=deal_binlog_one_record(wbuffer)) != 0) {
return result; return result;
} }
@ -639,6 +565,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer,
bool create; bool create;
char filepath[PATH_MAX]; char filepath[PATH_MAX];
writer->total_count = 0;
writer->flush.in_queue = false; writer->flush.in_queue = false;
if ((result=sf_binlog_buffer_init(&writer->binlog_buffer, if ((result=sf_binlog_buffer_init(&writer->binlog_buffer,
buffer_size)) != 0) buffer_size)) != 0)
@ -681,18 +608,15 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
{ {
int bytes; int bytes;
logDebug("init writer %s ===== next version: %"PRId64", writer: %p", bytes = sizeof(SFBinlogWriterSlot) * ring_size;
subdir_name, next_version, writer); writer->version_ctx.ring.slots = (SFBinlogWriterSlot *)fc_malloc(bytes);
if (writer->version_ctx.ring.slots == NULL) {
bytes = sizeof(SFBinlogWriterBuffer *) * ring_size;
writer->version_ctx.ring.entries = (SFBinlogWriterBuffer **)fc_malloc(bytes);
if (writer->version_ctx.ring.entries == NULL) {
return ENOMEM; return ENOMEM;
} }
memset(writer->version_ctx.ring.entries, 0, bytes); memset(writer->version_ctx.ring.slots, 0, bytes);
writer->version_ctx.ring.size = ring_size; writer->version_ctx.ring.size = ring_size;
writer->version_ctx.ring.count = 0; writer->version_ctx.ring.waiting_count = 0;
writer->version_ctx.ring.max_count = 0; writer->version_ctx.ring.max_waitings = 0;
binlog_writer_set_next_version(writer, next_version); binlog_writer_set_next_version(writer, next_version);
return sf_binlog_writer_init_normal(writer, subdir_name, buffer_size); return sf_binlog_writer_init_normal(writer, subdir_name, buffer_size);

View File

@ -46,18 +46,20 @@ struct sf_binlog_writer_info;
typedef struct sf_binlog_writer_buffer { typedef struct sf_binlog_writer_buffer {
SFVersionRange version; SFVersionRange version;
BufferInfo bf; BufferInfo bf;
int tag; int64_t tag;
int type; //for versioned writer int type; //for versioned writer
struct sf_binlog_writer_info *writer; struct sf_binlog_writer_info *writer;
struct sf_binlog_writer_buffer *next; struct sf_binlog_writer_buffer *next;
} SFBinlogWriterBuffer; } SFBinlogWriterBuffer;
typedef struct sf_binlog_writer_slot {
SFBinlogWriterBuffer head;
} SFBinlogWriterSlot;
typedef struct sf_binlog_writer_buffer_ring { typedef struct sf_binlog_writer_buffer_ring {
SFBinlogWriterBuffer **entries; SFBinlogWriterSlot *slots;
SFBinlogWriterBuffer **start; //for consumer int waiting_count;
SFBinlogWriterBuffer **end; //for producer int max_waitings;
int count;
int max_count;
int size; int size;
} SFBinlogWriterBufferRing; } SFBinlogWriterBufferRing;
@ -91,6 +93,7 @@ typedef struct sf_binlog_writer_info {
char *name; char *name;
} file; } file;
int64_t total_count;
struct { struct {
SFBinlogWriterBufferRing ring; SFBinlogWriterBufferRing ring;
int64_t next; int64_t next;