From e5c48a413235136b01fc4db61e12c950ed9a2059 Mon Sep 17 00:00:00 2001
From: YuQing <384681@qq.com>
Date: Mon, 12 Feb 2024 20:17:02 +0800
Subject: [PATCH] check filename duplicate by hashtable instead of file system
access
---
HISTORY | 3 +-
storage/Makefile.in | 1 +
storage/fdfs_storaged.c | 8 +
storage/file_id_hashtable.c | 326 ++++++++++++++++++++++++++++++++
storage/file_id_hashtable.h | 35 ++++
storage/storage_service.c | 82 ++++----
storage/storage_types.h | 4 +
storage/tracker_client_thread.c | 11 +-
tracker/tracker_types.h | 2 +-
9 files changed, 430 insertions(+), 42 deletions(-)
create mode 100644 storage/file_id_hashtable.c
create mode 100644 storage/file_id_hashtable.h
diff --git a/HISTORY b/HISTORY
index 4ee941c..c793c7c 100644
--- a/HISTORY
+++ b/HISTORY
@@ -1,7 +1,8 @@
-Version 6.12.0 2024-01-11
+Version 6.12.0 2024-02-12
* bugfixed: parse ip and port use parseAddress instead of splitEx
* bugfixed: fdfs_server_info_to_string support IPv6 correctly
+ * check filename duplicate by hashtable instead of file system access
Version 6.11.0 2023-12-10
* support IPv6, config item: address_family in tracker.conf and storage.conf
diff --git a/storage/Makefile.in b/storage/Makefile.in
index ac8dd20..c66c987 100644
--- a/storage/Makefile.in
+++ b/storage/Makefile.in
@@ -12,6 +12,7 @@ SHARED_OBJS = ../common/fdfs_global.o ../tracker/fdfs_shared_func.o \
storage_sync_func.o storage_service.o storage_sync.o \
storage_dio.o storage_ip_changed_dealer.o \
storage_param_getter.o storage_disk_recovery.o \
+ file_id_hashtable.o \
trunk_mgr/trunk_mem.o trunk_mgr/trunk_shared.o \
trunk_mgr/trunk_sync.o trunk_mgr/trunk_client.o \
trunk_mgr/trunk_free_block_checker.o \
diff --git a/storage/fdfs_storaged.c b/storage/fdfs_storaged.c
index 248c408..837fe8d 100644
--- a/storage/fdfs_storaged.c
+++ b/storage/fdfs_storaged.c
@@ -42,6 +42,7 @@
#include "trunk_mem.h"
#include "trunk_sync.h"
#include "trunk_shared.h"
+#include "file_id_hashtable.h"
#ifdef WITH_HTTPD
#include "storage_httpd.h"
@@ -243,6 +244,13 @@ int main(int argc, char *argv[])
}
if ((result=setup_schedule_tasks()) != 0)
+ {
+ logCrit("exit abnormally!\n");
+ log_destroy();
+ return result;
+ }
+
+ if ((result=file_id_hashtable_init()) != 0)
{
logCrit("exit abnormally!\n");
log_destroy();
diff --git a/storage/file_id_hashtable.c b/storage/file_id_hashtable.c
new file mode 100644
index 0000000..9875a0c
--- /dev/null
+++ b/storage/file_id_hashtable.c
@@ -0,0 +1,326 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "fastcommon/pthread_func.h"
+#include "fastcommon/logger.h"
+#include "fastcommon/fc_atomic.h"
+#include "fastcommon/fast_allocator.h"
+#include "file_id_hashtable.h"
+
+typedef struct file_id_info {
+ string_t file_id;
+ uint32_t hash_code;
+ uint32_t expires;
+ struct {
+ struct file_id_info *htable;
+ struct file_id_info *list;
+ } nexts;
+} FileIdInfo;
+
+typedef struct {
+ FileIdInfo **buckets;
+ uint32_t capacity;
+ volatile uint32_t count;
+} FileIdHashtable;
+
+typedef struct {
+ pthread_mutex_t *locks;
+ int count;
+} FileIdSharedLockArray;
+
+typedef struct {
+ struct {
+ struct file_id_info *head;
+ struct file_id_info *tail;
+ pthread_mutex_t lock;
+ } list;
+ FileIdHashtable htable;
+ FileIdSharedLockArray lock_array;
+ struct fast_mblock_man allocator; //element: FileIdInfo
+ struct fast_allocator_context acontext; //for string allocator
+} FileIdHTableContext;
+
+static FileIdHTableContext file_id_ctx = {
+ {NULL, NULL}, {NULL, 0, 0}, {NULL, 0}
+};
+
+static int clear_expired_file_id_func(void *args);
+
+int file_id_hashtable_init()
+{
+ const int obj_size = 0;
+ int result;
+ int bytes;
+ struct fast_region_info regions[2];
+ pthread_mutex_t *lock;
+ pthread_mutex_t *end;
+ ScheduleArray scheduleArray;
+ ScheduleEntry entry;
+
+ file_id_ctx.htable.capacity = 1403641;
+ bytes = sizeof(FileIdInfo *) * file_id_ctx.htable.capacity;
+ file_id_ctx.htable.buckets = fc_malloc(bytes);
+ if (file_id_ctx.htable.buckets == NULL) {
+ return ENOMEM;
+ }
+ memset(file_id_ctx.htable.buckets, 0, bytes);
+
+ file_id_ctx.lock_array.count = 163;
+ bytes = sizeof(pthread_mutex_t) * file_id_ctx.lock_array.count;
+ file_id_ctx.lock_array.locks = fc_malloc(bytes);
+ if (file_id_ctx.lock_array.locks == NULL) {
+ return ENOMEM;
+ }
+
+ end = file_id_ctx.lock_array.locks + file_id_ctx.lock_array.count;
+ for (lock=file_id_ctx.lock_array.locks; lockstr, file_id->len);
+ FILE_ID_HASHTABLE_SET_BUCKET_AND_LOCK(hash_code);
+
+ result = 0;
+ PTHREAD_MUTEX_LOCK(lock);
+ previous = NULL;
+ current = *bucket;
+ while (current != NULL) {
+ if (hash_code < current->hash_code) {
+ break;
+ } else if (hash_code == current->hash_code && fc_string_equal(
+ file_id, ¤t->file_id))
+ {
+ result = EEXIST;
+ break;
+ }
+
+ previous = current;
+ current = current->nexts.htable;
+ }
+
+ if (result == 0) {
+ do {
+ if ((finfo=fast_mblock_alloc_object(&file_id_ctx.
+ allocator)) == NULL)
+ {
+ result = ENOMEM;
+ break;
+ }
+ if ((finfo->file_id.str=fast_allocator_alloc(&file_id_ctx.
+ acontext, file_id->len)) == NULL)
+ {
+ fast_mblock_free_object(&file_id_ctx.allocator, finfo);
+ result = ENOMEM;
+ break;
+ }
+
+ memcpy(finfo->file_id.str, file_id->str, file_id->len);
+ finfo->file_id.len = file_id->len;
+ finfo->hash_code = hash_code;
+ finfo->expires = g_current_time + 3;
+ if (previous == NULL) {
+ finfo->nexts.htable = *bucket;
+ *bucket = finfo;
+ } else {
+ finfo->nexts.htable = current;
+ previous->nexts.htable = finfo;
+ }
+ FC_ATOMIC_INC(file_id_ctx.htable.count);
+
+ } while (0);
+ } else {
+ finfo = NULL;
+ }
+ PTHREAD_MUTEX_UNLOCK(lock);
+
+ if (result == 0) {
+ PTHREAD_MUTEX_LOCK(&file_id_ctx.list.lock);
+ finfo->nexts.list = NULL;
+ if (file_id_ctx.list.tail == NULL) {
+ file_id_ctx.list.head = finfo;
+ } else {
+ file_id_ctx.list.tail->nexts.list = finfo;
+ }
+ file_id_ctx.list.tail = finfo;
+ PTHREAD_MUTEX_UNLOCK(&file_id_ctx.list.lock);
+ }
+ return result;
+}
+
+static int file_id_hashtable_del(FileIdInfo *finfo)
+{
+ int result;
+ FileIdInfo *current;
+ FileIdInfo *previous;
+ FILE_ID_HASHTABLE_DECLARE_VARS();
+
+ FILE_ID_HASHTABLE_SET_BUCKET_AND_LOCK(finfo->hash_code);
+ PTHREAD_MUTEX_LOCK(lock);
+ if (*bucket == NULL) {
+ result = ENOENT;
+ } else if (finfo->hash_code == (*bucket)->hash_code &&
+ fc_string_equal(&finfo->file_id, &(*bucket)->file_id))
+ {
+ *bucket = (*bucket)->nexts.htable;
+ FC_ATOMIC_DEC(file_id_ctx.htable.count);
+ result = 0;
+ } else {
+ result = ENOENT;
+ previous = *bucket;
+ while ((current=previous->nexts.htable) != NULL) {
+ if (finfo->hash_code < current->hash_code) {
+ break;
+ } else if (finfo->hash_code == current->hash_code &&
+ fc_string_equal(&finfo->file_id, ¤t->file_id))
+ {
+ previous->nexts.htable = current->nexts.htable;
+ FC_ATOMIC_DEC(file_id_ctx.htable.count);
+ result = 0;
+ break;
+ }
+
+ previous = current;
+ }
+ }
+ PTHREAD_MUTEX_UNLOCK(lock);
+
+ return result;
+}
+
+static int clear_expired_file_id_func(void *args)
+{
+ struct file_id_info *head;
+ struct file_id_info *tail;
+ struct fast_mblock_chain chain;
+ struct fast_mblock_node *node;
+
+ head = tail = NULL;
+ PTHREAD_MUTEX_LOCK(&file_id_ctx.list.lock);
+ if (file_id_ctx.list.head != NULL && file_id_ctx.
+ list.head->expires < g_current_time)
+ {
+ head = tail = file_id_ctx.list.head;
+ file_id_ctx.list.head = file_id_ctx.list.head->nexts.list;
+ while (file_id_ctx.list.head != NULL && file_id_ctx.
+ list.head->expires < g_current_time)
+ {
+ tail = file_id_ctx.list.head;
+ file_id_ctx.list.head = file_id_ctx.list.head->nexts.list;
+ }
+
+ if (file_id_ctx.list.head == NULL) {
+ file_id_ctx.list.tail = NULL;
+ } else {
+ tail->nexts.list = NULL;
+ }
+ }
+ PTHREAD_MUTEX_UNLOCK(&file_id_ctx.list.lock);
+
+ if (head == NULL) {
+ return 0;
+ }
+
+ chain.head = chain.tail = NULL;
+ do {
+ node = fast_mblock_to_node_ptr(head);
+ if (chain.head == NULL) {
+ chain.head = node;
+ } else {
+ chain.tail->next = node;
+ }
+ chain.tail = node;
+
+ file_id_hashtable_del(head);
+ fast_allocator_free(&file_id_ctx.acontext, head->file_id.str);
+ } while ((head=head->nexts.list) != NULL);
+
+ chain.tail->next = NULL;
+ fast_mblock_batch_free(&file_id_ctx.allocator, &chain);
+ return 0;
+}
diff --git a/storage/file_id_hashtable.h b/storage/file_id_hashtable.h
new file mode 100644
index 0000000..2be59dd
--- /dev/null
+++ b/storage/file_id_hashtable.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _FILE_ID_HASHTABLE_H
+#define _FILE_ID_HASHTABLE_H
+
+#include "storage_types.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ int file_id_hashtable_init();
+
+ void file_id_hashtable_destroy();
+
+ int file_id_hashtable_add(const string_t *file_id);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/storage/storage_service.c b/storage/storage_service.c
index 8b5ed34..721ef2a 100644
--- a/storage/storage_service.c
+++ b/storage/storage_service.c
@@ -40,6 +40,7 @@
#include "storage_global.h"
#include "fastcommon/base64.h"
#include "fastcommon/hash.h"
+#include "fastcommon/ioevent_loop.h"
#include "fdht_client.h"
#include "fdfs_global.h"
#include "tracker_client.h"
@@ -49,7 +50,7 @@
#include "trunk_mem.h"
#include "trunk_sync.h"
#include "trunk_client.h"
-#include "fastcommon/ioevent_loop.h"
+#include "file_id_hashtable.h"
//storage access log actions
#define ACCESS_LOG_ACTION_UPLOAD_FILE "upload"
@@ -1801,18 +1802,30 @@ static int storage_gen_filename(StorageClientInfo *pClientInfo,
int2buff(htonl(g_server_id_in_filename), buff);
int2buff(timestamp, buff+sizeof(int));
if ((file_size >> 32) != 0)
- {
- masked_file_size = file_size;
- }
+ {
+ if (IS_TRUNK_FILE(file_size))
+ {
+ COMBINE_RAND_FILE_SIZE(file_size & 0xFFFFFFFF, masked_file_size);
+ masked_file_size |= FDFS_TRUNK_FILE_MARK_SIZE;
+ }
+ else if (IS_APPENDER_FILE(file_size))
+ {
+ COMBINE_RAND_FILE_SIZE(0, masked_file_size);
+ masked_file_size |= FDFS_APPENDER_FILE_SIZE;
+ }
+ else
+ {
+ masked_file_size = file_size;
+ }
+ }
else
{
COMBINE_RAND_FILE_SIZE(file_size, masked_file_size);
}
long2buff(masked_file_size, buff+sizeof(int)*2);
int2buff(crc32, buff+sizeof(int)*4);
-
- base64_encode_ex(&g_fdfs_base64_context, buff, sizeof(int) * 5, encoded, \
- filename_len, false);
+ base64_encode_ex(&g_fdfs_base64_context, buff, sizeof(int) * 5,
+ encoded, filename_len, false);
if (!pClientInfo->file_context.extra_info.upload.if_sub_path_alloced)
{
@@ -1903,9 +1916,12 @@ static int storage_get_filename(StorageClientInfo *pClientInfo,
int i;
int result;
int store_path_index;
+ string_t file_id;
+ char buff[128];
store_path_index = pClientInfo->file_context.extra_info.upload.
trunk_info.path.store_path_index;
+ file_id.str = buff;
for (i=0; i<10; i++)
{
if ((result=storage_gen_filename(pClientInfo, file_size,
@@ -1915,26 +1931,28 @@ static int storage_get_filename(StorageClientInfo *pClientInfo,
return result;
}
- sprintf(full_filename, "%s/data/%s",
- g_fdfs_store_paths.paths[store_path_index].path, filename);
- if (!fileExists(full_filename))
- {
- break;
- }
-
- *full_filename = '\0';
+ file_id.len = snprintf(buff, sizeof(buff),
+ "%c"FDFS_STORAGE_DATA_DIR_FORMAT"/%s",
+ FDFS_STORAGE_STORE_PATH_PREFIX_CHAR,
+ store_path_index, filename);
+ if ((result=file_id_hashtable_add(&file_id)) == 0) //check duplicate
+ {
+ sprintf(full_filename, "%s/data/%s", g_fdfs_store_paths.
+ paths[store_path_index].path, filename);
+ break;
+ }
}
- if (*full_filename == '\0')
- {
- logError("file: "__FILE__", line: %d, "
- "Can't generate uniq filename", __LINE__);
- *filename = '\0';
- *filename_len = 0;
- return ENOENT;
- }
+ if (result != 0)
+ {
+ logError("file: "__FILE__", line: %d, "
+ "Can't generate uniq filename", __LINE__);
+ *filename = '\0';
+ *filename_len = 0;
+ *full_filename = '\0';
+ }
- return 0;
+ return result;
}
static int storage_client_create_link_wrapper(struct fast_task_info *pTask, \
@@ -2072,11 +2090,10 @@ static int storage_service_upload_file_done(struct fast_task_info *pTask)
*new_filename = '\0';
new_filename_len = 0;
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
- {
- end_time = pFileContext->extra_info.upload.start_time;
- COMBINE_RAND_FILE_SIZE(file_size, file_size_in_name);
- file_size_in_name |= FDFS_TRUNK_FILE_MARK_SIZE;
- }
+ {
+ end_time = pFileContext->extra_info.upload.start_time;
+ file_size_in_name = FDFS_TRUNK_FILE_MARK_SIZE | file_size;
+ }
else
{
struct stat stat_buf;
@@ -2093,10 +2110,9 @@ static int storage_service_upload_file_done(struct fast_task_info *pTask)
}
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_APPENDER)
- {
- COMBINE_RAND_FILE_SIZE(0, file_size_in_name);
- file_size_in_name |= FDFS_APPENDER_FILE_SIZE;
- }
+ {
+ file_size_in_name = FDFS_APPENDER_FILE_SIZE;
+ }
else
{
file_size_in_name = file_size;
diff --git a/storage/storage_types.h b/storage/storage_types.h
index 7b20d26..6356f46 100644
--- a/storage/storage_types.h
+++ b/storage/storage_types.h
@@ -27,6 +27,10 @@
#define FDFS_STORAGE_FILE_OP_DELETE 'D'
#define FDFS_STORAGE_FILE_OP_DISCARD 'd'
+#define FDFS_TRUNK_FILE_CREATOR_TASK_ID 88
+#define FDFS_TRUNK_BINLOG_COMPRESS_TASK_ID 89
+#define FDFS_CLEAR_EXPIRED_FILE_ID_TASK_ID 90
+
typedef int (*TaskDealFunc)(struct fast_task_info *pTask);
/* this clean func will be called when connection disconnected */
diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c
index 1074fad..e62d2f3 100644
--- a/storage/tracker_client_thread.c
+++ b/storage/tracker_client_thread.c
@@ -37,9 +37,6 @@
#include "trunk_sync.h"
#include "storage_param_getter.h"
-#define TRUNK_FILE_CREATOR_TASK_ID 88
-#define TRUNK_BINLOG_COMPRESS_TASK_ID 89
-
static pthread_mutex_t reporter_thread_lock;
/* save report thread ids */
@@ -1236,7 +1233,7 @@ static int do_set_trunk_server_myself(ConnectionInfo *pTrackerServer)
if (g_trunk_create_file_advance &&
g_trunk_create_file_interval > 0)
{
- INIT_SCHEDULE_ENTRY_EX1(*entry, TRUNK_FILE_CREATOR_TASK_ID,
+ INIT_SCHEDULE_ENTRY_EX1(*entry, FDFS_TRUNK_FILE_CREATOR_TASK_ID,
g_trunk_create_file_time_base,
g_trunk_create_file_interval,
trunk_create_trunk_file_advance, NULL, true);
@@ -1245,7 +1242,7 @@ static int do_set_trunk_server_myself(ConnectionInfo *pTrackerServer)
if (g_trunk_compress_binlog_interval > 0)
{
- INIT_SCHEDULE_ENTRY_EX1(*entry, TRUNK_BINLOG_COMPRESS_TASK_ID,
+ INIT_SCHEDULE_ENTRY_EX1(*entry, FDFS_TRUNK_BINLOG_COMPRESS_TASK_ID,
g_trunk_compress_binlog_time_base,
g_trunk_compress_binlog_interval,
trunk_binlog_compress_func, NULL, true);
@@ -1273,12 +1270,12 @@ static void do_unset_trunk_server_myself(ConnectionInfo *pTrackerServer)
if (g_trunk_create_file_advance &&
g_trunk_create_file_interval > 0)
{
- sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID);
+ sched_del_entry(FDFS_TRUNK_FILE_CREATOR_TASK_ID);
}
if (g_trunk_compress_binlog_interval > 0)
{
- sched_del_entry(TRUNK_BINLOG_COMPRESS_TASK_ID);
+ sched_del_entry(FDFS_TRUNK_BINLOG_COMPRESS_TASK_ID);
}
}
diff --git a/tracker/tracker_types.h b/tracker/tracker_types.h
index ec1dd88..26e9a28 100644
--- a/tracker/tracker_types.h
+++ b/tracker/tracker_types.h
@@ -48,7 +48,7 @@
#define FDFS_TRUNK_LOGIC_FILENAME_LENGTH (FDFS_TRUNK_FILENAME_LENGTH + \
(FDFS_LOGIC_FILE_PATH_LEN - FDFS_TRUE_FILE_PATH_LEN))
-#define FDFS_VERSION_SIZE 6
+#define FDFS_VERSION_SIZE 8
//status order is important!
#define FDFS_STORAGE_STATUS_INIT 0