binlog_writer support dynamic buffer size

connection_manager
YuQing 2020-09-29 11:42:17 +08:00
parent c0d28ef560
commit 10f4e77262
2 changed files with 37 additions and 12 deletions

View File

@ -582,9 +582,16 @@ static int binlog_wbuffer_alloc_init(void *element, void *args)
wbuffer = (SFBinlogWriterBuffer *)element; wbuffer = (SFBinlogWriterBuffer *)element;
writer = (SFBinlogWriterInfo *)args; writer = (SFBinlogWriterInfo *)args;
wbuffer->bf.alloc_size = writer->cfg.max_record_size;
wbuffer->bf.buff = (char *)(wbuffer + 1);
wbuffer->writer = writer; wbuffer->writer = writer;
wbuffer->bf.alloc_size = writer->cfg.max_record_size;
if (writer->thread->use_fixed_buffer_size) {
wbuffer->bf.buff = (char *)(wbuffer + 1);
} else {
wbuffer->bf.buff = (char *)fc_malloc(writer->cfg.max_record_size);
if (wbuffer->bf.buff == NULL) {
return ENOMEM;
}
}
return 0; return 0;
} }
@ -656,20 +663,27 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
SFBinlogWriterInfo *writer, const int order_by, SFBinlogWriterInfo *writer, const int order_by,
const int max_record_size, const int writer_count) const int max_record_size, const int writer_count,
const bool use_fixed_buffer_size)
{ {
const int alloc_elements_once = 1024; const int alloc_elements_once = 1024;
int element_size;
pthread_t tid; pthread_t tid;
int result; int result;
int bytes; int bytes;
thread->order_by = order_by; thread->order_by = order_by;
thread->use_fixed_buffer_size = use_fixed_buffer_size;
writer->cfg.max_record_size = max_record_size; writer->cfg.max_record_size = max_record_size;
writer->thread = thread; writer->thread = thread;
element_size = sizeof(SFBinlogWriterBuffer);
if (use_fixed_buffer_size) {
element_size += max_record_size;
}
if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog_wbuffer", if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog_wbuffer",
sizeof(SFBinlogWriterBuffer) + max_record_size, element_size, alloc_elements_once, 0,
alloc_elements_once, 0, binlog_wbuffer_alloc_init, binlog_wbuffer_alloc_init, writer, true)) != 0)
writer, true)) != 0)
{ {
return result; return result;
} }
@ -702,7 +716,7 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
return ENOMEM; return ENOMEM;
} }
sf_push_to_binlog_write_queue(writer->thread, buffer); fc_queue_push(&writer->thread->queue, buffer);
return 0; return 0;
} }

View File

@ -49,6 +49,7 @@ typedef struct binlog_writer_thread {
struct fast_mblock_man mblock; struct fast_mblock_man mblock;
struct fc_queue queue; struct fc_queue queue;
volatile bool running; volatile bool running;
bool use_fixed_buffer_size;
int order_by; int order_by;
SFBinlogWriterPtrArray flush_writers; SFBinlogWriterPtrArray flush_writers;
} SFBinlogWriterThread; } SFBinlogWriterThread;
@ -98,10 +99,13 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
SFBinlogWriterInfo *writer, const int order_by, SFBinlogWriterInfo *writer, const int order_by,
const int max_record_size, const int writer_count); const int max_record_size, const int writer_count,
const bool use_fixed_buffer_size);
#define sf_binlog_writer_init_thread(thread, writer, order_by, max_record_size) \ #define sf_binlog_writer_init_thread(thread, \
sf_binlog_writer_init_thread_ex(thread, writer, order_by, max_record_size, 1) writer, order_by, max_record_size) \
sf_binlog_writer_init_thread_ex(thread, writer, \
order_by, max_record_size, 1, true)
static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
const char *subdir_name, const int buffer_size, const char *subdir_name, const int buffer_size,
@ -171,9 +175,16 @@ static inline const char *sf_binlog_writer_get_filename(const char *subdir_name,
int sf_binlog_writer_set_binlog_index(SFBinlogWriterInfo *writer, int sf_binlog_writer_set_binlog_index(SFBinlogWriterInfo *writer,
const int binlog_index); const int binlog_index);
#define sf_push_to_binlog_write_queue(thread, buffer) \ #define sf_push_to_binlog_thread_queue(thread, buffer) \
fc_queue_push(&(thread)->queue, buffer) fc_queue_push(&(thread)->queue, buffer)
static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
SFBinlogWriterBuffer *buffer)
{
buffer->type = SF_BINLOG_BUFFER_TYPEWRITE_TO_FILE;
fc_queue_push(&writer->thread->queue, buffer);
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif