support regenerate filename for appender file

pull/348/head
YuQing 2019-11-10 20:38:36 +08:00
parent 9f0a914c93
commit 9c0bbce9df
13 changed files with 652 additions and 230 deletions

View File

@ -1,8 +1,10 @@
Version 6.02 2019-11-07
Version 6.02 2019-11-10
* get_file_info calculate CRC32 for appender file type
* disk recovery download file to local temp file then rename it
when the local file exists
* support regenerate filename for appender file
NOTE: the regenerated file will be a normal file!
Version 6.01 2019-10-25
* compress and uncompress binlog file by gzip when need,

View File

@ -39,7 +39,7 @@ ALL_OBJS = $(STATIC_OBJS) $(FDFS_SHARED_OBJS)
ALL_PRGS = fdfs_monitor fdfs_test fdfs_test1 fdfs_crc32 fdfs_upload_file \
fdfs_download_file fdfs_delete_file fdfs_file_info \
fdfs_appender_test fdfs_appender_test1 fdfs_append_file \
fdfs_upload_appender
fdfs_upload_appender fdfs_regenerate_filename
STATIC_LIBS = libfdfsclient.a

View File

@ -16,6 +16,8 @@
#define _CLIENT_FUNC_H_
typedef struct {
short file_type;
bool get_from_server;
time_t create_timestamp;
int crc32;
int source_id; //source storage id

View File

@ -19,6 +19,7 @@
int main(int argc, char *argv[])
{
char *conf_filename;
const char *file_type_str;
char file_id[128];
int result;
FDFSFileInfo file_info;
@ -44,7 +45,7 @@ int main(int argc, char *argv[])
result = fdfs_get_file_info_ex1(file_id, true, &file_info);
if (result != 0)
{
printf("query file info fail, " \
fprintf(stderr, "query file info fail, " \
"error no: %d, error info: %s\n", \
result, STRERROR(result));
}
@ -52,6 +53,25 @@ int main(int argc, char *argv[])
{
char szDatetime[32];
switch (file_info.file_type)
{
case FDFS_FILE_TYPE_NORMAL:
file_type_str = "normal";
break;
case FDFS_FILE_TYPE_SLAVE:
file_type_str = "slave";
break;
case FDFS_FILE_TYPE_APPENDER:
file_type_str = "appender";
break;
default:
file_type_str = "unkown";
break;
}
printf("GET FROM SERVER: %s\n\n",
file_info.get_from_server ? "true" : "false");
printf("file type: %s\n", file_type_str);
printf("source storage id: %d\n", file_info.source_id);
printf("source ip address: %s\n", file_info.source_ip_addr);
printf("file create timestamp: %s\n", formatDatetime(

View File

@ -0,0 +1,68 @@
/**
* Copyright (C) 2008 Happy Fish / YuQing
*
* FastDFS may be copied only under the terms of the GNU General
* Public License V3, which may be found in the FastDFS source kit.
* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.
**/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "fdfs_client.h"
#include "fastcommon/logger.h"
int main(int argc, char *argv[])
{
char *conf_filename;
ConnectionInfo *pTrackerServer;
int result;
char appender_file_id[128];
char new_file_id[128];
if (argc < 3)
{
fprintf(stderr, "regenerate filename for the appender file.\n"
"NOTE: the regenerated file will be a normal file!\n"
"Usage: %s <config_file> <appender_file_id>\n",
argv[0]);
return 1;
}
log_init();
g_log_context.log_level = LOG_ERR;
conf_filename = argv[1];
if ((result=fdfs_client_init(conf_filename)) != 0)
{
return result;
}
pTrackerServer = tracker_get_connection();
if (pTrackerServer == NULL)
{
fdfs_client_destroy();
return errno != 0 ? errno : ECONNREFUSED;
}
snprintf(appender_file_id, sizeof(appender_file_id), "%s", argv[2]);
if ((result=storage_regenerate_appender_filename1(pTrackerServer,
NULL, appender_file_id, new_file_id)) != 0)
{
fprintf(stderr, "regenerate file %s fail, "
"error no: %d, error info: %s\n",
appender_file_id, result, STRERROR(result));
return result;
}
printf("%s\n", new_file_id);
tracker_close_connection_ex(pTrackerServer, true);
fdfs_client_destroy();
return result;
}

View File

@ -35,20 +35,20 @@ static struct base64_context the_base64_context;
static int the_base64_context_inited = 0;
#define FDFS_SPLIT_GROUP_NAME_AND_FILENAME(file_id) \
char new_file_id[FDFS_GROUP_NAME_MAX_LEN + 128]; \
char in_file_id[FDFS_GROUP_NAME_MAX_LEN + 128]; \
char *group_name; \
char *filename; \
char *pSeperator; \
\
snprintf(new_file_id, sizeof(new_file_id), "%s", file_id); \
pSeperator = strchr(new_file_id, FDFS_FILE_ID_SEPERATOR); \
snprintf(in_file_id, sizeof(in_file_id), "%s", file_id); \
pSeperator = strchr(in_file_id, FDFS_FILE_ID_SEPERATOR); \
if (pSeperator == NULL) \
{ \
return EINVAL; \
} \
\
*pSeperator = '\0'; \
group_name = new_file_id; \
group_name = in_file_id; \
filename = pSeperator + 1; \
#define storage_get_read_connection(pTrackerServer, \
@ -2203,8 +2203,21 @@ int fdfs_get_file_info_ex(const char *group_name, const char *remote_filename, \
pFileInfo->create_timestamp = buff2int(buff + sizeof(int));
pFileInfo->file_size = buff2long(buff + sizeof(int) * 2);
if (IS_SLAVE_FILE(filename_len, pFileInfo->file_size) || \
IS_APPENDER_FILE(pFileInfo->file_size) || \
if (IS_APPENDER_FILE(pFileInfo->file_size))
{
pFileInfo->file_type = FDFS_FILE_TYPE_APPENDER;
}
else if (IS_SLAVE_FILE(filename_len, pFileInfo->file_size))
{
pFileInfo->file_type = FDFS_FILE_TYPE_SLAVE;
}
else
{
pFileInfo->file_type = FDFS_FILE_TYPE_NORMAL;
}
if (pFileInfo->file_type == FDFS_FILE_TYPE_SLAVE ||
pFileInfo->file_type == FDFS_FILE_TYPE_APPENDER ||
(*(pFileInfo->source_ip_addr) == '\0' && get_from_server))
{ //slave file or appender file
if (get_from_server)
@ -2218,21 +2231,24 @@ int fdfs_get_file_info_ex(const char *group_name, const char *remote_filename, \
return result;
}
result = storage_query_file_info(conn, \
result = storage_query_file_info(conn,
NULL, group_name, remote_filename, pFileInfo);
tracker_close_connection_ex(conn, result != 0 && \
tracker_close_connection_ex(conn, result != 0 &&
result != ENOENT);
pFileInfo->get_from_server = true;
return result;
}
else
{
pFileInfo->get_from_server = false;
pFileInfo->file_size = -1;
return 0;
}
}
else //master file (normal file)
{
pFileInfo->get_from_server = false;
if ((pFileInfo->file_size >> 63) != 0)
{
pFileInfo->file_size &= 0xFFFFFFFF; //low 32 bits is file size
@ -2352,3 +2368,118 @@ int storage_truncate_file(ConnectionInfo *pTrackerServer, \
return result;
}
int storage_regenerate_appender_filename(ConnectionInfo *pTrackerServer,
ConnectionInfo *pStorageServer, const char *group_name,
const char *appender_filename, char *new_group_name,
char *new_remote_filename)
{
TrackerHeader *pHeader;
int result;
char out_buff[512];
char in_buff[256];
char *p;
char *pInBuff;
int64_t in_bytes;
ConnectionInfo storageServer;
bool new_connection;
int appender_filename_len;
appender_filename_len = strlen(appender_filename);
if ((result=storage_get_update_connection(pTrackerServer,
&pStorageServer, group_name, appender_filename,
&storageServer, &new_connection)) != 0)
{
return result;
}
do
{
pHeader = (TrackerHeader *)out_buff;
p = out_buff + sizeof(TrackerHeader);
snprintf(p, sizeof(out_buff) - sizeof(TrackerHeader),
"%s", group_name);
p += FDFS_GROUP_NAME_MAX_LEN;
memcpy(p, appender_filename, appender_filename_len);
p += appender_filename_len;
long2buff((p - out_buff) - sizeof(TrackerHeader),
pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_REGENERATE_APPENDER_FILENAME;
pHeader->status = 0;
if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff,
p - out_buff, g_fdfs_network_timeout)) != 0)
{
logError("file: "__FILE__", line: %d, "
"send data to storage server %s:%d fail, "
"errno: %d, error info: %s", __LINE__,
pStorageServer->ip_addr, pStorageServer->port,
result, STRERROR(result));
break;
}
pInBuff = in_buff;
if ((result=fdfs_recv_response(pStorageServer,
&pInBuff, sizeof(in_buff), &in_bytes)) != 0)
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
break;
}
if (in_bytes <= FDFS_GROUP_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, "
"storage server %s:%d response data "
"length: %"PRId64" is invalid, "
"should > %d", __LINE__,
pStorageServer->ip_addr, pStorageServer->port,
in_bytes, FDFS_GROUP_NAME_MAX_LEN);
result = EINVAL;
break;
}
in_buff[in_bytes] = '\0';
memcpy(new_group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN);
new_group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0';
memcpy(new_remote_filename, in_buff + FDFS_GROUP_NAME_MAX_LEN,
in_bytes - FDFS_GROUP_NAME_MAX_LEN + 1);
} while (0);
if (new_connection)
{
tracker_close_connection_ex(pStorageServer, result != 0);
}
return result;
}
int storage_regenerate_appender_filename1(ConnectionInfo *pTrackerServer,
ConnectionInfo *pStorageServer, const char *appender_file_id,
char *new_file_id)
{
int result;
char new_group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char new_remote_filename[128];
FDFS_SPLIT_GROUP_NAME_AND_FILENAME(appender_file_id)
result = storage_regenerate_appender_filename(pTrackerServer,
pStorageServer, group_name, filename,
new_group_name, new_remote_filename);
if (result == 0)
{
sprintf(new_file_id, "%s%c%s", new_group_name,
FDFS_FILE_ID_SEPERATOR, new_remote_filename);
}
else
{
new_file_id[0] = '\0';
}
return result;
}

View File

@ -563,6 +563,23 @@ int fdfs_get_file_info_ex(const char *group_name, const char *remote_filename, \
const bool get_from_server, FDFSFileInfo *pFileInfo);
/**
* regenerate normal filename for appender file
* Note: the appender file will change to normal file
* params:
* pTrackerServer: the tracker server
* pStorageServer: the storage server
* group_name: the group name
* appender_filename: the appender filename
* new_group_name: return the new group name
* new_remote_filename: return the new filename
* return: 0 success, !=0 fail, return the error code
**/
int storage_regenerate_appender_filename(ConnectionInfo *pTrackerServer,
ConnectionInfo *pStorageServer, const char *group_name,
const char *appender_filename, char *new_group_name,
char *new_remote_filename);
#ifdef __cplusplus
}
#endif

View File

@ -524,6 +524,22 @@ int fdfs_get_file_info_ex1(const char *file_id, const bool get_from_server, \
int storage_file_exist1(ConnectionInfo *pTrackerServer, \
ConnectionInfo *pStorageServer, \
const char *file_id);
/**
* regenerate normal filename for appender file
* Note: the appender file will change to normal file
* params:
* pTrackerServer: the tracker server
* pStorageServer: the storage server
* group_name: the group name
* appender_file_id: the appender file id
* file_id: regenerated file id return by storage server
* return: 0 success, !=0 fail, return the error code
**/
int storage_regenerate_appender_filename1(ConnectionInfo *pTrackerServer,
ConnectionInfo *pStorageServer, const char *appender_file_id,
char *new_file_id);
#ifdef __cplusplus
}
#endif

View File

@ -23,7 +23,7 @@
int g_fdfs_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
int g_fdfs_network_timeout = DEFAULT_NETWORK_TIMEOUT;
char g_fdfs_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'};
Version g_fdfs_version = {6, 1};
Version g_fdfs_version = {6, 2};
bool g_use_connection_pool = false;
ConnectionPool g_connection_pool;
int g_connection_pool_max_idle_time = 3600;

View File

@ -59,6 +59,7 @@
#define ACCESS_LOG_ACTION_APPEND_FILE "append"
#define ACCESS_LOG_ACTION_TRUNCATE_FILE "truncate"
#define ACCESS_LOG_ACTION_QUERY_FILE "status"
#define ACCESS_LOG_ACTION_RENAME_FILE "rename"
typedef struct
{
@ -2111,9 +2112,9 @@ void storage_get_store_path(const char *filename, const int filename_len, \
} while (0)
static int storage_gen_filename(StorageClientInfo *pClientInfo, \
const int64_t file_size, const int crc32, \
const char *szFormattedExt, const int ext_name_len, \
static int storage_gen_filename(StorageClientInfo *pClientInfo,
const int64_t file_size, const int crc32,
const char *szFormattedExt, const int ext_name_len,
const time_t timestamp, char *filename, int *filename_len)
{
char buff[sizeof(int) * 5];
@ -2143,18 +2144,18 @@ static int storage_gen_filename(StorageClientInfo *pClientInfo, \
{
int sub_path_high;
int sub_path_low;
storage_get_store_path(encoded, *filename_len, \
storage_get_store_path(encoded, *filename_len,
&sub_path_high, &sub_path_low);
pTrunkInfo->path.sub_path_high = sub_path_high;
pTrunkInfo->path.sub_path_low = sub_path_low;
pClientInfo->file_context.extra_info.upload. \
pClientInfo->file_context.extra_info.upload.
if_sub_path_alloced = true;
}
len = sprintf(filename, FDFS_STORAGE_DATA_DIR_FORMAT"/" \
FDFS_STORAGE_DATA_DIR_FORMAT"/", \
len = sprintf(filename, FDFS_STORAGE_DATA_DIR_FORMAT"/"
FDFS_STORAGE_DATA_DIR_FORMAT"/",
pTrunkInfo->path.sub_path_high,
pTrunkInfo->path.sub_path_low);
memcpy(filename+len, encoded, *filename_len);
@ -2187,7 +2188,7 @@ static int storage_sort_metadata_buff(char *meta_buff, const int meta_size)
return 0;
}
static void storage_format_ext_name(const char *file_ext_name, \
static void storage_format_ext_name(const char *file_ext_name,
char *szFormattedExt)
{
int i;
@ -2220,9 +2221,9 @@ static void storage_format_ext_name(const char *file_ext_name, \
*p = '\0';
}
static int storage_get_filename(StorageClientInfo *pClientInfo, \
const int start_time, const int64_t file_size, const int crc32, \
const char *szFormattedExt, char *filename, \
static int storage_get_filename(StorageClientInfo *pClientInfo,
const int start_time, const int64_t file_size, const int crc32,
const char *szFormattedExt, char *filename,
int *filename_len, char *full_filename)
{
int i;
@ -2233,14 +2234,14 @@ static int storage_get_filename(StorageClientInfo *pClientInfo, \
trunk_info.path.store_path_index;
for (i=0; i<10; i++)
{
if ((result=storage_gen_filename(pClientInfo, file_size, \
crc32, szFormattedExt, FDFS_FILE_EXT_NAME_MAX_LEN+1, \
if ((result=storage_gen_filename(pClientInfo, file_size,
crc32, szFormattedExt, FDFS_FILE_EXT_NAME_MAX_LEN + 1,
start_time, filename, filename_len)) != 0)
{
return result;
}
sprintf(full_filename, "%s/data/%s", \
sprintf(full_filename, "%s/data/%s",
g_fdfs_store_paths.paths[store_path_index], filename);
if (!fileExists(full_filename))
{
@ -2252,7 +2253,7 @@ static int storage_get_filename(StorageClientInfo *pClientInfo, \
if (*full_filename == '\0')
{
logError("file: "__FILE__", line: %d, " \
logError("file: "__FILE__", line: %d, "
"Can't generate uniq filename", __LINE__);
*filename = '\0';
*filename_len = 0;
@ -3491,8 +3492,8 @@ static int query_file_info_response(struct fast_task_info *pTask,
return 0;
}
static void calc_crc32_done_callback(struct fast_task_info *pTask,
const int err_no)
static void calc_crc32_done_callback_for_query_finfo(
struct fast_task_info *pTask, const int err_no)
{
StorageClientInfo *pClientInfo;
StorageFileInfoForCRC32 *crc32_file_info;
@ -3525,6 +3526,7 @@ static void calc_crc32_done_callback(struct fast_task_info *pTask,
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len);
STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_QUERY_FILE, result);
storage_nio_notify(pTask);
}
@ -3534,6 +3536,39 @@ static int calc_crc32_continue_callback(struct fast_task_info *pTask)
return storage_dio_queue_push(pTask);
}
static int push_calc_crc32_to_dio_queue(struct fast_task_info *pTask,
FileDealDoneCallback done_callback, const int store_path_index,
const struct stat *file_stat, const int storage_id)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
StorageFileInfoForCRC32 *crc32_file_info;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
crc32_file_info = (StorageFileInfoForCRC32 *)fast_mblock_alloc_object(
&finfo_for_crc32_allocator);
if (crc32_file_info == NULL)
{
logError("file: "__FILE__", line: %d, "
"finfo_for_crc32_allocator %d bytes object fail",
__LINE__, (int)sizeof(StorageFileInfoForCRC32));
return errno != 0 ? errno : ENOMEM;
}
crc32_file_info->storage_id = storage_id;
crc32_file_info->fsize = file_stat->st_size;
crc32_file_info->mtime = file_stat->st_mtime;
pClientInfo->extra_arg = crc32_file_info;
pFileContext->fd = -1;
pFileContext->calc_crc32 = true;
pFileContext->continue_callback = calc_crc32_continue_callback;
return storage_read_from_file(pTask, 0, file_stat->st_size,
done_callback, store_path_index);
}
static int query_file_info_deal_response(struct fast_task_info *pTask,
const char *filename, const char *true_filename,
struct stat *file_stat, const int store_path_index)
@ -3544,7 +3579,6 @@ static int query_file_info_deal_response(struct fast_task_info *pTask,
int crc32;
int64_t file_size;
StorageFileInfoForCRC32 finfo;
StorageFileInfoForCRC32 *crc32_file_info;
memset(decode_buff, 0, sizeof(decode_buff));
base64_decode_auto(&g_fdfs_base64_context, filename +
@ -3560,31 +3594,15 @@ static int query_file_info_deal_response(struct fast_task_info *pTask,
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
crc32_file_info = (StorageFileInfoForCRC32 *)fast_mblock_alloc_object(
&finfo_for_crc32_allocator);
if (crc32_file_info == NULL)
{
logError("file: "__FILE__", line: %d, "
"finfo_for_crc32_allocator %d bytes object fail",
__LINE__, (int)sizeof(StorageFileInfoForCRC32));
return errno != 0 ? errno : ENOMEM;
}
crc32_file_info->storage_id = storage_id;
crc32_file_info->fsize = file_stat->st_size;
crc32_file_info->mtime = file_stat->st_mtime;
snprintf(pFileContext->fname2log, sizeof(pFileContext->fname2log),
"%s", filename);
snprintf(pFileContext->filename, sizeof(pFileContext->filename),
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index],
true_filename);
pClientInfo->extra_arg = crc32_file_info;
pFileContext->fd = -1;
pFileContext->calc_crc32 = true;
pFileContext->continue_callback = calc_crc32_continue_callback;
return storage_read_from_file(pTask, 0, file_stat->st_size,
calc_crc32_done_callback, store_path_index);
return push_calc_crc32_to_dio_queue(pTask,
calc_crc32_done_callback_for_query_finfo,
store_path_index, file_stat, storage_id);
}
finfo.storage_id = storage_id;
@ -4708,6 +4726,274 @@ static int storage_deal_active_test(struct fast_task_info *pTask)
return 0;
}
static int storage_server_check_appender_file(struct fast_task_info *pTask,
char *appender_filename, const int appender_filename_len,
char *true_filename, struct stat *stat_buf, int *store_path_index)
{
StorageFileContext *pFileContext;
char decode_buff[64];
int result;
int filename_len;
int buff_len;
int64_t appender_file_size;
pFileContext = &(((StorageClientInfo *)pTask->arg)->file_context);
filename_len = appender_filename_len;
if ((result=storage_split_filename_ex(appender_filename,
&filename_len, true_filename, store_path_index)) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename),
"%s/data/%s", g_fdfs_store_paths.paths[*store_path_index],
true_filename);
if (lstat(pFileContext->filename, stat_buf) == 0)
{
if (!S_ISREG(stat_buf->st_mode))
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, appender file: %s "
"is not a regular file", __LINE__,
pTask->client_ip, pFileContext->filename);
return EINVAL;
}
}
else
{
result = errno != 0 ? errno : ENOENT;
if (result == ENOENT)
{
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, appender file: %s "
"not exist", __LINE__, pTask->client_ip,
pFileContext->filename);
}
else
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, stat appednder file %s fail, "
"errno: %d, error info: %s",
__LINE__, pTask->client_ip,
pFileContext->filename, result, STRERROR(result));
}
return result;
}
strcpy(pFileContext->fname2log, appender_filename);
memset(decode_buff, 0, sizeof(decode_buff));
base64_decode_auto(&g_fdfs_base64_context, pFileContext->fname2log +
FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH,
decode_buff, &buff_len);
appender_file_size = buff2long(decode_buff + sizeof(int) * 2);
if (!IS_APPENDER_FILE(appender_file_size))
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, file: %s is not a valid "
"appender file, file size: %"PRId64,
__LINE__, pTask->client_ip, appender_filename,
appender_file_size);
return EINVAL;
}
return 0;
}
static void calc_crc32_done_callback_for_regenerate(
struct fast_task_info *pTask, const int err_no)
{
StorageClientInfo *pClientInfo;
StorageFileInfoForCRC32 *crc32_file_info;
StorageFileContext *pFileContext;
TrackerHeader *pHeader;
char full_filename[MAX_PATH_SIZE + 128];
int result;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
crc32_file_info = (StorageFileInfoForCRC32 *)pClientInfo->extra_arg;
pHeader = (TrackerHeader *)pTask->data;
if (err_no == 0)
{
char *formatted_ext_name;
char return_filename[128];
char filename[128];
int filename_len;
*filename = '\0';
filename_len = 0;
formatted_ext_name = pFileContext->filename +
(strlen(pFileContext->filename) -
(FDFS_FILE_EXT_NAME_MAX_LEN + 1));
pFileContext->timestamp2log = g_current_time;
if ((result=storage_get_filename(pClientInfo,
pFileContext->timestamp2log,
crc32_file_info->fsize, pFileContext->crc32,
formatted_ext_name, filename, &filename_len,
full_filename)) == 0)
{
if (*full_filename == '\0')
{
result = EBUSY;
logWarning("file: "__FILE__", line: %d, "
"Can't generate uniq filename", __LINE__);
}
else
{
if (rename(pFileContext->filename, full_filename) != 0)
{
result = errno != 0 ? errno : EPERM;
logWarning("file: "__FILE__", line: %d, "
"rename %s to %s fail, "
"errno: %d, error info: %s", __LINE__,
pFileContext->filename, full_filename,
result, STRERROR(result));
}
else
{
char *p;
char binlog_msg[256];
filename_len = snprintf(return_filename,
sizeof(return_filename),
"%c"FDFS_STORAGE_DATA_DIR_FORMAT"/%s",
FDFS_STORAGE_STORE_PATH_PREFIX_CHAR,
pFileContext->extra_info.upload.trunk_info.
path.store_path_index, filename);
sprintf(binlog_msg, "%s %s",
pFileContext->fname2log, return_filename);
result = storage_binlog_write(
pFileContext->timestamp2log,
STORAGE_OP_TYPE_SOURCE_RENAME_FILE,
binlog_msg);
pClientInfo->total_length = sizeof(TrackerHeader) +
FDFS_GROUP_NAME_MAX_LEN + filename_len;
p = pTask->data + sizeof(TrackerHeader);
memcpy(p, g_group_name, FDFS_GROUP_NAME_MAX_LEN);
p += FDFS_GROUP_NAME_MAX_LEN;
memcpy(p, return_filename, filename_len);
if (g_use_access_log)
{
snprintf(pFileContext->fname2log +
strlen(pFileContext->fname2log),
sizeof(pFileContext->fname2log),
"=>%s", return_filename);
}
}
}
}
}
else
{
result = err_no;
}
fast_mblock_free_object(&finfo_for_crc32_allocator, crc32_file_info);
if (result != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
}
pClientInfo->total_offset = 0;
pTask->length = pClientInfo->total_length;
pHeader->status = result;
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len);
STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_RENAME_FILE, result);
storage_nio_notify(pTask);
}
/**
FDFS_GROUP_NAME_MAX_LEN: group name
body length - FDFS_GROUP_NAME_MAX_LEN: appender filename
**/
static int storage_server_regenerate_appender_filename(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
FDFSTrunkFullInfo *pTrunkInfo;
char *p;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char appender_filename[128];
char true_filename[128];
struct stat stat_buf;
int appender_filename_len;
int64_t nInPackLen;
int result;
int store_path_index;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
appender_filename_len = nInPackLen - FDFS_GROUP_NAME_MAX_LEN;
if ((nInPackLen < FDFS_GROUP_NAME_MAX_LEN +
FDFS_LOGIC_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH +
FDFS_FILE_EXT_NAME_MAX_LEN + 1)
|| (appender_filename_len >= sizeof(appender_filename)))
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, invalid package length: %"PRId64,
__LINE__, pTask->client_ip, nInPackLen);
return EINVAL;
}
p = pTask->data + sizeof(TrackerHeader);
memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN);
*(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
if (strcmp(group_name, g_group_name) != 0)
{
logError("file: "__FILE__", line: %d, "
"client ip:%s, group_name: %s "
"not correct, should be: %s",
__LINE__, pTask->client_ip,
group_name, g_group_name);
return EINVAL;
}
p += FDFS_GROUP_NAME_MAX_LEN;
memcpy(appender_filename, p, appender_filename_len);
*(appender_filename + appender_filename_len) = '\0';
STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename,
appender_filename_len, pClientInfo);
if ((result=storage_server_check_appender_file(pTask,
appender_filename, appender_filename_len,
true_filename, &stat_buf, &store_path_index)) != 0)
{
return result;
}
pTrunkInfo = &(pClientInfo->file_context.extra_info.upload.trunk_info);
pClientInfo->file_context.extra_info.upload.if_sub_path_alloced = true;
pFileContext->extra_info.upload.trunk_info.path.store_path_index =
store_path_index;
pTrunkInfo->path.sub_path_high = strtol(true_filename, NULL, 16);
pTrunkInfo->path.sub_path_low = strtol(true_filename + 3, NULL, 16);
return push_calc_crc32_to_dio_queue(pTask,
calc_crc32_done_callback_for_regenerate,
store_path_index, &stat_buf,
g_server_id_in_filename);
}
/**
8 bytes: appender filename length
8 bytes: file size
@ -4721,16 +5007,12 @@ static int storage_append_file(struct fast_task_info *pTask)
char *p;
char appender_filename[128];
char true_filename[128];
char decode_buff[64];
struct stat stat_buf;
int appender_filename_len;
int64_t nInPackLen;
int64_t file_bytes;
int64_t appender_file_size;
int result;
int store_path_index;
int filename_len;
int buff_len;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
@ -4778,76 +5060,16 @@ static int storage_append_file(struct fast_task_info *pTask)
memcpy(appender_filename, p, appender_filename_len);
*(appender_filename + appender_filename_len) = '\0';
p += appender_filename_len;
filename_len = appender_filename_len;
STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, \
STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename,
appender_filename_len, pClientInfo);
if ((result=storage_split_filename_ex(appender_filename, \
&filename_len, true_filename, &store_path_index)) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename);
if (lstat(pFileContext->filename, &stat_buf) == 0)
{
if (!S_ISREG(stat_buf.st_mode))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, appender file: %s " \
"is not a regular file", __LINE__, \
pTask->client_ip, pFileContext->filename);
return EINVAL;
}
}
else
{
result = errno != 0 ? errno : ENOENT;
if (result == ENOENT)
{
logWarning("file: "__FILE__", line: %d, " \
"client ip: %s, appender file: %s " \
"not exist", __LINE__, \
pTask->client_ip, pFileContext->filename);
}
else
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, stat appednder file %s fail" \
", errno: %d, error info: %s.", \
__LINE__, pTask->client_ip, \
pFileContext->filename, \
result, STRERROR(result));
}
return result;
}
strcpy(pFileContext->fname2log, appender_filename);
memset(decode_buff, 0, sizeof(decode_buff));
base64_decode_auto(&g_fdfs_base64_context, pFileContext->fname2log + \
FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, \
decode_buff, &buff_len);
appender_file_size = buff2long(decode_buff + sizeof(int) * 2);
if (!IS_APPENDER_FILE(appender_file_size))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, file: %s is not a valid " \
"appender file, file size: %"PRId64, \
__LINE__, pTask->client_ip, appender_filename, \
appender_file_size);
return EINVAL;
}
if ((result=storage_server_check_appender_file(pTask,
appender_filename, appender_filename_len,
true_filename, &stat_buf, &store_path_index)) != 0)
{
return result;
}
if (file_bytes == 0)
{
@ -4860,24 +5082,20 @@ static int storage_append_file(struct fast_task_info *pTask)
pFileContext->calc_crc32 = false;
pFileContext->calc_file_hash = false;
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \
true_filename);
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_APPEND_FILE;
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
pFileContext->extra_info.upload.file_type = _FILE_TYPE_APPENDER;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->extra_info.upload.trunk_info.path.store_path_index = \
pFileContext->extra_info.upload.trunk_info.path.store_path_index =
store_path_index;
pFileContext->op = FDFS_STORAGE_FILE_OP_APPEND;
pFileContext->open_flags = O_WRONLY | O_APPEND | g_extra_open_file_flags;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, stat_buf.st_size, file_bytes, \
p - pTask->data, dio_write_file, \
storage_append_file_done_callback, \
return storage_write_to_file(pTask, stat_buf.st_size, file_bytes,
p - pTask->data, dio_write_file,
storage_append_file_done_callback,
dio_append_finish_clean_up, store_path_index);
}
@ -4895,17 +5113,13 @@ static int storage_modify_file(struct fast_task_info *pTask)
char *p;
char appender_filename[128];
char true_filename[128];
char decode_buff[64];
struct stat stat_buf;
int appender_filename_len;
int64_t nInPackLen;
int64_t file_offset;
int64_t file_bytes;
int64_t appender_file_size;
int result;
int store_path_index;
int filename_len;
int buff_len;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
@ -4964,83 +5178,23 @@ static int storage_modify_file(struct fast_task_info *pTask)
memcpy(appender_filename, p, appender_filename_len);
*(appender_filename + appender_filename_len) = '\0';
p += appender_filename_len;
filename_len = appender_filename_len;
STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, \
appender_filename_len, pClientInfo);
if ((result=storage_split_filename_ex(appender_filename, \
&filename_len, true_filename, &store_path_index)) != 0)
{
return result;
}
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename);
if (lstat(pFileContext->filename, &stat_buf) == 0)
{
if (!S_ISREG(stat_buf.st_mode))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, appender file: %s " \
"is not a regular file", __LINE__, \
pTask->client_ip, pFileContext->filename);
return EINVAL;
}
}
else
{
result = errno != 0 ? errno : ENOENT;
if (result == ENOENT)
{
logWarning("file: "__FILE__", line: %d, " \
"client ip: %s, appender file: %s " \
"not exist", __LINE__, \
pTask->client_ip, pFileContext->filename);
}
else
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, stat appednder file %s fail" \
", errno: %d, error info: %s.", \
__LINE__, pTask->client_ip, \
pFileContext->filename, \
result, STRERROR(result));
}
return result;
}
strcpy(pFileContext->fname2log, appender_filename);
memset(decode_buff, 0, sizeof(decode_buff));
base64_decode_auto(&g_fdfs_base64_context, pFileContext->fname2log + \
FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, \
decode_buff, &buff_len);
appender_file_size = buff2long(decode_buff + sizeof(int) * 2);
if (!IS_APPENDER_FILE(appender_file_size))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, file: %s is not a valid " \
"appender file, file size: %"PRId64, \
__LINE__, pTask->client_ip, appender_filename, \
appender_file_size);
return EINVAL;
}
if ((result=storage_server_check_appender_file(pTask,
appender_filename, appender_filename_len,
true_filename, &stat_buf, &store_path_index)) != 0)
{
return result;
}
if (file_offset > stat_buf.st_size)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, file offset: %"PRId64 \
" is invalid, which > appender file size: %" \
PRId64, __LINE__, pTask->client_ip, \
logError("file: "__FILE__", line: %d, "
"client ip: %s, file offset: %"PRId64
" is invalid, which > appender file size: %"
PRId64, __LINE__, pTask->client_ip,
file_offset, (int64_t)stat_buf.st_size);
return EINVAL;
@ -5057,23 +5211,20 @@ static int storage_modify_file(struct fast_task_info *pTask)
pFileContext->calc_crc32 = false;
pFileContext->calc_file_hash = false;
snprintf(pFileContext->filename, sizeof(pFileContext->filename), \
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename);
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_MODIFY_FILE;
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
pFileContext->extra_info.upload.file_type = _FILE_TYPE_APPENDER;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->extra_info.upload.trunk_info.path.store_path_index = \
pFileContext->extra_info.upload.trunk_info.path.store_path_index =
store_path_index;
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
pFileContext->open_flags = O_WRONLY | g_extra_open_file_flags;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
storage_modify_file_done_callback, \
return storage_write_to_file(pTask, file_offset, file_bytes,
p - pTask->data, dio_write_file,
storage_modify_file_done_callback,
dio_modify_finish_clean_up, store_path_index);
}
@ -8137,8 +8288,8 @@ int storage_deal_task(struct fast_task_info *pTask)
case STORAGE_PROTO_CMD_QUERY_FILE_INFO:
ACCESS_LOG_INIT_FIELDS();
result = storage_server_query_file_info(pTask);
STORAGE_ACCESS_LOG(pTask, \
ACCESS_LOG_ACTION_QUERY_FILE, \
STORAGE_ACCESS_LOG(pTask,
ACCESS_LOG_ACTION_QUERY_FILE,
result);
break;
case STORAGE_PROTO_CMD_CREATE_LINK:
@ -8198,6 +8349,13 @@ int storage_deal_task(struct fast_task_info *pTask)
case STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE:
result = storage_server_trunk_truncate_binlog_file(pTask);
break;
case STORAGE_PROTO_CMD_REGENERATE_APPENDER_FILENAME:
ACCESS_LOG_INIT_FIELDS();
result = storage_server_regenerate_appender_filename(pTask);
STORAGE_ACCESS_LOG(pTask,
ACCESS_LOG_ACTION_RENAME_FILE,
result);
break;
default:
logError("file: "__FILE__", line: %d, " \
"client ip: %s, unkown cmd: %d", \

View File

@ -14,20 +14,22 @@
#include "fastcommon/fc_list.h"
#include "storage_func.h"
#define STORAGE_OP_TYPE_SOURCE_CREATE_FILE 'C' //upload file
#define STORAGE_OP_TYPE_SOURCE_APPEND_FILE 'A' //append file
#define STORAGE_OP_TYPE_SOURCE_DELETE_FILE 'D' //delete file
#define STORAGE_OP_TYPE_SOURCE_UPDATE_FILE 'U' //for whole file update such as metadata file
#define STORAGE_OP_TYPE_SOURCE_MODIFY_FILE 'M' //for part modify
#define STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE 'T' //truncate file
#define STORAGE_OP_TYPE_SOURCE_CREATE_LINK 'L' //create symbol link
#define STORAGE_OP_TYPE_REPLICA_CREATE_FILE 'c'
#define STORAGE_OP_TYPE_REPLICA_APPEND_FILE 'a'
#define STORAGE_OP_TYPE_REPLICA_DELETE_FILE 'd'
#define STORAGE_OP_TYPE_REPLICA_UPDATE_FILE 'u'
#define STORAGE_OP_TYPE_REPLICA_MODIFY_FILE 'm'
#define STORAGE_OP_TYPE_REPLICA_TRUNCATE_FILE 't'
#define STORAGE_OP_TYPE_REPLICA_CREATE_LINK 'l'
#define STORAGE_OP_TYPE_SOURCE_CREATE_FILE 'C' //upload file
#define STORAGE_OP_TYPE_SOURCE_APPEND_FILE 'A' //append file
#define STORAGE_OP_TYPE_SOURCE_DELETE_FILE 'D' //delete file
#define STORAGE_OP_TYPE_SOURCE_UPDATE_FILE 'U' //for whole file update such as metadata file
#define STORAGE_OP_TYPE_SOURCE_MODIFY_FILE 'M' //for part modify
#define STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE 'T' //truncate file
#define STORAGE_OP_TYPE_SOURCE_CREATE_LINK 'L' //create symbol link
#define STORAGE_OP_TYPE_SOURCE_RENAME_FILE 'R' //rename appender file to normal file
#define STORAGE_OP_TYPE_REPLICA_CREATE_FILE 'c'
#define STORAGE_OP_TYPE_REPLICA_APPEND_FILE 'a'
#define STORAGE_OP_TYPE_REPLICA_DELETE_FILE 'd'
#define STORAGE_OP_TYPE_REPLICA_UPDATE_FILE 'u'
#define STORAGE_OP_TYPE_REPLICA_MODIFY_FILE 'm'
#define STORAGE_OP_TYPE_REPLICA_TRUNCATE_FILE 't'
#define STORAGE_OP_TYPE_REPLICA_CREATE_LINK 'l'
#define STORAGE_OP_TYPE_REPLICA_RENAME_FILE 'r'
#define STORAGE_BINLOG_BUFFER_SIZE 64 * 1024
#define STORAGE_BINLOG_LINE_SIZE 256

View File

@ -78,7 +78,7 @@
#define STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE 21
#define STORAGE_PROTO_CMD_QUERY_FILE_INFO 22
#define STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE 23 //create appender file
#define STORAGE_PROTO_CMD_APPEND_FILE 24 //append file
#define STORAGE_PROTO_CMD_APPEND_FILE 24 //append file
#define STORAGE_PROTO_CMD_SYNC_APPEND_FILE 25
#define STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG 26 //fetch binlog of one store path
#define STORAGE_PROTO_CMD_RESP TRACKER_PROTO_CMD_RESP
@ -92,10 +92,12 @@
#define STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS 32 //since V3.07, tracker to storage
#define STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE 33 //since V3.07, trunk storage to storage
#define STORAGE_PROTO_CMD_MODIFY_FILE 34 //since V3.08
#define STORAGE_PROTO_CMD_SYNC_MODIFY_FILE 35 //since V3.08
#define STORAGE_PROTO_CMD_TRUNCATE_FILE 36 //since V3.08
#define STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE 37 //since V3.08
#define STORAGE_PROTO_CMD_MODIFY_FILE 34 //since V3.08
#define STORAGE_PROTO_CMD_SYNC_MODIFY_FILE 35 //since V3.08
#define STORAGE_PROTO_CMD_TRUNCATE_FILE 36 //since V3.08
#define STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE 37 //since V3.08
#define STORAGE_PROTO_CMD_REGENERATE_APPENDER_FILENAME 38 //since V6.02, rename appender file to normal file
#define STORAGE_PROTO_CMD_SYNC_RENAME_FILE 40 //since V6.02
//for overwrite all old metadata
#define STORAGE_SET_METADATA_FLAG_OVERWRITE 'O'

View File

@ -117,6 +117,10 @@
#define FDFS_TRUNK_FILE_TRUE_SIZE(file_size) \
(file_size & 0xFFFFFFFF)
#define FDFS_FILE_TYPE_NORMAL 1 //normal file
#define FDFS_FILE_TYPE_APPENDER 2 //appender file
#define FDFS_FILE_TYPE_SLAVE 4 //slave file
#define FDFS_STORAGE_ID_MAX_SIZE 16
#define TRACKER_STORAGE_RESERVED_SPACE_FLAG_MB 0