add function sf_binlog_writer_get_last_version

storage_engine
YuQing 2021-09-01 21:13:57 +08:00
parent 024c148700
commit 88a0f0a267
3 changed files with 57 additions and 1 deletions

View File

@ -30,6 +30,17 @@
} \
} while (0)
#define SF_BINLOG_PARSE_INT_SILENCE2(var, caption, index, echr1, echr2, min_val) \
do { \
var = strtol(cols[index].str, &endptr, 10); \
if (!(*endptr == echr1 || *endptr == echr2) || (var < min_val)) { \
sprintf(error_info, "invalid %s: %.*s", \
caption, cols[index].len, cols[index].str); \
return EINVAL; \
} \
} while (0)
typedef int (*pack_record_func)(char *buff, void *record);
typedef int (*unpack_record_func)(const string_t *line,
void *record, char *error_info);

View File

@ -269,7 +269,14 @@ static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb)
}
}
return check_write_to_file(wb->writer, wb->bf.buff, wb->bf.length);
if ((result=check_write_to_file(wb->writer, wb->bf.buff,
wb->bf.length)) == 0)
{
if (wb->writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) {
wb->writer->last_versions.pending = wb->version.last;
}
}
return result;
}
if (wb->writer->file.size + SF_BINLOG_BUFFER_LENGTH(wb->writer->
@ -286,9 +293,13 @@ static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb)
}
}
if (wb->writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) {
wb->writer->last_versions.pending = wb->version.last;
}
memcpy(wb->writer->binlog_buffer.end,
wb->bf.buff, wb->bf.length);
wb->writer->binlog_buffer.end += wb->bf.length;
return 0;
}
@ -411,6 +422,9 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread)
return result;
}
if (writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) {
writer->last_versions.done = writer->last_versions.pending;
}
writer->flush.in_queue = false;
writer = writer->flush.next;
}
@ -594,6 +608,9 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer,
writer->total_count = 0;
writer->flush.in_queue = false;
writer->last_versions.pending = 0;
writer->last_versions.done = 0;
writer->flags = 0;
if ((result=sf_binlog_buffer_init(&writer->binlog_buffer,
buffer_size)) != 0)
{

View File

@ -27,6 +27,8 @@
#define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0
#define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1
#define SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
@ -103,6 +105,13 @@ typedef struct sf_binlog_writer_info {
} version_ctx;
SFBinlogBuffer binlog_buffer;
SFBinlogWriterThread *thread;
short flags;
struct {
int64_t pending;
volatile int64_t done;
} last_versions;
struct {
bool in_queue;
struct sf_binlog_writer_info *next;
@ -160,6 +169,25 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version);
static inline void sf_binlog_writer_set_flags(
SFBinlogWriterInfo *writer, const short flags)
{
writer->flags = flags;
}
static inline int64_t sf_binlog_writer_get_last_version(
SFBinlogWriterInfo *writer)
{
if (writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) {
return writer->last_versions.done;
} else {
logError("file: "__FILE__", line: %d, "
"should set writer flags to %d!", __LINE__,
SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION);
return -1;
}
}
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);
int sf_binlog_get_current_write_index(SFBinlogWriterInfo *writer);