sf_file_writer.[hc] support start_index
parent
077a68a974
commit
c611b9b30c
|
|
@ -197,6 +197,14 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
|
||||||
fast_mblock_free_object(¤t->writer->
|
fast_mblock_free_object(¤t->writer->
|
||||||
thread->mblock, current);
|
thread->mblock, current);
|
||||||
break;
|
break;
|
||||||
|
case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE:
|
||||||
|
if ((result=sf_file_writer_set_binlog_last_index(¤t->
|
||||||
|
writer->fw, current->writer->fw.binlog.
|
||||||
|
last_index + 1)) != 0)
|
||||||
|
{
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
break;
|
||||||
case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT:
|
case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT:
|
||||||
flush_writer_files(thread);
|
flush_writer_files(thread);
|
||||||
return ERRNO_THREAD_EXIT;
|
return ERRNO_THREAD_EXIT;
|
||||||
|
|
@ -517,6 +525,12 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
|
||||||
next_version);
|
next_version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer)
|
||||||
|
{
|
||||||
|
return sf_binlog_writer_push_directive(writer,
|
||||||
|
SF_BINLOG_BUFFER_TYPE_ROTATE_FILE, 0);
|
||||||
|
}
|
||||||
|
|
||||||
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
|
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
|
||||||
{
|
{
|
||||||
return sf_binlog_writer_push_directive(writer,
|
return sf_binlog_writer_push_directive(writer,
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,8 @@
|
||||||
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
|
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
|
||||||
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
|
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
|
||||||
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
|
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
|
||||||
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 3
|
#define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 3
|
||||||
|
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 4
|
||||||
|
|
||||||
#define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \
|
#define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \
|
||||||
(buffer)->version.first = (buffer)->version.last = ver
|
(buffer)->version.first = (buffer)->version.last = ver
|
||||||
|
|
@ -164,6 +165,8 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
|
||||||
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
|
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
|
||||||
const int64_t next_version);
|
const int64_t next_version);
|
||||||
|
|
||||||
|
int sf_binlog_writer_rotate_file(SFBinlogWriterInfo *writer);
|
||||||
|
|
||||||
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer);
|
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer);
|
||||||
|
|
||||||
#define sf_binlog_writer_set_flags(writer, flags) \
|
#define sf_binlog_writer_set_flags(writer, flags) \
|
||||||
|
|
@ -228,7 +231,13 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
|
||||||
subdir_name, write_index)
|
subdir_name, write_index)
|
||||||
|
|
||||||
#define sf_binlog_writer_set_binlog_index(writer, binlog_index) \
|
#define sf_binlog_writer_set_binlog_index(writer, binlog_index) \
|
||||||
sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index)
|
sf_file_writer_set_binlog_last_index(&(writer)->fw, binlog_index)
|
||||||
|
|
||||||
|
#define sf_binlog_writer_set_binlog_start_index(writer, start_index) \
|
||||||
|
sf_file_writer_set_binlog_start_index(&(writer)->fw, start_index)
|
||||||
|
|
||||||
|
#define sf_binlog_writer_set_binlog_last_index(writer, last_index) \
|
||||||
|
sf_file_writer_set_binlog_last_index(&(writer)->fw, last_index)
|
||||||
|
|
||||||
#define sf_push_to_binlog_thread_queue(thread, buffer) \
|
#define sf_push_to_binlog_thread_queue(thread, buffer) \
|
||||||
fc_queue_push(&(thread)->queue, buffer)
|
fc_queue_push(&(thread)->queue, buffer)
|
||||||
|
|
|
||||||
|
|
@ -36,13 +36,14 @@
|
||||||
|
|
||||||
#define BINLOG_INDEX_FILENAME SF_BINLOG_FILE_PREFIX"_index.dat"
|
#define BINLOG_INDEX_FILENAME SF_BINLOG_FILE_PREFIX"_index.dat"
|
||||||
|
|
||||||
|
#define BINLOG_INDEX_ITEM_START_INDEX "start_index"
|
||||||
#define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write"
|
#define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write"
|
||||||
#define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress"
|
#define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress"
|
||||||
|
|
||||||
#define GET_BINLOG_FILENAME(writer) \
|
#define GET_BINLOG_FILENAME(writer) \
|
||||||
sprintf(writer->file.name, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, \
|
sprintf(writer->file.name, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, \
|
||||||
writer->cfg.data_path, writer->cfg.subdir_name, \
|
writer->cfg.data_path, writer->cfg.subdir_name, \
|
||||||
SF_BINLOG_FILE_PREFIX, writer->binlog.index)
|
SF_BINLOG_FILE_PREFIX, writer->binlog.last_index)
|
||||||
|
|
||||||
#define GET_BINLOG_INDEX_FILENAME_EX(data_path, subdir_name, filename, size) \
|
#define GET_BINLOG_INDEX_FILENAME_EX(data_path, subdir_name, filename, size) \
|
||||||
snprintf(filename, size, "%s/%s/%s", data_path, \
|
snprintf(filename, size, "%s/%s/%s", data_path, \
|
||||||
|
|
@ -68,9 +69,12 @@ static int write_to_binlog_index_file(SFFileWriterInfo *writer)
|
||||||
|
|
||||||
GET_BINLOG_INDEX_FILENAME(writer, filename, sizeof(filename));
|
GET_BINLOG_INDEX_FILENAME(writer, filename, sizeof(filename));
|
||||||
len = sprintf(buff, "%s=%d\n"
|
len = sprintf(buff, "%s=%d\n"
|
||||||
|
"%s=%d\n"
|
||||||
"%s=%d\n",
|
"%s=%d\n",
|
||||||
|
BINLOG_INDEX_ITEM_START_INDEX,
|
||||||
|
writer->binlog.start_index,
|
||||||
BINLOG_INDEX_ITEM_CURRENT_WRITE,
|
BINLOG_INDEX_ITEM_CURRENT_WRITE,
|
||||||
writer->binlog.index,
|
writer->binlog.last_index,
|
||||||
BINLOG_INDEX_ITEM_CURRENT_COMPRESS,
|
BINLOG_INDEX_ITEM_CURRENT_COMPRESS,
|
||||||
writer->binlog.compress_index);
|
writer->binlog.compress_index);
|
||||||
if ((result=safeWriteToFile(filename, buff, len)) != 0) {
|
if ((result=safeWriteToFile(filename, buff, len)) != 0) {
|
||||||
|
|
@ -83,8 +87,8 @@ static int write_to_binlog_index_file(SFFileWriterInfo *writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int get_binlog_info_from_file(const char *data_path,
|
static int get_binlog_info_from_file(const char *data_path,
|
||||||
const char *subdir_name, int *write_index,
|
const char *subdir_name, int *start_index,
|
||||||
int *compress_index)
|
int *last_index, int *compress_index)
|
||||||
{
|
{
|
||||||
char full_filename[PATH_MAX];
|
char full_filename[PATH_MAX];
|
||||||
IniContext ini_context;
|
IniContext ini_context;
|
||||||
|
|
@ -103,7 +107,10 @@ static int get_binlog_info_from_file(const char *data_path,
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
*write_index = iniGetIntValue(NULL,
|
*start_index = iniGetIntValue(NULL,
|
||||||
|
BINLOG_INDEX_ITEM_START_INDEX,
|
||||||
|
&ini_context, 0);
|
||||||
|
*last_index = iniGetIntValue(NULL,
|
||||||
BINLOG_INDEX_ITEM_CURRENT_WRITE,
|
BINLOG_INDEX_ITEM_CURRENT_WRITE,
|
||||||
&ini_context, 0);
|
&ini_context, 0);
|
||||||
*compress_index = iniGetIntValue(NULL,
|
*compress_index = iniGetIntValue(NULL,
|
||||||
|
|
@ -114,12 +121,12 @@ static int get_binlog_info_from_file(const char *data_path,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_file_writer_get_binlog_index(const char *data_path,
|
int sf_file_writer_get_binlog_indexes(const char *data_path,
|
||||||
const char *subdir_name, int *write_index)
|
const char *subdir_name, int *start_index, int *last_index)
|
||||||
{
|
{
|
||||||
int compress_index;
|
int compress_index;
|
||||||
return get_binlog_info_from_file(data_path, subdir_name,
|
return get_binlog_info_from_file(data_path, subdir_name,
|
||||||
write_index, &compress_index);
|
start_index, last_index, &compress_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int get_binlog_index_from_file(SFFileWriterInfo *writer)
|
static inline int get_binlog_index_from_file(SFFileWriterInfo *writer)
|
||||||
|
|
@ -127,10 +134,11 @@ static inline int get_binlog_index_from_file(SFFileWriterInfo *writer)
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
result = get_binlog_info_from_file(writer->cfg.data_path,
|
result = get_binlog_info_from_file(writer->cfg.data_path,
|
||||||
writer->cfg.subdir_name, &writer->binlog.index,
|
writer->cfg.subdir_name, &writer->binlog.start_index,
|
||||||
&writer->binlog.compress_index);
|
&writer->binlog.last_index, &writer->binlog.compress_index);
|
||||||
if (result == ENOENT) {
|
if (result == ENOENT) {
|
||||||
writer->binlog.index = 0;
|
writer->binlog.start_index = 0;
|
||||||
|
writer->binlog.last_index = 0;
|
||||||
writer->binlog.compress_index = 0;
|
writer->binlog.compress_index = 0;
|
||||||
return write_to_binlog_index_file(writer);
|
return write_to_binlog_index_file(writer);
|
||||||
}
|
}
|
||||||
|
|
@ -233,7 +241,7 @@ static int check_write_to_file(SFFileWriterInfo *writer,
|
||||||
return do_write_to_file(writer, buff, len);
|
return do_write_to_file(writer, buff, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
writer->binlog.index++; //binlog rotate
|
writer->binlog.last_index++; //binlog rotate
|
||||||
if ((result=write_to_binlog_index_file(writer)) == 0) {
|
if ((result=write_to_binlog_index_file(writer)) == 0) {
|
||||||
result = open_next_binlog(writer);
|
result = open_next_binlog(writer);
|
||||||
}
|
}
|
||||||
|
|
@ -269,11 +277,11 @@ int sf_file_writer_get_current_index(SFFileWriterInfo *writer)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writer->binlog.index < 0) {
|
if (writer->binlog.last_index < 0) {
|
||||||
get_binlog_index_from_file(writer);
|
get_binlog_index_from_file(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return writer->binlog.index;
|
return writer->binlog.last_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
|
int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
|
||||||
|
|
@ -383,13 +391,28 @@ void sf_file_writer_destroy(SFFileWriterInfo *writer)
|
||||||
sf_binlog_buffer_destroy(&writer->binlog_buffer);
|
sf_binlog_buffer_destroy(&writer->binlog_buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer,
|
int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer,
|
||||||
const int binlog_index)
|
const int start_index)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
if (writer->binlog.index != binlog_index) {
|
if (writer->binlog.start_index != start_index) {
|
||||||
writer->binlog.index = binlog_index;
|
writer->binlog.start_index = start_index;
|
||||||
|
if ((result=write_to_binlog_index_file(writer)) != 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int sf_file_writer_set_binlog_last_index(SFFileWriterInfo *writer,
|
||||||
|
const int last_index)
|
||||||
|
{
|
||||||
|
int result;
|
||||||
|
|
||||||
|
if (writer->binlog.last_index != last_index) {
|
||||||
|
writer->binlog.last_index = last_index;
|
||||||
if ((result=write_to_binlog_index_file(writer)) != 0) {
|
if ((result=write_to_binlog_index_file(writer)) != 0) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,8 @@ typedef struct sf_file_writer_info {
|
||||||
} cfg;
|
} cfg;
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
int index; //current write index
|
int start_index; //for read only
|
||||||
|
int last_index; //for write
|
||||||
int compress_index;
|
int compress_index;
|
||||||
} binlog;
|
} binlog;
|
||||||
|
|
||||||
|
|
@ -96,15 +97,23 @@ static inline int64_t sf_file_writer_get_last_version(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_file_writer_get_binlog_index(const char *data_path,
|
int sf_file_writer_get_binlog_indexes(const char *data_path,
|
||||||
const char *subdir_name, int *write_index);
|
const char *subdir_name, int *start_index, int *last_index);
|
||||||
|
|
||||||
|
static inline int sf_file_writer_get_binlog_index(const char *data_path,
|
||||||
|
const char *subdir_name, int *last_index)
|
||||||
|
{
|
||||||
|
int start_index;
|
||||||
|
return sf_file_writer_get_binlog_indexes(data_path,
|
||||||
|
subdir_name, &start_index, last_index);
|
||||||
|
}
|
||||||
|
|
||||||
int sf_file_writer_get_current_index(SFFileWriterInfo *writer);
|
int sf_file_writer_get_current_index(SFFileWriterInfo *writer);
|
||||||
|
|
||||||
static inline void sf_file_writer_get_current_position(
|
static inline void sf_file_writer_get_current_position(
|
||||||
SFFileWriterInfo *writer, SFBinlogFilePosition *position)
|
SFFileWriterInfo *writer, SFBinlogFilePosition *position)
|
||||||
{
|
{
|
||||||
position->index = writer->binlog.index;
|
position->index = writer->binlog.last_index;
|
||||||
position->offset = writer->file.size;
|
position->offset = writer->file.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -128,8 +137,11 @@ static inline const char *sf_file_writer_get_filename(
|
||||||
const char *sf_file_writer_get_index_filename(const char *data_path,
|
const char *sf_file_writer_get_index_filename(const char *data_path,
|
||||||
const char *subdir_name, char *filename, const int size);
|
const char *subdir_name, char *filename, const int size);
|
||||||
|
|
||||||
int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer,
|
int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer,
|
||||||
const int binlog_index);
|
const int start_index);
|
||||||
|
|
||||||
|
int sf_file_writer_set_binlog_last_index(SFFileWriterInfo *writer,
|
||||||
|
const int last_index);
|
||||||
|
|
||||||
int sf_file_writer_get_last_lines(const char *data_path,
|
int sf_file_writer_get_last_lines(const char *data_path,
|
||||||
const char *subdir_name, const int current_write_index,
|
const char *subdir_name, const int current_write_index,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue