diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index d9d8ee6..2ad7397 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -177,7 +177,7 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread) } static int deal_binlog_records(SFBinlogWriterThread *thread, - SFBinlogWriterBuffer *wb_head) + SFBinlogWriterBuffer *wb_head, uint32_t *last_timestamp) { int result; SFBinlogWriterBuffer *wbuffer; @@ -187,6 +187,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, do { current = wbuffer; wbuffer = wbuffer->next; + if (wbuffer == NULL) { + *last_timestamp = current->timestamp; + } switch (current->type) { case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE: @@ -306,6 +309,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) { SFBinlogWriterBuffer *wb_head; + uint32_t last_timestamp; int count; if (writer->fw.file.name != NULL) { @@ -332,7 +336,8 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) wb_head = (SFBinlogWriterBuffer *)fc_queue_try_pop_all( &writer->thread->queue); if (wb_head != NULL) { - deal_binlog_records(writer->thread, wb_head); + last_timestamp = 0; + deal_binlog_records(writer->thread, wb_head, &last_timestamp); } free(writer->fw.file.name); @@ -349,6 +354,8 @@ static void *binlog_writer_func(void *arg) { SFBinlogWriterThread *thread; SFBinlogWriterBuffer *wb_head; + uint32_t current_timestamp; + uint32_t last_timestamp; int result; thread = (SFBinlogWriterThread *)arg; @@ -362,6 +369,7 @@ static void *binlog_writer_func(void *arg) } #endif + current_timestamp = last_timestamp = 0; thread->running = true; while (SF_G_CONTINUE_FLAG) { wb_head = (SFBinlogWriterBuffer *)fc_queue_pop_all(&thread->queue); @@ -369,7 +377,9 @@ static void *binlog_writer_func(void *arg) continue; } - if ((result=deal_binlog_records(thread, wb_head)) != 0) { + if ((result=deal_binlog_records(thread, wb_head, + ¤t_timestamp)) != 0) + { if (result != ERRNO_THREAD_EXIT) { logCrit("file: "__FILE__", line: %d, " "deal_binlog_records fail, " @@ -378,6 +388,23 @@ static void *binlog_writer_func(void *arg) } break; } + + if (fc_queue_empty(&thread->queue)) { + current_timestamp = 0; + } + if (current_timestamp == 0 || current_timestamp > last_timestamp) { + if (current_timestamp != last_timestamp) { + last_timestamp = current_timestamp; + FC_ATOMIC_SET(thread->flow_ctrol.last_timestamp, + current_timestamp); + } + + PTHREAD_MUTEX_LOCK(&thread->flow_ctrol.lcp.lock); + if (thread->flow_ctrol.waiting_count > 0) { + pthread_cond_broadcast(&thread->flow_ctrol.lcp.cond); + } + PTHREAD_MUTEX_UNLOCK(&thread->flow_ctrol.lcp.lock); + } } thread->running = false; @@ -452,8 +479,8 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const int max_record_size, const bool use_fixed_buffer_size, - const bool passive_write) + const int max_delay, const int max_record_size, const bool + use_fixed_buffer_size, const bool passive_write) { const int alloc_elements_once = 1024; const int64_t alloc_elements_limit = 0; @@ -467,9 +494,9 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, thread->order_mode = order_mode; thread->use_fixed_buffer_size = use_fixed_buffer_size; thread->passive_write = passive_write; + thread->flow_ctrol.max_delay = max_delay; writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; - callbacks.init_func = binlog_wbuffer_alloc_init; callbacks.args = writer; element_size = sizeof(SFBinlogWriterBuffer); @@ -492,6 +519,12 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, return result; } + thread->flow_ctrol.last_timestamp = 0; + thread->flow_ctrol.waiting_count = 0; + if ((result=init_pthread_lock_cond_pair(&thread->flow_ctrol.lcp)) != 0) { + return result; + } + thread->flush_writers.head = thread->flush_writers.tail = NULL; return fc_create_thread(&tid, binlog_writer_func, thread, SF_G_THREAD_STACK_SIZE); @@ -537,7 +570,7 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, return ENOMEM; } - fc_queue_push(&writer->thread->queue, buffer); + sf_push_to_binlog_write_queue(writer, buffer); return 0; } @@ -552,7 +585,7 @@ static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer, return ENOMEM; } - fc_queue_push(&writer->thread->queue, buffer); + sf_push_to_binlog_write_queue(writer, buffer); return 0; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 38fcabd..662fd92 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -19,6 +19,7 @@ #define _SF_BINLOG_WRITER_H_ #include "fastcommon/fc_queue.h" +#include "fastcommon/fc_atomic.h" #include "sf_types.h" #include "sf_file_writer.h" @@ -46,7 +47,8 @@ struct sf_binlog_writer_info; typedef struct sf_binlog_writer_buffer { SFVersionRange version; BufferInfo bf; - int type; //for versioned writer + int type; + uint32_t timestamp; //for flow ctrol struct sf_binlog_writer_info *writer; struct sf_binlog_writer_buffer *next; } SFBinlogWriterBuffer; @@ -70,6 +72,12 @@ typedef struct binlog_writer_thread { bool use_fixed_buffer_size; bool passive_write; char order_mode; + struct { + int max_delay; //in seconds + volatile uint32_t last_timestamp; + int waiting_count; + pthread_lock_cond_pair_t lcp; + } flow_ctrol; struct { struct sf_binlog_writer_info *head; struct sf_binlog_writer_info *tail; @@ -115,8 +123,8 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const int max_record_size, const bool use_fixed_buffer_size, - const bool passive_write); + const int max_delay, const int max_record_size, const bool + use_fixed_buffer_size, const bool passive_write); #define sf_binlog_writer_init_normal(writer, \ data_path, subdir_name, buffer_size) \ @@ -129,15 +137,16 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, SF_BINLOG_FILE_PREFIX, next_version, buffer_size, \ ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE) -#define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ +#define sf_binlog_writer_init_thread(thread, name, \ + writer, max_delay, max_record_size) \ sf_binlog_writer_init_thread_ex(thread, name, writer, \ - SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ + SF_BINLOG_THREAD_ORDER_MODE_FIXED, max_delay, \ max_record_size, true, false) static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, const char *data_path, const char *subdir_name, const char *file_prefix, const int buffer_size, - const int max_record_size) + const int max_delay, const int max_record_size) { int result; if ((result=sf_binlog_writer_init_normal_ex(&context->writer, @@ -147,14 +156,14 @@ static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, return result; } - return sf_binlog_writer_init_thread(&context->thread, - subdir_name, &context->writer, max_record_size); + return sf_binlog_writer_init_thread(&context->thread, subdir_name, + &context->writer, max_delay, max_record_size); } -#define sf_binlog_writer_init(context, data_path, \ - subdir_name, buffer_size, max_record_size) \ +#define sf_binlog_writer_init(context, data_path, subdir_name, \ + buffer_size, max_delay, max_record_size) \ sf_binlog_writer_init_ex(context, data_path, subdir_name, \ - SF_BINLOG_FILE_PREFIX, buffer_size, max_record_size) + SF_BINLOG_FILE_PREFIX, buffer_size, max_delay, max_record_size) void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); @@ -240,7 +249,12 @@ int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer); static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_buffer( SFBinlogWriterThread *thread) { - return (SFBinlogWriterBuffer *)fast_mblock_alloc_object(&thread->mblock); + SFBinlogWriterBuffer *buffer; + + if ((buffer=fast_mblock_alloc_object(&thread->mblock)) != NULL) { + buffer->type = SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE; + } + return buffer; } #define sf_binlog_writer_alloc_one_version_buffer(writer, version) \ @@ -257,6 +271,7 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( const int64_t last_version, const int type) { SFBinlogWriterBuffer *buffer; + buffer = (SFBinlogWriterBuffer *)fast_mblock_alloc_object( &writer->thread->mblock); if (buffer != NULL) { @@ -310,13 +325,32 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( #define sf_binlog_writer_set_binlog_write_index(writer, last_index) \ sf_file_writer_set_binlog_write_index(&(writer)->fw, last_index) -#define sf_push_to_binlog_thread_queue(thread, buffer) \ - fc_queue_push(&(thread)->queue, buffer) - static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, SFBinlogWriterBuffer *buffer) { - buffer->type = SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE; + int64_t last_timestamp; + + last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp); + if (last_timestamp > 0 && g_current_time - last_timestamp > + writer->thread->flow_ctrol.max_delay) + { + PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock); + writer->thread->flow_ctrol.waiting_count++; + last_timestamp = FC_ATOMIC_GET(writer->thread-> + flow_ctrol.last_timestamp); + while (last_timestamp > 0 && g_current_time - last_timestamp > + writer->thread->flow_ctrol.max_delay) + { + pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond, + &writer->thread->flow_ctrol.lcp.lock); + last_timestamp = FC_ATOMIC_GET(writer->thread-> + flow_ctrol.last_timestamp); + } + writer->thread->flow_ctrol.waiting_count--; + PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock); + } + + buffer->timestamp = g_current_time; fc_queue_push(&writer->thread->queue, buffer); } diff --git a/src/sf_ordered_writer.h b/src/sf_ordered_writer.h index 7f85b41..f3b2f80 100644 --- a/src/sf_ordered_writer.h +++ b/src/sf_ordered_writer.h @@ -124,10 +124,11 @@ static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer( #define sf_ordered_writer_set_binlog_index(ctx, binlog_index) \ sf_file_writer_set_binlog_index(&(ctx)->writer.fw, binlog_index) -#define sf_push_to_binlog_thread_queue(ctx, buffer) \ +#define sf_ordered_writer_push_to_thread_queue(ctx, buffer) \ sorted_queue_push(&(ctx)->thread.queues.buffer, buffer) -static inline void sf_push_to_binlog_write_queue(SFOrderedWriterContext *ctx, +static inline void sf_ordered_writer_push_to_queue( + SFOrderedWriterContext *ctx, SFOrderedWriterBuffer *buffer) { sorted_queue_push(&ctx->thread.queues.buffer, buffer);