get_file_info calculate CRC32 for appender file type

pull/348/head
YuQing 2019-11-06 19:33:19 +08:00
parent 57d2d815c6
commit cdb180ae32
5 changed files with 212 additions and 62 deletions

View File

@ -1,4 +1,7 @@
Version 6.02 2019-11-06
* get_file_info calculate CRC32 for appender file type
Version 6.01 2019-10-25
* compress and uncompress binlog file by gzip when need,
config items in storage.conf: compress_binlog and compress_binlog_time

View File

@ -144,12 +144,12 @@ void storage_dio_terminate()
int storage_dio_queue_push(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
struct storage_dio_context *pContext;
int result;
pClientInfo = (StorageClientInfo *)pTask->arg;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
pContext = g_dio_contexts + pFileContext->dio_thread_index;
@ -245,7 +245,7 @@ int dio_discard_file(struct fast_task_info *pTask)
else
{
pFileContext->buff_offset = 0;
storage_nio_notify(pTask); //notify nio to deal
pFileContext->continue_callback(pTask);
}
return 0;
@ -353,6 +353,12 @@ int dio_read_file(struct fast_task_info *pTask)
break;
}
if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_ex(pTask->data + pTask->length,
read_bytes, pFileContext->crc32);
}
pTask->length += read_bytes;
pFileContext->offset += read_bytes;
@ -363,7 +369,7 @@ int dio_read_file(struct fast_task_info *pTask)
if (pFileContext->offset < pFileContext->end)
{
storage_nio_notify(pTask); //notify nio to deal
pFileContext->continue_callback(pTask);
}
else
{
@ -371,6 +377,11 @@ int dio_read_file(struct fast_task_info *pTask)
close(pFileContext->fd);
pFileContext->fd = -1;
if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_FINAL( \
pFileContext->crc32);
}
pFileContext->done_callback(pTask, result);
}
@ -475,7 +486,7 @@ int dio_write_file(struct fast_task_info *pTask)
if (pFileContext->offset < pFileContext->end)
{
pFileContext->buff_offset = 0;
storage_nio_notify(pTask); //notify nio to deal
pFileContext->continue_callback(pTask);
}
else
{

View File

@ -46,6 +46,8 @@ typedef void (*DeleteFileLogCallback)(struct fast_task_info *pTask, \
typedef void (*FileDealDoneCallback)(struct fast_task_info *pTask, \
const int err_no);
typedef int (*FileDealContinueCallback)(struct fast_task_info *pTask);
typedef int (*FileBeforeOpenCallback)(struct fast_task_info *pTask);
typedef int (*FileBeforeCloseCallback)(struct fast_task_info *pTask);
@ -109,6 +111,7 @@ typedef struct
int64_t start; //the start offset of file
int64_t end; //the end offset of file
int64_t offset; //the current offset of file
FileDealContinueCallback continue_callback;
FileDealDoneCallback done_callback;
DeleteFileLogCallback log_callback;

View File

@ -28,6 +28,7 @@
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/fast_mblock.h"
#include "tracker_types.h"
#include "tracker_proto.h"
#include "storage_service.h"
@ -59,6 +60,12 @@
#define ACCESS_LOG_ACTION_TRUNCATE_FILE "truncate"
#define ACCESS_LOG_ACTION_QUERY_FILE "status"
typedef struct
{
int storage_id;
time_t mtime;
int64_t fsize;
} StorageFileInfoForCRC32;
pthread_mutex_t g_storage_thread_lock;
int g_storage_thread_count = 0;
@ -69,6 +76,8 @@ static int64_t temp_file_sequence = 0;
static pthread_mutex_t path_index_thread_lock;
static pthread_mutex_t stat_count_thread_lock;
static struct fast_mblock_man finfo_for_crc32_allocator;
static void *work_thread_entrance(void* arg);
extern int storage_client_create_link(ConnectionInfo *pTrackerServer, \
@ -1708,6 +1717,9 @@ int storage_service_init()
return result;
}
result = fast_mblock_init(&finfo_for_crc32_allocator,
sizeof(StorageFileInfoForCRC32), 1024);
return result;
}
@ -1901,7 +1913,7 @@ void storage_accept_loop(int server_sock)
accept_thread_entrance((void *)(long)server_sock);
}
void storage_nio_notify(struct fast_task_info *pTask)
int storage_nio_notify(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
struct storage_nio_thread_data *pThreadData;
@ -1922,6 +1934,8 @@ void storage_nio_notify(struct fast_task_info *pTask)
__LINE__, result, STRERROR(result));
abort();
}
return 0;
}
static void *work_thread_entrance(void* arg)
@ -2775,6 +2789,7 @@ static int storage_trunk_do_create_link(struct fast_task_info *pTask, \
pTask, pFileContext->extra_info.upload.trunk_info.path. \
store_path_index, pFileContext->op);
pFileContext->continue_callback = storage_nio_notify;
pFileContext->done_callback = done_callback;
pClientInfo->clean_func = dio_trunk_write_finish_clean_up;
@ -3432,6 +3447,153 @@ static int storage_server_trunk_sync_binlog(struct fast_task_info *pTask)
return trunk_binlog_write_buffer(binlog_buff, nInPackLen);
}
static int query_file_info_response(struct fast_task_info *pTask,
const StorageFileInfoForCRC32 *finfo, const int crc32)
{
char *p;
p = pTask->data + sizeof(TrackerHeader);
long2buff(finfo->fsize, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(finfo->mtime, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(crc32, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
memset(p, 0, IP_ADDRESS_SIZE);
if (fdfs_get_server_id_type(finfo->storage_id) == FDFS_ID_TYPE_SERVER_ID)
{
if (g_use_storage_id)
{
FDFSStorageIdInfo *pStorageIdInfo;
char id[16];
sprintf(id, "%d", finfo->storage_id);
pStorageIdInfo = fdfs_get_storage_by_id(id);
if (pStorageIdInfo != NULL)
{
strcpy(p, fdfs_get_ipaddr_by_peer_ip(
&pStorageIdInfo->ip_addrs,
pTask->client_ip));
}
}
}
else
{
struct in_addr ip_addr;
memset(&ip_addr, 0, sizeof(ip_addr));
ip_addr.s_addr = finfo->storage_id;
inet_ntop(AF_INET, &ip_addr, p, IP_ADDRESS_SIZE);
}
p += IP_ADDRESS_SIZE;
((StorageClientInfo *)pTask->arg)->total_length = p - pTask->data;
return 0;
}
static void calc_crc32_done_callback(struct fast_task_info *pTask,
const int err_no)
{
StorageClientInfo *pClientInfo;
StorageFileInfoForCRC32 *crc32_file_info;
StorageFileContext *pFileContext;
TrackerHeader *pHeader;
int result;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
crc32_file_info = (StorageFileInfoForCRC32 *)pClientInfo->extra_arg;
if (err_no == 0)
{
result = query_file_info_response(pTask, crc32_file_info,
pFileContext->crc32);
}
else
{
result = err_no;
pClientInfo->total_length = sizeof(TrackerHeader);
}
fast_mblock_free_object(&finfo_for_crc32_allocator, crc32_file_info);
pClientInfo->total_offset = 0;
pTask->length = pClientInfo->total_length;
pHeader = (TrackerHeader *)pTask->data;
pHeader->status = result;
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len);
storage_nio_notify(pTask);
}
static int calc_crc32_continue_callback(struct fast_task_info *pTask)
{
pTask->length = 0;
return storage_dio_queue_push(pTask);
}
static int query_file_info_deal_response(struct fast_task_info *pTask,
const char *filename, const char *true_filename,
struct stat *file_stat, const int store_path_index)
{
char decode_buff[64];
int buff_len;
int storage_id;
int crc32;
int64_t file_size;
StorageFileInfoForCRC32 finfo;
StorageFileInfoForCRC32 *crc32_file_info;
memset(decode_buff, 0, sizeof(decode_buff));
base64_decode_auto(&g_fdfs_base64_context, filename +
FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH,
decode_buff, &buff_len);
storage_id = ntohl(buff2int(decode_buff));
file_size = buff2long(decode_buff + sizeof(int) * 2);
if (IS_APPENDER_FILE(file_size))
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
crc32_file_info = (StorageFileInfoForCRC32 *)fast_mblock_alloc_object(
&finfo_for_crc32_allocator);
if (crc32_file_info == NULL)
{
logError("file: "__FILE__", line: %d, "
"finfo_for_crc32_allocator %d bytes object fail",
__LINE__, (int)sizeof(StorageFileInfoForCRC32));
return errno != 0 ? errno : ENOMEM;
}
crc32_file_info->storage_id = storage_id;
crc32_file_info->fsize = file_stat->st_size;
crc32_file_info->mtime = file_stat->st_mtime;
snprintf(pFileContext->fname2log, sizeof(pFileContext->fname2log),
"%s", filename);
snprintf(pFileContext->filename, sizeof(pFileContext->filename),
"%s/data/%s", g_fdfs_store_paths.paths[store_path_index],
true_filename);
pClientInfo->extra_arg = crc32_file_info;
pFileContext->fd = -1;
pFileContext->calc_crc32 = true;
pFileContext->continue_callback = calc_crc32_continue_callback;
return storage_read_from_file(pTask, 0, file_stat->st_size,
calc_crc32_done_callback, store_path_index);
}
finfo.storage_id = storage_id;
finfo.fsize = file_stat->st_size;
finfo.mtime = file_stat->st_mtime;
crc32 = buff2int(decode_buff + sizeof(int) * 4);
return query_file_info_response(pTask, &finfo, crc32);
}
/**
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
filename
@ -3441,11 +3603,9 @@ static int storage_server_query_file_info(struct fast_task_info *pTask)
StorageClientInfo *pClientInfo;
char *in_buff;
char *filename;
char *p;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char true_filename[128];
char src_filename[MAX_PATH_SIZE + 128];
char decode_buff[64];
struct stat file_lstat;
struct stat file_stat;
FDFSTrunkFullInfo trunkInfo;
@ -3454,11 +3614,8 @@ static int storage_server_query_file_info(struct fast_task_info *pTask)
int store_path_index;
int filename_len;
int true_filename_len;
int crc32;
int storage_id;
int result;
int len;
int buff_len;
bool bSilence;
pClientInfo = (StorageClientInfo *)pTask->arg;
@ -3597,10 +3754,9 @@ static int storage_server_query_file_info(struct fast_task_info *pTask)
}
else
{
char full_filename[MAX_PATH_SIZE + 128];
sprintf(full_filename, "%s/data/%s", \
g_fdfs_store_paths.paths[store_path_index], \
char full_filename[MAX_PATH_SIZE + 128];
sprintf(full_filename, "%s/data/%s",
g_fdfs_store_paths.paths[store_path_index],
true_filename);
if ((len=readlink(full_filename, src_filename, \
sizeof(src_filename))) < 0)
@ -3624,6 +3780,8 @@ static int storage_server_query_file_info(struct fast_task_info *pTask)
return result;
}
}
file_stat.st_mtime = file_lstat.st_mtime;
}
else
{
@ -3641,50 +3799,8 @@ static int storage_server_query_file_info(struct fast_task_info *pTask)
return EINVAL;
}
memset(decode_buff, 0, sizeof(decode_buff));
base64_decode_auto(&g_fdfs_base64_context, filename + \
FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, \
decode_buff, &buff_len);
storage_id = ntohl(buff2int(decode_buff));
crc32 = buff2int(decode_buff + sizeof(int) * 4);
p = pTask->data + sizeof(TrackerHeader);
long2buff(file_stat.st_size, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(file_lstat.st_mtime, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(crc32, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
memset(p, 0, IP_ADDRESS_SIZE);
if (fdfs_get_server_id_type(storage_id) == FDFS_ID_TYPE_SERVER_ID)
{
if (g_use_storage_id)
{
FDFSStorageIdInfo *pStorageIdInfo;
char id[16];
sprintf(id, "%d", storage_id);
pStorageIdInfo = fdfs_get_storage_by_id(id);
if (pStorageIdInfo != NULL)
{
strcpy(p, fdfs_get_ipaddr_by_peer_ip(
&pStorageIdInfo->ip_addrs,
pTask->client_ip));
}
}
}
else
{
struct in_addr ip_addr;
memset(&ip_addr, 0, sizeof(ip_addr));
ip_addr.s_addr = storage_id;
inet_ntop(AF_INET, &ip_addr, p, IP_ADDRESS_SIZE);
}
p += IP_ADDRESS_SIZE;
pClientInfo->total_length = p - pTask->data;
return 0;
return query_file_info_deal_response(pTask, filename,
true_filename, &file_stat, store_path_index);
}
#define CHECK_TRUNK_SERVER(pTask) \
@ -4563,8 +4679,9 @@ static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
| g_extra_open_file_flags;
}
return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
storage_upload_file_done_callback, \
clean_func, store_path_index);
}
@ -4756,6 +4873,7 @@ static int storage_append_file(struct fast_task_info *pTask)
store_path_index;
pFileContext->op = FDFS_STORAGE_FILE_OP_APPEND;
pFileContext->open_flags = O_WRONLY | O_APPEND | g_extra_open_file_flags;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, stat_buf.st_size, file_bytes, \
p - pTask->data, dio_write_file, \
@ -4951,6 +5069,7 @@ static int storage_modify_file(struct fast_task_info *pTask)
store_path_index;
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
pFileContext->open_flags = O_WRONLY | g_extra_open_file_flags;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
@ -5134,6 +5253,7 @@ static int storage_do_truncate_file(struct fast_task_info *pTask)
store_path_index;
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
pFileContext->open_flags = O_WRONLY | g_extra_open_file_flags;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, remain_bytes, \
stat_buf.st_size, 0, dio_truncate_file, \
@ -5368,6 +5488,7 @@ static int storage_upload_slave_file(struct fast_task_info *pTask)
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \
| g_extra_open_file_flags;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, 0, file_bytes, p - pTask->data, \
dio_write_file, storage_upload_file_done_callback, \
@ -5680,6 +5801,7 @@ static int storage_sync_copy_file(struct fast_task_info *pTask, \
pFileContext->calc_crc32 = false;
pFileContext->calc_file_hash = false;
strcpy(pFileContext->fname2log, filename);
pFileContext->continue_callback = storage_nio_notify;
if (have_file_content)
{
@ -5915,6 +6037,7 @@ static int storage_sync_append_file(struct fast_task_info *pTask)
pFileContext->calc_file_hash = false;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, start_offset, append_bytes, \
p - pTask->data, deal_func, \
@ -6116,6 +6239,7 @@ static int storage_sync_modify_file(struct fast_task_info *pTask)
pFileContext->calc_file_hash = false;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, start_offset, modify_bytes, \
p - pTask->data, deal_func, \
@ -6301,6 +6425,7 @@ static int storage_sync_truncate_file(struct fast_task_info *pTask)
pFileContext->calc_file_hash = false;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->continue_callback = storage_nio_notify;
return storage_write_to_file(pTask, new_file_size, \
old_file_size, 0, dio_truncate_file, \
@ -6761,6 +6886,8 @@ static int storage_server_get_metadata(struct fast_task_info *pTask)
}
pFileContext->fd = -1;
pFileContext->calc_crc32 = false;
pFileContext->continue_callback = storage_nio_notify;
return storage_read_from_file(pTask, 0, file_bytes, \
storage_get_metadata_done_callback, store_path_index);
}
@ -6941,6 +7068,8 @@ static int storage_server_download_file(struct fast_task_info *pTask)
g_fdfs_store_paths.paths[store_path_index], true_filename);
}
pFileContext->calc_crc32 = false;
pFileContext->continue_callback = storage_nio_notify;
return storage_read_from_file(pTask, file_offset, download_bytes, \
storage_download_file_done_callback, store_path_index);
}
@ -7006,6 +7135,10 @@ static int storage_read_from_file(struct fast_task_info *pTask, \
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(download_bytes, pHeader->pkg_len);
if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_XINIT;
}
if ((result=storage_dio_queue_push(pTask)) != 0)
{
if (pFileContext->fd >= 0)

View File

@ -36,7 +36,7 @@ void storage_service_destroy();
int fdfs_stat_file_sync_func(void *args);
int storage_deal_task(struct fast_task_info *pTask);
void storage_nio_notify(struct fast_task_info *pTask);
int storage_nio_notify(struct fast_task_info *pTask);
void storage_accept_loop(int server_sock);
int storage_terminate_threads();