diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 8e767af..cb65528 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -197,6 +197,14 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; + case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE: + if ((result=sf_file_writer_set_binlog_last_index(¤t-> + writer->fw, current->writer->fw.binlog. + last_index + 1)) != 0) + { + return result; + } + break; case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT: flush_writer_files(thread); return ERRNO_THREAD_EXIT; @@ -517,6 +525,12 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, next_version); } +int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_ROTATE_FILE, 0); +} + int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer) { return sf_binlog_writer_push_directive(writer, diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 5d92b1c..d31c3cc 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -31,7 +31,8 @@ #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 -#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 3 +#define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 3 +#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 4 #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ (buffer)->version.first = (buffer)->version.last = ver @@ -164,6 +165,8 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); +int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer); + int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer); #define sf_binlog_writer_set_flags(writer, flags) \ @@ -228,7 +231,13 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( subdir_name, write_index) #define sf_binlog_writer_set_binlog_index(writer, binlog_index) \ - sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index) + sf_file_writer_set_binlog_last_index(&(writer)->fw, binlog_index) + +#define sf_binlog_writer_set_binlog_start_index(writer, start_index) \ + sf_file_writer_set_binlog_start_index(&(writer)->fw, start_index) + +#define sf_binlog_writer_set_binlog_last_index(writer, last_index) \ + sf_file_writer_set_binlog_last_index(&(writer)->fw, last_index) #define sf_push_to_binlog_thread_queue(thread, buffer) \ fc_queue_push(&(thread)->queue, buffer) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index ecaafd0..1a25f20 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -36,13 +36,14 @@ #define BINLOG_INDEX_FILENAME SF_BINLOG_FILE_PREFIX"_index.dat" +#define BINLOG_INDEX_ITEM_START_INDEX "start_index" #define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write" #define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress" #define GET_BINLOG_FILENAME(writer) \ sprintf(writer->file.name, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, \ writer->cfg.data_path, writer->cfg.subdir_name, \ - SF_BINLOG_FILE_PREFIX, writer->binlog.index) + SF_BINLOG_FILE_PREFIX, writer->binlog.last_index) #define GET_BINLOG_INDEX_FILENAME_EX(data_path, subdir_name, filename, size) \ snprintf(filename, size, "%s/%s/%s", data_path, \ @@ -68,9 +69,12 @@ static int write_to_binlog_index_file(SFFileWriterInfo *writer) GET_BINLOG_INDEX_FILENAME(writer, filename, sizeof(filename)); len = sprintf(buff, "%s=%d\n" + "%s=%d\n" "%s=%d\n", + BINLOG_INDEX_ITEM_START_INDEX, + writer->binlog.start_index, BINLOG_INDEX_ITEM_CURRENT_WRITE, - writer->binlog.index, + writer->binlog.last_index, BINLOG_INDEX_ITEM_CURRENT_COMPRESS, writer->binlog.compress_index); if ((result=safeWriteToFile(filename, buff, len)) != 0) { @@ -83,8 +87,8 @@ static int write_to_binlog_index_file(SFFileWriterInfo *writer) } static int get_binlog_info_from_file(const char *data_path, - const char *subdir_name, int *write_index, - int *compress_index) + const char *subdir_name, int *start_index, + int *last_index, int *compress_index) { char full_filename[PATH_MAX]; IniContext ini_context; @@ -103,7 +107,10 @@ static int get_binlog_info_from_file(const char *data_path, return result; } - *write_index = iniGetIntValue(NULL, + *start_index = iniGetIntValue(NULL, + BINLOG_INDEX_ITEM_START_INDEX, + &ini_context, 0); + *last_index = iniGetIntValue(NULL, BINLOG_INDEX_ITEM_CURRENT_WRITE, &ini_context, 0); *compress_index = iniGetIntValue(NULL, @@ -114,12 +121,12 @@ static int get_binlog_info_from_file(const char *data_path, return 0; } -int sf_file_writer_get_binlog_index(const char *data_path, - const char *subdir_name, int *write_index) +int sf_file_writer_get_binlog_indexes(const char *data_path, + const char *subdir_name, int *start_index, int *last_index) { int compress_index; return get_binlog_info_from_file(data_path, subdir_name, - write_index, &compress_index); + start_index, last_index, &compress_index); } static inline int get_binlog_index_from_file(SFFileWriterInfo *writer) @@ -127,10 +134,11 @@ static inline int get_binlog_index_from_file(SFFileWriterInfo *writer) int result; result = get_binlog_info_from_file(writer->cfg.data_path, - writer->cfg.subdir_name, &writer->binlog.index, - &writer->binlog.compress_index); + writer->cfg.subdir_name, &writer->binlog.start_index, + &writer->binlog.last_index, &writer->binlog.compress_index); if (result == ENOENT) { - writer->binlog.index = 0; + writer->binlog.start_index = 0; + writer->binlog.last_index = 0; writer->binlog.compress_index = 0; return write_to_binlog_index_file(writer); } @@ -233,7 +241,7 @@ static int check_write_to_file(SFFileWriterInfo *writer, return do_write_to_file(writer, buff, len); } - writer->binlog.index++; //binlog rotate + writer->binlog.last_index++; //binlog rotate if ((result=write_to_binlog_index_file(writer)) == 0) { result = open_next_binlog(writer); } @@ -269,11 +277,11 @@ int sf_file_writer_get_current_index(SFFileWriterInfo *writer) return 0; } - if (writer->binlog.index < 0) { + if (writer->binlog.last_index < 0) { get_binlog_index_from_file(writer); } - return writer->binlog.index; + return writer->binlog.last_index; } int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer, @@ -383,13 +391,28 @@ void sf_file_writer_destroy(SFFileWriterInfo *writer) sf_binlog_buffer_destroy(&writer->binlog_buffer); } -int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer, - const int binlog_index) +int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer, + const int start_index) { int result; - if (writer->binlog.index != binlog_index) { - writer->binlog.index = binlog_index; + if (writer->binlog.start_index != start_index) { + writer->binlog.start_index = start_index; + if ((result=write_to_binlog_index_file(writer)) != 0) { + return result; + } + } + + return 0; +} + +int sf_file_writer_set_binlog_last_index(SFFileWriterInfo *writer, + const int last_index) +{ + int result; + + if (writer->binlog.last_index != last_index) { + writer->binlog.last_index = last_index; if ((result=write_to_binlog_index_file(writer)) != 0) { return result; } diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index ec1de31..498ee31 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -39,7 +39,8 @@ typedef struct sf_file_writer_info { } cfg; struct { - int index; //current write index + int start_index; //for read only + int last_index; //for write int compress_index; } binlog; @@ -96,15 +97,23 @@ static inline int64_t sf_file_writer_get_last_version( } } -int sf_file_writer_get_binlog_index(const char *data_path, - const char *subdir_name, int *write_index); +int sf_file_writer_get_binlog_indexes(const char *data_path, + const char *subdir_name, int *start_index, int *last_index); + +static inline int sf_file_writer_get_binlog_index(const char *data_path, + const char *subdir_name, int *last_index) +{ + int start_index; + return sf_file_writer_get_binlog_indexes(data_path, + subdir_name, &start_index, last_index); +} int sf_file_writer_get_current_index(SFFileWriterInfo *writer); static inline void sf_file_writer_get_current_position( SFFileWriterInfo *writer, SFBinlogFilePosition *position) { - position->index = writer->binlog.index; + position->index = writer->binlog.last_index; position->offset = writer->file.size; } @@ -128,8 +137,11 @@ static inline const char *sf_file_writer_get_filename( const char *sf_file_writer_get_index_filename(const char *data_path, const char *subdir_name, char *filename, const int size); -int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer, - const int binlog_index); +int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer, + const int start_index); + +int sf_file_writer_set_binlog_last_index(SFFileWriterInfo *writer, + const int last_index); int sf_file_writer_get_last_lines(const char *data_path, const char *subdir_name, const int current_write_index,