From bcd1120617b0c8650b0e1f1936788f7d21e99a65 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 31 May 2022 21:19:15 +0800 Subject: [PATCH] sf_file_writer support specifying file prefix --- src/Makefile.in | 14 +++++++------- src/sf_binlog_writer.c | 16 +++++++++------- src/sf_binlog_writer.h | 38 ++++++++++++++++++++++++++------------ src/sf_file_writer.c | 36 ++++++++++++++++++++++-------------- src/sf_file_writer.h | 26 +++++++++++++++++--------- src/sf_ordered_writer.c | 8 ++++---- src/sf_ordered_writer.h | 8 ++++---- 7 files changed, 89 insertions(+), 57 deletions(-) diff --git a/src/Makefile.in b/src/Makefile.in index accf6c0..ca4979e 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -6,9 +6,9 @@ LIB_PATH = $(LIBS) -lfastcommon TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION) TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ - sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \ - sf_cluster_cfg.h sf_sharding_htable.h sf_connection_manager.h \ - sf_serializer.h sf_binlog_index.h sf_file_writer.h \ + sf_func.h sf_util.h sf_configs.h sf_proto.h sf_cluster_cfg.h \ + sf_sharding_htable.h sf_connection_manager.h sf_serializer.h \ + sf_binlog_index.h sf_file_writer.h sf_binlog_writer.h \ sf_ordered_writer.h sf_buffered_writer.h sf_iov.h IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ @@ -26,10 +26,10 @@ ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER) SHARED_OBJS = sf_nio.lo sf_iov.lo sf_service.lo sf_global.lo \ sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \ - sf_binlog_writer.lo sf_sharding_htable.lo \ - sf_cluster_cfg.lo sf_connection_manager.lo \ - sf_serializer.lo sf_binlog_index.lo \ - sf_file_writer.lo sf_ordered_writer.lo \ + sf_sharding_htable.lo sf_cluster_cfg.lo \ + sf_connection_manager.lo sf_serializer.lo \ + sf_binlog_index.lo sf_file_writer.lo \ + sf_binlog_writer.lo sf_ordered_writer.lo \ idempotency/server/server_channel.lo \ idempotency/server/request_htable.lo \ idempotency/server/channel_htable.lo \ diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 8948879..7c21c21 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -388,18 +388,20 @@ static void binlog_wbuffer_destroy_func(void *element, void *args) int sf_binlog_writer_init_normal_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const int buffer_size, const int64_t file_rotate_size) + const char *file_prefix, const int buffer_size, + const int64_t file_rotate_size) { memset(writer, 0, sizeof(*writer)); writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE; - return sf_file_writer_init(&writer->fw, data_path, - subdir_name, buffer_size, file_rotate_size); + return sf_file_writer_init(&writer->fw, data_path, subdir_name, + file_prefix, buffer_size, file_rotate_size); } int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const uint64_t next_version, const int buffer_size, - const int ring_size, const int64_t file_rotate_size) + const char *file_prefix, const uint64_t next_version, + const int buffer_size, const int ring_size, + const int64_t file_rotate_size) { int bytes; @@ -417,8 +419,8 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, binlog_writer_set_next_version(writer, next_version); writer->flush.in_queue = false; - return sf_file_writer_init(&writer->fw, data_path, - subdir_name, buffer_size, file_rotate_size); + return sf_file_writer_init(&writer->fw, data_path, subdir_name, + file_prefix, buffer_size, file_rotate_size); } int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 305a165..f2b8a11 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -101,12 +101,14 @@ extern "C" { int sf_binlog_writer_init_normal_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const int buffer_size, const int64_t file_rotate_size); + const char *file_prefix, const int buffer_size, + const int64_t file_rotate_size); int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, - const uint64_t next_version, const int buffer_size, - const int ring_size, const int64_t file_rotate_size); + const char *file_prefix, const uint64_t next_version, + const int buffer_size, const int ring_size, + const int64_t file_rotate_size); int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, @@ -116,12 +118,12 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, #define sf_binlog_writer_init_normal(writer, \ data_path, subdir_name, buffer_size) \ sf_binlog_writer_init_normal_ex(writer, data_path, subdir_name, \ - buffer_size, SF_BINLOG_DEFAULT_ROTATE_SIZE) + SF_BINLOG_FILE_PREFIX, buffer_size, SF_BINLOG_DEFAULT_ROTATE_SIZE) -#define sf_binlog_writer_init_by_version(writer, data_path, \ - subdir_name, next_version, buffer_size, ring_size) \ - sf_binlog_writer_init_by_version_ex(writer, data_path, \ - subdir_name, next_version, buffer_size, \ +#define sf_binlog_writer_init_by_version(writer, data_path, \ + subdir_name, next_version, buffer_size, ring_size) \ + sf_binlog_writer_init_by_version_ex(writer, data_path, subdir_name, \ + SF_BINLOG_FILE_PREFIX, next_version, buffer_size, \ ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE) #define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ @@ -129,13 +131,15 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ max_record_size, 1, true) -static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, +static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context, const char *data_path, const char *subdir_name, - const int buffer_size, const int max_record_size) + const char *file_prefix, const int buffer_size, + const int max_record_size) { int result; - if ((result=sf_binlog_writer_init_normal(&context->writer, - data_path, subdir_name, buffer_size)) != 0) + if ((result=sf_binlog_writer_init_normal_ex(&context->writer, + data_path, subdir_name, file_prefix, buffer_size, + SF_BINLOG_DEFAULT_ROTATE_SIZE)) != 0) { return result; } @@ -144,6 +148,11 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, subdir_name, &context->writer, max_record_size); } +#define sf_binlog_writer_init(context, data_path, \ + subdir_name, buffer_size, max_record_size) \ + sf_binlog_writer_init_ex(context, data_path, subdir_name, \ + SF_BINLOG_FILE_PREFIX, buffer_size, max_record_size) + void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); static inline void sf_binlog_writer_destroy_writer( @@ -239,6 +248,11 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( #define sf_binlog_writer_get_filepath(data_path, subdir_name, filepath, size) \ sf_file_writer_get_filepath(data_path, subdir_name, filepath, size) +#define sf_binlog_writer_get_filename_ex(data_path, subdir_name, \ + file_prefix, binlog_index, filename, size) \ + sf_file_writer_get_filename_ex(data_path, subdir_name, \ + file_prefix, binlog_index, filename, size) + #define sf_binlog_writer_get_filename(data_path, \ subdir_name, binlog_index, filename, size) \ sf_file_writer_get_filename(data_path, subdir_name, \ diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index e1b31dd..006bacf 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -34,8 +34,6 @@ #include "sf_func.h" #include "sf_file_writer.h" -#define BINLOG_INDEX_FILENAME SF_BINLOG_FILE_PREFIX"_index.dat" - #define BINLOG_INDEX_ITEM_START_INDEX "start_index" #define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write" #define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress" @@ -43,20 +41,22 @@ #define GET_BINLOG_FILENAME(writer) \ sprintf(writer->file.name, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, \ writer->cfg.data_path, writer->cfg.subdir_name, \ - SF_BINLOG_FILE_PREFIX, writer->binlog.last_index) + writer->cfg.file_prefix, writer->binlog.last_index) -#define GET_BINLOG_INDEX_FILENAME_EX(data_path, subdir_name, filename, size) \ - snprintf(filename, size, "%s/%s/%s", data_path, \ - subdir_name, BINLOG_INDEX_FILENAME) +#define GET_BINLOG_INDEX_FILENAME_EX(data_path, \ + subdir_name, file_prefix, filename, size) \ + snprintf(filename, size, "%s/%s/%s_index.dat", \ + data_path, subdir_name, file_prefix) #define GET_BINLOG_INDEX_FILENAME(writer, filename, size) \ GET_BINLOG_INDEX_FILENAME_EX(writer->cfg.data_path, \ - writer->cfg.subdir_name, filename, size) + writer->cfg.subdir_name, writer->cfg.file_prefix, filename, size) const char *sf_file_writer_get_index_filename(const char *data_path, const char *subdir_name, char *filename, const int size) { - GET_BINLOG_INDEX_FILENAME_EX(data_path, subdir_name, filename, size); + GET_BINLOG_INDEX_FILENAME_EX(data_path, subdir_name, + SF_BINLOG_FILE_PREFIX, filename, size); return filename; } @@ -94,8 +94,9 @@ static int get_binlog_info_from_file(const char *data_path, IniContext ini_context; int result; - snprintf(full_filename, sizeof(full_filename), "%s/%s/%s", - data_path, subdir_name, BINLOG_INDEX_FILENAME); + GET_BINLOG_INDEX_FILENAME_EX(data_path, + subdir_name, SF_BINLOG_FILE_PREFIX, + full_filename, sizeof(full_filename)); if (access(full_filename, F_OK) != 0) { return errno != 0 ? errno : EPERM; } @@ -140,7 +141,11 @@ static inline int get_binlog_index_from_file(SFFileWriterInfo *writer) writer->binlog.start_index = 0; writer->binlog.last_index = 0; writer->binlog.compress_index = 0; - return write_to_binlog_index_file(writer); + if (writer->cfg.file_rotate_size > 0) { + return write_to_binlog_index_file(writer); + } else { + return 0; + } } return result; } @@ -237,7 +242,7 @@ static int check_write_to_file(SFFileWriterInfo *writer, { int result; - if ((writer->cfg.file_rotate_size > 0) && (writer->file.size + if ((writer->cfg.file_rotate_size <= 0) || (writer->file.size + len <= writer->cfg.file_rotate_size)) { return do_write_to_file(writer, buff, len); @@ -341,8 +346,8 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, return 0; } -int sf_file_writer_init(SFFileWriterInfo *writer, - const char *data_path, const char *subdir_name, +int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, + const char *subdir_name, const char *file_prefix, const int buffer_size, const int64_t file_rotate_size) { int result; @@ -375,6 +380,9 @@ int sf_file_writer_init(SFFileWriterInfo *writer, snprintf(writer->cfg.subdir_name, sizeof(writer->cfg.subdir_name), "%s", subdir_name); + snprintf(writer->cfg.file_prefix, + sizeof(writer->cfg.file_prefix), + "%s", file_prefix); writer->file.name = (char *)fc_malloc(path_len + 32); if (writer->file.name == NULL) { return ENOMEM; diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index 3d857bb..5832465 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -23,11 +23,12 @@ #define SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION 1 -#define SF_BINLOG_SUBDIR_NAME_SIZE 128 +#define SF_BINLOG_SUBDIR_NAME_SIZE 128 +#define SF_BINLOG_FILE_PREFIX_SIZE 64 #define SF_BINLOG_DEFAULT_ROTATE_SIZE (1024 * 1024 * 1024) #define SF_BINLOG_NEVER_ROTATE_FILE 0 -#define SF_BINLOG_FILE_PREFIX "binlog" -#define SF_BINLOG_FILE_EXT_FMT ".%06d" +#define SF_BINLOG_FILE_PREFIX "binlog" +#define SF_BINLOG_FILE_EXT_FMT ".%06d" #define SF_BINLOG_BUFFER_LENGTH(buffer) ((buffer).end - (buffer).buff) #define SF_BINLOG_BUFFER_REMAIN(buffer) ((buffer).end - (buffer).current) @@ -36,6 +37,7 @@ typedef struct sf_file_writer_info { struct { const char *data_path; char subdir_name[SF_BINLOG_SUBDIR_NAME_SIZE]; + char file_prefix[SF_BINLOG_FILE_PREFIX_SIZE]; int64_t file_rotate_size; int max_record_size; } cfg; @@ -66,8 +68,8 @@ typedef struct sf_file_writer_info { extern "C" { #endif -int sf_file_writer_init(SFFileWriterInfo *writer, - const char *data_path, const char *subdir_name, +int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, + const char *subdir_name, const char *file_prefix, const int buffer_size, const int64_t file_rotate_size); void sf_file_writer_destroy(SFFileWriterInfo *writer); @@ -162,15 +164,21 @@ static inline const char *sf_file_writer_get_filepath( return filepath; } -static inline const char *sf_file_writer_get_filename( +static inline const char *sf_file_writer_get_filename_ex( const char *data_path, const char *subdir_name, - const int binlog_index, char *filename, const int size) + const char *file_prefix, const int binlog_index, + char *filename, const int size) { - snprintf(filename, size, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, data_path, - subdir_name, SF_BINLOG_FILE_PREFIX, binlog_index); + snprintf(filename, size, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, + data_path, subdir_name, file_prefix, binlog_index); return filename; } +#define sf_file_writer_get_filename(data_path, subdir_name, \ + binlog_index, filename, size) \ + sf_file_writer_get_filename_ex(data_path, subdir_name, \ + SF_BINLOG_FILE_PREFIX, binlog_index, filename, size) + const char *sf_file_writer_get_index_filename(const char *data_path, const char *subdir_name, char *filename, const int size); diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index b54aa8e..49c653f 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -236,13 +236,13 @@ static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context, int sf_ordered_writer_init_ex(SFOrderedWriterContext *context, const char *data_path, const char *subdir_name, - const int buffer_size, const int max_record_size, - const int64_t file_rotate_size) + const char *file_prefix, const int buffer_size, + const int max_record_size, const int64_t file_rotate_size) { int result; if ((result=sf_file_writer_init(&context->writer.fw, - data_path, subdir_name, buffer_size, - file_rotate_size)) != 0) + data_path, subdir_name, file_prefix, + buffer_size, file_rotate_size)) != 0) { return result; } diff --git a/src/sf_ordered_writer.h b/src/sf_ordered_writer.h index 9872704..2ca1c43 100644 --- a/src/sf_ordered_writer.h +++ b/src/sf_ordered_writer.h @@ -64,13 +64,13 @@ extern "C" { int sf_ordered_writer_init_ex(SFOrderedWriterContext *context, const char *data_path, const char *subdir_name, - const int buffer_size, const int max_record_size, - const int64_t file_rotate_size); + const char *file_prefix, const int buffer_size, + const int max_record_size, const int64_t file_rotate_size); #define sf_ordered_writer_init(context, data_path, \ subdir_name, buffer_size, max_record_size) \ - sf_ordered_writer_init_ex(context, data_path, \ - subdir_name, buffer_size, max_record_size, \ + sf_ordered_writer_init_ex(context, data_path, subdir_name, \ + SF_BINLOG_FILE_PREFIX, buffer_size, max_record_size, \ SF_BINLOG_DEFAULT_ROTATE_SIZE) #define sf_ordered_writer_set_flags(ctx, flags) \