/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS may be copied only under the terms of the GNU General * Public License V3, which may be found in the FastDFS source kit. * Please visit the FastDFS Home Page http://www.fastken.com/ for more detail. **/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "fdfs_define.h" #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/avl_tree.h" #include "fastcommon/shared_func.h" #include "fdfs_global.h" #include "tracker_types.h" #include "tracker_proto.h" #include "storage_global.h" #include "trunk_mgr/trunk_shared.h" #include "storage_func.h" #include "storage_sync.h" #include "tracker_client.h" #include "storage_disk_recovery.h" #include "storage_client.h" typedef struct { char line[128]; FDFSTrunkPathInfo path; //trunk file path int id; //trunk file id } FDFSTrunkFileIdInfo; typedef struct recovery_thread_data { int thread_index; //-1 for global int result; volatile int alive; bool done; const char *base_path; pthread_t tid; } RecoveryThreadData; #define RECOVERY_BINLOG_FILENAME ".binlog.recovery" #define RECOVERY_FLAG_FILENAME ".recovery.flag" #define RECOVERY_MARK_FILENAME ".recovery.mark" #define FLAG_ITEM_RECOVERY_THREADS "recovery_threads" #define FLAG_ITEM_SAVED_STORAGE_STATUS "saved_storage_status" #define FLAG_ITEM_FETCH_BINLOG_DONE "fetch_binlog_done" #define MARK_ITEM_BINLOG_OFFSET "binlog_offset" static int last_recovery_threads = -1; //for rebalance binlog data static volatile int current_recovery_thread_count = 0; static int saved_storage_status = FDFS_STORAGE_STATUS_NONE; static char *recovery_get_binlog_filename(const void *pArg, char *full_filename); static int disk_recovery_write_to_binlog(FILE *fp, const char *binlog_filename, StorageBinLogRecord *pRecord); static char *recovery_get_full_filename_ex(const char *pBasePath, const int thread_index, const char *filename, char *full_filename) { static char buff[MAX_PATH_SIZE]; int len; if (full_filename == NULL) { full_filename = buff; } len = snprintf(full_filename, MAX_PATH_SIZE, "%s/data/%s", pBasePath, filename); if (thread_index >= 0) { snprintf(full_filename + len, MAX_PATH_SIZE - len, ".%d", thread_index); } return full_filename; } static inline char *recovery_get_full_filename(const RecoveryThreadData *pThreadData, const char *filename, char *full_filename) { return recovery_get_full_filename_ex(pThreadData->base_path, pThreadData->thread_index, filename, full_filename); } static inline char *recovery_get_global_full_filename(const char *pBasePath, const char *filename, char *full_filename) { return recovery_get_full_filename_ex(pBasePath, -1, filename, full_filename); } static inline char *recovery_get_global_binlog_filename(const char *pBasePath, char *full_filename) { return recovery_get_global_full_filename(pBasePath, RECOVERY_BINLOG_FILENAME, full_filename); } static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \ const int store_path_index) { char out_buff[sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN + 1]; char full_binlog_filename[MAX_PATH_SIZE]; TrackerHeader *pHeader; char *pBasePath; int64_t in_bytes; int64_t file_bytes; int result; int network_timeout; pBasePath = g_fdfs_store_paths.paths[store_path_index].path; recovery_get_full_filename_ex(pBasePath, 0, RECOVERY_BINLOG_FILENAME, full_binlog_filename); memset(out_buff, 0, sizeof(out_buff)); pHeader = (TrackerHeader *)out_buff; long2buff(FDFS_GROUP_NAME_MAX_LEN + 1, pHeader->pkg_len); pHeader->cmd = STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG; strcpy(out_buff + sizeof(TrackerHeader), g_group_name); *(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN) = store_path_index; if((result=tcpsenddata_nb(pSrcStorage->sock, out_buff, sizeof(out_buff), SF_G_NETWORK_TIMEOUT)) != 0) { logError("file: "__FILE__", line: %d, " "storage server %s:%u, send data fail, " "errno: %d, error info: %s.", __LINE__, pSrcStorage->ip_addr, pSrcStorage->port, result, STRERROR(result)); return result; } if (SF_G_NETWORK_TIMEOUT >= 600) { network_timeout = SF_G_NETWORK_TIMEOUT; } else { network_timeout = 600; } if ((result=fdfs_recv_header_ex(pSrcStorage, network_timeout, &in_bytes)) != 0) { logError("file: "__FILE__", line: %d, " "fdfs_recv_header fail, result: %d", __LINE__, result); return result; } if ((result=tcprecvfile(pSrcStorage->sock, full_binlog_filename, in_bytes, 0, network_timeout, &file_bytes)) != 0) { logError("file: "__FILE__", line: %d, " "storage server %s:%u, tcprecvfile fail, " "errno: %d, error info: %s.", __LINE__, pSrcStorage->ip_addr, pSrcStorage->port, result, STRERROR(result)); return result; } logInfo("file: "__FILE__", line: %d, " "recovery binlog from %s:%u, file size: %"PRId64, __LINE__, pSrcStorage->ip_addr, pSrcStorage->port, file_bytes); return 0; } static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) { int result; int storage_count; int i; static unsigned int current_index = 0; TrackerServerInfo trackerServer; ConnectionInfo *pTrackerConn; FDFSGroupStat groupStat; FDFSStorageInfo storageStats[FDFS_MAX_SERVERS_EACH_GROUP]; FDFSStorageInfo *pStorageStat; bool found; memset(pSrcStorage, 0, sizeof(ConnectionInfo)); pSrcStorage->sock = -1; logDebug("file: "__FILE__", line: %d, " \ "disk recovery: get source storage server", \ __LINE__); while (SF_G_CONTINUE_FLAG) { result = tracker_get_storage_max_status(&g_tracker_group, g_group_name, g_tracker_client_ip.ips[0].address, g_my_server_id_str, &saved_storage_status); if (result == ENOENT) { logWarning("file: "__FILE__", line: %d, " \ "current storage: %s does not exist " \ "in tracker server", __LINE__, \ g_tracker_client_ip.ips[0].address); return ENOENT; } if (result == 0) { if (saved_storage_status == FDFS_STORAGE_STATUS_INIT) { logInfo("file: "__FILE__", line: %d, " \ "current storage: %s 's status is %d" \ ", does not need recovery", __LINE__, \ g_tracker_client_ip.ips[0].address, \ saved_storage_status); return ENOENT; } if (saved_storage_status == FDFS_STORAGE_STATUS_IP_CHANGED || \ saved_storage_status == FDFS_STORAGE_STATUS_DELETED) { logWarning("file: "__FILE__", line: %d, " \ "current storage: %s 's status is %d" \ ", does not need recovery", __LINE__, \ g_tracker_client_ip.ips[0].address, saved_storage_status); return ENOENT; } break; } sleep(1); } found = false; while (SF_G_CONTINUE_FLAG) { if ((pTrackerConn=tracker_get_connection_r(&trackerServer, \ &result)) == NULL) { sleep(5); continue; } result = tracker_list_one_group(pTrackerConn, \ g_group_name, &groupStat); if (result != 0) { tracker_close_connection_ex(pTrackerConn, true); sleep(1); continue; } if (groupStat.count <= 0) { logWarning("file: "__FILE__", line: %d, " \ "storage server count: %d in the group <= 0!",\ __LINE__, groupStat.count); tracker_close_connection(pTrackerConn); sleep(1); continue; } if (groupStat.count == 1) { logInfo("file: "__FILE__", line: %d, " \ "storage server count in the group = 1, " \ "does not need recovery", __LINE__); tracker_close_connection(pTrackerConn); return ENOENT; } if (g_fdfs_store_paths.count > groupStat.store_path_count) { logInfo("file: "__FILE__", line: %d, " \ "storage store path count: %d > " \ "which of the group: %d, " \ "does not need recovery", __LINE__, \ g_fdfs_store_paths.count, groupStat.store_path_count); tracker_close_connection(pTrackerConn); return ENOENT; } if (groupStat.active_count <= 0) { tracker_close_connection(pTrackerConn); sleep(5); continue; } result = tracker_list_servers(pTrackerConn, g_group_name, NULL, storageStats, FDFS_MAX_SERVERS_EACH_GROUP, &storage_count); tracker_close_connection_ex(pTrackerConn, result != 0); if (result != 0) { sleep(5); continue; } if (storage_count <= 1) { logWarning("file: "__FILE__", line: %d, " \ "storage server count: %d in the group <= 1!",\ __LINE__, storage_count); sleep(5); continue; } for (i=0; iid, g_my_server_id_str) == 0) { continue; } if (pStorageStat->status == FDFS_STORAGE_STATUS_ACTIVE) { found = true; strcpy(pSrcStorage->ip_addr, pStorageStat->ip_addr); pSrcStorage->port = pStorageStat->storage_port; break; } } if (found) //found src storage server { break; } sleep(5); } if (!SF_G_CONTINUE_FLAG) { return EINTR; } logDebug("file: "__FILE__", line: %d, " "disk recovery: get source storage server %s:%u", __LINE__, pSrcStorage->ip_addr, pSrcStorage->port); return 0; } static char *recovery_get_binlog_filename(const void *pArg, char *full_filename) { return recovery_get_full_filename((const RecoveryThreadData *)pArg, RECOVERY_BINLOG_FILENAME, full_filename); } static char *recovery_get_flag_filename(const char *pBasePath, char *full_filename) { return recovery_get_global_full_filename(pBasePath, RECOVERY_FLAG_FILENAME, full_filename); } static char *recovery_get_mark_filename(const RecoveryThreadData *pThreadData, char *full_filename) { return recovery_get_full_filename(pThreadData, RECOVERY_MARK_FILENAME, full_filename); } static int storage_disk_recovery_delete_thread_files(const char *pBasePath, const int index_start, const int index_end) { int i; char mark_filename[MAX_PATH_SIZE]; char binlog_filename[MAX_PATH_SIZE]; for (i=index_start; imark_filename, pReader->binlog_offset); } static int recovery_init_global_binlog_file(const char *pBasePath) { char full_binlog_filename[MAX_PATH_SIZE]; char buff[1]; *buff = '\0'; recovery_get_full_filename_ex(pBasePath, 0, RECOVERY_BINLOG_FILENAME, full_binlog_filename); return writeToFile(full_binlog_filename, buff, 0); } static int recovery_init_flag_file(const char *pBasePath, const bool fetch_binlog_done, const int recovery_threads) { char full_filename[MAX_PATH_SIZE]; recovery_get_flag_filename(pBasePath, full_filename); return do_write_to_flag_file(full_filename, fetch_binlog_done, recovery_threads); } static int recovery_load_params_from_flag_file(const char *full_flag_filename) { IniContext iniContext; int result; memset(&iniContext, 0, sizeof(IniContext)); if ((result=iniLoadFromFile(full_flag_filename, &iniContext)) != 0) { logError("file: "__FILE__", line: %d, " "load from flag file \"%s\" fail, " "error code: %d", __LINE__, full_flag_filename, result); return result; } if (!iniGetBoolValue(NULL, FLAG_ITEM_FETCH_BINLOG_DONE, &iniContext, false)) { iniFreeContext(&iniContext); logInfo("file: "__FILE__", line: %d, " "flag file \"%s\", %s=0, " "need to fetch binlog again", __LINE__, full_flag_filename, FLAG_ITEM_FETCH_BINLOG_DONE); return EAGAIN; } saved_storage_status = iniGetIntValue(NULL, FLAG_ITEM_SAVED_STORAGE_STATUS, &iniContext, -1); if (saved_storage_status < 0) { iniFreeContext(&iniContext); logError("file: "__FILE__", line: %d, " "in flag file \"%s\", %s: %d < 0", __LINE__, full_flag_filename, FLAG_ITEM_SAVED_STORAGE_STATUS, saved_storage_status); return EINVAL; } last_recovery_threads = iniGetIntValue(NULL, FLAG_ITEM_RECOVERY_THREADS, &iniContext, -1); iniFreeContext(&iniContext); return 0; } static int recovery_reader_init(const RecoveryThreadData *pThreadData, StorageBinLogReader *pReader) { IniContext iniContext; int result; memset(pReader, 0, sizeof(StorageBinLogReader)); pReader->binlog_fd = -1; pReader->binlog_index = g_binlog_index + 1; pReader->binlog_buff.buffer = (char *)malloc( STORAGE_BINLOG_BUFFER_SIZE); if (pReader->binlog_buff.buffer == NULL) { logError("file: "__FILE__", line: %d, " "malloc %d bytes fail, " "errno: %d, error info: %s", __LINE__, STORAGE_BINLOG_BUFFER_SIZE, errno, STRERROR(errno)); return errno != 0 ? errno : ENOMEM; } pReader->binlog_buff.current = pReader->binlog_buff.buffer; recovery_get_mark_filename(pThreadData, pReader->mark_filename); memset(&iniContext, 0, sizeof(IniContext)); if ((result=iniLoadFromFile(pReader->mark_filename, &iniContext)) != 0) { logError("file: "__FILE__", line: %d, " "load from mark file \"%s\" fail, " "error code: %d", __LINE__, pReader->mark_filename, result); return result; } pReader->binlog_offset = iniGetInt64Value(NULL, MARK_ITEM_BINLOG_OFFSET, &iniContext, -1); if (pReader->binlog_offset < 0) { iniFreeContext(&iniContext); logError("file: "__FILE__", line: %d, " \ "in mark file \"%s\", %s: "\ "%"PRId64" < 0", __LINE__, \ pReader->mark_filename, MARK_ITEM_BINLOG_OFFSET, \ pReader->binlog_offset); return EINVAL; } iniFreeContext(&iniContext); if ((result=storage_open_readable_binlog(pReader, recovery_get_binlog_filename, pThreadData)) != 0) { return result; } return 0; } static int recovery_reader_check_init(const RecoveryThreadData *pThreadData, StorageBinLogReader *pReader) { if (pReader->binlog_fd >= 0 && pReader->binlog_buff.buffer != NULL) { return 0; } return recovery_reader_init(pThreadData, pReader); } static int recovery_download_file_to_local(StorageBinLogRecord *pRecord, ConnectionInfo *pTrackerServer, ConnectionInfo *pStorageConn) { int result; bool bTrunkFile; char local_filename[MAX_PATH_SIZE]; char tmp_filename[MAX_PATH_SIZE + 32]; char *download_filename; int64_t file_size; if (fdfs_is_trunk_file(pRecord->filename, pRecord->filename_len)) { FDFSTrunkFullInfo trunk_info; char *pTrunkPathEnd; char *pLocalFilename; bTrunkFile = true; if (fdfs_decode_trunk_info(pRecord->store_path_index, pRecord->true_filename, pRecord->true_filename_len, &trunk_info) != 0) { return -EINVAL; } trunk_get_full_filename(&trunk_info, local_filename, sizeof(local_filename)); pTrunkPathEnd = strrchr(pRecord->filename, '/'); pLocalFilename = strrchr(local_filename, '/'); if (pTrunkPathEnd == NULL || pLocalFilename == NULL) { return -EINVAL; } sprintf(pTrunkPathEnd + 1, "%s", pLocalFilename + 1); } else { bTrunkFile = false; sprintf(local_filename, "%s/data/%s", g_fdfs_store_paths.paths[pRecord->store_path_index].path, pRecord->true_filename); } if (access(local_filename, F_OK) == 0) { sprintf(tmp_filename, "%s.recovery.tmp", local_filename); download_filename = tmp_filename; } else { download_filename = local_filename; } result = storage_download_file_to_file(pTrackerServer, pStorageConn, g_group_name, pRecord->filename, download_filename, &file_size); if (result == 0) { if (download_filename != local_filename) { if (rename(download_filename, local_filename) != 0) { logError("file: "__FILE__", line: %d, " "rename file %s to %s fail, " "errno: %d, error info: %s", __LINE__, download_filename, local_filename, errno, STRERROR(errno)); return errno != 0 ? errno : EPERM; } } if (!bTrunkFile) { set_file_utimes(local_filename, pRecord->timestamp); } } return result; } static int storage_do_recovery(RecoveryThreadData *pThreadData, StorageBinLogReader *pReader, ConnectionInfo *pSrcStorage) { TrackerServerInfo trackerServer; ConnectionInfo *pTrackerServer; ConnectionInfo *pStorageConn; StorageBinLogRecord record; int record_length; int result; int log_level; int count; int store_path_index; int64_t total_count; int64_t success_count; int64_t noent_count; bool bContinueFlag; char local_filename[MAX_PATH_SIZE]; char src_filename[MAX_PATH_SIZE]; pTrackerServer = tracker_get_connection_r(&trackerServer, &result); if (pTrackerServer == NULL) { logError("file: "__FILE__", line: %d, " "get tracker connection fail, result: %d", __LINE__, result); return result; } count = 0; total_count = 0; success_count = 0; noent_count = 0; result = 0; logInfo("file: "__FILE__", line: %d, " "disk recovery thread #%d, src storage server %s:%u, " "recovering files of data path: %s ...", __LINE__, pThreadData->thread_index, pSrcStorage->ip_addr, pSrcStorage->port, pThreadData->base_path); bContinueFlag = true; while (bContinueFlag) { if ((result=recovery_reader_check_init(pThreadData, pReader)) != 0) { break; } if ((pStorageConn=tracker_make_connection(pSrcStorage, &result)) == NULL) { sleep(5); continue; } while (SF_G_CONTINUE_FLAG) { result = storage_binlog_read(pReader, &record, &record_length); if (result != 0) { if (result == ENOENT) { pThreadData->done = true; result = 0; } bContinueFlag = false; break; } total_count++; if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) { result = recovery_download_file_to_local(&record, pTrackerServer, pStorageConn); if (result == 0) { success_count++; } else if (result == -EINVAL) { result = 0; } else if (result == ENOENT) { result = 0; noent_count++; } else { break; } } else if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK) { if (record.src_filename_len == 0) { logError("file: "__FILE__", line: %d, " \ "invalid binlog line, filename: %s, " \ "expect src filename", __LINE__, \ record.filename); result = EINVAL; bContinueFlag = false; break; } if ((result=storage_split_filename_ex(record.filename, \ &record.filename_len, record.true_filename, \ &store_path_index)) != 0) { bContinueFlag = false; break; } sprintf(local_filename, "%s/data/%s", \ g_fdfs_store_paths.paths[store_path_index].path, \ record.true_filename); if ((result=storage_split_filename_ex( \ record.src_filename, &record.src_filename_len,\ record.true_filename, &store_path_index)) != 0) { bContinueFlag = false; break; } sprintf(src_filename, "%s/data/%s", \ g_fdfs_store_paths.paths[store_path_index].path, \ record.true_filename); if (symlink(src_filename, local_filename) == 0) { success_count++; } else { result = errno != 0 ? errno : ENOENT; if (result == ENOENT || result == EEXIST) { log_level = LOG_DEBUG; } else { log_level = LOG_ERR; } log_it_ex(&g_log_context, log_level, \ "file: "__FILE__", line: %d, " \ "link file %s to %s fail, " \ "errno: %d, error info: %s", __LINE__,\ src_filename, local_filename, \ result, STRERROR(result)); if (result != ENOENT && result != EEXIST) { bContinueFlag = false; break; } else { result = 0; } } } else { logError("file: "__FILE__", line: %d, " \ "invalid file op type: %d", \ __LINE__, record.op_type); result = EINVAL; bContinueFlag = false; break; } pReader->binlog_offset += record_length; count++; if (count == 1000) { logDebug("file: "__FILE__", line: %d, " "disk recovery thread #%d recover path: %s, " "file count: %"PRId64", success count: %"PRId64 ", noent_count: %"PRId64, __LINE__, pThreadData->thread_index, pThreadData->base_path, total_count, success_count, noent_count); recovery_write_to_mark_file(pReader); count = 0; } } tracker_close_connection_ex(pStorageConn, result != 0); recovery_write_to_mark_file(pReader); if (!SF_G_CONTINUE_FLAG) { bContinueFlag = false; } else if (bContinueFlag) { storage_reader_destroy(pReader); } if (count > 0) { logInfo("file: "__FILE__", line: %d, " "disk recovery thread #%d, recover path: %s, " "file count: %"PRId64", success count: " "%"PRId64", noent_count: %"PRId64, __LINE__, pThreadData->thread_index, pThreadData->base_path, total_count, success_count, noent_count); count = 0; } if (bContinueFlag) { sleep(5); } } tracker_close_connection_ex(pTrackerServer, true); if (pThreadData->done) { logInfo("file: "__FILE__", line: %d, " "disk recovery thread #%d, src storage server %s:%u, " "recover files of data path: %s done", __LINE__, pThreadData->thread_index, pSrcStorage->ip_addr, pSrcStorage->port, pThreadData->base_path); } return SF_G_CONTINUE_FLAG ? result :EINTR; } static void *storage_disk_recovery_restore_entrance(void *arg) { StorageBinLogReader reader; RecoveryThreadData *pThreadData; ConnectionInfo srcStorage; pThreadData = (RecoveryThreadData *)arg; pThreadData->tid = pthread_self(); __sync_add_and_fetch(&pThreadData->alive, 1); __sync_add_and_fetch(¤t_recovery_thread_count, 1); do { if ((pThreadData->result=recovery_get_src_storage_server(&srcStorage)) != 0) { if (pThreadData->result == ENOENT) { logWarning("file: "__FILE__", line: %d, " "no source storage server, " "disk recovery finished!", __LINE__); pThreadData->result = 0; } break; } if ((pThreadData->result=recovery_reader_init(pThreadData, &reader)) != 0) { storage_reader_destroy(&reader); break; } pThreadData->result = storage_do_recovery(pThreadData, &reader, &srcStorage); recovery_write_to_mark_file(&reader); storage_reader_destroy(&reader); } while (0); __sync_sub_and_fetch(¤t_recovery_thread_count, 1); __sync_sub_and_fetch(&pThreadData->alive, 1); sleep(1); return NULL; } static int storage_disk_recovery_old_version_migrate(const char *pBasePath) { char old_binlog_filename[MAX_PATH_SIZE]; char old_mark_filename[MAX_PATH_SIZE]; char new_binlog_filename[MAX_PATH_SIZE]; char new_mark_filename[MAX_PATH_SIZE]; int result; recovery_get_global_binlog_filename(pBasePath, old_binlog_filename); recovery_get_global_full_filename(pBasePath, RECOVERY_MARK_FILENAME, old_mark_filename); if (!(fileExists(old_mark_filename) && fileExists(old_binlog_filename))) { return ENOENT; } logInfo("file: "__FILE__", line: %d, " "try to migrate data from old version ...", __LINE__); result = recovery_load_params_from_flag_file(old_mark_filename); if (result != 0) { if (result == EAGAIN) { unlink(old_mark_filename); } return result; } if ((result=recovery_init_flag_file(pBasePath, true, 1)) != 0) { return result; } recovery_get_full_filename_ex(pBasePath, 0, RECOVERY_MARK_FILENAME, new_mark_filename); if (rename(old_mark_filename, new_mark_filename) != 0) { logError("file: "__FILE__", line: %d, " "rename file %s to %s fail, " "errno: %d, error info: %s", __LINE__, old_mark_filename, new_mark_filename, errno, STRERROR(errno)); return errno != 0 ? errno : EPERM; } recovery_get_full_filename_ex(pBasePath, 0, RECOVERY_BINLOG_FILENAME, new_binlog_filename); if (rename(old_binlog_filename, new_binlog_filename) != 0) { logError("file: "__FILE__", line: %d, " "rename file %s to %s fail, " "errno: %d, error info: %s", __LINE__, old_binlog_filename, new_binlog_filename, errno, STRERROR(errno)); return errno != 0 ? errno : EPERM; } logInfo("file: "__FILE__", line: %d, " "migrate data from old version successfully.", __LINE__); return 0; } static int do_dispatch_binlog_for_threads(const char *pBasePath) { typedef struct { FILE *fp; int64_t count; char binlog_filename[MAX_PATH_SIZE]; char temp_filename[MAX_PATH_SIZE]; } RecoveryDispatchInfo; char mark_filename[MAX_PATH_SIZE]; string_t log_buff; char buff[2 * 1024]; struct stat file_stat; RecoveryThreadData thread_data; StorageBinLogReader reader; StorageBinLogRecord record; int record_length; RecoveryDispatchInfo *dispatchs; RecoveryDispatchInfo *disp; int64_t total_count; int hash_code; int bytes; int result; int i; bytes = sizeof(RecoveryDispatchInfo) * g_disk_recovery_threads; dispatchs = (RecoveryDispatchInfo *)malloc(bytes); if (dispatchs == NULL) { logError("file: "__FILE__", line: %d, " "malloc %d bytes fail", __LINE__, bytes); return ENOMEM; } memset(dispatchs, 0, bytes); result = 0; for (i=0; i 0) { hash_code = Time33Hash(record.src_filename, record.src_filename_len); } else { hash_code = Time33Hash(record.filename, record.filename_len); } disp = dispatchs + ((unsigned int)hash_code) % g_disk_recovery_threads; if ((result=disk_recovery_write_to_binlog(disp->fp, disp->temp_filename, &record)) != 0) { break; } disp->count++; } storage_reader_destroy(&reader); if (result != 0) { break; } } total_count = 0; *buff = '\0'; log_buff.str = buff; log_buff.len = 0; for (i=0; i record count: %"PRId64"%s", __LINE__, total_count, log_buff.str); } return result; } static int storage_disk_recovery_dispatch_binlog_for_threads( const char *pBasePath) { int result; int i; char binlog_filename[MAX_PATH_SIZE]; if (last_recovery_threads <= 0) { logError("file: "__FILE__", line: %d, " "invalid last recovery threads: %d, " "retry restore data for %s again ...", __LINE__, last_recovery_threads, pBasePath); return EAGAIN; } for (i=0; i 0); if (__sync_fetch_and_add(¤t_recovery_thread_count, 0) > 0) { #define MAX_WAIT_COUNT 30 sig = SIGINT; for (i=0; i= MAX_WAIT_COUNT / 2) { sig = SIGTERM; } for (k=0; k 0) { pthread_kill(thread_data[k].tid, sig); } } logInfo("file: "__FILE__", line: %d, " "waiting for recovery threads exit, " "waiting count: %d, current thread count: %d", __LINE__, i+1, thread_count); sleep(1); } } sleep(1); //wait for thread exit free(thread_data); free(args); free(recovery_tids); if (!SF_G_CONTINUE_FLAG) { return EINTR; } while (SF_G_CONTINUE_FLAG) { if (storage_report_storage_status(g_my_server_id_str, g_tracker_client_ip.ips[0].address, saved_storage_status) == 0) { break; } sleep(5); } if (!SF_G_CONTINUE_FLAG) { return EINTR; } for (i=0; ipath), \ &(((FDFSTrunkFileIdInfo *)p2)->path), \ sizeof(FDFSTrunkPathInfo)); if (result != 0) { return result; } return ((FDFSTrunkFileIdInfo *)p1)->id - ((FDFSTrunkFileIdInfo *)p2)->id; } static int tree_write_file_walk_callback(void *data, void *args) { int result; if (fprintf((FILE *)args, "%s\n", ((FDFSTrunkFileIdInfo *)data)->line) > 0) { return 0; } else { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " "write to binlog file fail, " "errno: %d, error info: %s.", __LINE__, result, STRERROR(result)); return EIO; } } static int disk_recovery_write_to_binlog(FILE *fp, const char *binlog_filename, StorageBinLogRecord *pRecord) { int result; if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE || pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) { if (fprintf(fp, "%d %c %s\n", (int)pRecord->timestamp, pRecord->op_type, pRecord->filename) < 0) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " "write to file: %s fail, " "errno: %d, error info: %s.", __LINE__, binlog_filename, result, STRERROR(result)); return result; } } else { if (fprintf(fp, "%d %c %s %s\n", (int)pRecord->timestamp, pRecord->op_type, pRecord->filename, pRecord->src_filename) < 0) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " "write to file: %s fail, " "errno: %d, error info: %s.", __LINE__, binlog_filename, result, STRERROR(result)); return result; } } return 0; } static int storage_do_split_trunk_binlog(const int store_path_index, StorageBinLogReader *pReader) { FILE *fp; char *pBasePath; FDFSTrunkFileIdInfo *pFound; char binlogFullFilename[MAX_PATH_SIZE]; char tmpFullFilename[MAX_PATH_SIZE]; FDFSTrunkFullInfo trunk_info; FDFSTrunkFileIdInfo trunkFileId; StorageBinLogRecord record; AVLTreeInfo tree_unique_trunks; int record_length; int result; pBasePath = g_fdfs_store_paths.paths[store_path_index].path; recovery_get_full_filename_ex(pBasePath, -1, RECOVERY_BINLOG_FILENAME".tmp", tmpFullFilename); fp = fopen(tmpFullFilename, "w"); if (fp == NULL) { result = errno != 0 ? errno : EPERM; logError("file: "__FILE__", line: %d, " \ "open file: %s fail, " \ "errno: %d, error info: %s.", \ __LINE__, tmpFullFilename, result, STRERROR(result)); return result; } if ((result=avl_tree_init(&tree_unique_trunks, free, \ storage_compare_trunk_id_info)) != 0) { logError("file: "__FILE__", line: %d, " \ "avl_tree_init fail, " \ "errno: %d, error info: %s", \ __LINE__, result, STRERROR(result)); fclose(fp); return result; } memset(&trunk_info, 0, sizeof(trunk_info)); memset(&trunkFileId, 0, sizeof(trunkFileId)); result = 0; while (SF_G_CONTINUE_FLAG) { result=storage_binlog_read(pReader, &record, &record_length); if (result != 0) { if (result == ENOENT) { result = 0; } break; } if (fdfs_is_trunk_file(record.filename, record.filename_len)) { if (fdfs_decode_trunk_info(store_path_index, \ record.true_filename, record.true_filename_len,\ &trunk_info) != 0) { continue; } trunkFileId.path = trunk_info.path; trunkFileId.id = trunk_info.file.id; pFound = (FDFSTrunkFileIdInfo *)avl_tree_find( \ &tree_unique_trunks, &trunkFileId); if (pFound != NULL) { continue; } pFound = (FDFSTrunkFileIdInfo *)malloc( \ sizeof(FDFSTrunkFileIdInfo)); if (pFound == NULL) { result = errno != 0 ? errno : ENOMEM; logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", __LINE__,\ (int)sizeof(FDFSTrunkFileIdInfo), \ result, STRERROR(result)); break; } sprintf(trunkFileId.line, "%d %c %s", \ (int)record.timestamp, \ record.op_type, record.filename); memcpy(pFound, &trunkFileId, sizeof(FDFSTrunkFileIdInfo)); if (avl_tree_insert(&tree_unique_trunks, pFound) != 1) { result = errno != 0 ? errno : ENOMEM; logError("file: "__FILE__", line: %d, " \ "avl_tree_insert fail, " \ "errno: %d, error info: %s", \ __LINE__, result, STRERROR(result)); break; } } else { if ((result=disk_recovery_write_to_binlog(fp, tmpFullFilename, &record)) != 0) { break; } } } if (result == 0) { int tree_node_count; tree_node_count = avl_tree_count(&tree_unique_trunks); if (tree_node_count > 0) { logInfo("file: "__FILE__", line: %d, " \ "recovering trunk file count: %d", __LINE__, \ tree_node_count); result = avl_tree_walk(&tree_unique_trunks, \ tree_write_file_walk_callback, fp); } } avl_tree_destroy(&tree_unique_trunks); fclose(fp); if (!SF_G_CONTINUE_FLAG) { return EINTR; } if (result != 0) { return result; } recovery_get_full_filename_ex(pBasePath, 0, RECOVERY_BINLOG_FILENAME, binlogFullFilename); if (rename(tmpFullFilename, binlogFullFilename) != 0) { logError("file: "__FILE__", line: %d, " \ "rename file %s to %s fail, " \ "errno: %d, error info: %s", __LINE__, \ tmpFullFilename, binlogFullFilename, \ errno, STRERROR(errno)); return errno != 0 ? errno : EPERM; } return 0; } static int storage_disk_recovery_split_trunk_binlog(const int store_path_index) { char mark_filename[MAX_PATH_SIZE]; RecoveryThreadData thread_data; StorageBinLogReader reader; int result; thread_data.base_path = g_fdfs_store_paths.paths[store_path_index].path; thread_data.thread_index = 0; recovery_get_mark_filename(&thread_data, mark_filename); if ((result=do_write_to_mark_file(mark_filename, 0)) != 0) { return result; } if ((result=recovery_reader_init(&thread_data, &reader)) != 0) { storage_reader_destroy(&reader); return result; } result = storage_do_split_trunk_binlog(store_path_index, &reader); storage_reader_destroy(&reader); return result; } int storage_disk_recovery_prepare(const int store_path_index) { ConnectionInfo srcStorage; ConnectionInfo *pStorageConn; int result; char *pBasePath; pBasePath = g_fdfs_store_paths.paths[store_path_index].path; if ((result=recovery_init_flag_file(pBasePath, false, -1)) != 0) { return result; } if ((result=recovery_init_global_binlog_file(pBasePath)) != 0) { return result; } if ((result=recovery_get_src_storage_server(&srcStorage)) != 0) { if (result == ENOENT) { return storage_disk_recovery_finish(pBasePath); } else { return result; } } while (SF_G_CONTINUE_FLAG) { if (storage_report_storage_status(g_my_server_id_str, \ g_tracker_client_ip.ips[0].address, FDFS_STORAGE_STATUS_RECOVERY) == 0) { break; } } if (!SF_G_CONTINUE_FLAG) { return EINTR; } if ((pStorageConn=tracker_make_connection(&srcStorage, &result)) == NULL) { return result; } logInfo("file: "__FILE__", line: %d, " "try to fetch binlog from %s:%u ...", __LINE__, pStorageConn->ip_addr, pStorageConn->port); result = storage_do_fetch_binlog(pStorageConn, store_path_index); tracker_close_connection_ex(pStorageConn, true); if (result != 0) { return result; } logInfo("file: "__FILE__", line: %d, " "fetch binlog from %s:%u successfully.", __LINE__, pStorageConn->ip_addr, pStorageConn->port); if ((result=storage_disk_recovery_split_trunk_binlog( store_path_index)) != 0) { char flagFullFilename[MAX_PATH_SIZE]; unlink(recovery_get_flag_filename(pBasePath, flagFullFilename)); return result; } //set fetch binlog done if ((result=recovery_init_flag_file(pBasePath, true, 1)) != 0) { return result; } return 0; }