disk recovery support multi-threads to speed up

pull/484/head
YuQing 2019-12-04 10:47:32 +08:00
parent 46171f2c64
commit 33b539eac6
11 changed files with 1109 additions and 486 deletions

10
HISTORY
View File

@ -1,8 +1,12 @@
Version 6.04 2019-12-01
Version 6.04 2019-12-04
* storage_report_ip_changed ignore result EEXIST
* use get_gzip_command_filename from libfastcommon v1.42
* support compress error log and access log
* disk recovery support multi-threads to speed up
NOTE: you MUST upgrade libfastcommon to V1.42 or later
Version 6.03 2019-11-20
* dual IPs support two different types of inner (intranet) IPs
@ -21,6 +25,7 @@ Version 6.03 2019-11-20
NOTE: the tracker and storage server must upgrade together
Version 6.02 2019-11-12
* get_file_info calculate CRC32 for appender file type
* disk recovery download file to local temp file then rename it
@ -28,12 +33,14 @@ Version 6.02 2019-11-12
* support regenerate filename for appender file
NOTE: the regenerated file will be a normal file!
Version 6.01 2019-10-25
* compress and uncompress binlog file by gzip when need,
config items in storage.conf: compress_binlog and compress_binlog_time
* bugfix: must check and create data path before write_to_pid_file
in fdfs_storaged.c
Version 6.00 2019-10-16
* tracker and storage server support dual IPs
1. you can config dual tracker IPs in storage.conf and client.conf,
@ -49,6 +56,7 @@ Version 6.00 2019-10-16
* tracker server check tracker list when storage server join
* use socketCreateExAuto and socketClientExAuto exported by libfastcommon
Version 5.12 2018-06-07
* code refine for rare case
* replace print format OFF_PRINTF_FORMAT to PRId64

View File

@ -109,6 +109,11 @@ sync_end_time=23:59
# default value is 500
write_mark_file_freq=500
# disk recovery thread count
# default value is 1
# since V6.04
disk_recovery_threads = 1
# store path (disk or mount point) count, default value is 1
store_path_count = 1
@ -262,6 +267,7 @@ compress_old_access_log = false
# compress the access log days before
# default value is 1
# since V6.04
compress_access_log_days_before = 1
# if rotate the error log every day
@ -282,6 +288,7 @@ compress_old_error_log = false
# compress the error log days before
# default value is 1
# since V6.04
compress_error_log_days_before = 1
# rotate access log when the log file exceeds this size
@ -309,7 +316,7 @@ file_sync_skip_invalid_record=false
# if use connection pool
# default value is false
# since V4.05
use_connection_pool = false
use_connection_pool = true
# connections whose the idle time exceeds this time will be closed
# unit: second

View File

@ -249,6 +249,7 @@ compress_old_error_log = false
# compress the error log days before
# default value is 1
# since V6.04
compress_error_log_days_before = 1
# rotate error log when the log file exceeds this size
@ -265,7 +266,7 @@ log_file_keep_days = 0
# if use connection pool
# default value is false
# since V4.05
use_connection_pool = false
use_connection_pool = true
# connections whose the idle time exceeds this time will be closed
# unit: second

View File

@ -49,8 +49,12 @@
#include "storage_dump.h"
#endif
#define ACCEPT_STAGE_NONE 0
#define ACCEPT_STAGE_DOING 1
#define ACCEPT_STAGE_DONE 2
static bool bTerminateFlag = false;
static bool bAcceptEndFlag = false;
static char accept_stage = ACCEPT_STAGE_NONE;
static void sigQuitHandler(int sig);
static void sigHupHandler(int sig);
@ -58,6 +62,7 @@ static void sigUsrHandler(int sig);
static void sigAlarmHandler(int sig);
static int setupSchedules(pthread_t *schedule_tid);
static int setupSignalHandlers();
#if defined(DEBUG_FLAG)
@ -83,7 +88,6 @@ int main(int argc, char *argv[])
int sock;
int wait_count;
pthread_t schedule_tid;
struct sigaction act;
char pidFilename[MAX_PATH_SIZE];
bool stop;
@ -148,6 +152,13 @@ int main(int argc, char *argv[])
return result;
}
if ((result=setupSignalHandlers()) != 0)
{
logCrit("exit abnormally!\n");
log_destroy();
return result;
}
memset(g_bind_addr, 0, sizeof(g_bind_addr));
if ((result=storage_func_init(conf_filename, \
g_bind_addr, sizeof(g_bind_addr))) != 0)
@ -207,84 +218,6 @@ int main(int argc, char *argv[])
return result;
}
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigUsrHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = sigHupHandler;
if(sigaction(SIGHUP, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = SIG_IGN;
if(sigaction(SIGPIPE, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 || \
sigaction(SIGTERM, &act, NULL) < 0 || \
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
logCrit("exit abnormally!\n");
return errno;
}
#if defined(DEBUG_FLAG)
/*
#if defined(OS_LINUX)
memset(&act, 0, sizeof(act));
act.sa_sigaction = sigSegvHandler;
act.sa_flags = SA_SIGINFO;
if (sigaction(SIGSEGV, &act, NULL) < 0 || \
sigaction(SIGABRT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
logCrit("exit abnormally!\n");
return errno;
}
#endif
*/
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigDumpHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
logCrit("exit abnormally!\n");
return errno;
}
#endif
#ifdef WITH_HTTPD
if (!g_http_params.disabled)
@ -333,10 +266,10 @@ int main(int argc, char *argv[])
log_set_cache(true);
bTerminateFlag = false;
bAcceptEndFlag = false;
accept_stage = ACCEPT_STAGE_DOING;
storage_accept_loop(sock);
bAcceptEndFlag = true;
accept_stage = ACCEPT_STAGE_DONE;
fdfs_binlog_sync_func(NULL); //binlog fsync
@ -412,7 +345,7 @@ static void sigAlarmHandler(int sig)
{
ConnectionInfo server;
if (bAcceptEndFlag)
if (accept_stage != ACCEPT_STAGE_DOING)
{
return;
}
@ -583,3 +516,82 @@ static int setupSchedules(pthread_t *schedule_tid)
return 0;
}
static int setupSignalHandlers()
{
struct sigaction act;
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigUsrHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EFAULT;
}
act.sa_handler = sigHupHandler;
if(sigaction(SIGHUP, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EFAULT;
}
act.sa_handler = SIG_IGN;
if(sigaction(SIGPIPE, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EFAULT;
}
act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 || \
sigaction(SIGTERM, &act, NULL) < 0 || \
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EFAULT;
}
#if defined(DEBUG_FLAG)
/*
#if defined(OS_LINUX)
memset(&act, 0, sizeof(act));
act.sa_sigaction = sigSegvHandler;
act.sa_flags = SA_SIGINFO;
if (sigaction(SIGSEGV, &act, NULL) < 0 || \
sigaction(SIGABRT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EFAULT;
}
#endif
*/
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigDumpHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EFAULT;
}
#endif
return 0;
}

File diff suppressed because it is too large Load Diff

View File

@ -18,8 +18,8 @@
extern "C" {
#endif
int storage_disk_recovery_start(const int store_path_index);
int storage_disk_recovery_restore(const char *pBasePath);
int storage_disk_recovery_prepare(const int store_path_index);
int storage_disk_recovery_check_restore(const char *pBasePath);
#ifdef __cplusplus
}

View File

@ -1138,21 +1138,23 @@ static int storage_check_and_make_data_dirs()
if (g_sync_old_done && pathCreated) //repair damaged disk
{
if ((result=storage_disk_recovery_start(i)) != 0)
if ((result=storage_disk_recovery_prepare(i)) != 0)
{
return result;
}
}
result = storage_disk_recovery_restore(g_fdfs_store_paths.paths[i].path);
result = storage_disk_recovery_check_restore(
g_fdfs_store_paths.paths[i].path);
if (result == EAGAIN) //need to re-fetch binlog
{
if ((result=storage_disk_recovery_start(i)) != 0)
if ((result=storage_disk_recovery_prepare(i)) != 0)
{
return result;
}
result=storage_disk_recovery_restore(g_fdfs_store_paths.paths[i].path);
result = storage_disk_recovery_check_restore(
g_fdfs_store_paths.paths[i].path);
}
if (result != 0)
@ -1712,6 +1714,18 @@ int storage_func_init(const char *filename, \
break;
}
g_disk_recovery_threads = iniGetIntValue(NULL,
"disk_recovery_threads", &iniContext, 1);
if (g_disk_recovery_threads <= 0)
{
logError("file: "__FILE__", line: %d, "
"item \"disk_recovery_threads\" is invalid, "
"value: %d <= 0!", __LINE__,
g_disk_recovery_threads);
result = EINVAL;
break;
}
/*
g_disk_rw_direct = iniGetBoolValue(NULL, \
"disk_rw_direct", &iniContext, false);
@ -2127,7 +2141,7 @@ int storage_func_init(const char *filename, \
"max_connections=%d, accept_threads=%d, " \
"work_threads=%d, " \
"disk_rw_separated=%d, disk_reader_threads=%d, " \
"disk_writer_threads=%d, " \
"disk_writer_threads=%d, disk_recovery_threads=%d, " \
"buff_size=%d KB, heart_beat_interval=%ds, " \
"stat_report_interval=%ds, tracker_server_count=%d, " \
"sync_wait_msec=%dms, sync_interval=%dms, " \
@ -2172,7 +2186,7 @@ int storage_func_init(const char *filename, \
g_client_bind_addr, g_max_connections, \
g_accept_threads, g_work_threads, g_disk_rw_separated, \
g_disk_reader_threads, g_disk_writer_threads, \
g_buff_size / 1024, \
g_disk_recovery_threads, g_buff_size / 1024, \
g_heart_beat_interval, g_stat_report_interval, \
g_tracker_group.server_count, g_sync_wait_usec / 1000, \
g_sync_interval / 1000, \

View File

@ -31,6 +31,7 @@ bool g_disk_rw_direct = false;
bool g_disk_rw_separated = true;
int g_disk_reader_threads = DEFAULT_DISK_READER_THREADS;
int g_disk_writer_threads = DEFAULT_DISK_WRITER_THREADS;
int g_disk_recovery_threads = 1;
int g_extra_open_file_flags = 0;
int g_file_distribute_path_mode = FDFS_FILE_DIST_PATH_ROUND_ROBIN;

View File

@ -78,6 +78,7 @@ extern bool g_disk_rw_direct; //if file read / write directly
extern bool g_disk_rw_separated; //if disk read / write separated
extern int g_disk_reader_threads; //disk reader thread count per store base path
extern int g_disk_writer_threads; //disk writer thread count per store base path
extern int g_disk_recovery_threads; //disk recovery thread count
extern int g_extra_open_file_flags; //extra open file flags
extern int g_file_distribute_path_mode;

View File

@ -4359,7 +4359,15 @@ static int storage_server_fetch_one_path_binlog_dealer(
{
break;
}
} while(1);
} while (g_continue_flag);
if (!g_continue_flag)
{
if (result == 0)
{
result = EINTR;
}
}
if (result != 0) //error occurs
{
@ -4386,6 +4394,7 @@ static int storage_server_fetch_one_path_binlog_dealer(
static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
StorageBinLogReader *pReader;
pClientInfo = (StorageClientInfo *)pTask->arg;
@ -4404,6 +4413,12 @@ static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask)
unlink(pReader->mark_filename);
}
pFileContext = &(pClientInfo->file_context);
logInfo("file: "__FILE__", line: %d, "
"client ip: %s, fetch binlog of store path #%d done",
__LINE__, pTask->client_ip, pFileContext->extra_info.
upload.trunk_info.path.store_path_index);
storage_reader_destroy(pReader);
free(pReader);
}
@ -4427,6 +4442,10 @@ static int storage_server_do_fetch_one_path_binlog(
return errno != 0 ? errno : ENOMEM;
}
logInfo("file: "__FILE__", line: %d, "
"client ip: %s, fetch binlog of store path #%d ...",
__LINE__, pTask->client_ip, store_path_index);
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);

View File

@ -2372,7 +2372,7 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
pReader->last_scan_rows = pReader->scan_row_count;
pReader->last_sync_rows = pReader->sync_row_count;
if ((result=storage_open_readable_binlog(pReader, \
if ((result=storage_open_readable_binlog(pReader,
get_binlog_readable_filename, pReader)) != 0)
{
return result;