diff --git a/src/sf_binlog_index.h b/src/sf_binlog_index.h index 5fd0cbe..0fc8832 100644 --- a/src/sf_binlog_index.h +++ b/src/sf_binlog_index.h @@ -30,6 +30,17 @@ } \ } while (0) +#define SF_BINLOG_PARSE_INT_SILENCE2(var, caption, index, echr1, echr2, min_val) \ + do { \ + var = strtol(cols[index].str, &endptr, 10); \ + if (!(*endptr == echr1 || *endptr == echr2) || (var < min_val)) { \ + sprintf(error_info, "invalid %s: %.*s", \ + caption, cols[index].len, cols[index].str); \ + return EINVAL; \ + } \ + } while (0) + + typedef int (*pack_record_func)(char *buff, void *record); typedef int (*unpack_record_func)(const string_t *line, void *record, char *error_info); diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 87d569f..5610d03 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -269,7 +269,14 @@ static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb) } } - return check_write_to_file(wb->writer, wb->bf.buff, wb->bf.length); + if ((result=check_write_to_file(wb->writer, wb->bf.buff, + wb->bf.length)) == 0) + { + if (wb->writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { + wb->writer->last_versions.pending = wb->version.last; + } + } + return result; } if (wb->writer->file.size + SF_BINLOG_BUFFER_LENGTH(wb->writer-> @@ -286,9 +293,13 @@ static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb) } } + if (wb->writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { + wb->writer->last_versions.pending = wb->version.last; + } memcpy(wb->writer->binlog_buffer.end, wb->bf.buff, wb->bf.length); wb->writer->binlog_buffer.end += wb->bf.length; + return 0; } @@ -411,6 +422,9 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread) return result; } + if (writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { + writer->last_versions.done = writer->last_versions.pending; + } writer->flush.in_queue = false; writer = writer->flush.next; } @@ -594,6 +608,9 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, writer->total_count = 0; writer->flush.in_queue = false; + writer->last_versions.pending = 0; + writer->last_versions.done = 0; + writer->flags = 0; if ((result=sf_binlog_buffer_init(&writer->binlog_buffer, buffer_size)) != 0) { diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index c287066..e11d7e2 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -27,6 +27,8 @@ #define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0 #define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1 +#define SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION 1 + #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 @@ -103,6 +105,13 @@ typedef struct sf_binlog_writer_info { } version_ctx; SFBinlogBuffer binlog_buffer; SFBinlogWriterThread *thread; + + short flags; + struct { + int64_t pending; + volatile int64_t done; + } last_versions; + struct { bool in_queue; struct sf_binlog_writer_info *next; @@ -160,6 +169,25 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); +static inline void sf_binlog_writer_set_flags( + SFBinlogWriterInfo *writer, const short flags) +{ + writer->flags = flags; +} + +static inline int64_t sf_binlog_writer_get_last_version( + SFBinlogWriterInfo *writer) +{ + if (writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { + return writer->last_versions.done; + } else { + logError("file: "__FILE__", line: %d, " + "should set writer flags to %d!", __LINE__, + SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION); + return -1; + } +} + void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); int sf_binlog_get_current_write_index(SFBinlogWriterInfo *writer);