From 75e8aacfd923dfbc1cc8916b62eae0a862b38bb0 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 7 Oct 2024 09:21:19 +0800 Subject: [PATCH] sf_binlog_writer.[hc] add parameter write_interval_ms for high performance --- src/sf_binlog_writer.c | 19 +++++++++++++++---- src/sf_binlog_writer.h | 30 ++++++++++++++++-------------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index aa294f5..11e6334 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -342,6 +342,7 @@ static void *binlog_writer_func(void *arg) { SFBinlogWriterThread *thread; SFBinlogWriterBuffer *wb_head; + uint32_t last_record_time; uint32_t current_timestamp; uint32_t last_timestamp; int result; @@ -357,7 +358,7 @@ static void *binlog_writer_func(void *arg) } #endif - current_timestamp = last_timestamp = 0; + last_record_time = current_timestamp = last_timestamp = 0; thread->running = true; while (SF_G_CONTINUE_FLAG) { wb_head = (SFBinlogWriterBuffer *)fc_queue_pop_all(&thread->queue); @@ -366,7 +367,7 @@ static void *binlog_writer_func(void *arg) } if ((result=deal_binlog_records(thread, wb_head, - ¤t_timestamp)) != 0) + &last_record_time)) != 0) { if (result != ERRNO_THREAD_EXIT) { logCrit("file: "__FILE__", line: %d, " @@ -379,6 +380,8 @@ static void *binlog_writer_func(void *arg) if (fc_queue_empty(&thread->queue)) { current_timestamp = 0; + } else { + current_timestamp = last_record_time; } if ((current_timestamp == 0 && last_timestamp != 0) || (current_timestamp > last_timestamp)) @@ -393,6 +396,12 @@ static void *binlog_writer_func(void *arg) } PTHREAD_MUTEX_UNLOCK(&thread->flow_ctrol.lcp.lock); } + + if (thread->write_interval_ms > 0 && + last_record_time == g_current_time) + { + fc_sleep_ms(thread->write_interval_ms); + } } thread->running = false; @@ -471,8 +480,9 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const int max_delay, const int max_record_size, const bool - use_fixed_buffer_size, const bool passive_write) + const int write_interval_ms, const int max_delay, + const int max_record_size, const bool use_fixed_buffer_size, + const bool passive_write) { const int alloc_elements_once = 1024; const int64_t alloc_elements_limit = 0; @@ -486,6 +496,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, thread->order_mode = order_mode; thread->use_fixed_buffer_size = use_fixed_buffer_size; thread->passive_write = passive_write; + thread->write_interval_ms = write_interval_ms; thread->flow_ctrol.max_delay = max_delay; writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 2ea8091..fed5d8f 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -72,6 +72,7 @@ typedef struct binlog_writer_thread { bool use_fixed_buffer_size; bool passive_write; char order_mode; + int write_interval_ms; struct { int max_delay; //in seconds volatile uint32_t last_timestamp; @@ -125,8 +126,9 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const int max_delay, const int max_record_size, const bool - use_fixed_buffer_size, const bool passive_write); + const int write_interval_ms, const int max_delay, + const int max_record_size, const bool use_fixed_buffer_size, + const bool passive_write); #define sf_binlog_writer_init_normal(writer, data_path, \ subdir_name, max_record_size, buffer_size) \ @@ -141,16 +143,16 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, buffer_size, ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE, true) #define sf_binlog_writer_init_thread(thread, name, \ - writer, max_delay, max_record_size) \ - sf_binlog_writer_init_thread_ex(thread, name, writer, \ - SF_BINLOG_THREAD_ORDER_MODE_FIXED, max_delay, \ - max_record_size, true, false) + writer, write_interval_ms, max_delay, max_record_size) \ + sf_binlog_writer_init_thread_ex(thread, name, writer, \ + SF_BINLOG_THREAD_ORDER_MODE_FIXED, write_interval_ms, \ + max_delay, max_record_size, true, false) static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, const char *data_path, const char *subdir_name, const char *file_prefix, const int buffer_size, - const int max_delay, const int max_record_size, - const bool call_fsync) + const int write_interval_ms, const int max_delay, + const int max_record_size, const bool call_fsync) { int result; if ((result=sf_binlog_writer_init_normal_ex(&context->writer, data_path, @@ -161,14 +163,14 @@ static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, } return sf_binlog_writer_init_thread(&context->thread, subdir_name, - &context->writer, max_delay, max_record_size); + &context->writer, write_interval_ms, max_delay, max_record_size); } -#define sf_binlog_writer_init(context, data_path, subdir_name, \ - buffer_size, max_delay, max_record_size) \ - sf_binlog_writer_init_ex(context, data_path, subdir_name, \ - SF_BINLOG_FILE_PREFIX, buffer_size, max_delay, \ - max_record_size, true) +#define sf_binlog_writer_init(context, data_path, subdir_name, \ + buffer_size, write_interval_ms, max_delay, max_record_size) \ + sf_binlog_writer_init_ex(context, data_path, subdir_name, \ + SF_BINLOG_FILE_PREFIX, buffer_size, write_interval_ms, \ + max_delay, max_record_size, true) void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);