sf_binlog_writer.[hc]: support passive write

fstore_storage_engine
YuQing 2022-09-29 11:44:02 +08:00
parent 230250d2f3
commit d4676e9d71
3 changed files with 39 additions and 9 deletions

View File

@ -168,9 +168,6 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread)
return result; return result;
} }
if (writer->fw.flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
writer->fw.last_versions.done = writer->fw.last_versions.pending;
}
writer->flush.in_queue = false; writer->flush.in_queue = false;
writer = writer->flush.next; writer = writer->flush.next;
} }
@ -205,6 +202,11 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
return result; return result;
} }
break; break;
case SF_BINLOG_BUFFER_TYPE_FLUSH_FILE:
if ((result=flush_writer_files(thread)) != 0) {
return result;
}
break;
case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX: case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX:
if ((result=sf_file_writer_set_binlog_write_index(&current-> if ((result=sf_file_writer_set_binlog_write_index(&current->
writer->fw, current->version.first)) != 0) writer->fw, current->version.first)) != 0)
@ -277,7 +279,11 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
} }
} while (wbuffer != NULL); } while (wbuffer != NULL);
if (thread->passive_write) {
return 0;
} else {
return flush_writer_files(thread); return flush_writer_files(thread);
}
} }
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) void sf_binlog_writer_finish(SFBinlogWriterInfo *writer)
@ -430,7 +436,7 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode, const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const int max_record_size, const int writer_count, const int max_record_size, const int writer_count,
const bool use_fixed_buffer_size) const bool use_fixed_buffer_size, const bool passive_write)
{ {
const int alloc_elements_once = 1024; const int alloc_elements_once = 1024;
int result; int result;
@ -441,6 +447,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
snprintf(thread->name, sizeof(thread->name), "%s", name); snprintf(thread->name, sizeof(thread->name), "%s", name);
thread->order_mode = order_mode; thread->order_mode = order_mode;
thread->use_fixed_buffer_size = use_fixed_buffer_size; thread->use_fixed_buffer_size = use_fixed_buffer_size;
thread->passive_write = passive_write;
writer->fw.cfg.max_record_size = max_record_size; writer->fw.cfg.max_record_size = max_record_size;
writer->thread = thread; writer->thread = thread;
@ -551,6 +558,12 @@ int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer)
SF_BINLOG_BUFFER_TYPE_ROTATE_FILE, 0); SF_BINLOG_BUFFER_TYPE_ROTATE_FILE, 0);
} }
int sf_binlog_writer_flush_file(SFBinlogWriterInfo *writer)
{
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_FLUSH_FILE, 0);
}
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer) int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
{ {
return sf_binlog_writer_push_directive(writer, return sf_binlog_writer_push_directive(writer,

View File

@ -34,6 +34,7 @@
#define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 3 #define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 3
#define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 4 #define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 4
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 5 #define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 5
#define SF_BINLOG_BUFFER_TYPE_FLUSH_FILE 6
#define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \
(buffer)->version.first = (buffer)->version.last = ver (buffer)->version.first = (buffer)->version.last = ver
@ -66,7 +67,8 @@ typedef struct binlog_writer_thread {
char name[64]; char name[64];
volatile bool running; volatile bool running;
bool use_fixed_buffer_size; bool use_fixed_buffer_size;
short order_mode; bool passive_write;
char order_mode;
struct { struct {
struct sf_binlog_writer_info *head; struct sf_binlog_writer_info *head;
struct sf_binlog_writer_info *tail; struct sf_binlog_writer_info *tail;
@ -113,7 +115,7 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer,
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const char *name, SFBinlogWriterInfo *writer, const short order_mode, const char *name, SFBinlogWriterInfo *writer, const short order_mode,
const int max_record_size, const int writer_count, const int max_record_size, const int writer_count,
const bool use_fixed_buffer_size); const bool use_fixed_buffer_size, const bool passive_write);
#define sf_binlog_writer_init_normal(writer, \ #define sf_binlog_writer_init_normal(writer, \
data_path, subdir_name, buffer_size) \ data_path, subdir_name, buffer_size) \
@ -129,7 +131,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
#define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ #define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \
sf_binlog_writer_init_thread_ex(thread, name, writer, \ sf_binlog_writer_init_thread_ex(thread, name, writer, \
SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ SF_BINLOG_THREAD_ORDER_MODE_FIXED, \
max_record_size, 1, true) max_record_size, 1, true, false)
static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context,
const char *data_path, const char *subdir_name, const char *data_path, const char *subdir_name,
@ -186,8 +188,16 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version); const int64_t next_version);
static inline int64_t sf_binlog_writer_get_next_version(
SFBinlogWriterInfo *writer)
{
return writer->version_ctx.next;
}
int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer); int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer);
int sf_binlog_writer_flush_file(SFBinlogWriterInfo *writer);
int sf_binlog_writer_change_write_index(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_write_index(SFBinlogWriterInfo *writer,
const int write_index); const int write_index);

View File

@ -282,7 +282,14 @@ int sf_file_writer_flush(SFFileWriterInfo *writer)
return 0; return 0;
} }
result = check_write_to_file(writer, writer->binlog_buffer.buff, len); if ((result=check_write_to_file(writer, writer->
binlog_buffer.buff, len)) == 0)
{
if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
writer->last_versions.done = writer->last_versions.pending;
}
}
writer->binlog_buffer.end = writer->binlog_buffer.buff; writer->binlog_buffer.end = writer->binlog_buffer.buff;
return result; return result;
} }