diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 6f3a3cb..aa294f5 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -430,20 +430,23 @@ static void binlog_wbuffer_destroy_func(void *element, void *args) int sf_binlog_writer_init_normal_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const char *file_prefix, const int buffer_size, - const int64_t file_rotate_size, const bool call_fsync) + const char *file_prefix, const int max_record_size, + const int buffer_size, const int64_t file_rotate_size, + const bool call_fsync) { memset(writer, 0, sizeof(*writer)); writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE; return sf_file_writer_init(&writer->fw, data_path, subdir_name, - file_prefix, buffer_size, file_rotate_size, call_fsync); + file_prefix, max_record_size, buffer_size, + file_rotate_size, call_fsync); } int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const char *file_prefix, const uint64_t next_version, - const int buffer_size, const int ring_size, - const int64_t file_rotate_size, const bool call_fsync) + const char *file_prefix, const int max_record_size, + const uint64_t next_version, const int buffer_size, + const int ring_size, const int64_t file_rotate_size, + const bool call_fsync) { int bytes; @@ -462,7 +465,8 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, binlog_writer_set_next_version(writer, next_version); writer->flush.in_queue = false; return sf_file_writer_init(&writer->fw, data_path, subdir_name, - file_prefix, buffer_size, file_rotate_size, call_fsync); + file_prefix, max_record_size, buffer_size, + file_rotate_size, call_fsync); } int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index c01f13c..2ea8091 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -112,31 +112,33 @@ extern "C" { int sf_binlog_writer_init_normal_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const char *file_prefix, const int buffer_size, - const int64_t file_rotate_size, const bool call_fsync); + const char *file_prefix, const int max_record_size, + const int buffer_size, const int64_t file_rotate_size, + const bool call_fsync); int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const char *file_prefix, const uint64_t next_version, - const int buffer_size, const int ring_size, - const int64_t file_rotate_size, const bool call_fsync); + const char *file_prefix, const int max_record_size, + const uint64_t next_version, const int buffer_size, + const int ring_size, const int64_t file_rotate_size, + const bool call_fsync); int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, const int max_delay, const int max_record_size, const bool use_fixed_buffer_size, const bool passive_write); -#define sf_binlog_writer_init_normal(writer, \ - data_path, subdir_name, buffer_size) \ +#define sf_binlog_writer_init_normal(writer, data_path, \ + subdir_name, max_record_size, buffer_size) \ sf_binlog_writer_init_normal_ex(writer, data_path, subdir_name, \ - SF_BINLOG_FILE_PREFIX, buffer_size, \ + SF_BINLOG_FILE_PREFIX, max_record_size, buffer_size, \ SF_BINLOG_DEFAULT_ROTATE_SIZE, true) -#define sf_binlog_writer_init_by_version(writer, data_path, \ - subdir_name, next_version, buffer_size, ring_size) \ - sf_binlog_writer_init_by_version_ex(writer, data_path, subdir_name, \ - SF_BINLOG_FILE_PREFIX, next_version, buffer_size, \ - ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE, true) +#define sf_binlog_writer_init_by_version(writer, data_path, subdir_name, \ + max_record_size, next_version, buffer_size, ring_size) \ + sf_binlog_writer_init_by_version_ex(writer, data_path, subdir_name, \ + SF_BINLOG_FILE_PREFIX, max_record_size, next_version, \ + buffer_size, ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE, true) #define sf_binlog_writer_init_thread(thread, name, \ writer, max_delay, max_record_size) \ @@ -151,8 +153,8 @@ static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, const bool call_fsync) { int result; - if ((result=sf_binlog_writer_init_normal_ex(&context->writer, - data_path, subdir_name, file_prefix, buffer_size, + if ((result=sf_binlog_writer_init_normal_ex(&context->writer, data_path, + subdir_name, file_prefix, max_record_size, buffer_size, SF_BINLOG_DEFAULT_ROTATE_SIZE, call_fsync)) != 0) { return result; diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index cf4255b..ad0b850 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -236,9 +236,8 @@ static int do_write_to_file(SFFileWriterInfo *writer, if (fsync(writer->file.fd) != 0) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " - "fsync to binlog file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, + "fsync to binlog file \"%s\" fail, errno: %d, " + "error info: %s", __LINE__, writer->file.name, result, STRERROR(result)); return result; } @@ -296,6 +295,26 @@ int sf_file_writer_flush(SFFileWriterInfo *writer) return result; } +int sf_file_writer_fsync(SFFileWriterInfo *writer) +{ + int result; + + if ((result=sf_file_writer_flush(writer)) != 0) { + return result; + } + + if (fsync(writer->file.fd) == 0) { + return 0; + } else { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "fsync to binlog file \"%s\" fail, errno: %d, " + "error info: %s", __LINE__, writer->file.name, + result, STRERROR(result)); + return result; + } +} + int sf_file_writer_get_indexes(SFFileWriterInfo *writer, int *start_index, int *last_index) { @@ -365,10 +384,34 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, return 0; } +int sf_file_writer_save_buffer(SFFileWriterInfo *writer, const int length) +{ + int result; + + if (writer->cfg.file_rotate_size > 0 && writer->file.size + + SF_BINLOG_BUFFER_PRODUCER_DATA_LENGTH(writer->binlog_buffer) + + length > writer->cfg.file_rotate_size) + { + if ((result=sf_file_writer_flush(writer)) != 0) { + return result; + } + } + + writer->binlog_buffer.data_end += length; + + if (SF_BINLOG_BUFFER_PRODUCER_BUFF_REMAIN(writer->binlog_buffer) < + writer->cfg.max_record_size) + { + return sf_file_writer_flush(writer); + } else { + return 0; + } +} + int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, const char *file_prefix, - const int buffer_size, const int64_t file_rotate_size, - const bool call_fsync) + const int max_record_size, const int buffer_size, + const int64_t file_rotate_size, const bool call_fsync) { int result; int path_len; @@ -385,6 +428,7 @@ int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, return result; } + writer->cfg.max_record_size = max_record_size; writer->cfg.call_fsync = call_fsync; writer->cfg.file_rotate_size = file_rotate_size; writer->cfg.data_path = data_path; diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index b9b9d09..8567e24 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -68,8 +68,8 @@ extern "C" { int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, const char *file_prefix, - const int buffer_size, const int64_t file_rotate_size, - const bool call_fsync); + const int max_record_size, const int buffer_size, + const int64_t file_rotate_size, const bool call_fsync); void sf_file_writer_destroy(SFFileWriterInfo *writer); @@ -81,12 +81,28 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, int sf_file_writer_flush(SFFileWriterInfo *writer); +int sf_file_writer_fsync(SFFileWriterInfo *writer); + +#define SF_FILE_WRITER_DATA_END_BUFF(writer) (writer)->binlog_buffer.data_end +#define SF_FILE_WRITER_CURRENT_DATA_VERSION(writer) \ + (writer)->last_versions.pending +#define SF_FILE_WRITER_NEXT_DATA_VERSION(writer) \ + ++((writer)->last_versions.pending) + +int sf_file_writer_save_buffer(SFFileWriterInfo *writer, const int length); + static inline void sf_file_writer_set_flags( SFFileWriterInfo *writer, const short flags) { writer->flags = flags; } +static inline void sf_file_writer_set_call_fsync( + SFFileWriterInfo *writer, const bool call_fsync) +{ + writer->cfg.call_fsync = call_fsync; +} + static inline int64_t sf_file_writer_get_last_version_ex( SFFileWriterInfo *writer, const int log_level) { diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index 4635c5b..bf0755c 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -250,8 +250,8 @@ int sf_ordered_writer_init_ex(SFOrderedWriterContext *context, { int result; if ((result=sf_file_writer_init(&context->writer.fw, data_path, - subdir_name, file_prefix, buffer_size, - file_rotate_size, call_fsync)) != 0) + subdir_name, file_prefix, max_record_size, + buffer_size, file_rotate_size, call_fsync)) != 0) { return result; }