diff --git a/HISTORY b/HISTORY index 4919bc3..5bf5b05 100644 --- a/HISTORY +++ b/HISTORY @@ -1,11 +1,12 @@ -Version 6.05 2019-12-15 +Version 6.05 2019-12-18 * fdfs_trackerd and fdfs_storaged print the server version in usage. you can execute fdfs_trackerd or fdfs_storaged without parameters to show the server version * trunk server support compress the trunk binlog periodically, config items in tracker.conf: trunk_compress_binlog_interval and trunk_compress_binlog_time_base + * trunk binlog compression support transaction Version 6.04 2019-12-05 * storage_report_ip_changed ignore result EEXIST diff --git a/storage/storage_func.c b/storage/storage_func.c index 44acd99..fae7b1e 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -73,8 +73,8 @@ typedef struct #define INIT_ITEM_LAST_HTTP_PORT "last_http_port" #define INIT_ITEM_CURRENT_TRUNK_FILE_ID "current_trunk_file_id" #define INIT_ITEM_TRUNK_LAST_COMPRESS_TIME "trunk_last_compress_time" -#define INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS \ - "trunk_binlog_compress_in_progress" +#define INIT_ITEM_TRUNK_BINLOG_COMPRESS_STAGE \ + "trunk_binlog_compress_stage" #define INIT_ITEM_STORE_PATH_MARK_PREFIX "store_path_mark" #define STAT_ITEM_TOTAL_UPLOAD "total_upload_count" @@ -652,28 +652,28 @@ int storage_write_to_sync_ini_file() fdfs_multi_ips_to_string(&g_tracker_client_ip, ip_str, sizeof(ip_str)); - len = sprintf(buff, "%s=%d\n" - "%s=%d\n" - "%s=%s\n" - "%s=%d\n" - "%s=%s\n" - "%s=%d\n" - "%s=%d\n" - "%s=%d\n" - "%s=%d\n" - "%s=%d\n", - INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time, - INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done, - INIT_ITEM_SYNC_SRC_SERVER, g_sync_src_id, - INIT_ITEM_SYNC_UNTIL_TIMESTAMP, g_sync_until_timestamp, - INIT_ITEM_LAST_IP_ADDRESS, ip_str, - INIT_ITEM_LAST_SERVER_PORT, g_last_server_port, - INIT_ITEM_LAST_HTTP_PORT, g_last_http_port, - INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id, - INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time, - INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS, - g_trunk_binlog_compress_in_progress - ); + len = sprintf(buff, "%s=%d\n" + "%s=%d\n" + "%s=%s\n" + "%s=%d\n" + "%s=%s\n" + "%s=%d\n" + "%s=%d\n" + "%s=%d\n" + "%s=%d\n" + "%s=%d\n", + INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time, + INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done, + INIT_ITEM_SYNC_SRC_SERVER, g_sync_src_id, + INIT_ITEM_SYNC_UNTIL_TIMESTAMP, g_sync_until_timestamp, + INIT_ITEM_LAST_IP_ADDRESS, ip_str, + INIT_ITEM_LAST_SERVER_PORT, g_last_server_port, + INIT_ITEM_LAST_HTTP_PORT, g_last_http_port, + INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id, + INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, + (int)g_trunk_last_compress_time, + INIT_ITEM_TRUNK_BINLOG_COMPRESS_STAGE, + g_trunk_binlog_compress_stage); if (g_check_store_path_mark) { @@ -1070,9 +1070,9 @@ static int storage_check_and_make_data_dirs() INIT_ITEM_CURRENT_TRUNK_FILE_ID, &iniContext, 0); g_trunk_last_compress_time = iniGetIntValue(NULL, INIT_ITEM_TRUNK_LAST_COMPRESS_TIME , &iniContext, 0); - g_trunk_binlog_compress_in_progress = iniGetIntValue(NULL, - INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS, - &iniContext, 0); + g_trunk_binlog_compress_stage = iniGetIntValue(NULL, + INIT_ITEM_TRUNK_BINLOG_COMPRESS_STAGE, + &iniContext, STORAGE_TRUNK_COMPRESS_STAGE_NONE); if ((result=storage_load_store_path_marks(&iniContext)) != 0) { @@ -2302,6 +2302,11 @@ int storage_func_init(const char *filename, \ return result; } + if ((result=storage_trunk_binlog_compress_check_recovery()) != 0) + { + return result; + } + if ((result=init_pthread_lock(&sync_stat_file_lock)) != 0) { return result; diff --git a/storage/storage_service.c b/storage/storage_service.c index a94be37..6f2f45a 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -4064,7 +4064,7 @@ static int storage_server_trunk_delete_binlog_marks(struct fast_task_info *pTask return result; } - return trunk_unlink_all_mark_files(); + return trunk_unlink_all_mark_files(false); } /** diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index cd48015..68abe42 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -1262,7 +1262,7 @@ static void do_unset_trunk_server_myself(ConnectionInfo *pTrackerServer) trunk_waiting_sync_thread_exit(); - storage_trunk_destroy_ex(true); + storage_trunk_destroy_ex(true, true); if (g_trunk_create_file_advance && g_trunk_create_file_interval > 0) { diff --git a/storage/trunk_mgr/trunk_mem.c b/storage/trunk_mgr/trunk_mem.c index 23481ae..1dff485 100644 --- a/storage/trunk_mgr/trunk_mem.c +++ b/storage/trunk_mgr/trunk_mem.c @@ -62,13 +62,14 @@ bool g_if_trunker_self = false; bool g_trunk_create_file_advance = false; bool g_trunk_init_check_occupying = false; bool g_trunk_init_reload_from_binlog = false; -volatile int g_trunk_binlog_compress_in_progress = 0; -volatile int g_trunk_data_save_in_progress = 0; -static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE; +int g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_NONE; int64_t g_trunk_total_free_space = 0; int64_t g_trunk_create_file_space_threshold = 0; time_t g_trunk_last_compress_time = 0; +static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE; +static volatile int trunk_binlog_compress_in_progress = 0; +static volatile int trunk_data_save_in_progress = 0; static pthread_mutex_t trunk_file_lock; static pthread_mutex_t trunk_mem_lock; static struct fast_mblock_man free_blocks_man; @@ -272,7 +273,8 @@ int storage_trunk_init() return 0; } -int storage_trunk_destroy_ex(const bool bNeedSleep) +int storage_trunk_destroy_ex(const bool bNeedSleep, + const bool bSaveData) { int result; int i; @@ -292,7 +294,14 @@ int storage_trunk_destroy_ex(const bool bNeedSleep) logDebug("file: "__FILE__", line: %d, " \ "storage trunk destroy", __LINE__); - result = storage_trunk_save(); + if (bSaveData) + { + result = storage_trunk_save(); + } + else + { + result = 0; + } for (i=0; i g_trunk_compress_binlog_min_interval)) { - if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 0) == 0) + if (__sync_add_and_fetch(&trunk_binlog_compress_in_progress, 0) == 0) { return storage_trunk_do_save(); } @@ -610,15 +743,16 @@ static int storage_trunk_save() { logWarning("file: "__FILE__", line: %d, " "trunk binlog compress already in progress, " - "g_trunk_binlog_compress_in_progress=%d", - __LINE__, g_trunk_binlog_compress_in_progress); + "trunk_binlog_compress_in_progress=%d", + __LINE__, trunk_binlog_compress_in_progress); return 0; } } if ((result=storage_trunk_compress()) == 0) { - return trunk_unlink_all_mark_files(); //because the binlog file be compressed + g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_FINISHED; + return storage_write_to_sync_ini_file(); } return (result == EAGAIN || result == EALREADY || @@ -634,19 +768,20 @@ int trunk_binlog_compress_func(void *args) return 0; } - result = storage_trunk_compress(); - if (result != 0) + if ((result=storage_trunk_compress()) != 0) { return result; } if (!g_if_trunker_self) { - return 0; + g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_FINISHED; + return storage_write_to_sync_ini_file(); } trunk_sync_notify_thread_reset_offset(); - return 0; + g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_FINISHED; + return storage_write_to_sync_ini_file(); } static bool storage_trunk_is_space_occupied(const FDFSTrunkFullInfo *pTrunkInfo) @@ -980,10 +1115,13 @@ static int storage_trunk_restore(const int64_t restore_offset) trunk_mark_filename_by_reader(&reader, trunk_mark_filename); if (unlink(trunk_mark_filename) != 0) { - logError("file: "__FILE__", line: %d, " \ - "unlink file %s fail, " \ - "errno: %d, error info: %s", __LINE__, \ - trunk_mark_filename, errno, STRERROR(errno)); + if (errno != ENOENT) + { + logError("file: "__FILE__", line: %d, " + "unlink file %s fail, " + "errno: %d, error info: %s", __LINE__, + trunk_mark_filename, errno, STRERROR(errno)); + } } if (result != 0) @@ -1039,10 +1177,10 @@ int storage_delete_trunk_data_file() result = errno != 0 ? errno : ENOENT; if (result != ENOENT) { - logError("file: "__FILE__", line: %d, " \ - "unlink trunk data file: %s fail, " \ - "errno: %d, error info: %s", \ - __LINE__, trunk_data_filename, \ + logError("file: "__FILE__", line: %d, " + "unlink trunk data file: %s fail, " + "errno: %d, error info: %s", + __LINE__, trunk_data_filename, result, STRERROR(result)); } diff --git a/storage/trunk_mgr/trunk_mem.h b/storage/trunk_mgr/trunk_mem.h index 7cd1f85..b7162b5 100644 --- a/storage/trunk_mgr/trunk_mem.h +++ b/storage/trunk_mgr/trunk_mem.h @@ -22,6 +22,17 @@ #include "trunk_shared.h" #include "fdfs_shared_func.h" +#define STORAGE_TRUNK_COMPRESS_STAGE_NONE 0 +#define STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_BEGIN 1 +#define STORAGE_TRUNK_COMPRESS_STAGE_APPLY_DONE 2 +#define STORAGE_TRUNK_COMPRESS_STAGE_SAVE_DONE 3 +#define STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGING 4 +#define STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGE_DONE 5 +#define STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_SUCCESS 6 +#define STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGING 7 +#define STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGE_DONE 8 +#define STORAGE_TRUNK_COMPRESS_STAGE_FINISHED 9 + #ifdef __cplusplus extern "C" { #endif @@ -44,7 +55,7 @@ extern bool g_if_use_trunk_file; //if use trunk file extern bool g_trunk_create_file_advance; extern bool g_trunk_init_check_occupying; extern bool g_trunk_init_reload_from_binlog; -extern volatile int g_trunk_binlog_compress_in_progress; +extern int g_trunk_binlog_compress_stage; extern bool g_if_trunker_self; //if am i trunk server extern int64_t g_trunk_create_file_space_threshold; extern int64_t g_trunk_total_free_space; //trunk total free space in bytes @@ -63,9 +74,10 @@ typedef struct { } FDFSTrunkSlot; int storage_trunk_init(); -int storage_trunk_destroy_ex(const bool bNeedSleep); +int storage_trunk_destroy_ex(const bool bNeedSleep, + const bool bSaveData); -#define storage_trunk_destroy() storage_trunk_destroy_ex(false) +#define storage_trunk_destroy() storage_trunk_destroy_ex(false, true) int trunk_alloc_space(const int size, FDFSTrunkFullInfo *pResult); int trunk_alloc_confirm(const FDFSTrunkFullInfo *pTrunkInfo, const int status); @@ -92,6 +104,8 @@ int trunk_create_trunk_file_advance(void *args); int trunk_binlog_compress_func(void *args); +int storage_trunk_binlog_compress_check_recovery(); + int storage_delete_trunk_data_file(); char *storage_trunk_get_data_filename(char *full_filename); diff --git a/storage/trunk_mgr/trunk_sync.c b/storage/trunk_mgr/trunk_sync.c index b94a2bf..a0ffacb 100644 --- a/storage/trunk_mgr/trunk_sync.c +++ b/storage/trunk_mgr/trunk_sync.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -38,10 +39,11 @@ #include "storage_sync_func.h" #include "trunk_sync.h" -#define TRUNK_SYNC_BINLOG_FILENAME "binlog" +#define TRUNK_SYNC_BINLOG_FILENAME "binlog" #define TRUNK_SYNC_BINLOG_ROLLBACK_EXT ".rollback" -#define TRUNK_SYNC_MARK_FILE_EXT ".mark" -#define TRUNK_DIR_NAME "trunk" +#define TRUNK_SYNC_MARK_FILE_EXT_STR ".mark" +#define TRUNK_SYNC_MARK_FILE_EXT_LEN (sizeof(TRUNK_SYNC_MARK_FILE_EXT_STR) - 1) +#define TRUNK_DIR_NAME "trunk" #define MARK_ITEM_BINLOG_FILE_OFFSET "binlog_offset" static int trunk_binlog_fd = -1; @@ -85,7 +87,7 @@ char *get_trunk_binlog_filename(char *full_filename) return full_filename; } -static char *get_trunk_rollback_filename(char *full_filename) +static char *get_trunk_binlog_rollback_filename(char *full_filename) { get_trunk_binlog_filename(full_filename); if (strlen(full_filename) + sizeof(TRUNK_SYNC_BINLOG_ROLLBACK_EXT) > @@ -97,6 +99,38 @@ static char *get_trunk_rollback_filename(char *full_filename) return full_filename; } +static char *get_trunk_data_rollback_filename(char *full_filename) +{ + storage_trunk_get_data_filename(full_filename); + if (strlen(full_filename) + sizeof(TRUNK_SYNC_BINLOG_ROLLBACK_EXT) > + MAX_PATH_SIZE) + { + return NULL; + } + strcat(full_filename, TRUNK_SYNC_BINLOG_ROLLBACK_EXT); + return full_filename; +} + +char *get_trunk_binlog_tmp_filename_ex(const char *binlog_filename, + char *tmp_filename) +{ + const char *true_binlog_filename; + char filename[MAX_PATH_SIZE]; + + if (binlog_filename == NULL) + { + get_trunk_binlog_filename(filename); + true_binlog_filename = filename; + } + else + { + true_binlog_filename = binlog_filename; + } + + sprintf(tmp_filename, "%s.tmp", true_binlog_filename); + return tmp_filename; +} + static int trunk_binlog_open_writer(const char *binlog_filename) { trunk_binlog_fd = open(binlog_filename, O_WRONLY | O_CREAT | @@ -264,12 +298,15 @@ int kill_trunk_sync_threads() int trunk_sync_notify_thread_reset_offset() { int result; + int i; + int count; + bool done; TrunkSyncThreadInfo **thread_info; TrunkSyncThreadInfo **info_end; if (sync_thread_info_array.thread_data == NULL) { - return 0; + return EINVAL; } if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0) @@ -280,6 +317,7 @@ int trunk_sync_notify_thread_reset_offset() __LINE__, result, STRERROR(result)); } + count = 0; info_end = sync_thread_info_array.thread_data + sync_thread_info_array.alloc_count; for (thread_info=sync_thread_info_array.thread_data; @@ -288,6 +326,7 @@ int trunk_sync_notify_thread_reset_offset() if ((*thread_info)->running) { (*thread_info)->reset_binlog_offset = true; + count++; } } @@ -299,7 +338,59 @@ int trunk_sync_notify_thread_reset_offset() __LINE__, result, STRERROR(result)); } - return result; + logInfo("file: "__FILE__", line: %d, " + "notify %d trunk sync threads to reset offset.", + __LINE__, count); + + done = false; + for (i=0; i<300 && g_continue_flag; i++) + { + info_end = sync_thread_info_array.thread_data + + sync_thread_info_array.alloc_count; + for (thread_info=sync_thread_info_array.thread_data; + thread_inforunning && (*thread_info)->reset_binlog_offset) + { + break; + } + } + + if (thread_info == info_end) + { + done = true; + break; + } + + sleep(1); + } + + if (done) + { + logInfo("file: "__FILE__", line: %d, " + "trunk sync threads reset binlog offset done.", + __LINE__); + return 0; + } + else + { + count = 0; + info_end = sync_thread_info_array.thread_data + + sync_thread_info_array.alloc_count; + for (thread_info=sync_thread_info_array.thread_data; + thread_inforunning && (*thread_info)->reset_binlog_offset) + { + count++; + } + } + + logError("file: "__FILE__", line: %d, " + "%d trunk sync threads reset binlog offset timeout.", + __LINE__, count); + return EBUSY; + } } int trunk_binlog_sync_func(void *args) @@ -339,80 +430,120 @@ int trunk_binlog_truncate() return 0; } -int trunk_binlog_compress_apply() +static int trunk_binlog_delete_rollback_file(const char *filename, + const bool silence) { int result; - char binlog_filename[MAX_PATH_SIZE]; - char rollback_filename[MAX_PATH_SIZE]; + if (access(filename, F_OK) == 0) + { + if (!silence) + { + logWarning("file: "__FILE__", line: %d, " + "rollback file %s exist, delete it!", + __LINE__, filename); + } + if (unlink(filename) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result != ENOENT) + { + logError("file: "__FILE__", line: %d, " + "unlink file %s fail, errno: %d, error info: %s", + __LINE__, filename, result, STRERROR(result)); + return result; + } + } + } + else + { + result = errno != 0 ? errno : EPERM; + if (result != ENOENT) + { + logError("file: "__FILE__", line: %d, " + "access file %s fail, errno: %d, error info: %s", + __LINE__, filename, result, STRERROR(result)); + return result; + } + } - get_trunk_binlog_filename(binlog_filename); - if (get_trunk_rollback_filename(rollback_filename) == NULL) + return 0; +} + +int trunk_binlog_compress_delete_binlog_rollback_file(const bool silence) +{ + char binlog_rollback_filename[MAX_PATH_SIZE]; + + if (get_trunk_binlog_rollback_filename(binlog_rollback_filename) == NULL) { - logError("file: "__FILE__", line: %d, " \ - "filename: %s is too long", - __LINE__, binlog_filename); + logError("file: "__FILE__", line: %d, " + "binlog rollback filename is too long", __LINE__); return ENAMETOOLONG; } - if (trunk_binlog_fd < 0) - { - if (access(binlog_filename, F_OK) == 0) - { - if (rename(binlog_filename, rollback_filename) != 0) - { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "rename %s to %s fail, " \ - "errno: %d, error info: %s", - __LINE__, binlog_filename, - rollback_filename, result, - STRERROR(result)); - return result; - } - } - else if (errno != ENOENT) - { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "call access %s fail, " \ - "errno: %d, error info: %s", - __LINE__, binlog_filename, - result, STRERROR(result)); - return result; - } + return trunk_binlog_delete_rollback_file(binlog_rollback_filename, silence); +} - return 0; - } +int trunk_binlog_compress_delete_rollback_files(const bool silence) +{ + int result; + char data_rollback_filename[MAX_PATH_SIZE]; - pthread_mutex_lock(&trunk_sync_thread_lock); - - do + if ((result=trunk_binlog_compress_delete_binlog_rollback_file( + silence)) != 0) { - if ((result=trunk_binlog_close_writer(false)) != 0) - { - break; - } + return result; + } - if (rename(binlog_filename, rollback_filename) != 0) + if (get_trunk_data_rollback_filename(data_rollback_filename) == NULL) + { + logError("file: "__FILE__", line: %d, " + "data rollback filename is too long", __LINE__); + return ENAMETOOLONG; + } + + if ((result=trunk_binlog_delete_rollback_file(data_rollback_filename, + silence)) != 0) + { + return result; + } + + return 0; +} + +static int trunk_binlog_rename_file(const char *src_filename, + const char *dest_filename, const int log_ignore_errno) +{ + int result; + if (access(src_filename, F_OK) == 0) + { + if (rename(src_filename, dest_filename) != 0) { result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "rename %s to %s fail, " \ + logError("file: "__FILE__", line: %d, " + "rename %s to %s fail, " "errno: %d, error info: %s", - __LINE__, binlog_filename, rollback_filename, - result, STRERROR(result)); - break; + __LINE__, src_filename, + dest_filename, result, + STRERROR(result)); + return result; } - - if ((result=trunk_binlog_open_writer(binlog_filename)) != 0) + } + else + { + result = errno != 0 ? errno : EIO; + if (result - log_ignore_errno != 0) { - rename(rollback_filename, binlog_filename); //rollback - break; + logError("file: "__FILE__", line: %d, " + "call access %s fail, " + "errno: %d, error info: %s", + __LINE__, src_filename, + result, STRERROR(result)); } - } while (0); - pthread_mutex_unlock(&trunk_sync_thread_lock); - return result; + return result; + } + + return 0; } static int trunk_binlog_open_read(const char *filename, @@ -445,7 +576,7 @@ static int trunk_binlog_open_read(const char *filename, return fd; } -static int trunk_binlog_merge_file(int old_fd) +static int trunk_binlog_merge_file(int old_fd, const int stage) { int result; int tmp_fd; @@ -454,19 +585,19 @@ static int trunk_binlog_merge_file(int old_fd) char tmp_filename[MAX_PATH_SIZE]; char buff[64 * 1024]; - get_trunk_binlog_filename(binlog_filename); - sprintf(tmp_filename, "%s.tmp", binlog_filename); + get_trunk_binlog_filename(binlog_filename); + get_trunk_binlog_tmp_filename_ex(binlog_filename, tmp_filename); tmp_fd = open(tmp_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); if (tmp_fd < 0) { result = errno != 0 ? errno : EACCES; - logError("file: "__FILE__", line: %d, " \ - "open file \"%s\" fail, " \ - "errno: %d, error info: %s", \ + logError("file: "__FILE__", line: %d, " + "open file \"%s\" fail, " + "errno: %d, error info: %s", __LINE__, tmp_filename, result, STRERROR(result)); return result; } - + while ((bytes=fc_safe_read(old_fd, buff, sizeof(buff))) > 0) { if (fc_safe_write(tmp_fd, buff, bytes) != bytes) @@ -523,6 +654,9 @@ static int trunk_binlog_merge_file(int old_fd) } close(tmp_fd); + g_trunk_binlog_compress_stage = stage; + storage_write_to_sync_ini_file(); + if (rename(tmp_filename, binlog_filename) != 0) { result = errno != 0 ? errno : EPERM; @@ -537,6 +671,269 @@ static int trunk_binlog_merge_file(int old_fd) return 0; } +static int trunk_compress_rollback_data_file() +{ + int result; + char data_filename[MAX_PATH_SIZE]; + char data_rollback_filename[MAX_PATH_SIZE]; + struct stat fs; + + storage_trunk_get_data_filename(data_filename); + get_trunk_data_rollback_filename(data_rollback_filename); + + if (stat(data_rollback_filename, &fs) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result == ENOENT) + { + return 0; + } + + logError("file: "__FILE__", line: %d, " + "stat file %s fail, errno: %d, error info: %s", + __LINE__, data_rollback_filename, + result, STRERROR(result)); + return result; + } + + if (unlink(data_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result != ENOENT) + { + logError("file: "__FILE__", line: %d, " + "unlink %s fail, errno: %d, error info: %s", + __LINE__, data_filename, result, STRERROR(result)); + return result; + } + } + + if (fs.st_size == 0) + { + unlink(data_rollback_filename); //delete zero file directly + return 0; + } + + if (rename(data_rollback_filename, data_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result == ENOENT) + { + return 0; + } + + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", + __LINE__, data_rollback_filename, + data_filename, result, STRERROR(result)); + return result; + } + + return 0; +} + +static int trunk_compress_rollback_binlog_file(const char *binlog_filename) +{ + int result; + int rollback_fd; + char binlog_rollback_filename[MAX_PATH_SIZE]; + struct stat fs; + + get_trunk_binlog_rollback_filename(binlog_rollback_filename); + if (stat(binlog_rollback_filename, &fs) != 0) + { + result = errno != 0 ? errno : ENOENT; + if (result == ENOENT) + { + return 0; + } + logError("file: "__FILE__", line: %d, " + "stat file %s fail, errno: %d, error info: %s", + __LINE__, binlog_rollback_filename, + result, STRERROR(result)); + return result; + } + + if (fs.st_size == 0) + { + unlink(binlog_rollback_filename); //delete zero file directly + return 0; + } + + if (access(binlog_filename, F_OK) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result == ENOENT) + { + if (rename(binlog_rollback_filename, binlog_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result != ENOENT) + { + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", + __LINE__, binlog_rollback_filename, + binlog_filename, errno, STRERROR(errno)); + return result; + } + } + + return 0; + } + else + { + logError("file: "__FILE__", line: %d, " + "access file %s fail, errno: %d, error info: %s", + __LINE__, binlog_filename, errno, STRERROR(errno)); + return result; + } + } + + if ((rollback_fd=trunk_binlog_open_read(binlog_rollback_filename, + false)) < 0) + { + result = errno != 0 ? errno : EPERM; + if (result == ENOENT) + { + return 0; + } + + return result; + } + + result = trunk_binlog_merge_file(rollback_fd, + STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGING); + close(rollback_fd); + + g_trunk_binlog_compress_stage = + STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGE_DONE; + storage_write_to_sync_ini_file(); + + if (unlink(binlog_rollback_filename) != 0) + { + logWarning("file: "__FILE__", line: %d, " + "unlink %s fail, errno: %d, error info: %s", + __LINE__, binlog_rollback_filename, + errno, STRERROR(errno)); + } + + return result; +} + +int trunk_binlog_compress_delete_temp_files_after_commit() +{ + int result; + char data_filename[MAX_PATH_SIZE]; + + storage_trunk_get_data_filename(data_filename); + if (unlink(data_filename) != 0) + { + result = errno != 0 ? errno : ENOENT; + logError("file: "__FILE__", line: %d, " + "unlink %s fail, errno: %d, error info: %s", + __LINE__, data_filename, + result, STRERROR(result)); + if (result != ENOENT) + { + return result; + } + } + + return trunk_binlog_compress_delete_rollback_files(true); +} + +int trunk_binlog_compress_apply() +{ + int result; + int open_res; + bool need_open_binlog; + char binlog_filename[MAX_PATH_SIZE]; + char data_filename[MAX_PATH_SIZE]; + char binlog_rollback_filename[MAX_PATH_SIZE]; + char data_rollback_filename[MAX_PATH_SIZE]; + + get_trunk_binlog_filename(binlog_filename); + if (get_trunk_binlog_rollback_filename(binlog_rollback_filename) == NULL) + { + logError("file: "__FILE__", line: %d, " + "filename: %s is too long", + __LINE__, binlog_filename); + return ENAMETOOLONG; + } + + storage_trunk_get_data_filename(data_filename); + if (get_trunk_data_rollback_filename(data_rollback_filename) == NULL) + { + logError("file: "__FILE__", line: %d, " + "data rollback filename is too long", __LINE__); + return ENAMETOOLONG; + } + + if (access(binlog_filename, F_OK) != 0) + { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "access file: %s is fail, " + "errno: %d, error info: %s", + __LINE__, binlog_filename, + result, STRERROR(result)); + return result; + } + + need_open_binlog = trunk_binlog_fd >= 0; + + pthread_mutex_lock(&trunk_sync_thread_lock); + if (need_open_binlog) + { + trunk_binlog_close_writer(false); + } + + do + { + result = trunk_binlog_rename_file(data_filename, + data_rollback_filename, ENOENT); + if (result != 0) + { + if (result == ENOENT) + { + result = writeToFile(data_rollback_filename, "", 0); + } + + if (result != 0) + { + break; + } + } + + if ((result=trunk_binlog_rename_file(binlog_filename, + binlog_rollback_filename, 0)) != 0) + { + trunk_compress_rollback_data_file(); + break; + } + } while (0); + + if (need_open_binlog) + { + if ((open_res=trunk_binlog_open_writer(binlog_filename)) != 0) + { + trunk_binlog_rename_file(binlog_rollback_filename, + binlog_filename, 0); //rollback + trunk_compress_rollback_data_file(); + + if (result == 0) + { + result = open_res; + } + } + } + + pthread_mutex_unlock(&trunk_sync_thread_lock); + return result; +} + int trunk_binlog_compress_commit() { int result; @@ -544,7 +941,6 @@ int trunk_binlog_compress_commit() bool need_open_binlog; char binlog_filename[MAX_PATH_SIZE]; char data_filename[MAX_PATH_SIZE]; - char rollback_filename[MAX_PATH_SIZE]; need_open_binlog = trunk_binlog_fd >= 0; get_trunk_binlog_filename(binlog_filename); @@ -563,35 +959,26 @@ int trunk_binlog_compress_commit() do { - result = trunk_binlog_merge_file(data_fd); + result = trunk_binlog_merge_file(data_fd, + STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGING); close(data_fd); if (result != 0) { break; } - if (unlink(data_filename) != 0) + + g_trunk_binlog_compress_stage = + STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGE_DONE; + storage_write_to_sync_ini_file(); + + if ((result=trunk_binlog_compress_delete_temp_files_after_commit()) != 0) { - result = errno != 0 ? errno : EPERM; - logError("file: "__FILE__", line: %d, " - "unlink %s fail, errno: %d, error info: %s", - __LINE__, data_filename, - result, STRERROR(result)); break; } - get_trunk_rollback_filename(rollback_filename); - if (access(rollback_filename, F_OK) == 0) - { - if (unlink(rollback_filename) != 0) - { - result = errno != 0 ? errno : EPERM; - logWarning("file: "__FILE__", line: %d, " - "unlink %s fail, errno: %d, error info: %s", - __LINE__, rollback_filename, - result, STRERROR(result)); - break; - } - } + g_trunk_binlog_compress_stage = + STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_SUCCESS; + storage_write_to_sync_ini_file(); if (need_open_binlog) { @@ -604,89 +991,34 @@ int trunk_binlog_compress_commit() return result; } -int trunk_binlog_compress_rollback() +static int do_compress_rollback() { int result; - int rollback_fd; + bool need_open_binlog; char binlog_filename[MAX_PATH_SIZE]; - char rollback_filename[MAX_PATH_SIZE]; - struct stat fs; + need_open_binlog = trunk_binlog_fd >= 0; get_trunk_binlog_filename(binlog_filename); - get_trunk_rollback_filename(rollback_filename); - if (trunk_binlog_fd < 0) - { - if (access(rollback_filename, F_OK) == 0) - { - if (rename(rollback_filename, binlog_filename) != 0) - { - result = errno != 0 ? errno : EPERM; - logError("file: "__FILE__", line: %d, "\ - "rename %s to %s fail, " \ - "errno: %d, error info: %s", - __LINE__, rollback_filename, - binlog_filename, result, - STRERROR(result)); - return result; - } - } - - return 0; - } - - if (stat(rollback_filename, &fs) != 0) - { - result = errno != 0 ? errno : ENOENT; - if (result == ENOENT) - { - return 0; - } - logError("file: "__FILE__", line: %d, " - "stat file %s fail, errno: %d, error info: %s", - __LINE__, rollback_filename, - result, STRERROR(result)); - return result; - } - - if (fs.st_size == 0) - { - unlink(rollback_filename); //delete zero file directly - return 0; - } pthread_mutex_lock(&trunk_sync_thread_lock); + if (need_open_binlog) + { + trunk_binlog_close_writer(false); + } + do { - if ((result=trunk_binlog_close_writer(false)) != 0) + if ((result=trunk_compress_rollback_binlog_file(binlog_filename)) != 0) { break; } - if ((rollback_fd=trunk_binlog_open_read(rollback_filename, - false)) < 0) + if ((result=trunk_compress_rollback_data_file()) != 0) { - result = errno != 0 ? errno : ENOENT; break; } - result = trunk_binlog_merge_file(rollback_fd); - close(rollback_fd); - if (result == 0) - { - if (unlink(rollback_filename) != 0) - { - result = errno != 0 ? errno : EPERM; - logWarning("file: "__FILE__", line: %d, " \ - "unlink %s fail, " \ - "errno: %d, error info: %s", - __LINE__, rollback_filename, - result, STRERROR(result)); - break; - } - - result = trunk_binlog_open_writer(binlog_filename); - } - else + if (need_open_binlog) { result = trunk_binlog_open_writer(binlog_filename); } @@ -696,6 +1028,20 @@ int trunk_binlog_compress_rollback() return result; } +int trunk_binlog_compress_rollback() +{ + int result; + + if ((result=do_compress_rollback()) == 0) + { + g_trunk_binlog_compress_stage = + STORAGE_TRUNK_COMPRESS_STAGE_FINISHED; + storage_write_to_sync_ini_file(); + } + + return result; +} + static int trunk_binlog_fsync_ex(const bool bNeedLock, \ const char *buff, int *length) { @@ -943,13 +1289,13 @@ static char *trunk_get_mark_filename_by_id_and_port(const char *storage_id, \ { snprintf(full_filename, filename_size, \ "%s/data/"TRUNK_DIR_NAME"/%s%s", g_fdfs_base_path, \ - storage_id, TRUNK_SYNC_MARK_FILE_EXT); + storage_id, TRUNK_SYNC_MARK_FILE_EXT_STR); } else { snprintf(full_filename, filename_size, \ "%s/data/"TRUNK_DIR_NAME"/%s_%d%s", g_fdfs_base_path, \ - storage_id, port, TRUNK_SYNC_MARK_FILE_EXT); + storage_id, port, TRUNK_SYNC_MARK_FILE_EXT_STR); } return full_filename; @@ -960,7 +1306,7 @@ static char *trunk_get_mark_filename_by_ip_and_port(const char *ip_addr, \ { snprintf(full_filename, filename_size, \ "%s/data/"TRUNK_DIR_NAME"/%s_%d%s", g_fdfs_base_path, \ - ip_addr, port, TRUNK_SYNC_MARK_FILE_EXT); + ip_addr, port, TRUNK_SYNC_MARK_FILE_EXT_STR); return full_filename; } @@ -990,7 +1336,6 @@ static char *trunk_get_mark_filename_by_id(const char *storage_id, int trunk_reader_init(const FDFSStorageBrief *pStorage, TrunkBinLogReader *pReader, const bool reset_binlog_offset) { - char full_filename[MAX_PATH_SIZE]; IniContext iniContext; int result; int64_t saved_binlog_offset; @@ -999,7 +1344,6 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, saved_binlog_offset = pReader->binlog_offset; memset(pReader, 0, sizeof(TrunkBinLogReader)); - pReader->mark_fd = -1; pReader->binlog_fd = -1; pReader->binlog_buff.buffer = (char *)malloc( \ @@ -1023,7 +1367,7 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, { strcpy(pReader->storage_id, pStorage->id); } - trunk_mark_filename_by_reader(pReader, full_filename); + trunk_mark_filename_by_reader(pReader, pReader->mark_filename); if (pStorage == NULL) { @@ -1032,22 +1376,23 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, } else { - bFileExist = fileExists(full_filename); + bFileExist = fileExists(pReader->mark_filename); if (!bFileExist && (g_use_storage_id && pStorage != NULL)) { char old_mark_filename[MAX_PATH_SIZE]; - trunk_get_mark_filename_by_ip_and_port( \ - pStorage->ip_addr, g_server_port, \ + trunk_get_mark_filename_by_ip_and_port( + pStorage->ip_addr, g_server_port, old_mark_filename, sizeof(old_mark_filename)); if (fileExists(old_mark_filename)) { - if (rename(old_mark_filename, full_filename)!=0) + if (rename(old_mark_filename, + pReader->mark_filename) != 0) { - logError("file: "__FILE__", line: %d, "\ - "rename file %s to %s fail" \ - ", errno: %d, error info: %s", \ - __LINE__, old_mark_filename, \ - full_filename, errno, \ + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", + __LINE__, old_mark_filename, + pReader->mark_filename, errno, STRERROR(errno)); return errno != 0 ? errno : EACCES; } @@ -1059,35 +1404,36 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, if (bFileExist) { memset(&iniContext, 0, sizeof(IniContext)); - if ((result=iniLoadFromFile(full_filename, &iniContext)) \ - != 0) + if ((result=iniLoadFromFile(pReader->mark_filename, + &iniContext)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "load from mark file \"%s\" fail, " \ - "error code: %d", \ - __LINE__, full_filename, result); + logError("file: "__FILE__", line: %d, " + "load from mark file \"%s\" fail, " + "error code: %d", __LINE__, + pReader->mark_filename, result); return result; } if (iniContext.global.count < 1) { iniFreeContext(&iniContext); - logError("file: "__FILE__", line: %d, " \ - "in mark file \"%s\", item count: %d < 7", \ - __LINE__, full_filename, iniContext.global.count); + logError("file: "__FILE__", line: %d, " + "in mark file \"%s\", item count: %d < 1", + __LINE__, pReader->mark_filename, + iniContext.global.count); return ENOENT; } - pReader->binlog_offset = iniGetInt64Value(NULL, \ - MARK_ITEM_BINLOG_FILE_OFFSET, \ + pReader->binlog_offset = iniGetInt64Value(NULL, + MARK_ITEM_BINLOG_FILE_OFFSET, &iniContext, -1); if (pReader->binlog_offset < 0) { iniFreeContext(&iniContext); - logError("file: "__FILE__", line: %d, " \ - "in mark file \"%s\", binlog_offset: "\ - "%"PRId64" < 0", \ - __LINE__, full_filename, \ + logError("file: "__FILE__", line: %d, " + "in mark file \"%s\", binlog_offset: " + "%"PRId64" < 0", __LINE__, + pReader->mark_filename, pReader->binlog_offset); return EINVAL; } @@ -1096,18 +1442,6 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, } pReader->last_binlog_offset = pReader->binlog_offset; - - pReader->mark_fd = open(full_filename, O_WRONLY | O_CREAT, 0644); - if (pReader->mark_fd < 0) - { - logError("file: "__FILE__", line: %d, " \ - "open mark file \"%s\" fail, " \ - "error no: %d, error info: %s", \ - __LINE__, full_filename, \ - errno, STRERROR(errno)); - return errno != 0 ? errno : ENOENT; - } - if (!bFileExist && pStorage != NULL) { if ((result=trunk_write_to_mark_file(pReader)) != 0) @@ -1139,12 +1473,6 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, void trunk_reader_destroy(TrunkBinLogReader *pReader) { - if (pReader->mark_fd >= 0) - { - close(pReader->mark_fd); - pReader->mark_fd = -1; - } - if (pReader->binlog_fd >= 0) { close(pReader->binlog_fd); @@ -1170,11 +1498,11 @@ static int trunk_write_to_mark_file(TrunkBinLogReader *pReader) "%s=%"PRId64"\n", MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset); - if ((result=storage_write_to_fd(pReader->mark_fd, - trunk_mark_filename_by_reader, pReader, buff, len)) == 0) - { + if ((result=safeWriteToFile(pReader->mark_filename, buff, len)) == 0) + { + STORAGE_CHOWN(pReader->mark_filename, geteuid(), getegid()) pReader->last_binlog_offset = pReader->binlog_offset; - } + } return result; } @@ -1518,7 +1846,6 @@ static void *trunk_sync_thread_entrance(void* arg) memset(local_ip_addr, 0, sizeof(local_ip_addr)); memset(&reader, 0, sizeof(reader)); - reader.mark_fd = -1; reader.binlog_fd = -1; current_time = g_current_time; @@ -1624,13 +1951,13 @@ static void *trunk_sync_thread_entrance(void* arg) read_result = trunk_binlog_preread(&reader); if (read_result == ENOENT) { - if (reader.last_binlog_offset != \ + if (reader.last_binlog_offset != reader.binlog_offset) { if (trunk_write_to_mark_file(&reader)!=0) { - logCrit("file: "__FILE__", line: %d, " \ - "trunk_write_to_mark_file fail, " \ + logCrit("file: "__FILE__", line: %d, " + "trunk_write_to_mark_file fail, " "program exit!", __LINE__); g_continue_flag = false; break; @@ -1905,7 +2232,7 @@ void trunk_waiting_sync_thread_exit() } count = 0; - while (g_trunk_sync_thread_count > 0 && count < 20) + while (g_trunk_sync_thread_count > 0 && count < 60) { usleep(50000); count++; @@ -1927,32 +2254,98 @@ void trunk_waiting_sync_thread_exit() } } -int trunk_unlink_all_mark_files() +int trunk_unlink_all_mark_files(const bool force_delete) { - FDFSStorageServer *pStorageServer; - FDFSStorageServer *pServerEnd; + char file_path[MAX_PATH_SIZE]; + char old_filename[MAX_PATH_SIZE]; + char new_filename[MAX_PATH_SIZE]; + DIR *dir; + struct dirent *ent; int result; + int name_len; + time_t t; + struct tm tm; - pServerEnd = g_storage_servers + g_storage_count; - for (pStorageServer=g_storage_servers; pStorageServerserver))) - { - continue; - } + t = g_current_time; + localtime_r(&t, &tm); - if ((result=trunk_unlink_mark_file( - pStorageServer->server.id)) != 0) - { - if (result != ENOENT) - { - return result; - } - } - } + snprintf(file_path, sizeof(file_path), + "%s/data/%s", g_fdfs_base_path, TRUNK_DIR_NAME); - return 0; + if ((dir=opendir(file_path)) == NULL) + { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "call opendir %s fail, errno: %d, error info: %s", + __LINE__, file_path, result, STRERROR(result)); + return result; + } + + result = 0; + while ((ent=readdir(dir)) != NULL) + { + name_len = strlen(ent->d_name); + if (name_len <= TRUNK_SYNC_MARK_FILE_EXT_LEN) + { + continue; + } + if (memcmp(ent->d_name + (name_len - + TRUNK_SYNC_MARK_FILE_EXT_LEN), + TRUNK_SYNC_MARK_FILE_EXT_STR, + TRUNK_SYNC_MARK_FILE_EXT_LEN) != 0) + { + continue; + } + + snprintf(old_filename, sizeof(old_filename), "%s/%s", + file_path, ent->d_name); + if (force_delete) + { + if (unlink(old_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result == ENOENT) + { + result = 0; + } + else + { + logError("file: "__FILE__", line: %d, " + "unlink %s fail, errno: %d, error info: %s", + __LINE__, old_filename, + result, STRERROR(result)); + break; + } + } + } + else + { + snprintf(new_filename, sizeof(new_filename), + "%s.%04d%02d%02d%02d%02d%02d", old_filename, + tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec); + if (rename(old_filename, new_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + if (result == ENOENT) + { + result = 0; + } + else + { + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", + __LINE__, old_filename, new_filename, + result, STRERROR(result)); + break; + } + } + } + } + + closedir(dir); + return result; } int trunk_binlog_get_write_version() diff --git a/storage/trunk_mgr/trunk_sync.h b/storage/trunk_mgr/trunk_sync.h index 1bb7309..c853e28 100644 --- a/storage/trunk_mgr/trunk_sync.h +++ b/storage/trunk_mgr/trunk_sync.h @@ -28,8 +28,8 @@ extern "C" { typedef struct { char storage_id[FDFS_STORAGE_ID_MAX_SIZE]; + char mark_filename[MAX_PATH_SIZE]; BinLogBuffer binlog_buff; - int mark_fd; int binlog_fd; int64_t binlog_offset; int64_t last_binlog_offset; //for write to mark file @@ -65,7 +65,7 @@ void trunk_waiting_sync_thread_exit(); char *get_trunk_binlog_filename(char *full_filename); char *trunk_mark_filename_by_reader(const void *pArg, char *full_filename); -int trunk_unlink_all_mark_files(); +int trunk_unlink_all_mark_files(const bool force_delete); int trunk_unlink_mark_file(const char *storage_id); int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \ const char *new_ip_addr, const int new_port); @@ -78,6 +78,9 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage, void trunk_reader_destroy(TrunkBinLogReader *pReader); //trunk binlog compress +int trunk_binlog_compress_delete_binlog_rollback_file(const bool silence); +int trunk_binlog_compress_delete_rollback_files(const bool silence); +int trunk_binlog_compress_delete_temp_files_after_commit(); int trunk_binlog_compress_apply(); int trunk_binlog_compress_commit(); int trunk_binlog_compress_rollback(); @@ -85,6 +88,14 @@ int trunk_binlog_compress_rollback(); int trunk_sync_notify_thread_reset_offset(); int trunk_binlog_get_write_version(); +char *get_trunk_binlog_tmp_filename_ex(const char *binlog_filename, + char *tmp_filename); + +static inline char *get_trunk_binlog_tmp_filename(char *tmp_filename) +{ + return get_trunk_binlog_tmp_filename_ex(NULL, tmp_filename); +} + #ifdef __cplusplus } #endif