compress and uncompress binlog file by gzip when need

pull/348/head
YuQing 2019-10-23 14:56:28 +08:00
parent 9d2db48f31
commit 77da832e05
12 changed files with 577 additions and 185 deletions

View File

@ -1,4 +1,8 @@
Version 6.01 2019-10-23
* compress and uncompress binlog file by gzip when need,
config items in storage.conf: compress_binlog and compress_binlog_time
Version 6.00 2019-10-16
* tracker and storage server support dual IPs
1. you can config dual tracker IPs in storage.conf and client.conf,

View File

@ -4,7 +4,7 @@ FastDFS may be copied only under the terms of the GNU General
Public License V3, which may be found in the FastDFS source kit.
Please visit the FastDFS Home Page for more detail.
English language: http://english.csource.org/
Chinese language: http://www.csource.org/
Chinese language: http://www.fastken.com/
FastDFS is an open source high performance distributed file system. It's major

View File

@ -23,7 +23,7 @@
int g_fdfs_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
int g_fdfs_network_timeout = DEFAULT_NETWORK_TIMEOUT;
char g_fdfs_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'};
Version g_fdfs_version = {5, 12};
Version g_fdfs_version = {6, 1};
bool g_use_connection_pool = false;
ConnectionPool g_connection_pool;
int g_connection_pool_max_idle_time = 3600;

View File

@ -283,6 +283,17 @@ use_connection_pool = false
# since V4.05
connection_pool_max_idle_time = 3600
# if compress the binlog files by gzip
# default value is false
# since V6.01
compress_binlog = false
# try to compress binlog time, time format: Hour:Minute
# Hour from 0 to 23, Minute from 0 to 59
# default value is 01:30
# since V6.01
compress_binlog_time=01:30
# use the ip address of this storage server if domain_name is empty,
# else this domain name will ocur in the url redirected by the tracker server
http.domain_name=

View File

@ -57,6 +57,8 @@ static void sigHupHandler(int sig);
static void sigUsrHandler(int sig);
static void sigAlarmHandler(int sig);
static int setupSchedules(pthread_t *schedule_tid);
#if defined(DEBUG_FLAG)
/*
@ -68,8 +70,6 @@ static void sigSegvHandler(int signum, siginfo_t *info, void *ptr);
static void sigDumpHandler(int sig);
#endif
#define SCHEDULE_ENTRIES_MAX_COUNT 9
static void usage(const char *program)
{
fprintf(stderr, "Usage: %s <config_file> [start | stop | restart]\n",
@ -84,8 +84,6 @@ int main(int argc, char *argv[])
int wait_count;
pthread_t schedule_tid;
struct sigaction act;
ScheduleEntry scheduleEntries[SCHEDULE_ENTRIES_MAX_COUNT];
ScheduleArray scheduleArray;
char pidFilename[MAX_PATH_SIZE];
bool stop;
@ -307,80 +305,7 @@ int main(int argc, char *argv[])
return result;
}
scheduleArray.entries = scheduleEntries;
scheduleArray.count = 0;
memset(scheduleEntries, 0, sizeof(scheduleEntries));
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_log_buff_interval, log_sync_func, &g_log_context);
scheduleArray.count++;
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_binlog_buff_interval, fdfs_binlog_sync_func, NULL);
scheduleArray.count++;
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_stat_file_interval, fdfs_stat_file_sync_func, NULL);
scheduleArray.count++;
if (g_if_use_trunk_file)
{
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
1, trunk_binlog_sync_func, NULL);
scheduleArray.count++;
}
if (g_use_access_log)
{
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_log_buff_interval, log_sync_func, &g_access_log_context);
scheduleArray.count++;
if (g_rotate_access_log)
{
INIT_SCHEDULE_ENTRY_EX(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, g_access_log_rotate_time,
24 * 3600, log_notify_rotate, &g_access_log_context);
scheduleArray.count++;
if (g_log_file_keep_days > 0)
{
log_set_keep_days(&g_access_log_context,
g_log_file_keep_days);
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, 1, 0, 0, 24 * 3600,
log_delete_old_files, &g_access_log_context);
scheduleArray.count++;
}
}
}
if (g_rotate_error_log)
{
INIT_SCHEDULE_ENTRY_EX(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, g_error_log_rotate_time,
24 * 3600, log_notify_rotate, &g_log_context);
scheduleArray.count++;
if (g_log_file_keep_days > 0)
{
log_set_keep_days(&g_log_context, g_log_file_keep_days);
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, 1, 0, 0, 24 * 3600,
log_delete_old_files, &g_log_context);
scheduleArray.count++;
}
}
if ((result=sched_start(&scheduleArray, &schedule_tid, \
g_thread_stack_size, (bool * volatile)&g_continue_flag)) != 0)
if ((result=setupSchedules(&schedule_tid)) != 0)
{
logCrit("exit abnormally!\n");
log_destroy();
@ -556,3 +481,100 @@ static void sigDumpHandler(int sig)
}
#endif
static int setupSchedules(pthread_t *schedule_tid)
{
#define SCHEDULE_ENTRIES_MAX_COUNT 10
ScheduleEntry scheduleEntries[SCHEDULE_ENTRIES_MAX_COUNT];
ScheduleArray scheduleArray;
int result;
scheduleArray.entries = scheduleEntries;
scheduleArray.count = 0;
memset(scheduleEntries, 0, sizeof(scheduleEntries));
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_log_buff_interval, log_sync_func, &g_log_context);
scheduleArray.count++;
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_binlog_buff_interval, fdfs_binlog_sync_func, NULL);
scheduleArray.count++;
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_stat_file_interval, fdfs_stat_file_sync_func, NULL);
scheduleArray.count++;
if (g_if_use_trunk_file)
{
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
1, trunk_binlog_sync_func, NULL);
scheduleArray.count++;
}
if (g_use_access_log)
{
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, TIME_NONE, TIME_NONE, TIME_NONE,
g_sync_log_buff_interval, log_sync_func, &g_access_log_context);
scheduleArray.count++;
if (g_rotate_access_log)
{
INIT_SCHEDULE_ENTRY_EX(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, g_access_log_rotate_time,
24 * 3600, log_notify_rotate, &g_access_log_context);
scheduleArray.count++;
if (g_log_file_keep_days > 0)
{
log_set_keep_days(&g_access_log_context,
g_log_file_keep_days);
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, 1, 0, 0, 24 * 3600,
log_delete_old_files, &g_access_log_context);
scheduleArray.count++;
}
}
}
if (g_rotate_error_log)
{
INIT_SCHEDULE_ENTRY_EX(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, g_error_log_rotate_time,
24 * 3600, log_notify_rotate, &g_log_context);
scheduleArray.count++;
if (g_log_file_keep_days > 0)
{
log_set_keep_days(&g_log_context, g_log_file_keep_days);
INIT_SCHEDULE_ENTRY(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, 1, 0, 0, 24 * 3600,
log_delete_old_files, &g_log_context);
scheduleArray.count++;
}
}
if (g_compress_binlog)
{
INIT_SCHEDULE_ENTRY_EX(scheduleEntries[scheduleArray.count],
scheduleArray.count + 1, g_compress_binlog_time,
60, fdfs_binlog_compress_func, NULL);
scheduleArray.count++;
}
if ((result=sched_start(&scheduleArray, schedule_tid,
g_thread_stack_size, (bool * volatile)&g_continue_flag)) != 0)
{
return result;
}
return 0;
}

View File

@ -1785,6 +1785,14 @@ int storage_func_init(const char *filename, \
g_file_sync_skip_invalid_record = iniGetBoolValue(NULL, \
"file_sync_skip_invalid_record", &iniContext, false);
g_compress_binlog = iniGetBoolValue(NULL,
"compress_binlog", &iniContext, false);
if ((result=get_time_item_from_conf(&iniContext,
"compress_binlog_time", &g_compress_binlog_time, 1, 30)) != 0)
{
break;
}
if ((result=fdfs_connection_pool_init(filename, &iniContext)) != 0)
{
break;
@ -1853,7 +1861,9 @@ int storage_func_init(const char *filename, \
"log_file_keep_days=%d, " \
"file_sync_skip_invalid_record=%d, " \
"use_connection_pool=%d, " \
"g_connection_pool_max_idle_time=%ds", \
"g_connection_pool_max_idle_time=%ds, " \
"compress_binlog=%d, " \
"compress_binlog_time=%02d:%02d", \
g_fdfs_version.major, g_fdfs_version.minor, \
g_fdfs_base_path, g_fdfs_store_paths.count, \
g_subdir_count_per_path, \
@ -1888,7 +1898,9 @@ int storage_func_init(const char *filename, \
g_access_log_context.rotate_size, \
g_log_context.rotate_size, g_log_file_keep_days, \
g_file_sync_skip_invalid_record, \
g_use_connection_pool, g_connection_pool_max_idle_time);
g_use_connection_pool, g_connection_pool_max_idle_time, \
g_compress_binlog, g_compress_binlog_time.hour, \
g_compress_binlog_time.minute);
#ifdef WITH_HTTPD
if (!g_http_params.disabled)

View File

@ -107,6 +107,9 @@ bool g_storage_ip_changed_auto_adjust = false;
bool g_thread_kill_done = false;
bool g_file_sync_skip_invalid_record = false;
bool g_compress_binlog = false;
TimeInfo g_compress_binlog_time = {0, 0};
int g_thread_stack_size = 512 * 1024;
int g_upload_priority = DEFAULT_UPLOAD_PRIORITY;
time_t g_up_time = 0;

View File

@ -162,6 +162,9 @@ extern bool g_thread_kill_done;
extern bool g_file_sync_skip_invalid_record;
extern bool g_compress_binlog;
extern TimeInfo g_compress_binlog_time; //compress binlog time base
extern int g_thread_stack_size;
extern int g_upload_priority;
extern time_t g_up_time;

View File

@ -4254,6 +4254,7 @@ static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask)
pClientInfo->extra_arg = NULL;
storage_reader_remove_from_list(pReader);
storage_reader_destroy(pReader);
get_mark_filename_by_reader(pReader, full_filename);
if (fileExists(full_filename))
@ -4292,6 +4293,7 @@ static int storage_server_do_fetch_one_path_binlog( \
free(pReader);
return result;
}
storage_reader_add_to_list(pReader);
pClientInfo->deal_func = storage_server_fetch_one_path_binlog_dealer;
pClientInfo->clean_func = fetch_one_path_binlog_finish_clean_up;

View File

@ -40,7 +40,8 @@
#define SYNC_BINLOG_FILE_MAX_SIZE 1024 * 1024 * 1024
#define SYNC_BINLOG_FILE_PREFIX "binlog"
#define SYNC_BINLOG_INDEX_FILENAME SYNC_BINLOG_FILE_PREFIX".index"
#define SYNC_BINLOG_INDEX_FILENAME_OLD SYNC_BINLOG_FILE_PREFIX".index"
#define SYNC_BINLOG_INDEX_FILENAME SYNC_BINLOG_FILE_PREFIX"_index.dat"
#define SYNC_MARK_FILE_EXT ".mark"
#define SYNC_BINLOG_FILE_EXT_FMT ".%03d"
#define SYNC_DIR_NAME "sync"
@ -53,9 +54,13 @@
#define MARK_ITEM_SYNC_ROW_COUNT "sync_row_count"
#define SYNC_BINLOG_WRITE_BUFF_SIZE (16 * 1024)
#define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write"
#define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress"
int g_binlog_fd = -1;
int g_binlog_index = 0;
static int64_t binlog_file_size = 0;
static int binlog_compress_index = 0;
int g_storage_sync_thread_count = 0;
static pthread_mutex_t sync_thread_lock;
@ -66,6 +71,8 @@ static int binlog_write_version = 1;
/* save sync thread ids */
static pthread_t *sync_tids = NULL;
static struct fc_list_head reader_head;
static int storage_write_to_mark_file(StorageBinLogReader *pReader);
static int storage_binlog_reader_skip(StorageBinLogReader *pReader);
static int storage_binlog_fsync(const bool bNeedLock);
@ -1078,30 +1085,33 @@ static int storage_sync_data(StorageBinLogReader *pReader, \
static int write_to_binlog_index(const int binlog_index)
{
char full_filename[MAX_PATH_SIZE];
char buff[16];
char buff[256];
int fd;
int len;
snprintf(full_filename, sizeof(full_filename), \
"%s/data/"SYNC_DIR_NAME"/%s", g_fdfs_base_path, \
snprintf(full_filename, sizeof(full_filename),
"%s/data/"SYNC_DIR_NAME"/%s", g_fdfs_base_path,
SYNC_BINLOG_INDEX_FILENAME);
if ((fd=open(full_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0)
{
logError("file: "__FILE__", line: %d, " \
"open file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
logError("file: "__FILE__", line: %d, "
"open file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
len = sprintf(buff, "%d", binlog_index);
len = sprintf(buff, "%s=%d\n"
"%s=%d\n",
BINLOG_INDEX_ITEM_CURRENT_WRITE, binlog_index,
BINLOG_INDEX_ITEM_CURRENT_COMPRESS, binlog_compress_index);
if (fc_safe_write(fd, buff, len) != len)
{
logError("file: "__FILE__", line: %d, " \
"write to file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
logError("file: "__FILE__", line: %d, "
"write to file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
close(fd);
return errno != 0 ? errno : EIO;
@ -1114,6 +1124,92 @@ static int write_to_binlog_index(const int binlog_index)
return 0;
}
static int get_binlog_index_from_file_old()
{
char full_filename[MAX_PATH_SIZE];
char file_buff[64];
int fd;
int bytes;
snprintf(full_filename, sizeof(full_filename),
"%s/data/"SYNC_DIR_NAME"/%s", g_fdfs_base_path,
SYNC_BINLOG_INDEX_FILENAME_OLD);
if ((fd=open(full_filename, O_RDONLY)) >= 0)
{
bytes = fc_safe_read(fd, file_buff, sizeof(file_buff) - 1);
close(fd);
if (bytes <= 0)
{
logError("file: "__FILE__", line: %d, " \
"read file \"%s\" fail, bytes read: %d", \
__LINE__, full_filename, bytes);
return errno != 0 ? errno : EIO;
}
file_buff[bytes] = '\0';
g_binlog_index = atoi(file_buff);
if (g_binlog_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"in file \"%s\", binlog_index: %d < 0", \
__LINE__, full_filename, g_binlog_index);
return EINVAL;
}
}
else
{
g_binlog_index = 0;
}
return 0;
}
static int get_binlog_index_from_file()
{
char full_filename[MAX_PATH_SIZE];
IniContext iniContext;
int result;
snprintf(full_filename, sizeof(full_filename),
"%s/data/"SYNC_DIR_NAME"/%s", g_fdfs_base_path,
SYNC_BINLOG_INDEX_FILENAME);
if (access(full_filename, F_OK) != 0)
{
if (errno == ENOENT)
{
if ((result=get_binlog_index_from_file_old()) == 0)
{
if ((result=write_to_binlog_index(g_binlog_index)) != 0)
{
return result;
}
}
return result;
}
}
memset(&iniContext, 0, sizeof(IniContext));
if ((result=iniLoadFromFile(full_filename, &iniContext)) != 0)
{
logError("file: "__FILE__", line: %d, "
"load from file \"%s\" fail, "
"error code: %d",
__LINE__, full_filename, result);
return result;
}
g_binlog_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_WRITE,
&iniContext, 0);
binlog_compress_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_COMPRESS,
&iniContext, 0);
iniFreeContext(&iniContext);
return 0;
}
static char *get_writable_binlog_filename(char *full_filename)
{
static char buff[MAX_PATH_SIZE];
@ -1189,10 +1285,7 @@ int storage_sync_init()
char data_path[MAX_PATH_SIZE];
char sync_path[MAX_PATH_SIZE];
char full_filename[MAX_PATH_SIZE];
char file_buff[64];
int bytes;
int result;
int fd;
snprintf(data_path, sizeof(data_path), "%s/data", g_fdfs_base_path);
if (!fileExists(data_path))
@ -1238,38 +1331,10 @@ int storage_sync_init()
return errno != 0 ? errno : ENOMEM;
}
snprintf(full_filename, sizeof(full_filename), \
"%s/%s", sync_path, SYNC_BINLOG_INDEX_FILENAME);
if ((fd=open(full_filename, O_RDONLY)) >= 0)
{
bytes = fc_safe_read(fd, file_buff, sizeof(file_buff) - 1);
close(fd);
if (bytes <= 0)
{
logError("file: "__FILE__", line: %d, " \
"read file \"%s\" fail, bytes read: %d", \
__LINE__, full_filename, bytes);
return errno != 0 ? errno : EIO;
}
file_buff[bytes] = '\0';
g_binlog_index = atoi(file_buff);
if (g_binlog_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"in file \"%s\", binlog_index: %d < 0", \
__LINE__, full_filename, g_binlog_index);
return EINVAL;
}
}
else
{
g_binlog_index = 0;
if ((result=write_to_binlog_index(g_binlog_index)) != 0)
if ((result=get_binlog_index_from_file()) != 0)
{
return result;
}
}
get_writable_binlog_filename(full_filename);
g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644);
@ -1309,6 +1374,8 @@ int storage_sync_init()
load_local_host_ip_addrs();
FC_INIT_LIST_HEAD(&reader_head);
return 0;
}
@ -1514,25 +1581,181 @@ int storage_binlog_write_ex(const int timestamp, const char op_type, \
return write_ret;
}
static char *get_binlog_readable_filename(const void *pArg, \
char *full_filename)
static char *get_binlog_readable_filename_ex(
const int binlog_index, char *full_filename)
{
const StorageBinLogReader *pReader;
static char buff[MAX_PATH_SIZE];
pReader = (const StorageBinLogReader *)pArg;
if (full_filename == NULL)
{
full_filename = buff;
}
snprintf(full_filename, MAX_PATH_SIZE, \
"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \
SYNC_BINLOG_FILE_EXT_FMT, \
g_fdfs_base_path, pReader->binlog_index);
snprintf(full_filename, MAX_PATH_SIZE,
"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX""
SYNC_BINLOG_FILE_EXT_FMT,
g_fdfs_base_path, binlog_index);
return full_filename;
}
static inline char *get_binlog_readable_filename(const void *pArg,
char *full_filename)
{
return get_binlog_readable_filename_ex(
((const StorageBinLogReader *)pArg)->binlog_index,
full_filename);
}
static int uncompress_binlog_file(StorageBinLogReader *pReader,
const char *filename)
{
char gzip_filename[MAX_PATH_SIZE];
char flag_filename[MAX_PATH_SIZE];
char command[MAX_PATH_SIZE];
char output[256];
struct stat flag_stat;
int result;
snprintf(gzip_filename, sizeof(gzip_filename),
"%s.gz", filename);
if (access(gzip_filename, F_OK) != 0)
{
return errno != 0 ? errno : ENOENT;
}
snprintf(flag_filename, sizeof(flag_filename),
"%s.flag", filename);
if (stat(flag_filename, &flag_stat) == 0)
{
if (g_current_time - flag_stat.st_mtime > 3600)
{
logInfo("file: "__FILE__", line: %d, "
"flag file %s expired, continue to uncompress",
__LINE__, flag_filename);
}
else
{
logWarning("file: "__FILE__", line: %d, "
"uncompress %s already in progress",
__LINE__, gzip_filename);
return EINPROGRESS;
}
}
if ((result=writeToFile(flag_filename, "unzip", 5)) != 0)
{
return result;
}
logInfo("file: "__FILE__", line: %d, "
"try to uncompress binlog %s",
__LINE__, gzip_filename);
snprintf(command, sizeof(command), "gzip -d %s 2>&1", gzip_filename);
result = getExecResult(command, output, sizeof(output));
unlink(flag_filename);
if (result != 0)
{
logError("file: "__FILE__", line: %d, "
"exec command \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, command, result, STRERROR(result));
return result;
}
if (*output != '\0')
{
logWarning("file: "__FILE__", line: %d, "
"exec command \"%s\", output: %s",
__LINE__, command, output);
}
if (access(filename, F_OK) == 0)
{
if (pReader->binlog_index < binlog_compress_index)
{
binlog_compress_index = pReader->binlog_index;
write_to_binlog_index(g_binlog_index);
}
}
logInfo("file: "__FILE__", line: %d, "
"uncompress binlog %s done",
__LINE__, gzip_filename);
return 0;
}
static int compress_binlog_file(const char *filename)
{
char gzip_filename[MAX_PATH_SIZE];
char flag_filename[MAX_PATH_SIZE];
char command[MAX_PATH_SIZE];
char output[256];
struct stat flag_stat;
int result;
snprintf(gzip_filename, sizeof(gzip_filename),
"%s.gz", filename);
if (access(gzip_filename, F_OK) == 0)
{
return 0;
}
if (access(filename, F_OK) != 0)
{
return errno != 0 ? errno : ENOENT;
}
snprintf(flag_filename, sizeof(flag_filename),
"%s.flag", filename);
if (stat(flag_filename, &flag_stat) == 0)
{
if (g_current_time - flag_stat.st_mtime > 3600)
{
logInfo("file: "__FILE__", line: %d, "
"flag file %s expired, continue to compress",
__LINE__, flag_filename);
}
else
{
logWarning("file: "__FILE__", line: %d, "
"compress %s already in progress",
__LINE__, filename);
return EINPROGRESS;
}
}
if ((result=writeToFile(flag_filename, "zip", 3)) != 0)
{
return result;
}
logInfo("file: "__FILE__", line: %d, "
"try to compress binlog %s",
__LINE__, filename);
snprintf(command, sizeof(command), "gzip %s 2>&1", filename);
result = getExecResult(command, output, sizeof(output));
unlink(flag_filename);
if (result != 0)
{
logError("file: "__FILE__", line: %d, "
"exec command \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, command, result, STRERROR(result));
return result;
}
if (*output != '\0')
{
logWarning("file: "__FILE__", line: %d, "
"exec command \"%s\", output: %s",
__LINE__, command, output);
}
logInfo("file: "__FILE__", line: %d, "
"compress binlog %s done",
__LINE__, filename);
return 0;
}
int storage_open_readable_binlog(StorageBinLogReader *pReader, \
get_filename_func filename_func, const void *pArg)
{
@ -1544,6 +1767,14 @@ int storage_open_readable_binlog(StorageBinLogReader *pReader, \
}
filename_func(pArg, full_filename);
if (access(full_filename, F_OK) != 0)
{
if (errno == ENOENT)
{
uncompress_binlog_file(pReader, full_filename);
}
}
pReader->binlog_fd = open(full_filename, O_RDONLY);
if (pReader->binlog_fd < 0)
{
@ -1833,7 +2064,14 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
bool bFileExist;
bool bNeedSyncOld;
memset(pReader, 0, sizeof(StorageBinLogReader));
pReader->binlog_index = 0;
pReader->binlog_offset = 0;
pReader->need_sync_old = 0;
pReader->sync_old_done = 0;
pReader->until_timestamp = 0;
pReader->scan_row_count = 0;
pReader->sync_row_count = 0;
pReader->last_file_exist = 0;
pReader->mark_fd = -1;
pReader->binlog_fd = -1;
@ -2546,7 +2784,7 @@ static void storage_sync_thread_exit(ConnectionInfo *pStorage)
static void* storage_sync_thread_entrance(void* arg)
{
FDFSStorageBrief *pStorage;
StorageBinLogReader reader;
StorageBinLogReader *pReader;
StorageBinLogRecord record;
ConnectionInfo storage_server;
char local_ip_addr[IP_ADDRESS_SIZE];
@ -2559,22 +2797,35 @@ static void* storage_sync_thread_entrance(void* arg)
time_t end_time;
time_t last_keep_alive_time;
pStorage = (FDFSStorageBrief *)arg;
strcpy(storage_server.ip_addr, pStorage->ip_addr);
storage_server.port = g_server_port;
storage_server.sock = -1;
memset(local_ip_addr, 0, sizeof(local_ip_addr));
memset(&reader, 0, sizeof(reader));
reader.mark_fd = -1;
reader.binlog_fd = -1;
pReader = (StorageBinLogReader *)malloc(sizeof(StorageBinLogReader));
if (pReader == NULL)
{
logCrit("file: "__FILE__", line: %d, "
"malloc %d bytes fail, "
"fail, program exit!",
__LINE__, (int)sizeof(StorageBinLogReader));
g_continue_flag = false;
storage_sync_thread_exit(&storage_server);
return NULL;
}
memset(pReader, 0, sizeof(StorageBinLogReader));
pReader->mark_fd = -1;
pReader->binlog_fd = -1;
storage_reader_add_to_list(pReader);
current_time = g_current_time;
last_keep_alive_time = 0;
start_time = 0;
end_time = 0;
pStorage = (FDFSStorageBrief *)arg;
strcpy(storage_server.ip_addr, pStorage->ip_addr);
storage_server.port = g_server_port;
storage_server.sock = -1;
logDebug("file: "__FILE__", line: %d, " \
"sync thread to storage server %s:%d started", \
__LINE__, storage_server.ip_addr, storage_server.port);
@ -2635,7 +2886,7 @@ static void* storage_sync_thread_entrance(void* arg)
continue;
}
if ((result=storage_reader_init(pStorage, &reader)) != 0)
if ((result=storage_reader_init(pStorage, pReader)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_reader_init fail, errno=%d, " \
@ -2645,7 +2896,7 @@ static void* storage_sync_thread_entrance(void* arg)
break;
}
if (!reader.need_sync_old)
if (!pReader->need_sync_old)
{
while (g_continue_flag && \
(pStorage->status != FDFS_STORAGE_STATUS_ACTIVE && \
@ -2659,7 +2910,7 @@ static void* storage_sync_thread_entrance(void* arg)
if (pStorage->status != FDFS_STORAGE_STATUS_ACTIVE)
{
close(storage_server.sock);
storage_reader_destroy(&reader);
storage_reader_destroy(pReader);
continue;
}
}
@ -2689,7 +2940,7 @@ static void* storage_sync_thread_entrance(void* arg)
if (storage_report_my_server_id(&storage_server) != 0)
{
close(storage_server.sock);
storage_reader_destroy(&reader);
storage_reader_destroy(pReader);
sleep(1);
continue;
}
@ -2703,7 +2954,7 @@ static void* storage_sync_thread_entrance(void* arg)
if (pStorage->status == FDFS_STORAGE_STATUS_SYNCING)
{
if (reader.need_sync_old && reader.sync_old_done)
if (pReader->need_sync_old && pReader->sync_old_done)
{
pStorage->status = FDFS_STORAGE_STATUS_OFFLINE;
storage_report_storage_status(pStorage->id, \
@ -2727,15 +2978,15 @@ static void* storage_sync_thread_entrance(void* arg)
(pStorage->status == FDFS_STORAGE_STATUS_ACTIVE || \
pStorage->status == FDFS_STORAGE_STATUS_SYNCING))
{
read_result = storage_binlog_read(&reader, \
read_result = storage_binlog_read(pReader, \
&record, &record_len);
if (read_result == ENOENT)
{
if (reader.need_sync_old && \
!reader.sync_old_done)
if (pReader->need_sync_old && \
!pReader->sync_old_done)
{
reader.sync_old_done = true;
if (storage_write_to_mark_file(&reader) != 0)
pReader->sync_old_done = true;
if (storage_write_to_mark_file(pReader) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file " \
@ -2758,9 +3009,9 @@ static void* storage_sync_thread_entrance(void* arg)
}
if (reader.last_scan_rows!=reader.scan_row_count)
if (pReader->last_scan_rows!=pReader->scan_row_count)
{
if (storage_write_to_mark_file(&reader)!=0)
if (storage_write_to_mark_file(pReader)!=0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file fail, " \
@ -2799,8 +3050,8 @@ static void* storage_sync_thread_entrance(void* arg)
logWarning("file: "__FILE__", line: %d, " \
"skip invalid record, binlog index: " \
"%d, offset: %"PRId64, \
__LINE__, reader.binlog_index, \
reader.binlog_offset);
__LINE__, pReader->binlog_index, \
pReader->binlog_offset);
}
else
{
@ -2808,17 +3059,17 @@ static void* storage_sync_thread_entrance(void* arg)
break;
}
}
else if ((sync_result=storage_sync_data(&reader, \
else if ((sync_result=storage_sync_data(pReader, \
&storage_server, &record)) != 0)
{
logDebug("file: "__FILE__", line: %d, " \
"binlog index: %d, current record " \
"offset: %"PRId64", next " \
"record offset: %"PRId64, \
__LINE__, reader.binlog_index, \
reader.binlog_offset, \
reader.binlog_offset + record_len);
if (rewind_to_prev_rec_end(&reader) != 0)
__LINE__, pReader->binlog_index, \
pReader->binlog_offset, \
pReader->binlog_offset + record_len);
if (rewind_to_prev_rec_end(pReader) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"rewind_to_prev_rec_end fail, "\
@ -2829,8 +3080,8 @@ static void* storage_sync_thread_entrance(void* arg)
break;
}
reader.binlog_offset += record_len;
reader.scan_row_count++;
pReader->binlog_offset += record_len;
pReader->scan_row_count++;
if (g_sync_interval > 0)
{
@ -2838,9 +3089,9 @@ static void* storage_sync_thread_entrance(void* arg)
}
}
if (reader.last_scan_rows != reader.scan_row_count)
if (pReader->last_scan_rows != pReader->scan_row_count)
{
if (storage_write_to_mark_file(&reader) != 0)
if (storage_write_to_mark_file(pReader) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file fail, " \
@ -2852,7 +3103,7 @@ static void* storage_sync_thread_entrance(void* arg)
close(storage_server.sock);
storage_server.sock = -1;
storage_reader_destroy(&reader);
storage_reader_destroy(pReader);
if (!g_continue_flag)
{
@ -2869,7 +3120,8 @@ static void* storage_sync_thread_entrance(void* arg)
{
close(storage_server.sock);
}
storage_reader_destroy(&reader);
storage_reader_remove_from_list(pReader);
storage_reader_destroy(pReader);
if (pStorage->status == FDFS_STORAGE_STATUS_DELETED
|| pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED)
@ -2880,7 +3132,6 @@ static void* storage_sync_thread_entrance(void* arg)
}
storage_sync_thread_exit(&storage_server);
return NULL;
}
@ -2970,3 +3221,76 @@ int storage_sync_thread_start(const FDFSStorageBrief *pStorage)
return 0;
}
void storage_reader_add_to_list(StorageBinLogReader *pReader)
{
pthread_mutex_lock(&sync_thread_lock);
fc_list_add_tail(&pReader->link, &reader_head);
pthread_mutex_unlock(&sync_thread_lock);
}
void storage_reader_remove_from_list(StorageBinLogReader *pReader)
{
pthread_mutex_lock(&sync_thread_lock);
fc_list_del_init(&pReader->link);
pthread_mutex_unlock(&sync_thread_lock);
}
static int calc_compress_until_binlog_index()
{
StorageBinLogReader *pReader;
int min_index;
pthread_mutex_lock(&sync_thread_lock);
logInfo("g_storage_sync_thread_count: %d, reader count: %d", g_storage_sync_thread_count, fc_list_count(&reader_head));
min_index = g_binlog_index;
fc_list_for_each_entry(pReader, &reader_head, link)
{
if (pReader->binlog_fd >= 0)
{
logInfo("storage_id: %s, binlog_fd: %d, binlog_index: %d, binlog_offset: %"PRId64,
pReader->storage_id, pReader->binlog_fd,
pReader->binlog_index, pReader->binlog_offset);
}
if (pReader->binlog_fd >= 0 && pReader->binlog_index >= 0 &&
pReader->binlog_index < min_index)
{
min_index = pReader->binlog_index;
}
}
pthread_mutex_unlock(&sync_thread_lock);
return min_index;
}
int fdfs_binlog_compress_func(void *args)
{
char full_filename[MAX_PATH_SIZE];
int until_index;
int bindex;
int result;
if (binlog_compress_index >= g_binlog_index)
{
return 0;
}
until_index = calc_compress_until_binlog_index();
for (bindex = binlog_compress_index; bindex < until_index;
bindex++)
{
get_binlog_readable_filename_ex(bindex, full_filename);
result = compress_binlog_file(full_filename);
if (!(result == 0 || result == ENOENT))
{
break;
}
binlog_compress_index = bindex + 1;
write_to_binlog_index(g_binlog_index);
}
return 0;
}

View File

@ -11,6 +11,7 @@
#ifndef _STORAGE_SYNC_H_
#define _STORAGE_SYNC_H_
#include "fastcommon/fc_list.h"
#include "storage_func.h"
#define STORAGE_OP_TYPE_SOURCE_CREATE_FILE 'C' //upload file
@ -37,6 +38,7 @@ extern "C" {
typedef struct
{
struct fc_list_head link;
char storage_id[FDFS_STORAGE_ID_MAX_SIZE];
bool need_sync_old;
bool sync_old_done;
@ -99,9 +101,15 @@ int storage_open_readable_binlog(StorageBinLogReader *pReader, \
int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader);
void storage_reader_destroy(StorageBinLogReader *pReader);
int storage_report_storage_status(const char *storage_id, \
int storage_report_storage_status(const char *storage_id,
const char *ip_addr, const char status);
int fdfs_binlog_compress_func(void *args);
void storage_reader_add_to_list(StorageBinLogReader *pReader);
void storage_reader_remove_from_list(StorageBinLogReader *pReader);
#ifdef __cplusplus
}
#endif

View File

@ -416,11 +416,11 @@ static int relationship_select_leader()
{
if (trackerStatus.if_leader)
{
g_tracker_servers.leader_index = \
trackerStatus.pTrackerServer - \
g_tracker_servers.leader_index =
trackerStatus.pTrackerServer -
g_tracker_servers.servers;
if (g_tracker_servers.leader_index < 0 || \
g_tracker_servers.leader_index >= \
if (g_tracker_servers.leader_index < 0 ||
g_tracker_servers.leader_index >=
g_tracker_servers.server_count)
{
logError("file: "__FILE__", line: %d, "
@ -429,7 +429,10 @@ static int relationship_select_leader()
g_tracker_servers.leader_index = -1;
return EINVAL;
}
}
if (g_tracker_servers.leader_index >= 0)
{
logInfo("file: "__FILE__", line: %d, "
"the tracker leader %s:%d", __LINE__,
conn->ip_addr, conn->port);