From 10f4e77262c3bbae7568186429c6c4faba06ee3b Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 29 Sep 2020 11:42:17 +0800 Subject: [PATCH] binlog_writer support dynamic buffer size --- src/sf_binlog_writer.c | 30 ++++++++++++++++++++++-------- src/sf_binlog_writer.h | 19 +++++++++++++++---- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 9a09c1e..2977752 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -582,9 +582,16 @@ static int binlog_wbuffer_alloc_init(void *element, void *args) wbuffer = (SFBinlogWriterBuffer *)element; writer = (SFBinlogWriterInfo *)args; - wbuffer->bf.alloc_size = writer->cfg.max_record_size; - wbuffer->bf.buff = (char *)(wbuffer + 1); wbuffer->writer = writer; + wbuffer->bf.alloc_size = writer->cfg.max_record_size; + if (writer->thread->use_fixed_buffer_size) { + wbuffer->bf.buff = (char *)(wbuffer + 1); + } else { + wbuffer->bf.buff = (char *)fc_malloc(writer->cfg.max_record_size); + if (wbuffer->bf.buff == NULL) { + return ENOMEM; + } + } return 0; } @@ -656,20 +663,27 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, SFBinlogWriterInfo *writer, const int order_by, - const int max_record_size, const int writer_count) + const int max_record_size, const int writer_count, + const bool use_fixed_buffer_size) { const int alloc_elements_once = 1024; + int element_size; pthread_t tid; int result; int bytes; thread->order_by = order_by; + thread->use_fixed_buffer_size = use_fixed_buffer_size; writer->cfg.max_record_size = max_record_size; writer->thread = thread; + + element_size = sizeof(SFBinlogWriterBuffer); + if (use_fixed_buffer_size) { + element_size += max_record_size; + } if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog_wbuffer", - sizeof(SFBinlogWriterBuffer) + max_record_size, - alloc_elements_once, 0, binlog_wbuffer_alloc_init, - writer, true)) != 0) + element_size, alloc_elements_once, 0, + binlog_wbuffer_alloc_init, writer, true)) != 0) { return result; } @@ -697,12 +711,12 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, { SFBinlogWriterBuffer *buffer; if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, - SF_BINLOG_BUFFER_TYPESET_NEXT_VERSION)) == NULL) + SF_BINLOG_BUFFER_TYPESET_NEXT_VERSION)) == NULL) { return ENOMEM; } - sf_push_to_binlog_write_queue(writer->thread, buffer); + fc_queue_push(&writer->thread->queue, buffer); return 0; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index b2155e0..20f297e 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -49,6 +49,7 @@ typedef struct binlog_writer_thread { struct fast_mblock_man mblock; struct fc_queue queue; volatile bool running; + bool use_fixed_buffer_size; int order_by; SFBinlogWriterPtrArray flush_writers; } SFBinlogWriterThread; @@ -98,10 +99,13 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, SFBinlogWriterInfo *writer, const int order_by, - const int max_record_size, const int writer_count); + const int max_record_size, const int writer_count, + const bool use_fixed_buffer_size); -#define sf_binlog_writer_init_thread(thread, writer, order_by, max_record_size) \ - sf_binlog_writer_init_thread_ex(thread, writer, order_by, max_record_size, 1) +#define sf_binlog_writer_init_thread(thread, \ + writer, order_by, max_record_size) \ + sf_binlog_writer_init_thread_ex(thread, writer, \ + order_by, max_record_size, 1, true) static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, const char *subdir_name, const int buffer_size, @@ -171,9 +175,16 @@ static inline const char *sf_binlog_writer_get_filename(const char *subdir_name, int sf_binlog_writer_set_binlog_index(SFBinlogWriterInfo *writer, const int binlog_index); -#define sf_push_to_binlog_write_queue(thread, buffer) \ +#define sf_push_to_binlog_thread_queue(thread, buffer) \ fc_queue_push(&(thread)->queue, buffer) +static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, + SFBinlogWriterBuffer *buffer) +{ + buffer->type = SF_BINLOG_BUFFER_TYPEWRITE_TO_FILE; + fc_queue_push(&writer->thread->queue, buffer); +} + #ifdef __cplusplus } #endif