diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 57f8cf7..b19463d 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -101,23 +101,21 @@ static int open_writable_binlog(SFBinlogWriterInfo *writer) writer->file.fd = open(writer->file.name, O_WRONLY | O_CREAT | O_APPEND, 0644); if (writer->file.fd < 0) { - logCrit("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "open file \"%s\" fail, " - "errno: %d, error info: %s, exiting ...", + "errno: %d, error info: %s", __LINE__, writer->file.name, errno, STRERROR(errno)); - SF_G_CONTINUE_FLAG = false; return errno != 0 ? errno : EACCES; } writer->file.size = lseek(writer->file.fd, 0, SEEK_END); if (writer->file.size < 0) { - logCrit("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "lseek file \"%s\" fail, " - "errno: %d, error info: %s, exiting ...", + "errno: %d, error info: %s", __LINE__, writer->file.name, errno, STRERROR(errno)); - SF_G_CONTINUE_FLAG = false; return errno != 0 ? errno : EIO; } @@ -139,12 +137,11 @@ static int open_next_binlog(SFBinlogWriterInfo *writer) "binlog file %s exist, rename to %s", __LINE__, writer->file.name, bak_filename); } else { - logCrit("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "rename binlog %s to backup %s fail, " - "errno: %d, error info: %s, exiting ...", + "errno: %d, error info: %s", __LINE__, writer->file.name, bak_filename, errno, STRERROR(errno)); - SF_G_CONTINUE_FLAG = false; return errno != 0 ? errno : EPERM; } } @@ -159,23 +156,21 @@ static int do_write_to_file(SFBinlogWriterInfo *writer, if (fc_safe_write(writer->file.fd, buff, len) != len) { result = errno != 0 ? errno : EIO; - logCrit("file: "__FILE__", line: %d, " - "write to binlog file \"%s\" fail, fd: %d, " - "errno: %d, error info: %s, exiting ...", + logError("file: "__FILE__", line: %d, " + "write to binlog file \"%s\" fail, " + "errno: %d, error info: %s", __LINE__, writer->file.name, - writer->file.fd, result, STRERROR(result)); - SF_G_CONTINUE_FLAG = false; + result, STRERROR(result)); return result; } if (fsync(writer->file.fd) != 0) { result = errno != 0 ? errno : EIO; - logCrit("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "fsync to binlog file \"%s\" fail, " - "errno: %d, error info: %s, exiting ...", + "errno: %d, error info: %s", __LINE__, writer->file.name, result, STRERROR(result)); - SF_G_CONTINUE_FLAG = false; return result; } @@ -321,17 +316,20 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer * } #define DEAL_CURRENT_VERSION_WBUFFER(writer, wb) \ - do { \ - deal_binlog_one_record(wb); \ + do { \ + if ((result=deal_binlog_one_record(wb)) != 0) { \ + return result; \ + } \ fast_mblock_free_object(&writer->thread->mblock, wb); \ ++writer->version_ctx.next; \ } while (0) -static void deal_record_by_version(SFBinlogWriterBuffer *wb) +static int deal_record_by_version(SFBinlogWriterBuffer *wb) { SFBinlogWriterInfo *writer; SFBinlogWriterBuffer **current; int64_t distance; + int result; int index; bool expand; @@ -344,7 +342,8 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb) wb->version, writer->version_ctx.next, writer->version_ctx.ring.size - 1); repush_to_queue(writer->thread, wb); - return; + fc_sleep_ms(10); + return 0; } /* @@ -363,7 +362,7 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb) writer->version_ctx.ring.start = writer->version_ctx.ring.end = writer->version_ctx.ring.entries + (++index) % writer->version_ctx.ring.size; - return; + return 0; } writer->version_ctx.ring.start = writer->version_ctx.ring.entries + @@ -378,7 +377,7 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb) (++index) % writer->version_ctx.ring.size; writer->version_ctx.ring.count--; } - return; + return 0; } *current = wb; @@ -404,6 +403,8 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb) writer->version_ctx.ring.end = writer->version_ctx.ring.entries + (wb->version + 1) % writer->version_ctx.ring.size; } + + return 0; } static inline void add_to_flush_writer_array(SFBinlogWriterThread *thread, @@ -495,7 +496,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, thread->mblock, current); } else { add_to_flush_writer_array(thread, current->writer); - deal_record_by_version(current); + if ((result=deal_record_by_version(current)) != 0) { + return result; + } } } while (wbuffer != NULL); } else { @@ -566,7 +569,8 @@ static void *binlog_writer_func(void *arg) if (deal_binlog_records(thread, wb_head) != 0) { logCrit("file: "__FILE__", line: %d, " - "deal_binlog_records fail, program exit!", __LINE__); + "deal_binlog_records fail, " + "program exit!", __LINE__); SF_G_CONTINUE_FLAG = false; } }