set SF_G_CONTINUE_FLAG only once :)

connection_manager
YuQing 2020-09-29 18:18:38 +08:00
parent 58e35f7457
commit ceedb33e93
1 changed files with 29 additions and 25 deletions

View File

@ -101,23 +101,21 @@ static int open_writable_binlog(SFBinlogWriterInfo *writer)
writer->file.fd = open(writer->file.name,
O_WRONLY | O_CREAT | O_APPEND, 0644);
if (writer->file.fd < 0) {
logCrit("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"open file \"%s\" fail, "
"errno: %d, error info: %s, exiting ...",
"errno: %d, error info: %s",
__LINE__, writer->file.name,
errno, STRERROR(errno));
SF_G_CONTINUE_FLAG = false;
return errno != 0 ? errno : EACCES;
}
writer->file.size = lseek(writer->file.fd, 0, SEEK_END);
if (writer->file.size < 0) {
logCrit("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"lseek file \"%s\" fail, "
"errno: %d, error info: %s, exiting ...",
"errno: %d, error info: %s",
__LINE__, writer->file.name,
errno, STRERROR(errno));
SF_G_CONTINUE_FLAG = false;
return errno != 0 ? errno : EIO;
}
@ -139,12 +137,11 @@ static int open_next_binlog(SFBinlogWriterInfo *writer)
"binlog file %s exist, rename to %s",
__LINE__, writer->file.name, bak_filename);
} else {
logCrit("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"rename binlog %s to backup %s fail, "
"errno: %d, error info: %s, exiting ...",
"errno: %d, error info: %s",
__LINE__, writer->file.name, bak_filename,
errno, STRERROR(errno));
SF_G_CONTINUE_FLAG = false;
return errno != 0 ? errno : EPERM;
}
}
@ -159,23 +156,21 @@ static int do_write_to_file(SFBinlogWriterInfo *writer,
if (fc_safe_write(writer->file.fd, buff, len) != len) {
result = errno != 0 ? errno : EIO;
logCrit("file: "__FILE__", line: %d, "
"write to binlog file \"%s\" fail, fd: %d, "
"errno: %d, error info: %s, exiting ...",
logError("file: "__FILE__", line: %d, "
"write to binlog file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, writer->file.name,
writer->file.fd, result, STRERROR(result));
SF_G_CONTINUE_FLAG = false;
result, STRERROR(result));
return result;
}
if (fsync(writer->file.fd) != 0) {
result = errno != 0 ? errno : EIO;
logCrit("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"fsync to binlog file \"%s\" fail, "
"errno: %d, error info: %s, exiting ...",
"errno: %d, error info: %s",
__LINE__, writer->file.name,
result, STRERROR(result));
SF_G_CONTINUE_FLAG = false;
return result;
}
@ -321,17 +316,20 @@ static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *
}
#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb) \
do { \
deal_binlog_one_record(wb); \
do { \
if ((result=deal_binlog_one_record(wb)) != 0) { \
return result; \
} \
fast_mblock_free_object(&writer->thread->mblock, wb); \
++writer->version_ctx.next; \
} while (0)
static void deal_record_by_version(SFBinlogWriterBuffer *wb)
static int deal_record_by_version(SFBinlogWriterBuffer *wb)
{
SFBinlogWriterInfo *writer;
SFBinlogWriterBuffer **current;
int64_t distance;
int result;
int index;
bool expand;
@ -344,7 +342,8 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb)
wb->version, writer->version_ctx.next,
writer->version_ctx.ring.size - 1);
repush_to_queue(writer->thread, wb);
return;
fc_sleep_ms(10);
return 0;
}
/*
@ -363,7 +362,7 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb)
writer->version_ctx.ring.start = writer->version_ctx.ring.end =
writer->version_ctx.ring.entries +
(++index) % writer->version_ctx.ring.size;
return;
return 0;
}
writer->version_ctx.ring.start = writer->version_ctx.ring.entries +
@ -378,7 +377,7 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb)
(++index) % writer->version_ctx.ring.size;
writer->version_ctx.ring.count--;
}
return;
return 0;
}
*current = wb;
@ -404,6 +403,8 @@ static void deal_record_by_version(SFBinlogWriterBuffer *wb)
writer->version_ctx.ring.end = writer->version_ctx.ring.entries +
(wb->version + 1) % writer->version_ctx.ring.size;
}
return 0;
}
static inline void add_to_flush_writer_array(SFBinlogWriterThread *thread,
@ -495,7 +496,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
thread->mblock, current);
} else {
add_to_flush_writer_array(thread, current->writer);
deal_record_by_version(current);
if ((result=deal_record_by_version(current)) != 0) {
return result;
}
}
} while (wbuffer != NULL);
} else {
@ -566,7 +569,8 @@ static void *binlog_writer_func(void *arg)
if (deal_binlog_records(thread, wb_head) != 0) {
logCrit("file: "__FILE__", line: %d, "
"deal_binlog_records fail, program exit!", __LINE__);
"deal_binlog_records fail, "
"program exit!", __LINE__);
SF_G_CONTINUE_FLAG = false;
}
}