diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 139612f..b03cee0 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -34,274 +34,16 @@ #include "sf_func.h" #include "sf_binlog_writer.h" -#define BINLOG_INDEX_FILENAME SF_BINLOG_FILE_PREFIX"_index.dat" - -#define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write" -#define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress" - -#define GET_BINLOG_FILENAME(writer) \ - sprintf(writer->file.name, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, \ - g_sf_binlog_data_path, writer->cfg.subdir_name, \ - SF_BINLOG_FILE_PREFIX, writer->binlog.index) - char *g_sf_binlog_data_path = NULL; -static int write_to_binlog_index_file(SFBinlogWriterInfo *writer) -{ - char full_filename[PATH_MAX]; - char buff[256]; - int result; - int len; - - snprintf(full_filename, sizeof(full_filename), "%s/%s/%s", - g_sf_binlog_data_path, writer->cfg.subdir_name, - BINLOG_INDEX_FILENAME); - - len = sprintf(buff, "%s=%d\n" - "%s=%d\n", - BINLOG_INDEX_ITEM_CURRENT_WRITE, - writer->binlog.index, - BINLOG_INDEX_ITEM_CURRENT_COMPRESS, - writer->binlog.compress_index); - if ((result=safeWriteToFile(full_filename, buff, len)) != 0) { - logError("file: "__FILE__", line: %d, " - "write to file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, full_filename, - result, STRERROR(result)); - } - - return result; -} - -static int get_binlog_index_from_file(SFBinlogWriterInfo *writer) -{ - char full_filename[PATH_MAX]; - IniContext ini_context; - int result; - - snprintf(full_filename, sizeof(full_filename), "%s/%s/%s", - g_sf_binlog_data_path, writer->cfg.subdir_name, - BINLOG_INDEX_FILENAME); - if (access(full_filename, F_OK) != 0) { - if (errno == ENOENT) { - writer->binlog.index = 0; - return write_to_binlog_index_file(writer); - } - } - - if ((result=iniLoadFromFile(full_filename, &ini_context)) != 0) { - logError("file: "__FILE__", line: %d, " - "load from file \"%s\" fail, error code: %d", - __LINE__, full_filename, result); - return result; - } - - writer->binlog.index = iniGetIntValue(NULL, - BINLOG_INDEX_ITEM_CURRENT_WRITE, &ini_context, 0); - writer->binlog.compress_index = iniGetIntValue(NULL, - BINLOG_INDEX_ITEM_CURRENT_COMPRESS, &ini_context, 0); - - iniFreeContext(&ini_context); - return 0; -} - -static int open_writable_binlog(SFBinlogWriterInfo *writer) -{ - if (writer->file.fd >= 0) { - close(writer->file.fd); - } - - GET_BINLOG_FILENAME(writer); - writer->file.fd = open(writer->file.name, - O_WRONLY | O_CREAT | O_APPEND, 0644); - if (writer->file.fd < 0) { - logError("file: "__FILE__", line: %d, " - "open file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, - errno, STRERROR(errno)); - return errno != 0 ? errno : EACCES; - } - - writer->file.size = lseek(writer->file.fd, 0, SEEK_END); - if (writer->file.size < 0) { - logError("file: "__FILE__", line: %d, " - "lseek file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, - errno, STRERROR(errno)); - return errno != 0 ? errno : EIO; - } - - return 0; -} - -static int open_next_binlog(SFBinlogWriterInfo *writer) -{ - GET_BINLOG_FILENAME(writer); - if (access(writer->file.name, F_OK) == 0) { - char bak_filename[PATH_MAX]; - char date_str[32]; - - snprintf(bak_filename, sizeof(bak_filename), "%s.%s", - writer->file.name, formatDatetime(g_current_time, - "%Y%m%d%H%M%S", date_str, sizeof(date_str))); - if (rename(writer->file.name, bak_filename) == 0) { - logWarning("file: "__FILE__", line: %d, " - "binlog file %s exist, rename to %s", - __LINE__, writer->file.name, bak_filename); - } else { - logError("file: "__FILE__", line: %d, " - "rename binlog %s to backup %s fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, bak_filename, - errno, STRERROR(errno)); - return errno != 0 ? errno : EPERM; - } - } - - return open_writable_binlog(writer); -} - -static int do_write_to_file(SFBinlogWriterInfo *writer, - char *buff, const int len) -{ - int result; - - if (fc_safe_write(writer->file.fd, buff, len) != len) { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " - "write to binlog file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, - result, STRERROR(result)); - return result; - } - - if (fsync(writer->file.fd) != 0) { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " - "fsync to binlog file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, - result, STRERROR(result)); - return result; - } - - writer->file.size += len; - return 0; -} - -static int check_write_to_file(SFBinlogWriterInfo *writer, - char *buff, const int len) -{ - int result; - - if (writer->file.size + len <= SF_BINLOG_FILE_MAX_SIZE) { - return do_write_to_file(writer, buff, len); - } - - writer->binlog.index++; //binlog rotate - if ((result=write_to_binlog_index_file(writer)) == 0) { - result = open_next_binlog(writer); - } - - if (result != 0) { - logError("file: "__FILE__", line: %d, " - "open binlog file \"%s\" fail", - __LINE__, writer->file.name); - return result; - } - - return do_write_to_file(writer, buff, len); -} - -static int binlog_write_to_file(SFBinlogWriterInfo *writer) -{ - int result; - int len; - - len = SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer); - if (len == 0) { - return 0; - } - - result = check_write_to_file(writer, writer->binlog_buffer.buff, len); - writer->binlog_buffer.end = writer->binlog_buffer.buff; - return result; -} - -int sf_binlog_get_current_write_index(SFBinlogWriterInfo *writer) -{ - if (writer == NULL) { //for data recovery - return 0; - } - - if (writer->binlog.index < 0) { - get_binlog_index_from_file(writer); - } - - return writer->binlog.index; -} - -void sf_binlog_get_current_write_position(SFBinlogWriterInfo *writer, - SFBinlogFilePosition *position) -{ - position->index = writer->binlog.index; - position->offset = writer->file.size; -} - static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer, const uint64_t next_version) { writer->version_ctx.next = next_version; } -static int deal_binlog_one_record(SFBinlogWriterBuffer *wb) -{ - int result; - - if (wb->bf.length >= wb->writer->binlog_buffer.size / 4) { - if (SF_BINLOG_BUFFER_LENGTH(wb->writer->binlog_buffer) > 0) { - if ((result=binlog_write_to_file(wb->writer)) != 0) { - return result; - } - } - - if ((result=check_write_to_file(wb->writer, wb->bf.buff, - wb->bf.length)) == 0) - { - if (wb->writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { - wb->writer->last_versions.pending = wb->version.last; - } - } - return result; - } - - if (wb->writer->file.size + SF_BINLOG_BUFFER_LENGTH(wb->writer-> - binlog_buffer) + wb->bf.length > SF_BINLOG_FILE_MAX_SIZE) - { - if ((result=binlog_write_to_file(wb->writer)) != 0) { - return result; - } - } else if (wb->writer->binlog_buffer.size - SF_BINLOG_BUFFER_LENGTH( - wb->writer->binlog_buffer) < wb->bf.length) - { - if ((result=binlog_write_to_file(wb->writer)) != 0) { - return result; - } - } - - if (wb->writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { - wb->writer->last_versions.pending = wb->version.last; - } - memcpy(wb->writer->binlog_buffer.end, - wb->bf.buff, wb->bf.length); - wb->writer->binlog_buffer.end += wb->bf.length; - - return 0; -} +#define deal_binlog_one_record(wb) \ + sf_file_writer_deal_buffer(&wb->writer->fw, &wb->bf, wb->version.last) #define GET_WBUFFER_VERSION_COUNT(wb) \ (((wb)->version.last - (wb)->version.first) + 1) @@ -328,7 +70,7 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb) logError("file: "__FILE__", line: %d, subdir_name: %s, " "current version: %"PRId64" is too small which " "less than %"PRId64", tag: %"PRId64", buffer(%d): %.*s", - __LINE__, writer->cfg.subdir_name, wb->version.first, + __LINE__, writer->fw.cfg.subdir_name, wb->version.first, writer->version_ctx.next, wb->tag, wb->bf.length, wb->bf.length, wb->bf.buff); fast_mblock_free_object(&writer->thread->mblock, wb); @@ -337,7 +79,7 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb) /* logInfo("%s wb version===== %"PRId64", next: %"PRId64", writer: %p", - writer->cfg.subdir_name, wb->version.first, + writer->fw.cfg.subdir_name, wb->version.first, writer->version_ctx.next, writer); */ @@ -418,12 +160,12 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread) writer = thread->flush_writers.head; while (writer != NULL) { - if ((result=binlog_write_to_file(writer)) != 0) { + if ((result=sf_file_writer_flush(&writer->fw)) != 0) { return result; } - if (writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { - writer->last_versions.done = writer->last_versions.pending; + if (writer->fw.flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) { + writer->fw.last_versions.done = writer->fw.last_versions.pending; } writer->flush.in_queue = false; writer = writer->flush.next; @@ -457,7 +199,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, logWarning("file: "__FILE__", line: %d, " "subdir_name: %s, invalid order by: %d != %d, " "maybe some mistake happen", __LINE__, - current->writer->cfg.subdir_name, thread->order_by, + current->writer->fw.cfg.subdir_name, thread->order_by, SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION); } @@ -465,12 +207,12 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, logWarning("file: "__FILE__", line: %d, " "subdir_name: %s, ring not empty, " "maybe some mistake happen", __LINE__, - current->writer->cfg.subdir_name); + current->writer->fw.cfg.subdir_name); } logDebug("file: "__FILE__", line: %d, " "subdir_name: %s, set next version to %"PRId64, - __LINE__, current->writer->cfg.subdir_name, + __LINE__, current->writer->fw.cfg.subdir_name, current->version.first); if (current->writer->version_ctx.next != @@ -485,7 +227,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, break; default: - current->writer->total_count++; + current->writer->fw.total_count++; add_to_flush_writer_queue(thread, current->writer); if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { @@ -513,7 +255,7 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) SFBinlogWriterBuffer *wb_head; int count; - if (writer->file.name != NULL) { + if (writer->fw.file.name != NULL) { fc_queue_terminate(&writer->thread->queue); count = 0; @@ -524,7 +266,7 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) if (writer->thread->running) { logWarning("file: "__FILE__", line: %d, " "%s binlog write thread still running, " - "exit anyway!", __LINE__, writer->cfg.subdir_name); + "exit anyway!", __LINE__, writer->fw.cfg.subdir_name); } wb_head = (SFBinlogWriterBuffer *)fc_queue_try_pop_all( @@ -533,13 +275,13 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) deal_binlog_records(writer->thread, wb_head); } - free(writer->file.name); - writer->file.name = NULL; + free(writer->fw.file.name); + writer->fw.file.name = NULL; } - if (writer->file.fd >= 0) { - close(writer->file.fd); - writer->file.fd = -1; + if (writer->fw.file.fd >= 0) { + close(writer->fw.file.fd); + writer->fw.file.fd = -1; } } @@ -586,11 +328,11 @@ static int binlog_wbuffer_alloc_init(void *element, void *args) wbuffer = (SFBinlogWriterBuffer *)element; writer = (SFBinlogWriterInfo *)args; wbuffer->writer = writer; - wbuffer->bf.alloc_size = writer->cfg.max_record_size; + wbuffer->bf.alloc_size = writer->fw.cfg.max_record_size; if (writer->thread->use_fixed_buffer_size) { wbuffer->bf.buff = (char *)(wbuffer + 1); } else { - wbuffer->bf.buff = (char *)fc_malloc(writer->cfg.max_record_size); + wbuffer->bf.buff = (char *)fc_malloc(writer->fw.cfg.max_record_size); if (wbuffer->bf.buff == NULL) { return ENOMEM; } @@ -601,49 +343,9 @@ static int binlog_wbuffer_alloc_init(void *element, void *args) int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, const char *subdir_name, const int buffer_size) { - int result; - int path_len; - bool create; - char filepath[PATH_MAX]; - - writer->total_count = 0; writer->flush.in_queue = false; - writer->last_versions.pending = 0; - writer->last_versions.done = 0; - writer->flags = 0; - if ((result=sf_binlog_buffer_init(&writer->binlog_buffer, - buffer_size)) != 0) - { - return result; - } - - path_len = snprintf(filepath, sizeof(filepath), "%s/%s", - g_sf_binlog_data_path, subdir_name); - if ((result=fc_check_mkdir_ex(filepath, 0775, &create)) != 0) { - return result; - } - if (create) { - SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(filepath); - } - - writer->file.fd = -1; - snprintf(writer->cfg.subdir_name, - sizeof(writer->cfg.subdir_name), - "%s", subdir_name); - writer->file.name = (char *)fc_malloc(path_len + 32); - if (writer->file.name == NULL) { - return ENOMEM; - } - - if ((result=get_binlog_index_from_file(writer)) != 0) { - return result; - } - - if ((result=open_writable_binlog(writer)) != 0) { - return result; - } - - return 0; + return sf_file_writer_init_normal(&writer->fw, + g_sf_binlog_data_path, subdir_name, buffer_size); } int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, @@ -681,7 +383,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, thread->order_mode = order_mode; thread->order_by = order_by; thread->use_fixed_buffer_size = use_fixed_buffer_size; - writer->cfg.max_record_size = max_record_size; + writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; element_size = sizeof(SFBinlogWriterBuffer); @@ -755,61 +457,3 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, fc_queue_push(&writer->thread->queue, buffer); return 0; } - -int sf_binlog_writer_set_binlog_index(SFBinlogWriterInfo *writer, - const int binlog_index) -{ - int result; - - if (writer->binlog.index != binlog_index) { - writer->binlog.index = binlog_index; - if ((result=write_to_binlog_index_file(writer)) != 0) { - return result; - } - } - - return open_writable_binlog(writer); -} - -int sf_binlog_writer_get_last_lines(const char *subdir_name, - const int current_write_index, char *buff, - const int buff_size, int *count, int *length) -{ - int result; - int remain_count; - int current_count; - int current_index; - int i; - char filename[PATH_MAX]; - string_t lines; - - current_index = current_write_index; - *length = 0; - remain_count = *count; - for (i=0; i<2; i++) { - current_count = remain_count; - sf_binlog_writer_get_filename(subdir_name, - current_index, filename, sizeof(filename)); - result = fc_get_last_lines(filename, buff + *length, - buff_size - *length, &lines, ¤t_count); - if (result == 0) { - memmove(buff + *length, lines.str, lines.len); - *length += lines.len; - remain_count -= current_count; - if (remain_count == 0) { - break; - } - } else if (result != ENOENT) { - *count = 0; - return result; - } - if (current_index == 0) { - break; - } - - --current_index; //try previous binlog file - } - - *count -= remain_count; - return 0; -} diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index e11d7e2..4095b6a 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -20,6 +20,7 @@ #include "fastcommon/fc_queue.h" #include "sf_types.h" +#include "sf_file_writer.h" #define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0 #define SF_BINLOG_THREAD_ORDER_MODE_VARY 1 @@ -27,20 +28,10 @@ #define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0 #define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1 -#define SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION 1 - #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 -#define SF_BINLOG_SUBDIR_NAME_SIZE 128 -#define SF_BINLOG_FILE_MAX_SIZE (1024 * 1024 * 1024) //for binlog rotating by size -#define SF_BINLOG_FILE_PREFIX "binlog" -#define SF_BINLOG_FILE_EXT_FMT ".%06d" - -#define SF_BINLOG_BUFFER_LENGTH(buffer) ((buffer).end - (buffer).buff) -#define SF_BINLOG_BUFFER_REMAIN(buffer) ((buffer).end - (buffer).current) - #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ (buffer)->version.first = (buffer)->version.last = ver @@ -81,37 +72,15 @@ typedef struct binlog_writer_thread { } SFBinlogWriterThread; typedef struct sf_binlog_writer_info { - struct { - char subdir_name[SF_BINLOG_SUBDIR_NAME_SIZE]; - int max_record_size; - } cfg; + SFFileWriterInfo fw; - struct { - int index; - int compress_index; - } binlog; - - struct { - int fd; - int64_t size; - char *name; - } file; - - int64_t total_count; struct { SFBinlogWriterBufferRing ring; int64_t next; int64_t change_count; //version change count } version_ctx; - SFBinlogBuffer binlog_buffer; SFBinlogWriterThread *thread; - short flags; - struct { - int64_t pending; - volatile int64_t done; - } last_versions; - struct { bool in_queue; struct sf_binlog_writer_info *next; @@ -169,31 +138,19 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); -static inline void sf_binlog_writer_set_flags( - SFBinlogWriterInfo *writer, const short flags) -{ - writer->flags = flags; -} +#define sf_binlog_writer_set_flags(writer, flags) \ + sf_file_writer_set_flags(&writer->fw, flags) -static inline int64_t sf_binlog_writer_get_last_version( - SFBinlogWriterInfo *writer) -{ - if (writer->flags & SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION) { - return writer->last_versions.done; - } else { - logError("file: "__FILE__", line: %d, " - "should set writer flags to %d!", __LINE__, - SF_BINLOG_WRITER_FLAGS_WANT_DONE_VERSION); - return -1; - } -} +#define sf_binlog_writer_get_last_version(writer) \ + sf_file_writer_get_last_version(&writer->fw) void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); -int sf_binlog_get_current_write_index(SFBinlogWriterInfo *writer); +#define sf_binlog_get_current_write_index(writer) \ + sf_file_writer_get_current_index(&writer->fw) -void sf_binlog_get_current_write_position(SFBinlogWriterInfo *writer, - SFBinlogFilePosition *position); +#define sf_binlog_get_current_write_position(writer, position) \ + sf_file_writer_get_current_position(&writer->fw, position) static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_buffer( SFBinlogWriterThread *thread) @@ -226,24 +183,17 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( return buffer; } -static inline const char *sf_binlog_writer_get_filepath(const char *subdir_name, - char *filename, const int size) -{ - snprintf(filename, size, "%s/%s", g_sf_binlog_data_path, subdir_name); - return filename; -} +#define sf_binlog_writer_get_filepath(subdir_name, filename, size) \ + sf_file_writer_get_filepath(g_sf_binlog_data_path, \ + subdir_name, filename, size) -static inline const char *sf_binlog_writer_get_filename(const char *subdir_name, - const int binlog_index, char *filename, const int size) -{ - snprintf(filename, size, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, - g_sf_binlog_data_path, subdir_name, - SF_BINLOG_FILE_PREFIX, binlog_index); - return filename; -} +#define sf_binlog_writer_get_filename(subdir_name, \ + binlog_index, filename, size) \ + sf_file_writer_get_filename(g_sf_binlog_data_path, \ + subdir_name, binlog_index, filename, size) -int sf_binlog_writer_set_binlog_index(SFBinlogWriterInfo *writer, - const int binlog_index); +#define sf_binlog_writer_set_binlog_index(writer, binlog_index) \ + sf_file_writer_set_binlog_index(writer, binlog_index) #define sf_push_to_binlog_thread_queue(thread, buffer) \ fc_queue_push(&(thread)->queue, buffer) @@ -255,9 +205,10 @@ static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, fc_queue_push(&writer->thread->queue, buffer); } -int sf_binlog_writer_get_last_lines(const char *subdir_name, - const int current_write_index, char *buff, - const int buff_size, int *count, int *length); +#define sf_binlog_writer_get_last_lines(subdir_name, current_write_index, \ + buff, buff_size, count, length) \ + sf_file_writer_get_last_lines(g_sf_binlog_data_path, subdir_name, \ + current_write_index, buff, buff_size, count, length) #ifdef __cplusplus }