diff --git a/HISTORY b/HISTORY index a9c28ef..16f2f0b 100644 --- a/HISTORY +++ b/HISTORY @@ -1,8 +1,12 @@ -Version 6.04 2019-12-01 +Version 6.04 2019-12-04 * storage_report_ip_changed ignore result EEXIST * use get_gzip_command_filename from libfastcommon v1.42 * support compress error log and access log + * disk recovery support multi-threads to speed up + + NOTE: you MUST upgrade libfastcommon to V1.42 or later + Version 6.03 2019-11-20 * dual IPs support two different types of inner (intranet) IPs @@ -21,6 +25,7 @@ Version 6.03 2019-11-20 NOTE: the tracker and storage server must upgrade together + Version 6.02 2019-11-12 * get_file_info calculate CRC32 for appender file type * disk recovery download file to local temp file then rename it @@ -28,12 +33,14 @@ Version 6.02 2019-11-12 * support regenerate filename for appender file NOTE: the regenerated file will be a normal file! + Version 6.01 2019-10-25 * compress and uncompress binlog file by gzip when need, config items in storage.conf: compress_binlog and compress_binlog_time * bugfix: must check and create data path before write_to_pid_file in fdfs_storaged.c + Version 6.00 2019-10-16 * tracker and storage server support dual IPs 1. you can config dual tracker IPs in storage.conf and client.conf, @@ -49,6 +56,7 @@ Version 6.00 2019-10-16 * tracker server check tracker list when storage server join * use socketCreateExAuto and socketClientExAuto exported by libfastcommon + Version 5.12 2018-06-07 * code refine for rare case * replace print format OFF_PRINTF_FORMAT to PRId64 diff --git a/conf/storage.conf b/conf/storage.conf index f2233ed..c27059f 100644 --- a/conf/storage.conf +++ b/conf/storage.conf @@ -109,8 +109,13 @@ sync_end_time=23:59 # default value is 500 write_mark_file_freq=500 +# disk recovery thread count +# default value is 1 +# since V6.04 +disk_recovery_threads = 1 + # store path (disk or mount point) count, default value is 1 -store_path_count=1 +store_path_count = 1 # store_path#, based on 0, to configure the store paths to store file # if store_path0 not exists, it's value is base_path (NOT recommended) @@ -262,6 +267,7 @@ compress_old_access_log = false # compress the access log days before # default value is 1 +# since V6.04 compress_access_log_days_before = 1 # if rotate the error log every day @@ -282,6 +288,7 @@ compress_old_error_log = false # compress the error log days before # default value is 1 +# since V6.04 compress_error_log_days_before = 1 # rotate access log when the log file exceeds this size @@ -309,7 +316,7 @@ file_sync_skip_invalid_record=false # if use connection pool # default value is false # since V4.05 -use_connection_pool = false +use_connection_pool = true # connections whose the idle time exceeds this time will be closed # unit: second diff --git a/conf/tracker.conf b/conf/tracker.conf index 78331fe..f93b6c0 100644 --- a/conf/tracker.conf +++ b/conf/tracker.conf @@ -249,6 +249,7 @@ compress_old_error_log = false # compress the error log days before # default value is 1 +# since V6.04 compress_error_log_days_before = 1 # rotate error log when the log file exceeds this size @@ -265,7 +266,7 @@ log_file_keep_days = 0 # if use connection pool # default value is false # since V4.05 -use_connection_pool = false +use_connection_pool = true # connections whose the idle time exceeds this time will be closed # unit: second diff --git a/storage/fdfs_storaged.c b/storage/fdfs_storaged.c index 3e57b54..874809f 100644 --- a/storage/fdfs_storaged.c +++ b/storage/fdfs_storaged.c @@ -49,8 +49,12 @@ #include "storage_dump.h" #endif +#define ACCEPT_STAGE_NONE 0 +#define ACCEPT_STAGE_DOING 1 +#define ACCEPT_STAGE_DONE 2 + static bool bTerminateFlag = false; -static bool bAcceptEndFlag = false; +static char accept_stage = ACCEPT_STAGE_NONE; static void sigQuitHandler(int sig); static void sigHupHandler(int sig); @@ -58,6 +62,7 @@ static void sigUsrHandler(int sig); static void sigAlarmHandler(int sig); static int setupSchedules(pthread_t *schedule_tid); +static int setupSignalHandlers(); #if defined(DEBUG_FLAG) @@ -83,7 +88,6 @@ int main(int argc, char *argv[]) int sock; int wait_count; pthread_t schedule_tid; - struct sigaction act; char pidFilename[MAX_PATH_SIZE]; bool stop; @@ -148,6 +152,13 @@ int main(int argc, char *argv[]) return result; } + if ((result=setupSignalHandlers()) != 0) + { + logCrit("exit abnormally!\n"); + log_destroy(); + return result; + } + memset(g_bind_addr, 0, sizeof(g_bind_addr)); if ((result=storage_func_init(conf_filename, \ g_bind_addr, sizeof(g_bind_addr))) != 0) @@ -207,84 +218,6 @@ int main(int argc, char *argv[]) return result; } - memset(&act, 0, sizeof(act)); - sigemptyset(&act.sa_mask); - - act.sa_handler = sigUsrHandler; - if(sigaction(SIGUSR1, &act, NULL) < 0 || \ - sigaction(SIGUSR2, &act, NULL) < 0) - { - logCrit("file: "__FILE__", line: %d, " \ - "call sigaction fail, errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - logCrit("exit abnormally!\n"); - return errno; - } - - act.sa_handler = sigHupHandler; - if(sigaction(SIGHUP, &act, NULL) < 0) - { - logCrit("file: "__FILE__", line: %d, " \ - "call sigaction fail, errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - logCrit("exit abnormally!\n"); - return errno; - } - - act.sa_handler = SIG_IGN; - if(sigaction(SIGPIPE, &act, NULL) < 0) - { - logCrit("file: "__FILE__", line: %d, " \ - "call sigaction fail, errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - logCrit("exit abnormally!\n"); - return errno; - } - - act.sa_handler = sigQuitHandler; - if(sigaction(SIGINT, &act, NULL) < 0 || \ - sigaction(SIGTERM, &act, NULL) < 0 || \ - sigaction(SIGQUIT, &act, NULL) < 0) - { - logCrit("file: "__FILE__", line: %d, " \ - "call sigaction fail, errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - logCrit("exit abnormally!\n"); - return errno; - } - -#if defined(DEBUG_FLAG) - -/* -#if defined(OS_LINUX) - memset(&act, 0, sizeof(act)); - act.sa_sigaction = sigSegvHandler; - act.sa_flags = SA_SIGINFO; - if (sigaction(SIGSEGV, &act, NULL) < 0 || \ - sigaction(SIGABRT, &act, NULL) < 0) - { - logCrit("file: "__FILE__", line: %d, " \ - "call sigaction fail, errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - logCrit("exit abnormally!\n"); - return errno; - } -#endif -*/ - - memset(&act, 0, sizeof(act)); - sigemptyset(&act.sa_mask); - act.sa_handler = sigDumpHandler; - if(sigaction(SIGUSR1, &act, NULL) < 0 || \ - sigaction(SIGUSR2, &act, NULL) < 0) - { - logCrit("file: "__FILE__", line: %d, " \ - "call sigaction fail, errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - logCrit("exit abnormally!\n"); - return errno; - } -#endif #ifdef WITH_HTTPD if (!g_http_params.disabled) @@ -333,10 +266,10 @@ int main(int argc, char *argv[]) log_set_cache(true); bTerminateFlag = false; - bAcceptEndFlag = false; + accept_stage = ACCEPT_STAGE_DOING; storage_accept_loop(sock); - bAcceptEndFlag = true; + accept_stage = ACCEPT_STAGE_DONE; fdfs_binlog_sync_func(NULL); //binlog fsync @@ -412,7 +345,7 @@ static void sigAlarmHandler(int sig) { ConnectionInfo server; - if (bAcceptEndFlag) + if (accept_stage != ACCEPT_STAGE_DOING) { return; } @@ -583,3 +516,82 @@ static int setupSchedules(pthread_t *schedule_tid) return 0; } +static int setupSignalHandlers() +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + sigemptyset(&act.sa_mask); + + act.sa_handler = sigUsrHandler; + if(sigaction(SIGUSR1, &act, NULL) < 0 || \ + sigaction(SIGUSR2, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EFAULT; + } + + act.sa_handler = sigHupHandler; + if(sigaction(SIGHUP, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EFAULT; + } + + act.sa_handler = SIG_IGN; + if(sigaction(SIGPIPE, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EFAULT; + } + + act.sa_handler = sigQuitHandler; + if(sigaction(SIGINT, &act, NULL) < 0 || \ + sigaction(SIGTERM, &act, NULL) < 0 || \ + sigaction(SIGQUIT, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EFAULT; + } + +#if defined(DEBUG_FLAG) + +/* +#if defined(OS_LINUX) + memset(&act, 0, sizeof(act)); + act.sa_sigaction = sigSegvHandler; + act.sa_flags = SA_SIGINFO; + if (sigaction(SIGSEGV, &act, NULL) < 0 || \ + sigaction(SIGABRT, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EFAULT; + } +#endif +*/ + + memset(&act, 0, sizeof(act)); + sigemptyset(&act.sa_mask); + act.sa_handler = sigDumpHandler; + if(sigaction(SIGUSR1, &act, NULL) < 0 || \ + sigaction(SIGUSR2, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EFAULT; + } +#endif + + return 0; +} diff --git a/storage/storage_disk_recovery.c b/storage/storage_disk_recovery.c index dc0b59a..5f95cbd 100644 --- a/storage/storage_disk_recovery.c +++ b/storage/storage_disk_recovery.c @@ -43,17 +43,74 @@ typedef struct { int id; //trunk file id } FDFSTrunkFileIdInfo; +typedef struct { + int thread_index; //-1 for global + int result; + bool done; + const char *base_path; +} RecoveryThreadData; + #define RECOVERY_BINLOG_FILENAME ".binlog.recovery" +#define RECOVERY_FLAG_FILENAME ".recovery.flag" #define RECOVERY_MARK_FILENAME ".recovery.mark" -#define MARK_ITEM_BINLOG_OFFSET "binlog_offset" -#define MARK_ITEM_FETCH_BINLOG_DONE "fetch_binlog_done" -#define MARK_ITEM_SAVED_STORAGE_STATUS "saved_storage_status" +#define FLAG_ITEM_RECOVERY_THREADS "recovery_threads" +#define FLAG_ITEM_SAVED_STORAGE_STATUS "saved_storage_status" +#define FLAG_ITEM_FETCH_BINLOG_DONE "fetch_binlog_done" +#define MARK_ITEM_BINLOG_OFFSET "binlog_offset" + +static int last_recovery_threads = -1; //for rebalance binlog data +static volatile int current_recovery_thread_count = 0; static int saved_storage_status = FDFS_STORAGE_STATUS_NONE; static char *recovery_get_binlog_filename(const void *pArg, - char *full_filename); + char *full_filename); + +static int disk_recovery_write_to_binlog(FILE *fp, + const char *binlog_filename, StorageBinLogRecord *pRecord); + +static char *recovery_get_full_filename_ex(const char *pBasePath, + const int thread_index, const char *filename, char *full_filename) +{ + static char buff[MAX_PATH_SIZE]; + int len; + + if (full_filename == NULL) + { + full_filename = buff; + } + + len = snprintf(full_filename, MAX_PATH_SIZE, + "%s/data/%s", pBasePath, filename); + if (thread_index >= 0) + { + snprintf(full_filename + len, MAX_PATH_SIZE - len, + ".%d", thread_index); + } + return full_filename; +} + +static inline char *recovery_get_full_filename(const RecoveryThreadData + *pThreadData, const char *filename, char *full_filename) +{ + return recovery_get_full_filename_ex(pThreadData->base_path, + pThreadData->thread_index, filename, full_filename); +} + +static inline char *recovery_get_global_full_filename(const char *pBasePath, + const char *filename, char *full_filename) +{ + return recovery_get_full_filename_ex(pBasePath, -1, + RECOVERY_FLAG_FILENAME, full_filename); +} + +static inline char *recovery_get_global_binlog_filename(const char *pBasePath, + char *full_filename) +{ + return recovery_get_global_full_filename(pBasePath, + RECOVERY_BINLOG_FILENAME, full_filename); +} static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \ const int store_path_index) @@ -68,7 +125,8 @@ static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \ int network_timeout; pBasePath = g_fdfs_store_paths.paths[store_path_index].path; - recovery_get_binlog_filename(pBasePath, full_binlog_filename); + recovery_get_full_filename_ex(pBasePath, 0, + RECOVERY_BINLOG_FILENAME, full_binlog_filename); memset(out_buff, 0, sizeof(out_buff)); pHeader = (TrackerHeader *)out_buff; @@ -129,12 +187,14 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) { int result; int storage_count; + int i; + static unsigned int current_index = 0; TrackerServerInfo trackerServer; ConnectionInfo *pTrackerConn; FDFSGroupStat groupStat; FDFSStorageInfo storageStats[FDFS_MAX_SERVERS_EACH_GROUP]; FDFSStorageInfo *pStorageStat; - FDFSStorageInfo *pStorageEnd; + bool found; memset(pSrcStorage, 0, sizeof(ConnectionInfo)); pSrcStorage->sock = -1; @@ -185,6 +245,7 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) sleep(1); } + found = false; while (g_continue_flag) { if ((pTrackerConn=tracker_get_connection_r(&trackerServer, \ @@ -243,8 +304,8 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) continue; } - result = tracker_list_servers(pTrackerConn, \ - g_group_name, NULL, storageStats, \ + result = tracker_list_servers(pTrackerConn, + g_group_name, NULL, storageStats, FDFS_MAX_SERVERS_EACH_GROUP, &storage_count); tracker_close_connection_ex(pTrackerConn, result != 0); if (result != 0) @@ -263,10 +324,9 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) continue; } - pStorageEnd = storageStats + storage_count; - for (pStorageStat=storageStats; pStorageStatid, g_my_server_id_str) == 0) { continue; @@ -274,13 +334,14 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) if (pStorageStat->status == FDFS_STORAGE_STATUS_ACTIVE) { + found = true; strcpy(pSrcStorage->ip_addr, pStorageStat->ip_addr); pSrcStorage->port = pStorageStat->storage_port; break; } } - if (pStorageStat < pStorageEnd) //found src storage server + if (found) //found src storage server { break; } @@ -299,117 +360,205 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage) return 0; } -static char *recovery_get_full_filename(const char *pBasePath, - const char *filename, char *full_filename) -{ - static char buff[MAX_PATH_SIZE]; - - if (full_filename == NULL) - { - full_filename = buff; - } - - snprintf(full_filename, MAX_PATH_SIZE, - "%s/data/%s", pBasePath, filename); - return full_filename; -} - static char *recovery_get_binlog_filename(const void *pArg, char *full_filename) { - return recovery_get_full_filename((const char *)pArg, + return recovery_get_full_filename((const RecoveryThreadData *)pArg, RECOVERY_BINLOG_FILENAME, full_filename); } -static char *recovery_get_mark_filename(const char *pBasePath, +static char *recovery_get_flag_filename(const char *pBasePath, char *full_filename) { - return recovery_get_full_filename(pBasePath, + return recovery_get_global_full_filename(pBasePath, + RECOVERY_FLAG_FILENAME, full_filename); +} + +static char *recovery_get_mark_filename(const RecoveryThreadData *pThreadData, + char *full_filename) +{ + return recovery_get_full_filename(pThreadData, RECOVERY_MARK_FILENAME, full_filename); } +static int storage_disk_recovery_delete_thread_files(const char *pBasePath, + const int index_start, const int index_end) +{ + int i; + char mark_filename[MAX_PATH_SIZE]; + char binlog_filename[MAX_PATH_SIZE]; + + for (i=index_start; ibinlog_offset, - MARK_ITEM_FETCH_BINLOG_DONE); + "%s=%"PRId64"\n", + MARK_ITEM_BINLOG_OFFSET, binlog_offset); - return safeWriteToFile(pReader->mark_filename, buff, len); + return safeWriteToFile(mark_filename, buff, len); } -static int recovery_init_binlog_file(const char *pBasePath) +static inline int recovery_write_to_mark_file(StorageBinLogReader *pReader) +{ + return do_write_to_mark_file(pReader->mark_filename, + pReader->binlog_offset); +} + +static int recovery_init_global_binlog_file(const char *pBasePath) { char full_binlog_filename[MAX_PATH_SIZE]; char buff[1]; *buff = '\0'; - recovery_get_binlog_filename(pBasePath, full_binlog_filename); + recovery_get_full_filename_ex(pBasePath, 0, + RECOVERY_BINLOG_FILENAME, full_binlog_filename); return writeToFile(full_binlog_filename, buff, 0); } -static int recovery_init_mark_file(const char *pBasePath, \ - const bool fetch_binlog_done) +static int recovery_init_flag_file_ex(const char *pBasePath, + const bool fetch_binlog_done, const int recovery_threads) { char full_filename[MAX_PATH_SIZE]; - char buff[128]; - int len; - recovery_get_mark_filename(pBasePath, full_filename); - - len = sprintf(buff, \ - "%s=%d\n" \ - "%s=0\n" \ - "%s=%d\n", \ - MARK_ITEM_SAVED_STORAGE_STATUS, saved_storage_status, \ - MARK_ITEM_BINLOG_OFFSET, \ - MARK_ITEM_FETCH_BINLOG_DONE, fetch_binlog_done); - return writeToFile(full_filename, buff, len); + recovery_get_flag_filename(pBasePath, full_filename); + return do_write_to_flag_file(full_filename, + fetch_binlog_done, recovery_threads); } -static int recovery_reader_init(const char *pBasePath, \ +static inline int recovery_init_flag_file(const char *pBasePath, + const bool fetch_binlog_done, const int recovery_threads) +{ + return recovery_init_flag_file_ex(pBasePath, + fetch_binlog_done, recovery_threads); +} + +static int recovery_load_params_from_flag_file(const char *full_flag_filename) +{ + IniContext iniContext; + int result; + + memset(&iniContext, 0, sizeof(IniContext)); + if ((result=iniLoadFromFile(full_flag_filename, + &iniContext)) != 0) + { + logError("file: "__FILE__", line: %d, " + "load from flag file \"%s\" fail, " + "error code: %d", __LINE__, + full_flag_filename, result); + return result; + } + + if (!iniGetBoolValue(NULL, FLAG_ITEM_FETCH_BINLOG_DONE, + &iniContext, false)) + { + iniFreeContext(&iniContext); + + logInfo("file: "__FILE__", line: %d, " + "flag file \"%s\", %s=0, " + "need to fetch binlog again", __LINE__, + full_flag_filename, FLAG_ITEM_FETCH_BINLOG_DONE); + return EAGAIN; + } + + saved_storage_status = iniGetIntValue(NULL, + FLAG_ITEM_SAVED_STORAGE_STATUS, &iniContext, -1); + if (saved_storage_status < 0) + { + iniFreeContext(&iniContext); + + logError("file: "__FILE__", line: %d, " + "in flag file \"%s\", %s: %d < 0", __LINE__, + full_flag_filename, FLAG_ITEM_SAVED_STORAGE_STATUS, + saved_storage_status); + return EINVAL; + } + + last_recovery_threads = iniGetIntValue(NULL, + FLAG_ITEM_RECOVERY_THREADS, &iniContext, -1); + + iniFreeContext(&iniContext); + return 0; +} + +static int recovery_reader_init(const RecoveryThreadData *pThreadData, StorageBinLogReader *pReader) { IniContext iniContext; @@ -419,57 +568,32 @@ static int recovery_reader_init(const char *pBasePath, \ pReader->binlog_fd = -1; pReader->binlog_index = g_binlog_index + 1; - pReader->binlog_buff.buffer = (char *)malloc( \ + pReader->binlog_buff.buffer = (char *)malloc( STORAGE_BINLOG_BUFFER_SIZE); if (pReader->binlog_buff.buffer == NULL) { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, " \ - "errno: %d, error info: %s", \ - __LINE__, STORAGE_BINLOG_BUFFER_SIZE, \ + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, " + "errno: %d, error info: %s", + __LINE__, STORAGE_BINLOG_BUFFER_SIZE, errno, STRERROR(errno)); return errno != 0 ? errno : ENOMEM; } pReader->binlog_buff.current = pReader->binlog_buff.buffer; - recovery_get_mark_filename(pBasePath, pReader->mark_filename); + recovery_get_mark_filename(pThreadData, pReader->mark_filename); memset(&iniContext, 0, sizeof(IniContext)); if ((result=iniLoadFromFile(pReader->mark_filename, &iniContext)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "load from mark file \"%s\" fail, " \ - "error code: %d", __LINE__, \ + logError("file: "__FILE__", line: %d, " + "load from mark file \"%s\" fail, " + "error code: %d", __LINE__, pReader->mark_filename, result); return result; } - if (!iniGetBoolValue(NULL, MARK_ITEM_FETCH_BINLOG_DONE, \ - &iniContext, false)) - { - iniFreeContext(&iniContext); - - logInfo("file: "__FILE__", line: %d, " \ - "mark file \"%s\", %s=0, " \ - "need to fetch binlog again", __LINE__, \ - pReader->mark_filename, MARK_ITEM_FETCH_BINLOG_DONE); - return EAGAIN; - } - - saved_storage_status = iniGetIntValue(NULL, \ - MARK_ITEM_SAVED_STORAGE_STATUS, &iniContext, -1); - if (saved_storage_status < 0) - { - iniFreeContext(&iniContext); - - logError("file: "__FILE__", line: %d, " \ - "in mark file \"%s\", %s: %d < 0", __LINE__, \ - pReader->mark_filename, MARK_ITEM_SAVED_STORAGE_STATUS, \ - saved_storage_status); - return EINVAL; - } - - pReader->binlog_offset = iniGetInt64Value(NULL, \ + pReader->binlog_offset = iniGetInt64Value(NULL, MARK_ITEM_BINLOG_OFFSET, &iniContext, -1); if (pReader->binlog_offset < 0) { @@ -485,8 +609,8 @@ static int recovery_reader_init(const char *pBasePath, \ iniFreeContext(&iniContext); - if ((result=storage_open_readable_binlog(pReader, \ - recovery_get_binlog_filename, pBasePath)) != 0) + if ((result=storage_open_readable_binlog(pReader, + recovery_get_binlog_filename, pThreadData)) != 0) { return result; } @@ -494,15 +618,15 @@ static int recovery_reader_init(const char *pBasePath, \ return 0; } -static int recovery_reader_check_init(const char *pBasePath, \ - StorageBinLogReader *pReader) +static int recovery_reader_check_init(const RecoveryThreadData *pThreadData, + StorageBinLogReader *pReader) { if (pReader->binlog_fd >= 0 && pReader->binlog_buff.buffer != NULL) { return 0; } - return recovery_reader_init(pBasePath, pReader); + return recovery_reader_init(pThreadData, pReader); } static int recovery_download_file_to_local(StorageBinLogRecord *pRecord, @@ -584,8 +708,8 @@ static int recovery_download_file_to_local(StorageBinLogRecord *pRecord, return result; } -static int storage_do_recovery(const char *pBasePath, StorageBinLogReader *pReader, \ - ConnectionInfo *pSrcStorage) +static int storage_do_recovery(RecoveryThreadData *pThreadData, + StorageBinLogReader *pReader, ConnectionInfo *pSrcStorage) { TrackerServerInfo trackerServer; ConnectionInfo *pTrackerServer; @@ -619,262 +743,670 @@ static int storage_do_recovery(const char *pBasePath, StorageBinLogReader *pRead result = 0; logInfo("file: "__FILE__", line: %d, " - "disk recovery: recovering files of data path: %s ...", - __LINE__, pBasePath); + "disk recovery thread #%d, src storage server %s:%d, " + "recovering files of data path: %s ...", __LINE__, + pThreadData->thread_index, pSrcStorage->ip_addr, + pSrcStorage->port, pThreadData->base_path); bContinueFlag = true; while (bContinueFlag) - { - if ((result=recovery_reader_check_init(pBasePath, pReader)) != 0) { - break; - } - if ((pStorageConn=tracker_make_connection(pSrcStorage, &result)) == NULL) - { - sleep(5); - continue; - } + if ((result=recovery_reader_check_init(pThreadData, pReader)) != 0) + { + break; + } + if ((pStorageConn=tracker_make_connection(pSrcStorage, + &result)) == NULL) + { + sleep(5); + continue; + } - while (g_continue_flag) - { - result=storage_binlog_read(pReader, &record, &record_length); - if (result != 0) - { - if (result == ENOENT) - { - result = 0; - } - bContinueFlag = false; - break; - } - - total_count++; - if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE - || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) - { - result = recovery_download_file_to_local(&record, - pTrackerServer, pStorageConn); - if (result == 0) - { - success_count++; - } - else if (result == -EINVAL) - { - result = 0; - } - else if (result == ENOENT) - { - result = 0; - noent_count++; - } - else + while (g_continue_flag) + { + result = storage_binlog_read(pReader, &record, &record_length); + if (result != 0) { - break; - } - } - else if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK - || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK) - { - if (record.src_filename_len == 0) - { - logError("file: "__FILE__", line: %d, " \ - "invalid binlog line, filename: %s, " \ - "expect src filename", __LINE__, \ - record.filename); - result = EINVAL; - bContinueFlag = false; - break; - } + if (result == ENOENT) + { + pThreadData->done = true; + result = 0; + } + bContinueFlag = false; + break; + } - if ((result=storage_split_filename_ex(record.filename, \ - &record.filename_len, record.true_filename, \ - &store_path_index)) != 0) - { - bContinueFlag = false; - break; - } - sprintf(local_filename, "%s/data/%s", \ - g_fdfs_store_paths.paths[store_path_index].path, \ - record.true_filename); - - if ((result=storage_split_filename_ex( \ - record.src_filename, &record.src_filename_len,\ - record.true_filename, &store_path_index)) != 0) - { - bContinueFlag = false; - break; - } - sprintf(src_filename, "%s/data/%s", \ - g_fdfs_store_paths.paths[store_path_index].path, \ - record.true_filename); - if (symlink(src_filename, local_filename) == 0) - { - success_count++; - } - else - { - result = errno != 0 ? errno : ENOENT; - if (result == ENOENT || result == EEXIST) - { - log_level = LOG_DEBUG; - } - else - { - log_level = LOG_ERR; - } - - log_it_ex(&g_log_context, log_level, \ - "file: "__FILE__", line: %d, " \ - "link file %s to %s fail, " \ - "errno: %d, error info: %s", __LINE__,\ - src_filename, local_filename, \ - result, STRERROR(result)); - - if (result != ENOENT && result != EEXIST) - { - bContinueFlag = false; - break; - } - else + total_count++; + if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE + || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) + { + result = recovery_download_file_to_local(&record, + pTrackerServer, pStorageConn); + if (result == 0) + { + success_count++; + } + else if (result == -EINVAL) { result = 0; } - } - } - else - { - logError("file: "__FILE__", line: %d, " \ - "invalid file op type: %d", \ - __LINE__, record.op_type); - result = EINVAL; - bContinueFlag = false; - break; - } + else if (result == ENOENT) + { + result = 0; + noent_count++; + } + else + { + break; + } + } + else if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK + || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK) + { + if (record.src_filename_len == 0) + { + logError("file: "__FILE__", line: %d, " \ + "invalid binlog line, filename: %s, " \ + "expect src filename", __LINE__, \ + record.filename); + result = EINVAL; + bContinueFlag = false; + break; + } - pReader->binlog_offset += record_length; - count++; - if (count == 1000) - { - logDebug("file: "__FILE__", line: %d, " \ - "disk recovery: recover path: %s, " \ - "file count: %"PRId64 \ - ", success count: %"PRId64", noent_count: %"PRId64, \ - __LINE__, pBasePath, total_count, \ - success_count, noent_count); - recovery_write_to_mark_file(pBasePath, pReader); - count = 0; - } - } + if ((result=storage_split_filename_ex(record.filename, \ + &record.filename_len, record.true_filename, \ + &store_path_index)) != 0) + { + bContinueFlag = false; + break; + } + sprintf(local_filename, "%s/data/%s", \ + g_fdfs_store_paths.paths[store_path_index].path, \ + record.true_filename); - tracker_close_connection_ex(pStorageConn, result != 0); - recovery_write_to_mark_file(pBasePath, pReader); - if (bContinueFlag) - { - storage_reader_destroy(pReader); + if ((result=storage_split_filename_ex( \ + record.src_filename, &record.src_filename_len,\ + record.true_filename, &store_path_index)) != 0) + { + bContinueFlag = false; + break; + } + sprintf(src_filename, "%s/data/%s", \ + g_fdfs_store_paths.paths[store_path_index].path, \ + record.true_filename); + if (symlink(src_filename, local_filename) == 0) + { + success_count++; + } + else + { + result = errno != 0 ? errno : ENOENT; + if (result == ENOENT || result == EEXIST) + { + log_level = LOG_DEBUG; + } + else + { + log_level = LOG_ERR; + } + + log_it_ex(&g_log_context, log_level, \ + "file: "__FILE__", line: %d, " \ + "link file %s to %s fail, " \ + "errno: %d, error info: %s", __LINE__,\ + src_filename, local_filename, \ + result, STRERROR(result)); + + if (result != ENOENT && result != EEXIST) + { + bContinueFlag = false; + break; + } + else + { + result = 0; + } + } + } + else + { + logError("file: "__FILE__", line: %d, " \ + "invalid file op type: %d", \ + __LINE__, record.op_type); + result = EINVAL; + bContinueFlag = false; + break; + } + + pReader->binlog_offset += record_length; + count++; + if (count == 1000) + { + logDebug("file: "__FILE__", line: %d, " + "disk recovery thread #%d recover path: %s, " + "file count: %"PRId64", success count: %"PRId64 + ", noent_count: %"PRId64, __LINE__, + pThreadData->thread_index, + pThreadData->base_path, total_count, + success_count, noent_count); + recovery_write_to_mark_file(pReader); + count = 0; + } + } + + tracker_close_connection_ex(pStorageConn, result != 0); + recovery_write_to_mark_file(pReader); + + if (!g_continue_flag) + { + bContinueFlag = false; + } + else if (bContinueFlag) + { + storage_reader_destroy(pReader); + } + + if (count > 0) + { + logInfo("file: "__FILE__", line: %d, " + "disk recovery thread #%d, recover path: %s, " + "file count: %"PRId64", success count: " + "%"PRId64", noent_count: %"PRId64, + __LINE__, pThreadData->thread_index, + pThreadData->base_path, total_count, + success_count, noent_count); + count = 0; + } + + if (bContinueFlag) + { + sleep(5); + } } - if (count > 0) - { - count = 0; - - logInfo("file: "__FILE__", line: %d, " \ - "disk recovery: recover path: %s, " \ - "file count: %"PRId64 \ - ", success count: %"PRId64", noent_count: %"PRId64, \ - __LINE__, pBasePath, total_count, success_count, noent_count); - } - else - { - sleep(5); - } - } - tracker_close_connection_ex(pTrackerServer, true); - if (result == 0) + if (pThreadData->done) { - logInfo("file: "__FILE__", line: %d, " \ - "disk recovery: recover files of data path: %s done", \ - __LINE__, pBasePath); + logInfo("file: "__FILE__", line: %d, " + "disk recovery thread #%d, src storage server %s:%d, " + "recover files of data path: %s done", __LINE__, + pThreadData->thread_index, pSrcStorage->ip_addr, + pSrcStorage->port, pThreadData->base_path); } - return result; + return g_continue_flag ? result :EINTR; } -int storage_disk_recovery_restore(const char *pBasePath) +static void *storage_disk_recovery_restore_entrance(void *arg) { - char full_binlog_filename[MAX_PATH_SIZE]; - char full_mark_filename[MAX_PATH_SIZE]; - ConnectionInfo srcStorage; - int result; StorageBinLogReader reader; + RecoveryThreadData *pThreadData; + ConnectionInfo srcStorage; - recovery_get_binlog_filename(pBasePath, full_binlog_filename); - recovery_get_mark_filename(pBasePath, full_mark_filename); + pThreadData = (RecoveryThreadData *)arg; + __sync_add_and_fetch(¤t_recovery_thread_count, 1); - if (!(fileExists(full_mark_filename) && \ - fileExists(full_binlog_filename))) + do + { + if ((pThreadData->result=recovery_get_src_storage_server(&srcStorage)) != 0) + { + if (pThreadData->result == ENOENT) + { + logWarning("file: "__FILE__", line: %d, " + "no source storage server, " + "disk recovery finished!", __LINE__); + pThreadData->result = 0; + } + break; + } + + if ((pThreadData->result=recovery_reader_init(pThreadData, &reader)) != 0) + { + storage_reader_destroy(&reader); + break; + } + + pThreadData->result = storage_do_recovery(pThreadData, &reader, &srcStorage); + + recovery_write_to_mark_file(&reader); + storage_reader_destroy(&reader); + + } while (0); + + __sync_sub_and_fetch(¤t_recovery_thread_count, 1); + + return NULL; +} + +static int storage_disk_recovery_old_version_migrate(const char *pBasePath) +{ + char old_binlog_filename[MAX_PATH_SIZE]; + char old_mark_filename[MAX_PATH_SIZE]; + char new_binlog_filename[MAX_PATH_SIZE]; + char new_mark_filename[MAX_PATH_SIZE]; + int result; + + recovery_get_global_binlog_filename(pBasePath, old_binlog_filename); + recovery_get_global_full_filename(pBasePath, + RECOVERY_MARK_FILENAME, old_mark_filename); + + if (!(fileExists(old_mark_filename) && + fileExists(old_binlog_filename))) { - return 0; + return ENOENT; } - logInfo("file: "__FILE__", line: %d, " \ - "disk recovery: begin recovery data path: %s ...", \ - __LINE__, pBasePath); + logInfo("file: "__FILE__", line: %d, " + "try to migrate data from old version ...", __LINE__); - if ((result=recovery_get_src_storage_server(&srcStorage)) != 0) + result = recovery_load_params_from_flag_file(old_mark_filename); + if (result != 0) + { + if (result == EAGAIN) + { + unlink(old_mark_filename); + } + return result; + } + + if ((result=recovery_init_flag_file_ex(pBasePath, true, 1)) != 0) + { + return result; + } + + recovery_get_full_filename_ex(pBasePath, 0, + RECOVERY_MARK_FILENAME, new_mark_filename); + if (rename(old_mark_filename, new_mark_filename) != 0) { - if (result == ENOENT) - { - logWarning("file: "__FILE__", line: %d, " \ - "no source storage server, " \ - "disk recovery finished!", __LINE__); - return storage_disk_recovery_finish(pBasePath); - } - else - { - return result; - } + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", __LINE__, + old_mark_filename, new_mark_filename, + errno, STRERROR(errno)); + return errno != 0 ? errno : EPERM; } - if ((result=recovery_reader_init(pBasePath, &reader)) != 0) + recovery_get_full_filename_ex(pBasePath, 0, + RECOVERY_BINLOG_FILENAME, new_binlog_filename); + if (rename(old_binlog_filename, new_binlog_filename) != 0) { - storage_reader_destroy(&reader); - return result; + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", __LINE__, + old_binlog_filename, new_binlog_filename, + errno, STRERROR(errno)); + return errno != 0 ? errno : EPERM; } - result = storage_do_recovery(pBasePath, &reader, &srcStorage); + logInfo("file: "__FILE__", line: %d, " + "migrate data from old version successfully.", __LINE__); + return 0; +} - recovery_write_to_mark_file(pBasePath, &reader); - storage_reader_destroy(&reader); +static int do_dispatch_binlog_for_threads(const char *pBasePath) +{ + typedef struct { + FILE *fp; + int64_t count; + char binlog_filename[MAX_PATH_SIZE]; + char temp_filename[MAX_PATH_SIZE]; + } RecoveryDispatchInfo; - if (result != 0) - { - return result; - } + char mark_filename[MAX_PATH_SIZE]; + string_t log_buff; + char buff[2 * 1024]; + struct stat file_stat; + RecoveryThreadData thread_data; + StorageBinLogReader reader; + StorageBinLogRecord record; + int record_length; + RecoveryDispatchInfo *dispatchs; + RecoveryDispatchInfo *disp; + int64_t total_count; + int bytes; + int result; + int i; - while (g_continue_flag) - { - if (storage_report_storage_status(g_my_server_id_str, \ - g_tracker_client_ip.ips[0].address, - saved_storage_status) == 0) - { - break; - } + bytes = sizeof(RecoveryDispatchInfo) * g_disk_recovery_threads; + dispatchs = (RecoveryDispatchInfo *)malloc(bytes); + if (dispatchs == NULL) + { + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail", + __LINE__, bytes); + return ENOMEM; + } + memset(dispatchs, 0, bytes); - sleep(5); - } + result = 0; + for (i=0; ifp, + disp->temp_filename, &record)) != 0) + { + break; + } + disp->count++; + } + + storage_reader_destroy(&reader); + if (result != 0) + { + break; + } + } + + total_count = 0; + *buff = '\0'; + log_buff.str = buff; + log_buff.len = 0; + for (i=0; i total lines: %"PRId64"%s", + __LINE__, total_count, log_buff.str); + return result; +} + +static int storage_disk_recovery_dispatch_binlog_for_threads( + const char *pBasePath) +{ + int result; + int i; + char binlog_filename[MAX_PATH_SIZE]; + + if (last_recovery_threads <= 0) + { + logError("file: "__FILE__", line: %d, " + "invalid last recovery threads: %d, " + "retry restore data for %s again ...", + __LINE__, last_recovery_threads, pBasePath); + return EAGAIN; + } + + for (i=0; i 0); + + if (__sync_fetch_and_add(¤t_recovery_thread_count, 0) > 0) + { + for (i=0; i<60; i++) + { + if ((thread_count=__sync_fetch_and_add( + ¤t_recovery_thread_count, 0)) == 0) + { + break; + } + + logInfo("file: "__FILE__", line: %d, " + "waiting for recovery threads exit, " + "waiting count: %d, current thread count: %d", + __LINE__, i+1, thread_count); + sleep(1); + } + } + + free(thread_data); + free(args); + free(recovery_tids); + + if (!g_continue_flag) + { + return EINTR; + } + + while (g_continue_flag) + { + if (storage_report_storage_status(g_my_server_id_str, \ + g_tracker_client_ip.ips[0].address, + saved_storage_status) == 0) + { + break; + } + + sleep(5); + } + + if (!g_continue_flag) + { + return EINTR; + } + + for (i=0; iline) > 0) { return 0; @@ -903,14 +1435,54 @@ static int tree_write_file_walk_callback(void *data, void *args) else { result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "write to binlog file fail, " \ - "errno: %d, error info: %s.", \ + logError("file: "__FILE__", line: %d, " + "write to binlog file fail, " + "errno: %d, error info: %s.", __LINE__, result, STRERROR(result)); return EIO; } } +static int disk_recovery_write_to_binlog(FILE *fp, + const char *binlog_filename, StorageBinLogRecord *pRecord) +{ + int result; + if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE + || pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) + { + if (fprintf(fp, "%d %c %s\n", + (int)pRecord->timestamp, + pRecord->op_type, pRecord->filename) < 0) + { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write to file: %s fail, " + "errno: %d, error info: %s.", + __LINE__, binlog_filename, + result, STRERROR(result)); + return result; + } + } + else + { + if (fprintf(fp, "%d %c %s %s\n", + (int)pRecord->timestamp, + pRecord->op_type, pRecord->filename, + pRecord->src_filename) < 0) + { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write to file: %s fail, " + "errno: %d, error info: %s.", + __LINE__, binlog_filename, + result, STRERROR(result)); + return result; + } + } + + return 0; +} + static int storage_do_split_trunk_binlog(const int store_path_index, StorageBinLogReader *pReader) { @@ -927,7 +1499,7 @@ static int storage_do_split_trunk_binlog(const int store_path_index, int result; pBasePath = g_fdfs_store_paths.paths[store_path_index].path; - recovery_get_full_filename(pBasePath, \ + recovery_get_full_filename_ex(pBasePath, -1, RECOVERY_BINLOG_FILENAME".tmp", tmpFullFilename); fp = fopen(tmpFullFilename, "w"); if (fp == NULL) @@ -1014,38 +1586,11 @@ static int storage_do_split_trunk_binlog(const int store_path_index, } else { - if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE - || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) - { - if (fprintf(fp, "%d %c %s\n", \ - (int)record.timestamp, \ - record.op_type, record.filename) < 0) - { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "write to file: %s fail, " \ - "errno: %d, error info: %s.", \ - __LINE__, tmpFullFilename, - result, STRERROR(result)); - break; - } - } - else - { - if (fprintf(fp, "%d %c %s %s\n", \ - (int)record.timestamp, \ - record.op_type, record.filename, \ - record.src_filename) < 0) - { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "write to file: %s fail, " \ - "errno: %d, error info: %s.", \ - __LINE__, tmpFullFilename, - result, STRERROR(result)); - break; - } - } + if ((result=disk_recovery_write_to_binlog(fp, + tmpFullFilename, &record)) != 0) + { + break; + } } } @@ -1076,7 +1621,7 @@ static int storage_do_split_trunk_binlog(const int store_path_index, return result; } - recovery_get_full_filename(pBasePath, \ + recovery_get_full_filename_ex(pBasePath, 0, RECOVERY_BINLOG_FILENAME, binlogFullFilename); if (rename(tmpFullFilename, binlogFullFilename) != 0) { @@ -1093,24 +1638,32 @@ static int storage_do_split_trunk_binlog(const int store_path_index, static int storage_disk_recovery_split_trunk_binlog(const int store_path_index) { - char *pBasePath; + char mark_filename[MAX_PATH_SIZE]; + RecoveryThreadData thread_data; StorageBinLogReader reader; int result; - pBasePath = g_fdfs_store_paths.paths[store_path_index].path; - if ((result=recovery_reader_init(pBasePath, &reader)) != 0) + thread_data.base_path = g_fdfs_store_paths.paths[store_path_index].path; + thread_data.thread_index = 0; + + recovery_get_mark_filename(&thread_data, mark_filename); + if ((result=do_write_to_mark_file(mark_filename, 0)) != 0) + { + return result; + } + + if ((result=recovery_reader_init(&thread_data, &reader)) != 0) { storage_reader_destroy(&reader); return result; } result = storage_do_split_trunk_binlog(store_path_index, &reader); - storage_reader_destroy(&reader); return result; } -int storage_disk_recovery_start(const int store_path_index) +int storage_disk_recovery_prepare(const int store_path_index) { ConnectionInfo srcStorage; ConnectionInfo *pStorageConn; @@ -1118,12 +1671,12 @@ int storage_disk_recovery_start(const int store_path_index) char *pBasePath; pBasePath = g_fdfs_store_paths.paths[store_path_index].path; - if ((result=recovery_init_mark_file(pBasePath, false)) != 0) + if ((result=recovery_init_flag_file(pBasePath, false, -1)) != 0) { return result; } - if ((result=recovery_init_binlog_file(pBasePath)) != 0) + if ((result=recovery_init_global_binlog_file(pBasePath)) != 0) { return result; } @@ -1160,6 +1713,10 @@ int storage_disk_recovery_start(const int store_path_index) return result; } + logInfo("file: "__FILE__", line: %d, " + "try to fetch binlog from %s:%d ...", __LINE__, + pStorageConn->ip_addr, pStorageConn->port); + result = storage_do_fetch_binlog(pStorageConn, store_path_index); tracker_close_connection_ex(pStorageConn, true); if (result != 0) @@ -1167,20 +1724,23 @@ int storage_disk_recovery_start(const int store_path_index) return result; } - //set fetch binlog done - if ((result=recovery_init_mark_file(pBasePath, true)) != 0) + logInfo("file: "__FILE__", line: %d, " + "fetch binlog from %s:%d successfully.", __LINE__, + pStorageConn->ip_addr, pStorageConn->port); + + if ((result=storage_disk_recovery_split_trunk_binlog( + store_path_index)) != 0) { + char flagFullFilename[MAX_PATH_SIZE]; + unlink(recovery_get_flag_filename(pBasePath, flagFullFilename)); return result; } - if ((result=storage_disk_recovery_split_trunk_binlog( \ - store_path_index)) != 0) + //set fetch binlog done + if ((result=recovery_init_flag_file(pBasePath, true, 1)) != 0) { - char markFullFilename[MAX_PATH_SIZE]; - unlink(recovery_get_mark_filename(pBasePath, markFullFilename)); return result; } return 0; } - diff --git a/storage/storage_disk_recovery.h b/storage/storage_disk_recovery.h index 9a8a06c..b07ab63 100644 --- a/storage/storage_disk_recovery.h +++ b/storage/storage_disk_recovery.h @@ -18,8 +18,8 @@ extern "C" { #endif -int storage_disk_recovery_start(const int store_path_index); -int storage_disk_recovery_restore(const char *pBasePath); +int storage_disk_recovery_prepare(const int store_path_index); +int storage_disk_recovery_check_restore(const char *pBasePath); #ifdef __cplusplus } diff --git a/storage/storage_func.c b/storage/storage_func.c index 4d9bab2..7d167c6 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -1138,21 +1138,23 @@ static int storage_check_and_make_data_dirs() if (g_sync_old_done && pathCreated) //repair damaged disk { - if ((result=storage_disk_recovery_start(i)) != 0) + if ((result=storage_disk_recovery_prepare(i)) != 0) { return result; } } - result = storage_disk_recovery_restore(g_fdfs_store_paths.paths[i].path); + result = storage_disk_recovery_check_restore( + g_fdfs_store_paths.paths[i].path); if (result == EAGAIN) //need to re-fetch binlog { - if ((result=storage_disk_recovery_start(i)) != 0) + if ((result=storage_disk_recovery_prepare(i)) != 0) { return result; } - result=storage_disk_recovery_restore(g_fdfs_store_paths.paths[i].path); + result = storage_disk_recovery_check_restore( + g_fdfs_store_paths.paths[i].path); } if (result != 0) @@ -1712,6 +1714,18 @@ int storage_func_init(const char *filename, \ break; } + g_disk_recovery_threads = iniGetIntValue(NULL, + "disk_recovery_threads", &iniContext, 1); + if (g_disk_recovery_threads <= 0) + { + logError("file: "__FILE__", line: %d, " + "item \"disk_recovery_threads\" is invalid, " + "value: %d <= 0!", __LINE__, + g_disk_recovery_threads); + result = EINVAL; + break; + } + /* g_disk_rw_direct = iniGetBoolValue(NULL, \ "disk_rw_direct", &iniContext, false); @@ -2127,7 +2141,7 @@ int storage_func_init(const char *filename, \ "max_connections=%d, accept_threads=%d, " \ "work_threads=%d, " \ "disk_rw_separated=%d, disk_reader_threads=%d, " \ - "disk_writer_threads=%d, " \ + "disk_writer_threads=%d, disk_recovery_threads=%d, " \ "buff_size=%d KB, heart_beat_interval=%ds, " \ "stat_report_interval=%ds, tracker_server_count=%d, " \ "sync_wait_msec=%dms, sync_interval=%dms, " \ @@ -2172,7 +2186,7 @@ int storage_func_init(const char *filename, \ g_client_bind_addr, g_max_connections, \ g_accept_threads, g_work_threads, g_disk_rw_separated, \ g_disk_reader_threads, g_disk_writer_threads, \ - g_buff_size / 1024, \ + g_disk_recovery_threads, g_buff_size / 1024, \ g_heart_beat_interval, g_stat_report_interval, \ g_tracker_group.server_count, g_sync_wait_usec / 1000, \ g_sync_interval / 1000, \ diff --git a/storage/storage_global.c b/storage/storage_global.c index f8a1a48..74e2232 100644 --- a/storage/storage_global.c +++ b/storage/storage_global.c @@ -31,6 +31,7 @@ bool g_disk_rw_direct = false; bool g_disk_rw_separated = true; int g_disk_reader_threads = DEFAULT_DISK_READER_THREADS; int g_disk_writer_threads = DEFAULT_DISK_WRITER_THREADS; +int g_disk_recovery_threads = 1; int g_extra_open_file_flags = 0; int g_file_distribute_path_mode = FDFS_FILE_DIST_PATH_ROUND_ROBIN; diff --git a/storage/storage_global.h b/storage/storage_global.h index a1849e4..a79c649 100644 --- a/storage/storage_global.h +++ b/storage/storage_global.h @@ -78,6 +78,7 @@ extern bool g_disk_rw_direct; //if file read / write directly extern bool g_disk_rw_separated; //if disk read / write separated extern int g_disk_reader_threads; //disk reader thread count per store base path extern int g_disk_writer_threads; //disk writer thread count per store base path +extern int g_disk_recovery_threads; //disk recovery thread count extern int g_extra_open_file_flags; //extra open file flags extern int g_file_distribute_path_mode; diff --git a/storage/storage_service.c b/storage/storage_service.c index b5c92a0..a94be37 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -4359,7 +4359,15 @@ static int storage_server_fetch_one_path_binlog_dealer( { break; } - } while(1); + } while (g_continue_flag); + + if (!g_continue_flag) + { + if (result == 0) + { + result = EINTR; + } + } if (result != 0) //error occurs { @@ -4386,6 +4394,7 @@ static int storage_server_fetch_one_path_binlog_dealer( static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask) { StorageClientInfo *pClientInfo; + StorageFileContext *pFileContext; StorageBinLogReader *pReader; pClientInfo = (StorageClientInfo *)pTask->arg; @@ -4404,6 +4413,12 @@ static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask) unlink(pReader->mark_filename); } + pFileContext = &(pClientInfo->file_context); + logInfo("file: "__FILE__", line: %d, " + "client ip: %s, fetch binlog of store path #%d done", + __LINE__, pTask->client_ip, pFileContext->extra_info. + upload.trunk_info.path.store_path_index); + storage_reader_destroy(pReader); free(pReader); } @@ -4427,6 +4442,10 @@ static int storage_server_do_fetch_one_path_binlog( return errno != 0 ? errno : ENOMEM; } + logInfo("file: "__FILE__", line: %d, " + "client ip: %s, fetch binlog of store path #%d ...", + __LINE__, pTask->client_ip, store_path_index); + pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); diff --git a/storage/storage_sync.c b/storage/storage_sync.c index b1f41a2..08b0645 100644 --- a/storage/storage_sync.c +++ b/storage/storage_sync.c @@ -2372,7 +2372,7 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader pReader->last_scan_rows = pReader->scan_row_count; pReader->last_sync_rows = pReader->sync_row_count; - if ((result=storage_open_readable_binlog(pReader, \ + if ((result=storage_open_readable_binlog(pReader, get_binlog_readable_filename, pReader)) != 0) { return result;