diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index fca9dd4..d9d8ee6 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -194,6 +194,16 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; + case SF_BINLOG_BUFFER_TYPE_CHANGE_PASSIVE_WRITE: + thread->passive_write = current->version.first; + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); + break; + case SF_BINLOG_BUFFER_TYPE_CHANGE_CALL_FSYNC: + current->writer->fw.cfg.call_fsync = current->version.first; + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); + break; case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE: if ((result=sf_file_writer_set_binlog_write_index(¤t-> writer->fw, current->writer->fw.binlog. @@ -201,11 +211,15 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, { return result; } + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); break; case SF_BINLOG_BUFFER_TYPE_FLUSH_FILE: if ((result=flush_writer_files(thread)) != 0) { return result; } + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); break; case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX: if ((result=sf_file_writer_set_binlog_write_index(¤t-> @@ -213,9 +227,13 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, { return result; } + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); break; case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT: flush_writer_files(thread); + fast_mblock_free_object(¤t->writer-> + thread->mblock, current); return ERRNO_THREAD_EXIT; case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: if (current->writer->order_by != @@ -255,7 +273,6 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; - default: current->writer->fw.total_count++; add_to_flush_writer_queue(thread, current->writer); @@ -539,6 +556,22 @@ static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer, return 0; } +int sf_binlog_writer_change_passive_write(SFBinlogWriterInfo *writer, + const bool passive_write) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_CHANGE_PASSIVE_WRITE, + passive_write); +} + +int sf_binlog_writer_change_call_fsync(SFBinlogWriterInfo *writer, + const bool call_fsync) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_CHANGE_CALL_FSYNC, + call_fsync); +} + int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version) { diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 8910852..38fcabd 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -28,13 +28,15 @@ #define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0 #define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1 -#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 -#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 -#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 -#define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 3 -#define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 4 -#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 5 -#define SF_BINLOG_BUFFER_TYPE_FLUSH_FILE 6 +#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 +#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 +#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 +#define SF_BINLOG_BUFFER_TYPE_CHANGE_PASSIVE_WRITE 3 +#define SF_BINLOG_BUFFER_TYPE_CHANGE_CALL_FSYNC 4 +#define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 5 +#define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 6 +#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 7 +#define SF_BINLOG_BUFFER_TYPE_FLUSH_FILE 8 #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ (buffer)->version.first = (buffer)->version.last = ver @@ -184,6 +186,12 @@ static inline void sf_binlog_writer_destroy( int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, const short order_by); +int sf_binlog_writer_change_passive_write(SFBinlogWriterInfo *writer, + const bool passive_write); + +int sf_binlog_writer_change_call_fsync(SFBinlogWriterInfo *writer, + const bool call_fsync); + int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 6af8f37..01272c3 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -232,14 +232,16 @@ static int do_write_to_file(SFFileWriterInfo *writer, return result; } - if (fsync(writer->file.fd) != 0) { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " - "fsync to binlog file \"%s\" fail, " - "errno: %d, error info: %s", - __LINE__, writer->file.name, - result, STRERROR(result)); - return result; + if (writer->cfg.call_fsync) { + if (fsync(writer->file.fd) != 0) { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "fsync to binlog file \"%s\" fail, " + "errno: %d, error info: %s", + __LINE__, writer->file.name, + result, STRERROR(result)); + return result; + } } writer->file.size += len; @@ -382,6 +384,7 @@ int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, return result; } + writer->cfg.call_fsync = true; writer->cfg.file_rotate_size = file_rotate_size; writer->cfg.data_path = data_path; path_len = snprintf(filepath, sizeof(filepath), diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index d5193af..07950c7 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -40,6 +40,7 @@ typedef struct sf_file_writer_info { char file_prefix[SF_BINLOG_FILE_PREFIX_SIZE]; int64_t file_rotate_size; int max_record_size; + bool call_fsync; } cfg; struct { diff --git a/src/sf_nio.c b/src/sf_nio.c index a93ba76..c143c94 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -608,9 +608,9 @@ int sf_client_sock_read(int sock, short event, void *arg) TCP_SET_QUICK_ACK(sock); total_read += bytes; task->offset += bytes; - if (task->length == 0) { //header + if (task->length == 0) { //pkg header if (task->offset < SF_CTX->header_size) { - break; + continue; } if (SF_CTX->set_body_length(task) != 0) {