sync regenerated appender file

pull/348/head
YuQing 2019-11-12 19:07:49 +08:00
parent 6fb8fe206b
commit 9bc762bffb
5 changed files with 353 additions and 86 deletions

View File

@ -1,5 +1,5 @@
Version 6.02 2019-11-11
Version 6.02 2019-11-12
* get_file_info calculate CRC32 for appender file type
* disk recovery download file to local temp file then rename it
when the local file exists

View File

@ -2180,6 +2180,29 @@ int storage_set_tracker_client_ips(ConnectionInfo *conn,
return 0;
}
int storage_logic_to_local_full_filename(const char *logic_filename,
const int logic_filename_len, int *store_path_index,
char *full_filename, const int filename_size)
{
int result;
int filename_len;
char true_filename[128];
filename_len = logic_filename_len;
if ((result=storage_split_filename_ex(logic_filename,
&filename_len, true_filename, store_path_index)) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(full_filename, filename_size, "%s/data/%s",
g_fdfs_store_paths.paths[*store_path_index], true_filename);
return 0;
}
/*
int write_serialized(int fd, const char *buff, size_t count, const bool bSync)

View File

@ -39,6 +39,10 @@ int storage_set_tracker_client_ips(ConnectionInfo *conn,
int storage_check_and_make_data_path();
int storage_logic_to_local_full_filename(const char *logic_filename,
const int logic_filename_len, int *store_path_index,
char *full_filename, const int filename_size);
#define STORAGE_CHOWN(path, current_uid, current_gid) \
if (!(g_run_by_gid == current_gid && g_run_by_uid == current_uid)) \
{ \

View File

@ -489,32 +489,30 @@ static void storage_sync_truncate_file_done_callback( \
storage_nio_notify(pTask);
}
static int storage_sync_copy_file_rename_filename( \
static int storage_sync_copy_file_rename_filename(
StorageFileContext *pFileContext)
{
char full_filename[MAX_PATH_SIZE + 256];
char true_filename[128];
int filename_len;
int result;
int store_path_index;
filename_len = strlen(pFileContext->fname2log);
if ((result=storage_split_filename_ex(pFileContext->fname2log, \
&filename_len, true_filename, &store_path_index)) != 0)
if ((result=storage_logic_to_local_full_filename(
pFileContext->fname2log, filename_len,
&store_path_index, full_filename,
sizeof(full_filename))) != 0)
{
return result;
}
snprintf(full_filename, sizeof(full_filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \
true_filename);
if (rename(pFileContext->filename, full_filename) != 0)
{
result = errno != 0 ? errno : EPERM;
logWarning("file: "__FILE__", line: %d, " \
"rename %s to %s fail, " \
"errno: %d, error info: %s", __LINE__, \
pFileContext->filename, full_filename, \
logWarning("file: "__FILE__", line: %d, "
"rename %s to %s fail, "
"errno: %d, error info: %s", __LINE__,
pFileContext->filename, full_filename,
result, STRERROR(result));
return result;
}
@ -4212,7 +4210,9 @@ static int storage_server_fetch_one_path_binlog_dealer( \
if (!(record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE
|| record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE
|| record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK
|| record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK))
|| record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK
|| record.op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE
|| record.op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE))
{
continue;
}
@ -4230,8 +4230,8 @@ static int storage_server_fetch_one_path_binlog_dealer( \
}
else
{
snprintf(full_filename, sizeof(full_filename), "%s/data/%s", \
g_fdfs_store_paths.paths[record.store_path_index], \
snprintf(full_filename, sizeof(full_filename), "%s/data/%s",
g_fdfs_store_paths.paths[record.store_path_index],
record.true_filename);
if (lstat(full_filename, &stat_buf) != 0)
{
@ -4241,10 +4241,10 @@ static int storage_server_fetch_one_path_binlog_dealer( \
}
else
{
logError("file: "__FILE__", line: %d, " \
"call stat fail, file: %s, "\
"error no: %d, error info: %s", \
__LINE__, full_filename, \
logError("file: "__FILE__", line: %d, "
"call stat fail, file: %s, "
"error no: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
result = errno != 0 ? errno : EPERM;
break;
@ -4291,11 +4291,20 @@ static int storage_server_fetch_one_path_binlog_dealer( \
}
}
if (record.op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE)
{
record.op_type = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
}
else if (record.op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE)
{
record.op_type = STORAGE_OP_TYPE_REPLICA_CREATE_FILE;
}
if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE
|| record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE)
{
pOutBuff += sprintf(pOutBuff, "%d %c %s\n", \
(int)record.timestamp, \
pOutBuff += sprintf(pOutBuff, "%d %c %s\n",
(int)record.timestamp,
record.op_type, record.filename);
}
else
@ -4344,7 +4353,7 @@ static int storage_server_fetch_one_path_binlog_dealer( \
src_filename + base_path_len + 6);
}
if (pTask->size - (pOutBuff - pTask->data) < \
if (pTask->size - (pOutBuff - pTask->data) <
STORAGE_BINLOG_LINE_SIZE + FDFS_PROTO_PKG_LEN_SIZE)
{
break;
@ -4873,7 +4882,7 @@ static void calc_crc32_done_callback_for_regenerate(
path.store_path_index, filename);
sprintf(binlog_msg, "%s %s",
pFileContext->fname2log, return_filename);
return_filename, pFileContext->fname2log);
result = storage_binlog_write(
pFileContext->timestamp2log,
STORAGE_OP_TYPE_SOURCE_RENAME_FILE,
@ -5223,7 +5232,6 @@ static int storage_do_truncate_file(struct fast_task_info *pTask)
StorageFileContext *pFileContext;
char *p;
char appender_filename[128];
char true_filename[128];
char decode_buff[64];
struct stat stat_buf;
int appender_filename_len;
@ -5285,18 +5293,14 @@ static int storage_do_truncate_file(struct fast_task_info *pTask)
STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, \
appender_filename_len, pClientInfo);
if ((result=storage_split_filename_ex(appender_filename, \
&filename_len, true_filename, &store_path_index)) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
if ((result=storage_logic_to_local_full_filename(
appender_filename, filename_len,
&store_path_index, pFileContext->filename,
sizeof(pFileContext->filename))) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename);
if (lstat(pFileContext->filename, &stat_buf) == 0)
{
if (!S_ISREG(stat_buf.st_mode))
@ -5376,9 +5380,6 @@ static int storage_do_truncate_file(struct fast_task_info *pTask)
pFileContext->calc_crc32 = false;
pFileContext->calc_file_hash = false;
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename);
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE;
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
pFileContext->extra_info.upload.file_type = _FILE_TYPE_APPENDER;
@ -5968,7 +5969,6 @@ static int storage_sync_append_file(struct fast_task_info *pTask)
TaskDealFunc deal_func;
char *p;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char true_filename[128];
char filename[128];
bool need_write_file;
int filename_len;
@ -6064,19 +6064,13 @@ static int storage_sync_append_file(struct fast_task_info *pTask)
*(filename + filename_len) = '\0';
p += filename_len;
if ((result=storage_split_filename_ex(filename, \
&filename_len, true_filename, &store_path_index)) != 0)
if ((result=storage_logic_to_local_full_filename(
filename, filename_len,
&store_path_index, pFileContext->filename,
sizeof(pFileContext->filename))) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \
true_filename);
if (lstat(pFileContext->filename, &stat_buf) != 0)
{
@ -6196,7 +6190,6 @@ static int storage_sync_modify_file(struct fast_task_info *pTask)
TaskDealFunc deal_func;
char *p;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char true_filename[128];
char filename[128];
bool need_write_file;
int filename_len;
@ -6293,19 +6286,13 @@ static int storage_sync_modify_file(struct fast_task_info *pTask)
*(filename + filename_len) = '\0';
p += filename_len;
if ((result=storage_split_filename_ex(filename, \
&filename_len, true_filename, &store_path_index)) != 0)
if ((result=storage_logic_to_local_full_filename(
filename, filename_len,
&store_path_index, pFileContext->filename,
sizeof(pFileContext->filename))) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \
true_filename);
if (lstat(pFileContext->filename, &stat_buf) != 0)
{
@ -6396,7 +6383,6 @@ static int storage_sync_truncate_file(struct fast_task_info *pTask)
StorageFileContext *pFileContext;
char *p;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char true_filename[128];
char filename[128];
int filename_len;
int64_t nInPackLen;
@ -6494,19 +6480,13 @@ static int storage_sync_truncate_file(struct fast_task_info *pTask)
*(filename + filename_len) = '\0';
p += filename_len;
if ((result=storage_split_filename_ex(filename, \
&filename_len, true_filename, &store_path_index)) != 0)
if ((result=storage_logic_to_local_full_filename(
filename, filename_len,
&store_path_index, pFileContext->filename,
sizeof(pFileContext->filename))) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \
true_filename);
if (lstat(pFileContext->filename, &stat_buf) != 0)
{
@ -6905,6 +6885,135 @@ static int storage_sync_link_file(struct fast_task_info *pTask)
return STORAGE_STATUE_DEAL_FILE;
}
static int storage_sync_rename_file(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
char *p;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char dest_filename[128];
char dest_full_filename[MAX_PATH_SIZE];
char src_full_filename[MAX_PATH_SIZE];
char src_filename[128];
int64_t nInPackLen;
int dest_filename_len;
int src_filename_len;
int result;
int dest_store_path_index;
int src_store_path_index;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
if (nInPackLen <= 2 * FDFS_PROTO_PKG_LEN_SIZE +
4 + FDFS_GROUP_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, package size "
"%"PRId64" is not correct, "
"expect length > %d", __LINE__,
pTask->client_ip, nInPackLen,
2 * FDFS_PROTO_PKG_LEN_SIZE +
4 + FDFS_GROUP_NAME_MAX_LEN);
return EINVAL;
}
p = pTask->data + sizeof(TrackerHeader);
dest_filename_len = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
src_filename_len = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
if (dest_filename_len < 0 || dest_filename_len >= sizeof(dest_filename))
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, in request pkg, "
"filename length: %d is invalid, "
"which < 0 or >= %d", __LINE__, pTask->client_ip,
dest_filename_len, (int)sizeof(dest_filename));
return EINVAL;
}
pFileContext->timestamp2log = buff2int(p);
p += 4;
memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN);
*(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
p += FDFS_GROUP_NAME_MAX_LEN;
if (strcmp(group_name, g_group_name) != 0)
{
logError("file: "__FILE__", line: %d, "
"client ip:%s, group_name: %s "
"not correct, should be: %s",
__LINE__, pTask->client_ip,
group_name, g_group_name);
return EINVAL;
}
if (nInPackLen != 2 * FDFS_PROTO_PKG_LEN_SIZE + 4 +
FDFS_GROUP_NAME_MAX_LEN + dest_filename_len + src_filename_len)
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, in request pkg, "
"pgk length: %"PRId64" != bytes: %d",
__LINE__, pTask->client_ip,
nInPackLen, 2 * FDFS_PROTO_PKG_LEN_SIZE +
FDFS_GROUP_NAME_MAX_LEN + dest_filename_len +
src_filename_len);
return EINVAL;
}
memcpy(dest_filename, p, dest_filename_len);
*(dest_filename + dest_filename_len) = '\0';
p += dest_filename_len;
memcpy(src_filename, p, src_filename_len);
*(src_filename + src_filename_len) = '\0';
if ((result=storage_logic_to_local_full_filename(
dest_filename, dest_filename_len,
&dest_store_path_index, dest_full_filename,
sizeof(dest_full_filename))) != 0)
{
return result;
}
if (access(dest_full_filename, F_OK) == 0)
{
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, dest file: %s "
"already exist", __LINE__,
pTask->client_ip, dest_full_filename);
return EEXIST;
}
if ((result=storage_logic_to_local_full_filename(
src_filename, src_filename_len,
&src_store_path_index, src_full_filename,
sizeof(src_full_filename))) != 0)
{
return result;
}
if (rename(src_full_filename, dest_full_filename) != 0)
{
result = errno != 0 ? errno : EPERM;
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, rename %s to %s fail, "
"errno: %d, error info: %s", __LINE__,
pTask->client_ip, src_full_filename,
dest_full_filename, result, STRERROR(result));
return result;
}
return storage_binlog_write_ex(pFileContext->timestamp2log,
STORAGE_OP_TYPE_REPLICA_RENAME_FILE,
dest_filename, src_filename);
}
/**
pkg format:
Header
@ -8297,6 +8406,9 @@ int storage_deal_task(struct fast_task_info *pTask)
case STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE:
result = storage_sync_truncate_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_RENAME_FILE:
result = storage_sync_rename_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_CREATE_LINK:
result = storage_sync_link_file(pTask);
break;

View File

@ -105,8 +105,8 @@ static int storage_sync_copy_file(ConnectionInfo *pStorageServer, \
int result;
bool need_sync_file;
if ((result=trunk_file_stat(pRecord->store_path_index, \
pRecord->true_filename, pRecord->true_filename_len, \
if ((result=trunk_file_stat(pRecord->store_path_index,
pRecord->true_filename, pRecord->true_filename_len,
&stat_buf, &trunkInfo, &trunkHeader)) != 0)
{
if (result == ENOENT)
@ -138,7 +138,7 @@ static int storage_sync_copy_file(ConnectionInfo *pStorageServer, \
{
FDFSFileInfo file_info;
result = storage_query_file_info_ex(NULL, \
pStorageServer, g_group_name, \
pStorageServer, g_group_name, \
pRecord->filename, &file_info, true);
if (result == 0)
{
@ -312,11 +312,11 @@ static int storage_sync_modify_file(ConnectionInfo *pStorageServer, \
StorageBinLogReader *pReader, StorageBinLogRecord *pRecord, \
const char cmd)
{
#define FIELD_COUNT 3
#define SYNC_MODIFY_FIELD_COUNT 3
TrackerHeader *pHeader;
char *p;
char *pBuff;
char *fields[FIELD_COUNT];
char *fields[SYNC_MODIFY_FIELD_COUNT];
char full_filename[MAX_PATH_SIZE];
char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];
char in_buff[1];
@ -328,8 +328,8 @@ static int storage_sync_modify_file(ConnectionInfo *pStorageServer, \
int result;
int count;
if ((count=splitEx(pRecord->filename, ' ', fields, FIELD_COUNT)) \
!= FIELD_COUNT)
if ((count=splitEx(pRecord->filename, ' ', fields, SYNC_MODIFY_FIELD_COUNT))
!= SYNC_MODIFY_FIELD_COUNT)
{
logError("file: "__FILE__", line: %d, " \
"the format of binlog not correct, filename: %s", \
@ -483,11 +483,11 @@ filename bytes : filename
static int storage_sync_truncate_file(ConnectionInfo *pStorageServer, \
StorageBinLogReader *pReader, StorageBinLogRecord *pRecord)
{
#define FIELD_COUNT 3
#define SYNC_TRUNCATE_FIELD_COUNT 3
TrackerHeader *pHeader;
char *p;
char *pBuff;
char *fields[FIELD_COUNT];
char *fields[SYNC_TRUNCATE_FIELD_COUNT];
char full_filename[MAX_PATH_SIZE];
char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];
char in_buff[1];
@ -498,8 +498,8 @@ static int storage_sync_truncate_file(ConnectionInfo *pStorageServer, \
int result;
int count;
if ((count=splitEx(pRecord->filename, ' ', fields, FIELD_COUNT)) \
!= FIELD_COUNT)
if ((count=splitEx(pRecord->filename, ' ', fields,
SYNC_TRUNCATE_FIELD_COUNT)) != SYNC_TRUNCATE_FIELD_COUNT)
{
logError("file: "__FILE__", line: %d, " \
"the format of binlog not correct, filename: %s", \
@ -968,6 +968,123 @@ static int storage_sync_link_file(ConnectionInfo *pStorageServer, \
return result;
}
/**
8 bytes: dest filename length
8 bytes: source filename length
4 bytes: source op timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
dest filename length: dest filename
source filename length: source filename
**/
static int storage_sync_rename_file(ConnectionInfo *pStorageServer,
StorageBinLogReader *pReader, StorageBinLogRecord *pRecord)
{
TrackerHeader *pHeader;
int result;
char out_buff[sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE +
4 + FDFS_GROUP_NAME_MAX_LEN + 256];
char in_buff[1];
int out_body_len;
int64_t in_bytes;
char *pBuff;
char full_filename[MAX_PATH_SIZE];
struct stat stat_buf;
if (pRecord->op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE)
{
return storage_sync_copy_file(pStorageServer,
pReader, pRecord,
STORAGE_PROTO_CMD_SYNC_CREATE_FILE);
}
snprintf(full_filename, sizeof(full_filename), "%s/data/%s",
g_fdfs_store_paths.paths[pRecord->store_path_index],
pRecord->true_filename);
if (lstat(full_filename, &stat_buf) != 0)
{
if (errno == ENOENT)
{
logWarning("file: "__FILE__", line: %d, "
"sync file rename, file: %s not exists, "
"maybe deleted later?", __LINE__, full_filename);
return 0;
}
else
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, "
"call stat fail, file: %s, "
"error no: %d, error info: %s",
__LINE__, full_filename,
result, STRERROR(result));
return result;
}
}
pHeader = (TrackerHeader *)out_buff;
memset(out_buff, 0, sizeof(out_buff));
long2buff(pRecord->filename_len, out_buff + sizeof(TrackerHeader));
long2buff(pRecord->src_filename_len, out_buff + sizeof(TrackerHeader) +
FDFS_PROTO_PKG_LEN_SIZE);
int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader) +
2 * FDFS_PROTO_PKG_LEN_SIZE);
sprintf(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE
+ 4, "%s", g_group_name);
memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE
+ 4 + FDFS_GROUP_NAME_MAX_LEN,
pRecord->filename, pRecord->filename_len);
memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE
+ 4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len,
pRecord->src_filename, pRecord->src_filename_len);
out_body_len = 2 * FDFS_PROTO_PKG_LEN_SIZE + 4 +
FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len +
pRecord->src_filename_len;
long2buff(out_body_len, pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_SYNC_RENAME_FILE;
if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff,
sizeof(TrackerHeader) + out_body_len,
g_fdfs_network_timeout)) != 0)
{
logError("FILE: "__FILE__", line: %d, "
"send data to storage server %s:%d fail, "
"errno: %d, error info: %s",
__LINE__, pStorageServer->ip_addr,
pStorageServer->port, result, STRERROR(result));
return result;
}
pBuff = in_buff;
result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes);
if (result != 0)
{
if (result == ENOENT)
{
return storage_sync_copy_file(pStorageServer,
pReader, pRecord,
STORAGE_PROTO_CMD_SYNC_CREATE_FILE);
}
else if (result == EEXIST)
{
logDebug("file: "__FILE__", line: %d, "
"storage server ip: %s:%d, data file: %s "
"already exists", __LINE__, pStorageServer->ip_addr,
pStorageServer->port, pRecord->filename);
return 0;
}
else
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
}
}
return result;
}
#define STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) \
if ((!pReader->need_sync_old) || pReader->sync_old_done || \
(pRecord->timestamp > pReader->until_timestamp)) \
@ -1022,6 +1139,10 @@ static int storage_sync_data(StorageBinLogReader *pReader, \
result = storage_sync_truncate_file(pStorageServer, \
pReader, pRecord);
break;
case STORAGE_OP_TYPE_SOURCE_RENAME_FILE:
result = storage_sync_rename_file(pStorageServer,
pReader, pRecord);
break;
case STORAGE_OP_TYPE_SOURCE_CREATE_LINK:
result = storage_sync_link_file(pStorageServer, \
pRecord);
@ -1048,6 +1169,11 @@ static int storage_sync_data(StorageBinLogReader *pReader, \
result = storage_sync_link_file(pStorageServer, \
pRecord);
break;
case STORAGE_OP_TYPE_REPLICA_RENAME_FILE:
STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)
result = storage_sync_rename_file(pStorageServer,
pReader, pRecord);
break;
case STORAGE_OP_TYPE_REPLICA_APPEND_FILE:
return 0;
case STORAGE_OP_TYPE_REPLICA_MODIFY_FILE:
@ -2560,8 +2686,10 @@ int storage_binlog_read(StorageBinLogReader *pReader, \
memcpy(pRecord->filename, cols[2], pRecord->filename_len);
*(pRecord->filename + pRecord->filename_len) = '\0';
if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK || \
pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK)
if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK ||
pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK ||
pRecord->op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE ||
pRecord->op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE)
{
char *p;
@ -2573,14 +2701,14 @@ int storage_binlog_read(StorageBinLogReader *pReader, \
}
else
{
pRecord->src_filename_len = pRecord->filename_len - \
pRecord->src_filename_len = pRecord->filename_len -
(p - pRecord->filename) - 1;
pRecord->filename_len = p - pRecord->filename;
*p = '\0';
memcpy(pRecord->src_filename, p + 1, \
memcpy(pRecord->src_filename, p + 1,
pRecord->src_filename_len);
*(pRecord->src_filename + \
*(pRecord->src_filename +
pRecord->src_filename_len) = '\0';
}
}
@ -2591,8 +2719,8 @@ int storage_binlog_read(StorageBinLogReader *pReader, \
}
pRecord->true_filename_len = pRecord->filename_len;
if ((result=storage_split_filename_ex(pRecord->filename, \
&pRecord->true_filename_len, pRecord->true_filename, \
if ((result=storage_split_filename_ex(pRecord->filename,
&pRecord->true_filename_len, pRecord->true_filename,
&pRecord->store_path_index)) != 0)
{
return result;