storage server write to data_init_flag and mark file safely

v6.03_dev
YuQing 2019-11-16 10:53:19 +08:00
parent 9dc6742b1e
commit cb24cd82e1
8 changed files with 118 additions and 176 deletions

View File

@ -5,6 +5,8 @@ Version 6.03 2019-11-16
to that of tracker leader when the storage server found
it's status inconsistence
* bugfix: fdfs_monitor fix get index of the specified tracker server
* storage server write to data_init_flag and mark file safely
(write to temp file then rename)
NOTE: the tracker and storage server must upgrade together

View File

@ -107,7 +107,7 @@ int main(int argc, char *argv[])
return result;
}
if ((result=storage_check_and_make_data_path()) != 0)
if ((result=storage_check_and_make_global_data_path()) != 0)
{
log_destroy();
return result;

View File

@ -51,7 +51,7 @@ typedef struct {
static int saved_storage_status = FDFS_STORAGE_STATUS_NONE;
static char *recovery_get_binlog_filename(const void *pArg, \
static char *recovery_get_binlog_filename(const void *pArg,
char *full_filename);
static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \
@ -289,35 +289,32 @@ static int recovery_get_src_storage_server(ConnectionInfo *pSrcStorage)
return 0;
}
static char *recovery_get_full_filename(const void *pArg, \
static char *recovery_get_full_filename(const char *pBasePath,
const char *filename, char *full_filename)
{
const char *pBasePath;
static char buff[MAX_PATH_SIZE];
pBasePath = (const char *)pArg;
if (full_filename == NULL)
{
full_filename = buff;
}
snprintf(full_filename, MAX_PATH_SIZE, \
snprintf(full_filename, MAX_PATH_SIZE,
"%s/data/%s", pBasePath, filename);
return full_filename;
}
static char *recovery_get_binlog_filename(const void *pArg, \
static char *recovery_get_binlog_filename(const void *pArg,
char *full_filename)
{
return recovery_get_full_filename(pArg, \
return recovery_get_full_filename((const char *)pArg,
RECOVERY_BINLOG_FILENAME, full_filename);
}
static char *recovery_get_mark_filename(const void *pArg, \
static char *recovery_get_mark_filename(const char *pBasePath,
char *full_filename)
{
return recovery_get_full_filename(pArg, \
return recovery_get_full_filename(pBasePath,
RECOVERY_MARK_FILENAME, full_filename);
}
@ -362,16 +359,15 @@ static int recovery_write_to_mark_file(const char *pBasePath, \
char buff[128];
int len;
len = sprintf(buff, \
"%s=%d\n" \
"%s=%"PRId64"\n" \
"%s=1\n", \
MARK_ITEM_SAVED_STORAGE_STATUS, saved_storage_status, \
MARK_ITEM_BINLOG_OFFSET, pReader->binlog_offset, \
len = sprintf(buff,
"%s=%d\n"
"%s=%"PRId64"\n"
"%s=1\n",
MARK_ITEM_SAVED_STORAGE_STATUS, saved_storage_status,
MARK_ITEM_BINLOG_OFFSET, pReader->binlog_offset,
MARK_ITEM_FETCH_BINLOG_DONE);
return storage_write_to_fd(pReader->mark_fd, \
recovery_get_mark_filename, pBasePath, buff, len);
return safeWriteToFile(pReader->mark_filename, buff, len);
}
static int recovery_init_binlog_file(const char *pBasePath)
@ -406,12 +402,10 @@ static int recovery_init_mark_file(const char *pBasePath, \
static int recovery_reader_init(const char *pBasePath, \
StorageBinLogReader *pReader)
{
char full_mark_filename[MAX_PATH_SIZE];
IniContext iniContext;
int result;
memset(pReader, 0, sizeof(StorageBinLogReader));
pReader->mark_fd = -1;
pReader->binlog_fd = -1;
pReader->binlog_index = g_binlog_index + 1;
@ -428,14 +422,15 @@ static int recovery_reader_init(const char *pBasePath, \
}
pReader->binlog_buff.current = pReader->binlog_buff.buffer;
recovery_get_mark_filename(pBasePath, full_mark_filename);
recovery_get_mark_filename(pBasePath, pReader->mark_filename);
memset(&iniContext, 0, sizeof(IniContext));
if ((result=iniLoadFromFile(full_mark_filename, &iniContext)) != 0)
if ((result=iniLoadFromFile(pReader->mark_filename,
&iniContext)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"load from mark file \"%s\" fail, " \
"error code: %d", __LINE__, \
full_mark_filename, result);
pReader->mark_filename, result);
return result;
}
@ -447,7 +442,7 @@ static int recovery_reader_init(const char *pBasePath, \
logInfo("file: "__FILE__", line: %d, " \
"mark file \"%s\", %s=0, " \
"need to fetch binlog again", __LINE__, \
full_mark_filename, MARK_ITEM_FETCH_BINLOG_DONE);
pReader->mark_filename, MARK_ITEM_FETCH_BINLOG_DONE);
return EAGAIN;
}
@ -459,7 +454,7 @@ static int recovery_reader_init(const char *pBasePath, \
logError("file: "__FILE__", line: %d, " \
"in mark file \"%s\", %s: %d < 0", __LINE__, \
full_mark_filename, MARK_ITEM_SAVED_STORAGE_STATUS, \
pReader->mark_filename, MARK_ITEM_SAVED_STORAGE_STATUS, \
saved_storage_status);
return EINVAL;
}
@ -473,24 +468,13 @@ static int recovery_reader_init(const char *pBasePath, \
logError("file: "__FILE__", line: %d, " \
"in mark file \"%s\", %s: "\
"%"PRId64" < 0", __LINE__, \
full_mark_filename, MARK_ITEM_BINLOG_OFFSET, \
pReader->mark_filename, MARK_ITEM_BINLOG_OFFSET, \
pReader->binlog_offset);
return EINVAL;
}
iniFreeContext(&iniContext);
pReader->mark_fd = open(full_mark_filename, O_WRONLY | O_CREAT, 0644);
if (pReader->mark_fd < 0)
{
logError("file: "__FILE__", line: %d, " \
"open mark file \"%s\" fail, " \
"error no: %d, error info: %s", \
__LINE__, full_mark_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
if ((result=storage_open_readable_binlog(pReader, \
recovery_get_binlog_filename, pBasePath)) != 0)
{

View File

@ -628,63 +628,47 @@ int storage_write_to_stat_file()
int storage_write_to_sync_ini_file()
{
char full_filename[MAX_PATH_SIZE];
char buff[512];
char buff[4 * 1024];
char ip_str[256];
int fd;
int len;
int result;
snprintf(full_filename, sizeof(full_filename), \
snprintf(full_filename, sizeof(full_filename),
"%s/data/%s", g_fdfs_base_path, DATA_DIR_INITED_FILENAME);
if ((fd=open(full_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0)
{
logError("file: "__FILE__", line: %d, " \
"open file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
fdfs_multi_ips_to_string(&g_tracker_client_ip,
ip_str, sizeof(ip_str));
len = sprintf(buff, "%s=%d\n" \
"%s=%d\n" \
"%s=%s\n" \
"%s=%d\n" \
"%s=%s\n" \
"%s=%d\n" \
"%s=%d\n" \
"%s=%d\n" \
"%s=%d\n", \
INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time, \
INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done, \
INIT_ITEM_SYNC_SRC_SERVER, g_sync_src_id, \
INIT_ITEM_SYNC_UNTIL_TIMESTAMP, g_sync_until_timestamp, \
INIT_ITEM_LAST_IP_ADDRESS, ip_str, \
INIT_ITEM_LAST_SERVER_PORT, g_last_server_port, \
len = sprintf(buff, "%s=%d\n"
"%s=%d\n"
"%s=%s\n"
"%s=%d\n"
"%s=%s\n"
"%s=%d\n"
"%s=%d\n"
"%s=%d\n"
"%s=%d\n",
INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time,
INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done,
INIT_ITEM_SYNC_SRC_SERVER, g_sync_src_id,
INIT_ITEM_SYNC_UNTIL_TIMESTAMP, g_sync_until_timestamp,
INIT_ITEM_LAST_IP_ADDRESS, ip_str,
INIT_ITEM_LAST_SERVER_PORT, g_last_server_port,
INIT_ITEM_LAST_HTTP_PORT, g_last_http_port,
INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id, \
INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id,
INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time
);
if (fc_safe_write(fd, buff, len) != len)
{
logError("file: "__FILE__", line: %d, " \
"write to file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
close(fd);
return errno != 0 ? errno : EIO;
}
close(fd);
if ((result=safeWriteToFile(full_filename, buff, len)) != 0)
{
return result;
}
STORAGE_CHOWN(full_filename, geteuid(), getegid())
return 0;
}
int storage_check_and_make_data_path()
int storage_check_and_make_global_data_path()
{
char data_path[MAX_PATH_SIZE];
snprintf(data_path, sizeof(data_path), "%s/data",
@ -843,7 +827,7 @@ static int storage_check_and_make_data_dirs()
}
else
{
if ((result=storage_check_and_make_data_path()) != 0)
if ((result=storage_check_and_make_global_data_path()) != 0)
{
return result;
}

View File

@ -37,7 +37,7 @@ bool storage_id_is_myself(const char *storage_id);
int storage_set_tracker_client_ips(ConnectionInfo *conn,
const int tracker_index);
int storage_check_and_make_data_path();
int storage_check_and_make_global_data_path();
int storage_logic_to_local_full_filename(const char *logic_filename,
const int logic_filename_len, int *store_path_index,

View File

@ -4386,7 +4386,6 @@ static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageBinLogReader *pReader;
char full_filename[MAX_PATH_SIZE];
pClientInfo = (StorageClientInfo *)pTask->arg;
pReader = (StorageBinLogReader *)pClientInfo->extra_arg;
@ -4398,13 +4397,13 @@ static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask)
pClientInfo->extra_arg = NULL;
storage_reader_remove_from_list(pReader);
storage_reader_destroy(pReader);
get_mark_filename_by_reader(pReader, full_filename);
if (fileExists(full_filename))
get_mark_filename_by_reader(pReader);
if (fileExists(pReader->mark_filename))
{
unlink(full_filename);
unlink(pReader->mark_filename);
}
storage_reader_destroy(pReader);
free(pReader);
}

View File

@ -1944,53 +1944,45 @@ int storage_open_readable_binlog(StorageBinLogReader *pReader, \
return 0;
}
static char *get_mark_filename_by_id_and_port(const char *storage_id, \
static char *get_mark_filename_by_id_and_port(const char *storage_id,
const int port, char *full_filename, const int filename_size)
{
if (g_use_storage_id)
{
snprintf(full_filename, filename_size, \
"%s/data/"SYNC_DIR_NAME"/%s%s", g_fdfs_base_path, \
snprintf(full_filename, filename_size,
"%s/data/"SYNC_DIR_NAME"/%s%s", g_fdfs_base_path,
storage_id, SYNC_MARK_FILE_EXT);
}
else
{
snprintf(full_filename, filename_size, \
"%s/data/"SYNC_DIR_NAME"/%s_%d%s", g_fdfs_base_path, \
snprintf(full_filename, filename_size,
"%s/data/"SYNC_DIR_NAME"/%s_%d%s", g_fdfs_base_path,
storage_id, port, SYNC_MARK_FILE_EXT);
}
return full_filename;
}
static char *get_mark_filename_by_ip_and_port(const char *ip_addr, \
static char *get_mark_filename_by_ip_and_port(const char *ip_addr,
const int port, char *full_filename, const int filename_size)
{
snprintf(full_filename, filename_size, \
"%s/data/"SYNC_DIR_NAME"/%s_%d%s", g_fdfs_base_path, \
snprintf(full_filename, filename_size,
"%s/data/"SYNC_DIR_NAME"/%s_%d%s", g_fdfs_base_path,
ip_addr, port, SYNC_MARK_FILE_EXT);
return full_filename;
}
char *get_mark_filename_by_reader(const void *pArg, char *full_filename)
char *get_mark_filename_by_reader(StorageBinLogReader *pReader)
{
const StorageBinLogReader *pReader;
static char buff[MAX_PATH_SIZE];
pReader = (const StorageBinLogReader *)pArg;
if (full_filename == NULL)
{
full_filename = buff;
return get_mark_filename_by_id_and_port(pReader->storage_id,
g_server_port, pReader->mark_filename,
sizeof(pReader->mark_filename));
}
return get_mark_filename_by_id_and_port(pReader->storage_id, \
g_server_port, full_filename, MAX_PATH_SIZE);
}
static char *get_mark_filename_by_id(const char *storage_id, \
static char *get_mark_filename_by_id(const char *storage_id,
char *full_filename, const int filename_size)
{
return get_mark_filename_by_id_and_port(storage_id, g_server_port, \
full_filename, filename_size);
return get_mark_filename_by_id_and_port(storage_id,
g_server_port, full_filename, filename_size);
}
int storage_report_storage_status(const char *storage_id, \
@ -2199,7 +2191,6 @@ static int storage_reader_sync_init_req(StorageBinLogReader *pReader)
int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader)
{
char full_filename[MAX_PATH_SIZE];
IniContext iniContext;
int result;
bool bFileExist;
@ -2213,7 +2204,6 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
pReader->scan_row_count = 0;
pReader->sync_row_count = 0;
pReader->last_file_exist = 0;
pReader->mark_fd = -1;
pReader->binlog_fd = -1;
pReader->binlog_buff.buffer = (char *)malloc( \
@ -2237,7 +2227,7 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
{
strcpy(pReader->storage_id, pStorage->id);
}
get_mark_filename_by_reader(pReader, full_filename);
get_mark_filename_by_reader(pReader);
if (pStorage == NULL)
{
@ -2249,22 +2239,23 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
}
else
{
bFileExist = fileExists(full_filename);
bFileExist = fileExists(pReader->mark_filename);
if (!bFileExist && (g_use_storage_id && pStorage != NULL))
{
char old_mark_filename[MAX_PATH_SIZE];
get_mark_filename_by_ip_and_port(pStorage->ip_addr, \
g_server_port, old_mark_filename, \
get_mark_filename_by_ip_and_port(pStorage->ip_addr,
g_server_port, old_mark_filename,
sizeof(old_mark_filename));
if (fileExists(old_mark_filename))
{
if (rename(old_mark_filename, full_filename)!=0)
if (rename(old_mark_filename,
pReader->mark_filename) !=0 )
{
logError("file: "__FILE__", line: %d, "\
"rename file %s to %s fail" \
", errno: %d, error info: %s", \
__LINE__, old_mark_filename, \
full_filename, errno, \
logError("file: "__FILE__", line: %d, "
"rename file %s to %s fail"
", errno: %d, error info: %s",
__LINE__, old_mark_filename,
pReader->mark_filename, errno,
STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
@ -2284,29 +2275,30 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
if (bFileExist)
{
memset(&iniContext, 0, sizeof(IniContext));
if ((result=iniLoadFromFile(full_filename, &iniContext)) \
!= 0)
if ((result=iniLoadFromFile(pReader->mark_filename,
&iniContext)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"load from mark file \"%s\" fail, " \
"error code: %d", \
__LINE__, full_filename, result);
logError("file: "__FILE__", line: %d, "
"load from mark file \"%s\" fail, "
"error code: %d", __LINE__,
pReader->mark_filename, result);
return result;
}
if (iniContext.global.count < 7)
{
iniFreeContext(&iniContext);
logError("file: "__FILE__", line: %d, " \
"in mark file \"%s\", item count: %d < 7", \
__LINE__, full_filename, iniContext.global.count);
logError("file: "__FILE__", line: %d, "
"in mark file \"%s\", item count: %d < 7",
__LINE__, pReader->mark_filename,
iniContext.global.count);
return ENOENT;
}
bNeedSyncOld = iniGetBoolValue(NULL, \
MARK_ITEM_NEED_SYNC_OLD, \
bNeedSyncOld = iniGetBoolValue(NULL,
MARK_ITEM_NEED_SYNC_OLD,
&iniContext, false);
if (pStorage != NULL && pStorage->status == \
if (pStorage != NULL && pStorage->status ==
FDFS_STORAGE_STATUS_SYNCING)
{
if ((result=storage_reader_sync_init_req(pReader)) != 0)
@ -2356,17 +2348,17 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
logError("file: "__FILE__", line: %d, " \
"in mark file \"%s\", " \
"binlog_index: %d < 0", \
__LINE__, full_filename, \
__LINE__, pReader->mark_filename, \
pReader->binlog_index);
return EINVAL;
}
if (pReader->binlog_offset < 0)
{
iniFreeContext(&iniContext);
logError("file: "__FILE__", line: %d, " \
"in mark file \"%s\", binlog_offset: "\
"%"PRId64" < 0", \
__LINE__, full_filename, \
logError("file: "__FILE__", line: %d, "
"in mark file \"%s\", binlog_offset: "
"%"PRId64" < 0", __LINE__,
pReader->mark_filename,
pReader->binlog_offset);
return EINVAL;
}
@ -2378,18 +2370,6 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
pReader->last_scan_rows = pReader->scan_row_count;
pReader->last_sync_rows = pReader->sync_row_count;
pReader->mark_fd = open(full_filename, O_WRONLY | O_CREAT, 0644);
if (pReader->mark_fd < 0)
{
logError("file: "__FILE__", line: %d, " \
"open mark file \"%s\" fail, " \
"error no: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
STORAGE_FCHOWN(pReader->mark_fd, full_filename, geteuid(), getegid())
if ((result=storage_open_readable_binlog(pReader, \
get_binlog_readable_filename, pReader)) != 0)
{
@ -2423,12 +2403,6 @@ int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader
void storage_reader_destroy(StorageBinLogReader *pReader)
{
if (pReader->mark_fd >= 0)
{
close(pReader->mark_fd);
pReader->mark_fd = -1;
}
if (pReader->binlog_fd >= 0)
{
close(pReader->binlog_fd);
@ -2450,25 +2424,25 @@ static int storage_write_to_mark_file(StorageBinLogReader *pReader)
int len;
int result;
len = sprintf(buff, \
"%s=%d\n" \
"%s=%"PRId64"\n" \
"%s=%d\n" \
"%s=%d\n" \
"%s=%d\n" \
"%s=%"PRId64"\n" \
"%s=%"PRId64"\n", \
MARK_ITEM_BINLOG_FILE_INDEX, pReader->binlog_index, \
MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset, \
MARK_ITEM_NEED_SYNC_OLD, pReader->need_sync_old, \
MARK_ITEM_SYNC_OLD_DONE, pReader->sync_old_done, \
MARK_ITEM_UNTIL_TIMESTAMP, (int)pReader->until_timestamp, \
MARK_ITEM_SCAN_ROW_COUNT, pReader->scan_row_count, \
len = sprintf(buff,
"%s=%d\n"
"%s=%"PRId64"\n"
"%s=%d\n"
"%s=%d\n"
"%s=%d\n"
"%s=%"PRId64"\n"
"%s=%"PRId64"\n",
MARK_ITEM_BINLOG_FILE_INDEX, pReader->binlog_index,
MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset,
MARK_ITEM_NEED_SYNC_OLD, pReader->need_sync_old,
MARK_ITEM_SYNC_OLD_DONE, pReader->sync_old_done,
MARK_ITEM_UNTIL_TIMESTAMP, (int)pReader->until_timestamp,
MARK_ITEM_SCAN_ROW_COUNT, pReader->scan_row_count,
MARK_ITEM_SYNC_ROW_COUNT, pReader->sync_row_count);
if ((result=storage_write_to_fd(pReader->mark_fd, \
get_mark_filename_by_reader, pReader, buff, len)) == 0)
if ((result=safeWriteToFile(pReader->mark_filename, buff, len)) == 0)
{
STORAGE_CHOWN(pReader->mark_filename, geteuid(), getegid())
pReader->last_scan_rows = pReader->scan_row_count;
pReader->last_sync_rows = pReader->sync_row_count;
}
@ -2959,7 +2933,6 @@ static void* storage_sync_thread_entrance(void* arg)
}
memset(pReader, 0, sizeof(StorageBinLogReader));
pReader->mark_fd = -1;
pReader->binlog_fd = -1;
storage_reader_add_to_list(pReader);

View File

@ -42,12 +42,12 @@ typedef struct
{
struct fc_list_head link;
char storage_id[FDFS_STORAGE_ID_MAX_SIZE];
char mark_filename[MAX_PATH_SIZE];
bool need_sync_old;
bool sync_old_done;
bool last_file_exist; //if the last file exist on the dest server
BinLogBuffer binlog_buff;
time_t until_timestamp;
int mark_fd;
int binlog_index;
int binlog_fd;
int64_t binlog_offset;
@ -92,7 +92,7 @@ int storage_sync_thread_start(const FDFSStorageBrief *pStorage);
int kill_storage_sync_threads();
int fdfs_binlog_sync_func(void *args);
char *get_mark_filename_by_reader(const void *pArg, char *full_filename);
char *get_mark_filename_by_reader(StorageBinLogReader *pReader);
int storage_unlink_mark_file(const char *storage_id);
int storage_rename_mark_file(const char *old_ip_addr, const int old_port, \
const char *new_ip_addr, const int new_port);