diff --git a/HISTORY b/HISTORY index 4ee941c..c793c7c 100644 --- a/HISTORY +++ b/HISTORY @@ -1,7 +1,8 @@ -Version 6.12.0 2024-01-11 +Version 6.12.0 2024-02-12 * bugfixed: parse ip and port use parseAddress instead of splitEx * bugfixed: fdfs_server_info_to_string support IPv6 correctly + * check filename duplicate by hashtable instead of file system access Version 6.11.0 2023-12-10 * support IPv6, config item: address_family in tracker.conf and storage.conf diff --git a/storage/Makefile.in b/storage/Makefile.in index ac8dd20..c66c987 100644 --- a/storage/Makefile.in +++ b/storage/Makefile.in @@ -12,6 +12,7 @@ SHARED_OBJS = ../common/fdfs_global.o ../tracker/fdfs_shared_func.o \ storage_sync_func.o storage_service.o storage_sync.o \ storage_dio.o storage_ip_changed_dealer.o \ storage_param_getter.o storage_disk_recovery.o \ + file_id_hashtable.o \ trunk_mgr/trunk_mem.o trunk_mgr/trunk_shared.o \ trunk_mgr/trunk_sync.o trunk_mgr/trunk_client.o \ trunk_mgr/trunk_free_block_checker.o \ diff --git a/storage/fdfs_storaged.c b/storage/fdfs_storaged.c index 248c408..837fe8d 100644 --- a/storage/fdfs_storaged.c +++ b/storage/fdfs_storaged.c @@ -42,6 +42,7 @@ #include "trunk_mem.h" #include "trunk_sync.h" #include "trunk_shared.h" +#include "file_id_hashtable.h" #ifdef WITH_HTTPD #include "storage_httpd.h" @@ -243,6 +244,13 @@ int main(int argc, char *argv[]) } if ((result=setup_schedule_tasks()) != 0) + { + logCrit("exit abnormally!\n"); + log_destroy(); + return result; + } + + if ((result=file_id_hashtable_init()) != 0) { logCrit("exit abnormally!\n"); log_destroy(); diff --git a/storage/file_id_hashtable.c b/storage/file_id_hashtable.c new file mode 100644 index 0000000..9875a0c --- /dev/null +++ b/storage/file_id_hashtable.c @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "fastcommon/pthread_func.h" +#include "fastcommon/logger.h" +#include "fastcommon/fc_atomic.h" +#include "fastcommon/fast_allocator.h" +#include "file_id_hashtable.h" + +typedef struct file_id_info { + string_t file_id; + uint32_t hash_code; + uint32_t expires; + struct { + struct file_id_info *htable; + struct file_id_info *list; + } nexts; +} FileIdInfo; + +typedef struct { + FileIdInfo **buckets; + uint32_t capacity; + volatile uint32_t count; +} FileIdHashtable; + +typedef struct { + pthread_mutex_t *locks; + int count; +} FileIdSharedLockArray; + +typedef struct { + struct { + struct file_id_info *head; + struct file_id_info *tail; + pthread_mutex_t lock; + } list; + FileIdHashtable htable; + FileIdSharedLockArray lock_array; + struct fast_mblock_man allocator; //element: FileIdInfo + struct fast_allocator_context acontext; //for string allocator +} FileIdHTableContext; + +static FileIdHTableContext file_id_ctx = { + {NULL, NULL}, {NULL, 0, 0}, {NULL, 0} +}; + +static int clear_expired_file_id_func(void *args); + +int file_id_hashtable_init() +{ + const int obj_size = 0; + int result; + int bytes; + struct fast_region_info regions[2]; + pthread_mutex_t *lock; + pthread_mutex_t *end; + ScheduleArray scheduleArray; + ScheduleEntry entry; + + file_id_ctx.htable.capacity = 1403641; + bytes = sizeof(FileIdInfo *) * file_id_ctx.htable.capacity; + file_id_ctx.htable.buckets = fc_malloc(bytes); + if (file_id_ctx.htable.buckets == NULL) { + return ENOMEM; + } + memset(file_id_ctx.htable.buckets, 0, bytes); + + file_id_ctx.lock_array.count = 163; + bytes = sizeof(pthread_mutex_t) * file_id_ctx.lock_array.count; + file_id_ctx.lock_array.locks = fc_malloc(bytes); + if (file_id_ctx.lock_array.locks == NULL) { + return ENOMEM; + } + + end = file_id_ctx.lock_array.locks + file_id_ctx.lock_array.count; + for (lock=file_id_ctx.lock_array.locks; lockstr, file_id->len); + FILE_ID_HASHTABLE_SET_BUCKET_AND_LOCK(hash_code); + + result = 0; + PTHREAD_MUTEX_LOCK(lock); + previous = NULL; + current = *bucket; + while (current != NULL) { + if (hash_code < current->hash_code) { + break; + } else if (hash_code == current->hash_code && fc_string_equal( + file_id, ¤t->file_id)) + { + result = EEXIST; + break; + } + + previous = current; + current = current->nexts.htable; + } + + if (result == 0) { + do { + if ((finfo=fast_mblock_alloc_object(&file_id_ctx. + allocator)) == NULL) + { + result = ENOMEM; + break; + } + if ((finfo->file_id.str=fast_allocator_alloc(&file_id_ctx. + acontext, file_id->len)) == NULL) + { + fast_mblock_free_object(&file_id_ctx.allocator, finfo); + result = ENOMEM; + break; + } + + memcpy(finfo->file_id.str, file_id->str, file_id->len); + finfo->file_id.len = file_id->len; + finfo->hash_code = hash_code; + finfo->expires = g_current_time + 3; + if (previous == NULL) { + finfo->nexts.htable = *bucket; + *bucket = finfo; + } else { + finfo->nexts.htable = current; + previous->nexts.htable = finfo; + } + FC_ATOMIC_INC(file_id_ctx.htable.count); + + } while (0); + } else { + finfo = NULL; + } + PTHREAD_MUTEX_UNLOCK(lock); + + if (result == 0) { + PTHREAD_MUTEX_LOCK(&file_id_ctx.list.lock); + finfo->nexts.list = NULL; + if (file_id_ctx.list.tail == NULL) { + file_id_ctx.list.head = finfo; + } else { + file_id_ctx.list.tail->nexts.list = finfo; + } + file_id_ctx.list.tail = finfo; + PTHREAD_MUTEX_UNLOCK(&file_id_ctx.list.lock); + } + return result; +} + +static int file_id_hashtable_del(FileIdInfo *finfo) +{ + int result; + FileIdInfo *current; + FileIdInfo *previous; + FILE_ID_HASHTABLE_DECLARE_VARS(); + + FILE_ID_HASHTABLE_SET_BUCKET_AND_LOCK(finfo->hash_code); + PTHREAD_MUTEX_LOCK(lock); + if (*bucket == NULL) { + result = ENOENT; + } else if (finfo->hash_code == (*bucket)->hash_code && + fc_string_equal(&finfo->file_id, &(*bucket)->file_id)) + { + *bucket = (*bucket)->nexts.htable; + FC_ATOMIC_DEC(file_id_ctx.htable.count); + result = 0; + } else { + result = ENOENT; + previous = *bucket; + while ((current=previous->nexts.htable) != NULL) { + if (finfo->hash_code < current->hash_code) { + break; + } else if (finfo->hash_code == current->hash_code && + fc_string_equal(&finfo->file_id, ¤t->file_id)) + { + previous->nexts.htable = current->nexts.htable; + FC_ATOMIC_DEC(file_id_ctx.htable.count); + result = 0; + break; + } + + previous = current; + } + } + PTHREAD_MUTEX_UNLOCK(lock); + + return result; +} + +static int clear_expired_file_id_func(void *args) +{ + struct file_id_info *head; + struct file_id_info *tail; + struct fast_mblock_chain chain; + struct fast_mblock_node *node; + + head = tail = NULL; + PTHREAD_MUTEX_LOCK(&file_id_ctx.list.lock); + if (file_id_ctx.list.head != NULL && file_id_ctx. + list.head->expires < g_current_time) + { + head = tail = file_id_ctx.list.head; + file_id_ctx.list.head = file_id_ctx.list.head->nexts.list; + while (file_id_ctx.list.head != NULL && file_id_ctx. + list.head->expires < g_current_time) + { + tail = file_id_ctx.list.head; + file_id_ctx.list.head = file_id_ctx.list.head->nexts.list; + } + + if (file_id_ctx.list.head == NULL) { + file_id_ctx.list.tail = NULL; + } else { + tail->nexts.list = NULL; + } + } + PTHREAD_MUTEX_UNLOCK(&file_id_ctx.list.lock); + + if (head == NULL) { + return 0; + } + + chain.head = chain.tail = NULL; + do { + node = fast_mblock_to_node_ptr(head); + if (chain.head == NULL) { + chain.head = node; + } else { + chain.tail->next = node; + } + chain.tail = node; + + file_id_hashtable_del(head); + fast_allocator_free(&file_id_ctx.acontext, head->file_id.str); + } while ((head=head->nexts.list) != NULL); + + chain.tail->next = NULL; + fast_mblock_batch_free(&file_id_ctx.allocator, &chain); + return 0; +} diff --git a/storage/file_id_hashtable.h b/storage/file_id_hashtable.h new file mode 100644 index 0000000..2be59dd --- /dev/null +++ b/storage/file_id_hashtable.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _FILE_ID_HASHTABLE_H +#define _FILE_ID_HASHTABLE_H + +#include "storage_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + + int file_id_hashtable_init(); + + void file_id_hashtable_destroy(); + + int file_id_hashtable_add(const string_t *file_id); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/storage/storage_service.c b/storage/storage_service.c index 8b5ed34..721ef2a 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -40,6 +40,7 @@ #include "storage_global.h" #include "fastcommon/base64.h" #include "fastcommon/hash.h" +#include "fastcommon/ioevent_loop.h" #include "fdht_client.h" #include "fdfs_global.h" #include "tracker_client.h" @@ -49,7 +50,7 @@ #include "trunk_mem.h" #include "trunk_sync.h" #include "trunk_client.h" -#include "fastcommon/ioevent_loop.h" +#include "file_id_hashtable.h" //storage access log actions #define ACCESS_LOG_ACTION_UPLOAD_FILE "upload" @@ -1801,18 +1802,30 @@ static int storage_gen_filename(StorageClientInfo *pClientInfo, int2buff(htonl(g_server_id_in_filename), buff); int2buff(timestamp, buff+sizeof(int)); if ((file_size >> 32) != 0) - { - masked_file_size = file_size; - } + { + if (IS_TRUNK_FILE(file_size)) + { + COMBINE_RAND_FILE_SIZE(file_size & 0xFFFFFFFF, masked_file_size); + masked_file_size |= FDFS_TRUNK_FILE_MARK_SIZE; + } + else if (IS_APPENDER_FILE(file_size)) + { + COMBINE_RAND_FILE_SIZE(0, masked_file_size); + masked_file_size |= FDFS_APPENDER_FILE_SIZE; + } + else + { + masked_file_size = file_size; + } + } else { COMBINE_RAND_FILE_SIZE(file_size, masked_file_size); } long2buff(masked_file_size, buff+sizeof(int)*2); int2buff(crc32, buff+sizeof(int)*4); - - base64_encode_ex(&g_fdfs_base64_context, buff, sizeof(int) * 5, encoded, \ - filename_len, false); + base64_encode_ex(&g_fdfs_base64_context, buff, sizeof(int) * 5, + encoded, filename_len, false); if (!pClientInfo->file_context.extra_info.upload.if_sub_path_alloced) { @@ -1903,9 +1916,12 @@ static int storage_get_filename(StorageClientInfo *pClientInfo, int i; int result; int store_path_index; + string_t file_id; + char buff[128]; store_path_index = pClientInfo->file_context.extra_info.upload. trunk_info.path.store_path_index; + file_id.str = buff; for (i=0; i<10; i++) { if ((result=storage_gen_filename(pClientInfo, file_size, @@ -1915,26 +1931,28 @@ static int storage_get_filename(StorageClientInfo *pClientInfo, return result; } - sprintf(full_filename, "%s/data/%s", - g_fdfs_store_paths.paths[store_path_index].path, filename); - if (!fileExists(full_filename)) - { - break; - } - - *full_filename = '\0'; + file_id.len = snprintf(buff, sizeof(buff), + "%c"FDFS_STORAGE_DATA_DIR_FORMAT"/%s", + FDFS_STORAGE_STORE_PATH_PREFIX_CHAR, + store_path_index, filename); + if ((result=file_id_hashtable_add(&file_id)) == 0) //check duplicate + { + sprintf(full_filename, "%s/data/%s", g_fdfs_store_paths. + paths[store_path_index].path, filename); + break; + } } - if (*full_filename == '\0') - { - logError("file: "__FILE__", line: %d, " - "Can't generate uniq filename", __LINE__); - *filename = '\0'; - *filename_len = 0; - return ENOENT; - } + if (result != 0) + { + logError("file: "__FILE__", line: %d, " + "Can't generate uniq filename", __LINE__); + *filename = '\0'; + *filename_len = 0; + *full_filename = '\0'; + } - return 0; + return result; } static int storage_client_create_link_wrapper(struct fast_task_info *pTask, \ @@ -2072,11 +2090,10 @@ static int storage_service_upload_file_done(struct fast_task_info *pTask) *new_filename = '\0'; new_filename_len = 0; if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK) - { - end_time = pFileContext->extra_info.upload.start_time; - COMBINE_RAND_FILE_SIZE(file_size, file_size_in_name); - file_size_in_name |= FDFS_TRUNK_FILE_MARK_SIZE; - } + { + end_time = pFileContext->extra_info.upload.start_time; + file_size_in_name = FDFS_TRUNK_FILE_MARK_SIZE | file_size; + } else { struct stat stat_buf; @@ -2093,10 +2110,9 @@ static int storage_service_upload_file_done(struct fast_task_info *pTask) } if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_APPENDER) - { - COMBINE_RAND_FILE_SIZE(0, file_size_in_name); - file_size_in_name |= FDFS_APPENDER_FILE_SIZE; - } + { + file_size_in_name = FDFS_APPENDER_FILE_SIZE; + } else { file_size_in_name = file_size; diff --git a/storage/storage_types.h b/storage/storage_types.h index 7b20d26..6356f46 100644 --- a/storage/storage_types.h +++ b/storage/storage_types.h @@ -27,6 +27,10 @@ #define FDFS_STORAGE_FILE_OP_DELETE 'D' #define FDFS_STORAGE_FILE_OP_DISCARD 'd' +#define FDFS_TRUNK_FILE_CREATOR_TASK_ID 88 +#define FDFS_TRUNK_BINLOG_COMPRESS_TASK_ID 89 +#define FDFS_CLEAR_EXPIRED_FILE_ID_TASK_ID 90 + typedef int (*TaskDealFunc)(struct fast_task_info *pTask); /* this clean func will be called when connection disconnected */ diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index 1074fad..e62d2f3 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -37,9 +37,6 @@ #include "trunk_sync.h" #include "storage_param_getter.h" -#define TRUNK_FILE_CREATOR_TASK_ID 88 -#define TRUNK_BINLOG_COMPRESS_TASK_ID 89 - static pthread_mutex_t reporter_thread_lock; /* save report thread ids */ @@ -1236,7 +1233,7 @@ static int do_set_trunk_server_myself(ConnectionInfo *pTrackerServer) if (g_trunk_create_file_advance && g_trunk_create_file_interval > 0) { - INIT_SCHEDULE_ENTRY_EX1(*entry, TRUNK_FILE_CREATOR_TASK_ID, + INIT_SCHEDULE_ENTRY_EX1(*entry, FDFS_TRUNK_FILE_CREATOR_TASK_ID, g_trunk_create_file_time_base, g_trunk_create_file_interval, trunk_create_trunk_file_advance, NULL, true); @@ -1245,7 +1242,7 @@ static int do_set_trunk_server_myself(ConnectionInfo *pTrackerServer) if (g_trunk_compress_binlog_interval > 0) { - INIT_SCHEDULE_ENTRY_EX1(*entry, TRUNK_BINLOG_COMPRESS_TASK_ID, + INIT_SCHEDULE_ENTRY_EX1(*entry, FDFS_TRUNK_BINLOG_COMPRESS_TASK_ID, g_trunk_compress_binlog_time_base, g_trunk_compress_binlog_interval, trunk_binlog_compress_func, NULL, true); @@ -1273,12 +1270,12 @@ static void do_unset_trunk_server_myself(ConnectionInfo *pTrackerServer) if (g_trunk_create_file_advance && g_trunk_create_file_interval > 0) { - sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID); + sched_del_entry(FDFS_TRUNK_FILE_CREATOR_TASK_ID); } if (g_trunk_compress_binlog_interval > 0) { - sched_del_entry(TRUNK_BINLOG_COMPRESS_TASK_ID); + sched_del_entry(FDFS_TRUNK_BINLOG_COMPRESS_TASK_ID); } } diff --git a/tracker/tracker_types.h b/tracker/tracker_types.h index ec1dd88..26e9a28 100644 --- a/tracker/tracker_types.h +++ b/tracker/tracker_types.h @@ -48,7 +48,7 @@ #define FDFS_TRUNK_LOGIC_FILENAME_LENGTH (FDFS_TRUNK_FILENAME_LENGTH + \ (FDFS_LOGIC_FILE_PATH_LEN - FDFS_TRUE_FILE_PATH_LEN)) -#define FDFS_VERSION_SIZE 6 +#define FDFS_VERSION_SIZE 8 //status order is important! #define FDFS_STORAGE_STATUS_INIT 0