diff --git a/storage/trunk_mgr/trunk_mem.c b/storage/trunk_mgr/trunk_mem.c index a32137d..23481ae 100644 --- a/storage/trunk_mgr/trunk_mem.c +++ b/storage/trunk_mgr/trunk_mem.c @@ -518,6 +518,8 @@ static int storage_trunk_do_save() static int storage_trunk_compress() { + static int last_write_version = 0; + int current_write_version; int result; if (g_current_time - g_up_time < 600) @@ -526,7 +528,16 @@ static int storage_trunk_compress() "too little time lapse: %ds afer startup, " "skip trunk binlog compress", __LINE__, (int)(g_current_time - g_up_time)); - return EBUSY; + return EAGAIN; + } + + current_write_version = trunk_binlog_get_write_version(); + if (current_write_version == last_write_version) + { + logInfo("file: "__FILE__", line: %d, " + "binlog NOT changed, do NOT need compress", + __LINE__); + return EALREADY; } if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 1) != 1) @@ -563,6 +574,7 @@ static int storage_trunk_compress() } g_trunk_last_compress_time = g_current_time; + last_write_version = current_write_version; } while (0); __sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1); @@ -609,7 +621,8 @@ static int storage_trunk_save() return trunk_unlink_all_mark_files(); //because the binlog file be compressed } - return result; + return (result == EAGAIN || result == EALREADY || + result == EINPROGRESS) ? 0 : result; } int trunk_binlog_compress_func(void *args) @@ -818,7 +831,7 @@ static int storage_trunk_restore(const int64_t restore_offset) memset(&record, 0, sizeof(record)); memset(&reader, 0, sizeof(reader)); reader.binlog_offset = restore_offset; - if ((result=trunk_reader_init(NULL, &reader)) != 0) + if ((result=trunk_reader_init(NULL, &reader, false)) != 0) { return result; } diff --git a/storage/trunk_mgr/trunk_sync.c b/storage/trunk_mgr/trunk_sync.c index 65909a0..b94a2bf 100644 --- a/storage/trunk_mgr/trunk_sync.c +++ b/storage/trunk_mgr/trunk_sync.c @@ -988,7 +988,7 @@ static char *trunk_get_mark_filename_by_id(const char *storage_id, } int trunk_reader_init(const FDFSStorageBrief *pStorage, - TrunkBinLogReader *pReader) + TrunkBinLogReader *pReader, const bool reset_binlog_offset) { char full_filename[MAX_PATH_SIZE]; IniContext iniContext; @@ -1116,7 +1116,13 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, } } - if ((result=trunk_open_readable_binlog(pReader, \ + if (reset_binlog_offset && pReader->binlog_offset > 0) + { + pReader->binlog_offset = 0; + trunk_write_to_mark_file(pReader); + } + + if ((result=trunk_open_readable_binlog(pReader, get_binlog_readable_filename, pReader)) != 0) { return result; @@ -1178,7 +1184,7 @@ static int trunk_binlog_preread(TrunkBinLogReader *pReader) int bytes_read; int saved_trunk_binlog_write_version; - if (pReader->binlog_buff.version == trunk_binlog_write_version && \ + if (pReader->binlog_buff.version == trunk_binlog_write_version && pReader->binlog_buff.length == 0) { return ENOENT; @@ -1550,12 +1556,12 @@ static void *trunk_sync_thread_entrance(void* arg) break; } - if ((result=trunk_reader_init(pStorage, &reader)) != 0) + if ((result=trunk_reader_init(pStorage, &reader, + thread_data->reset_binlog_offset)) != 0) { - logCrit("file: "__FILE__", line: %d, " \ - "trunk_reader_init fail, errno=%d, " \ - "program exit!", \ - __LINE__, result); + logCrit("file: "__FILE__", line: %d, " + "trunk_reader_init fail, errno=%d, " + "program exit!", __LINE__, result); g_continue_flag = false; break; } @@ -1733,6 +1739,7 @@ TrunkSyncThreadInfo *trunk_sync_alloc_thread_data() { TrunkSyncThreadInfo **thread_info; TrunkSyncThreadInfo **info_end; + TrunkSyncThreadInfo **old_thread_data; TrunkSyncThreadInfo **new_thread_data; TrunkSyncThreadInfo **new_data_start; int alloc_count; @@ -1802,12 +1809,13 @@ TrunkSyncThreadInfo *trunk_sync_alloc_thread_data() memset(*thread_info, 0, sizeof(TrunkSyncThreadInfo)); } - if (sync_thread_info_array.thread_data != NULL) - { - free(sync_thread_info_array.thread_data); - } + old_thread_data = sync_thread_info_array.thread_data; sync_thread_info_array.thread_data = new_thread_data; sync_thread_info_array.alloc_count = alloc_count; + if (old_thread_data != NULL) + { + free(old_thread_data); + } return *new_data_start; } @@ -1947,3 +1955,7 @@ int trunk_unlink_all_mark_files() return 0; } +int trunk_binlog_get_write_version() +{ + return trunk_binlog_write_version; +} diff --git a/storage/trunk_mgr/trunk_sync.h b/storage/trunk_mgr/trunk_sync.h index e94c471..1bb7309 100644 --- a/storage/trunk_mgr/trunk_sync.h +++ b/storage/trunk_mgr/trunk_sync.h @@ -74,7 +74,7 @@ int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \ get_filename_func filename_func, const void *pArg); int trunk_reader_init(const FDFSStorageBrief *pStorage, - TrunkBinLogReader *pReader); + TrunkBinLogReader *pReader, const bool reset_binlog_offset); void trunk_reader_destroy(TrunkBinLogReader *pReader); //trunk binlog compress @@ -83,6 +83,7 @@ int trunk_binlog_compress_commit(); int trunk_binlog_compress_rollback(); int trunk_sync_notify_thread_reset_offset(); +int trunk_binlog_get_write_version(); #ifdef __cplusplus }