From 39e5dd419e010c2c747a832079e044cda9b37fb6 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 30 May 2022 11:24:05 +0800 Subject: [PATCH] custom define binlog rotate file size --- src/sf_binlog_writer.c | 12 ++++++------ src/sf_binlog_writer.h | 23 +++++++++++++++++------ src/sf_file_writer.c | 12 ++++++++---- src/sf_file_writer.h | 12 +++++++----- src/sf_func.h | 5 +++++ src/sf_ordered_writer.c | 8 +++++--- src/sf_ordered_writer.h | 11 +++++++++-- 7 files changed, 57 insertions(+), 26 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index a7423c2..8948879 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -386,20 +386,20 @@ static void binlog_wbuffer_destroy_func(void *element, void *args) } } -int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, +int sf_binlog_writer_init_normal_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const int buffer_size) + const int buffer_size, const int64_t file_rotate_size) { memset(writer, 0, sizeof(*writer)); writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE; return sf_file_writer_init(&writer->fw, data_path, - subdir_name, buffer_size); + subdir_name, buffer_size, file_rotate_size); } -int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, +int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, const uint64_t next_version, const int buffer_size, - const int ring_size) + const int ring_size, const int64_t file_rotate_size) { int bytes; @@ -418,7 +418,7 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, binlog_writer_set_next_version(writer, next_version); writer->flush.in_queue = false; return sf_file_writer_init(&writer->fw, data_path, - subdir_name, buffer_size); + subdir_name, buffer_size, file_rotate_size); } int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index f3d319c..305a165 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -99,20 +99,31 @@ typedef struct sf_binlog_writer_context { extern "C" { #endif -int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, +int sf_binlog_writer_init_normal_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const int buffer_size); + const int buffer_size, const int64_t file_rotate_size); -int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, +int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, const uint64_t next_version, const int buffer_size, - const int ring_size); + const int ring_size, const int64_t file_rotate_size); int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, const int max_record_size, const int writer_count, const bool use_fixed_buffer_size); +#define sf_binlog_writer_init_normal(writer, \ + data_path, subdir_name, buffer_size) \ + sf_binlog_writer_init_normal_ex(writer, data_path, subdir_name, \ + buffer_size, SF_BINLOG_DEFAULT_ROTATE_SIZE) + +#define sf_binlog_writer_init_by_version(writer, data_path, \ + subdir_name, next_version, buffer_size, ring_size) \ + sf_binlog_writer_init_by_version_ex(writer, data_path, \ + subdir_name, next_version, buffer_size, \ + ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE) + #define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ sf_binlog_writer_init_thread_ex(thread, name, writer, \ SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ @@ -225,8 +236,8 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( return buffer; } -#define sf_binlog_writer_get_filepath(data_path, subdir_name, filename, size) \ - sf_file_writer_get_filepath(data_path, subdir_name, filename, size) +#define sf_binlog_writer_get_filepath(data_path, subdir_name, filepath, size) \ + sf_file_writer_get_filepath(data_path, subdir_name, filepath, size) #define sf_binlog_writer_get_filename(data_path, \ subdir_name, binlog_index, filename, size) \ diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 7cba794..e1b31dd 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -237,7 +237,9 @@ static int check_write_to_file(SFFileWriterInfo *writer, { int result; - if (writer->file.size + len <= SF_BINLOG_FILE_MAX_SIZE) { + if ((writer->cfg.file_rotate_size > 0) && (writer->file.size + + len <= writer->cfg.file_rotate_size)) + { return do_write_to_file(writer, buff, len); } @@ -315,8 +317,9 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, return result; } - if (writer->file.size + SF_BINLOG_BUFFER_LENGTH(writer-> - binlog_buffer) + buffer->length > SF_BINLOG_FILE_MAX_SIZE) + if (writer->cfg.file_rotate_size > 0 && writer->file.size + + SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer) + + buffer->length > writer->cfg.file_rotate_size) { if ((result=sf_file_writer_flush(writer)) != 0) { return result; @@ -340,7 +343,7 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, - const int buffer_size) + const int buffer_size, const int64_t file_rotate_size) { int result; int path_len; @@ -357,6 +360,7 @@ int sf_file_writer_init(SFFileWriterInfo *writer, return result; } + writer->cfg.file_rotate_size = file_rotate_size; writer->cfg.data_path = data_path; path_len = snprintf(filepath, sizeof(filepath), "%s/%s", data_path, subdir_name); diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index df4e635..3d857bb 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -24,7 +24,8 @@ #define SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION 1 #define SF_BINLOG_SUBDIR_NAME_SIZE 128 -#define SF_BINLOG_FILE_MAX_SIZE (1024 * 1024 * 1024) //for binlog rotating by size +#define SF_BINLOG_DEFAULT_ROTATE_SIZE (1024 * 1024 * 1024) +#define SF_BINLOG_NEVER_ROTATE_FILE 0 #define SF_BINLOG_FILE_PREFIX "binlog" #define SF_BINLOG_FILE_EXT_FMT ".%06d" @@ -35,6 +36,7 @@ typedef struct sf_file_writer_info { struct { const char *data_path; char subdir_name[SF_BINLOG_SUBDIR_NAME_SIZE]; + int64_t file_rotate_size; int max_record_size; } cfg; @@ -66,7 +68,7 @@ extern "C" { int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, - const int buffer_size); + const int buffer_size, const int64_t file_rotate_size); void sf_file_writer_destroy(SFFileWriterInfo *writer); @@ -154,10 +156,10 @@ static inline void sf_file_writer_get_current_position( static inline const char *sf_file_writer_get_filepath( const char *data_path, const char *subdir_name, - char *filename, const int size) + char *filepath, const int size) { - snprintf(filename, size, "%s/%s", data_path, subdir_name); - return filename; + snprintf(filepath, size, "%s/%s", data_path, subdir_name); + return filepath; } static inline const char *sf_file_writer_get_filename( diff --git a/src/sf_func.h b/src/sf_func.h index 2feae5c..62f231c 100644 --- a/src/sf_func.h +++ b/src/sf_func.h @@ -76,6 +76,11 @@ static inline int sf_synchronize_ctx_init(SFSynchronizeContext *sctx) return init_pthread_lock_cond_pair(&sctx->lcp); } +static inline void sf_synchronize_ctx_destroy(SFSynchronizeContext *sctx) +{ + destroy_pthread_lock_cond_pair(&sctx->lcp); +} + static inline void sf_synchronize_counter_add( SFSynchronizeContext *sctx, const int count) { diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index fac0a62..b54aa8e 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -234,13 +234,15 @@ static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context, context, SF_G_THREAD_STACK_SIZE); } -int sf_ordered_writer_init(SFOrderedWriterContext *context, +int sf_ordered_writer_init_ex(SFOrderedWriterContext *context, const char *data_path, const char *subdir_name, - const int buffer_size, const int max_record_size) + const int buffer_size, const int max_record_size, + const int64_t file_rotate_size) { int result; if ((result=sf_file_writer_init(&context->writer.fw, - data_path, subdir_name, buffer_size)) != 0) + data_path, subdir_name, buffer_size, + file_rotate_size)) != 0) { return result; } diff --git a/src/sf_ordered_writer.h b/src/sf_ordered_writer.h index 8e84f82..9872704 100644 --- a/src/sf_ordered_writer.h +++ b/src/sf_ordered_writer.h @@ -62,9 +62,16 @@ typedef struct sf_ordered_writer_context { extern "C" { #endif -int sf_ordered_writer_init(SFOrderedWriterContext *context, +int sf_ordered_writer_init_ex(SFOrderedWriterContext *context, const char *data_path, const char *subdir_name, - const int buffer_size, const int max_record_size); + const int buffer_size, const int max_record_size, + const int64_t file_rotate_size); + +#define sf_ordered_writer_init(context, data_path, \ + subdir_name, buffer_size, max_record_size) \ + sf_ordered_writer_init_ex(context, data_path, \ + subdir_name, buffer_size, max_record_size, \ + SF_BINLOG_DEFAULT_ROTATE_SIZE) #define sf_ordered_writer_set_flags(ctx, flags) \ sf_file_writer_set_flags(&(ctx)->writer.fw, flags)