diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index d3dd1fb..1fafd37 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -168,9 +168,6 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread) return result; } - if (writer->fw.flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) { - writer->fw.last_versions.done = writer->fw.last_versions.pending; - } writer->flush.in_queue = false; writer = writer->flush.next; } @@ -205,6 +202,11 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, return result; } break; + case SF_BINLOG_BUFFER_TYPE_FLUSH_FILE: + if ((result=flush_writer_files(thread)) != 0) { + return result; + } + break; case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX: if ((result=sf_file_writer_set_binlog_write_index(¤t-> writer->fw, current->version.first)) != 0) @@ -277,7 +279,11 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, } } while (wbuffer != NULL); - return flush_writer_files(thread); + if (thread->passive_write) { + return 0; + } else { + return flush_writer_files(thread); + } } void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) @@ -430,7 +436,7 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, const int max_record_size, const int writer_count, - const bool use_fixed_buffer_size) + const bool use_fixed_buffer_size, const bool passive_write) { const int alloc_elements_once = 1024; int result; @@ -441,6 +447,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, snprintf(thread->name, sizeof(thread->name), "%s", name); thread->order_mode = order_mode; thread->use_fixed_buffer_size = use_fixed_buffer_size; + thread->passive_write = passive_write; writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; @@ -551,6 +558,12 @@ int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer) SF_BINLOG_BUFFER_TYPE_ROTATE_FILE, 0); } +int sf_binlog_writer_flush_file(SFBinlogWriterInfo *writer) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_FLUSH_FILE, 0); +} + int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer) { return sf_binlog_writer_push_directive(writer, diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 66258fe..eb71305 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -34,6 +34,7 @@ #define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 3 #define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 4 #define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 5 +#define SF_BINLOG_BUFFER_TYPE_FLUSH_FILE 6 #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ (buffer)->version.first = (buffer)->version.last = ver @@ -66,7 +67,8 @@ typedef struct binlog_writer_thread { char name[64]; volatile bool running; bool use_fixed_buffer_size; - short order_mode; + bool passive_write; + char order_mode; struct { struct sf_binlog_writer_info *head; struct sf_binlog_writer_info *tail; @@ -113,7 +115,7 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, const int max_record_size, const int writer_count, - const bool use_fixed_buffer_size); + const bool use_fixed_buffer_size, const bool passive_write); #define sf_binlog_writer_init_normal(writer, \ data_path, subdir_name, buffer_size) \ @@ -129,7 +131,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, #define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ sf_binlog_writer_init_thread_ex(thread, name, writer, \ SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ - max_record_size, 1, true) + max_record_size, 1, true, false) static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, const char *data_path, const char *subdir_name, @@ -186,8 +188,16 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); +static inline int64_t sf_binlog_writer_get_next_version( + SFBinlogWriterInfo *writer) +{ + return writer->version_ctx.next; +} + int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer); +int sf_binlog_writer_flush_file(SFBinlogWriterInfo *writer); + int sf_binlog_writer_change_write_index(SFBinlogWriterInfo *writer, const int write_index); diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index da4ea4c..d97f415 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -282,7 +282,14 @@ int sf_file_writer_flush(SFFileWriterInfo *writer) return 0; } - result = check_write_to_file(writer, writer->binlog_buffer.buff, len); + if ((result=check_write_to_file(writer, writer-> + binlog_buffer.buff, len)) == 0) + { + if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) { + writer->last_versions.done = writer->last_versions.pending; + } + } + writer->binlog_buffer.end = writer->binlog_buffer.buff; return result; }