sf_binlog_writer.[hc] add parameter write_interval_ms for high performance

use_iouring
YuQing 2024-10-07 09:21:19 +08:00
parent be4bad4ee1
commit 75e8aacfd9
2 changed files with 31 additions and 18 deletions

View File

@ -342,6 +342,7 @@ static void *binlog_writer_func(void *arg)
{
SFBinlogWriterThread *thread;
SFBinlogWriterBuffer *wb_head;
uint32_t last_record_time;
uint32_t current_timestamp;
uint32_t last_timestamp;
int result;
@ -357,7 +358,7 @@ static void *binlog_writer_func(void *arg)
}
#endif
current_timestamp = last_timestamp = 0;
last_record_time = current_timestamp = last_timestamp = 0;
thread->running = true;
while (SF_G_CONTINUE_FLAG) {
wb_head = (SFBinlogWriterBuffer *)fc_queue_pop_all(&thread->queue);
@ -366,7 +367,7 @@ static void *binlog_writer_func(void *arg)
}
if ((result=deal_binlog_records(thread, wb_head,
&current_timestamp)) != 0)
&last_record_time)) != 0)
{
if (result != ERRNO_THREAD_EXIT) {
logCrit("file: "__FILE__", line: %d, "
@ -379,6 +380,8 @@ static void *binlog_writer_func(void *arg)
if (fc_queue_empty(&thread->queue)) {
current_timestamp = 0;
} else {
current_timestamp = last_record_time;
}
if ((current_timestamp == 0 && last_timestamp != 0) ||
(current_timestamp > last_timestamp))
@ -393,6 +396,12 @@ static void *binlog_writer_func(void *arg)
}
PTHREAD_MUTEX_UNLOCK(&thread->flow_ctrol.lcp.lock);
}
if (thread->write_interval_ms > 0 &&
last_record_time == g_current_time)
{
fc_sleep_ms(thread->write_interval_ms);
}
}
thread->running = false;
@ -471,8 +480,9 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const int max_delay, const int max_record_size, const bool
use_fixed_buffer_size, const bool passive_write)
const int write_interval_ms, const int max_delay,
const int max_record_size, const bool use_fixed_buffer_size,
const bool passive_write)
{
const int alloc_elements_once = 1024;
const int64_t alloc_elements_limit = 0;
@ -486,6 +496,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
thread->order_mode = order_mode;
thread->use_fixed_buffer_size = use_fixed_buffer_size;
thread->passive_write = passive_write;
thread->write_interval_ms = write_interval_ms;
thread->flow_ctrol.max_delay = max_delay;
writer->fw.cfg.max_record_size = max_record_size;
writer->thread = thread;

View File

@ -72,6 +72,7 @@ typedef struct binlog_writer_thread {
bool use_fixed_buffer_size;
bool passive_write;
char order_mode;
int write_interval_ms;
struct {
int max_delay; //in seconds
volatile uint32_t last_timestamp;
@ -125,8 +126,9 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const int max_delay, const int max_record_size, const bool
use_fixed_buffer_size, const bool passive_write);
const int write_interval_ms, const int max_delay,
const int max_record_size, const bool use_fixed_buffer_size,
const bool passive_write);
#define sf_binlog_writer_init_normal(writer, data_path, \
subdir_name, max_record_size, buffer_size) \
@ -141,16 +143,16 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
buffer_size, ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE, true)
#define sf_binlog_writer_init_thread(thread, name, \
writer, max_delay, max_record_size) \
sf_binlog_writer_init_thread_ex(thread, name, writer, \
SF_BINLOG_THREAD_ORDER_MODE_FIXED, max_delay, \
max_record_size, true, false)
writer, write_interval_ms, max_delay, max_record_size) \
sf_binlog_writer_init_thread_ex(thread, name, writer, \
SF_BINLOG_THREAD_ORDER_MODE_FIXED, write_interval_ms, \
max_delay, max_record_size, true, false)
static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context,
const char *data_path, const char *subdir_name,
const char *file_prefix, const int buffer_size,
const int max_delay, const int max_record_size,
const bool call_fsync)
const int write_interval_ms, const int max_delay,
const int max_record_size, const bool call_fsync)
{
int result;
if ((result=sf_binlog_writer_init_normal_ex(&context->writer, data_path,
@ -161,14 +163,14 @@ static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context,
}
return sf_binlog_writer_init_thread(&context->thread, subdir_name,
&context->writer, max_delay, max_record_size);
&context->writer, write_interval_ms, max_delay, max_record_size);
}
#define sf_binlog_writer_init(context, data_path, subdir_name, \
buffer_size, max_delay, max_record_size) \
sf_binlog_writer_init_ex(context, data_path, subdir_name, \
SF_BINLOG_FILE_PREFIX, buffer_size, max_delay, \
max_record_size, true)
#define sf_binlog_writer_init(context, data_path, subdir_name, \
buffer_size, write_interval_ms, max_delay, max_record_size) \
sf_binlog_writer_init_ex(context, data_path, subdir_name, \
SF_BINLOG_FILE_PREFIX, buffer_size, write_interval_ms, \
max_delay, max_record_size, true)
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);