From 56eda633d81428083465dcd2e6c01457579adcf4 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 10 Dec 2020 14:13:40 +0800 Subject: [PATCH] add src/sf_sharding_htable.[hc] --- src/Makefile.in | 5 +- src/sf_sharding_htable.c | 363 +++++++++++++++++++++++++++++++++++++++ src/sf_sharding_htable.h | 125 ++++++++++++++ 3 files changed, 491 insertions(+), 2 deletions(-) create mode 100644 src/sf_sharding_htable.c create mode 100644 src/sf_sharding_htable.h diff --git a/src/Makefile.in b/src/Makefile.in index 1da9162..af7edae 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -6,7 +6,8 @@ LIB_PATH = $(LIBS) -lfastcommon TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION) TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ - sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h + sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \ + sf_sharding_htable.h IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ idempotency/server/server_channel.h \ @@ -23,7 +24,7 @@ ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER) SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \ sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \ - sf_binlog_writer.lo \ + sf_binlog_writer.lo sf_sharding_htable.lo \ idempotency/server/server_channel.lo \ idempotency/server/request_htable.lo \ idempotency/server/channel_htable.lo \ diff --git a/src/sf_sharding_htable.c b/src/sf_sharding_htable.c new file mode 100644 index 0000000..3f11975 --- /dev/null +++ b/src/sf_sharding_htable.c @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include "fastcommon/shared_func.h" +#include "sf_sharding_htable.h" + +static int init_allocators(SFHtableShardingContext *sharding_ctx, + const int allocator_count, const int element_size, + const int64_t element_limit) +{ + int result; + int bytes; + int alloc_elts_once; + int64_t max_elts_per_allocator; + struct fast_mblock_man *pa; + struct fast_mblock_man *end; + + bytes = sizeof(struct fast_mblock_man) * allocator_count; + sharding_ctx->allocators.elts = (struct fast_mblock_man *)fc_malloc(bytes); + if (sharding_ctx->allocators.elts == NULL) { + return ENOMEM; + } + + max_elts_per_allocator = element_limit + + (allocator_count - 1) / allocator_count; + if (max_elts_per_allocator < 8 * 1024) { + alloc_elts_once = max_elts_per_allocator; + } else { + alloc_elts_once = 8 * 1024; + } + + end = sharding_ctx->allocators.elts + allocator_count; + for (pa=sharding_ctx->allocators.elts; paallocators.count = allocator_count; + return 0; +} + +static int init_sharding(SFHtableSharding *sharding, + const int64_t per_capacity) +{ + int result; + int bytes; + struct fc_list_head *ph; + struct fc_list_head *end; + + if ((result=init_pthread_lock(&sharding->lock)) != 0) { + return result; + } + + bytes = sizeof(struct fc_list_head) * per_capacity; + sharding->hashtable.buckets = (struct fc_list_head *)fc_malloc(bytes); + if (sharding->hashtable.buckets == NULL) { + return ENOMEM; + } + end = sharding->hashtable.buckets + per_capacity; + for (ph=sharding->hashtable.buckets; phhashtable.capacity = per_capacity; + sharding->element_count = 0; + sharding->last_reclaim_time_sec = get_current_time(); + FC_INIT_LIST_HEAD(&sharding->lru); + return 0; +} + +static int init_sharding_array(SFHtableShardingContext *sharding_ctx, + const int sharding_count, const int64_t per_elt_limit, + const int64_t per_capacity) +{ + int result; + int bytes; + SFHtableSharding *ps; + SFHtableSharding *end; + + bytes = sizeof(SFHtableSharding) * sharding_count; + sharding_ctx->sharding_array.entries = (SFHtableSharding *)fc_malloc(bytes); + if (sharding_ctx->sharding_array.entries == NULL) { + return ENOMEM; + } + + end = sharding_ctx->sharding_array.entries + sharding_count; + for (ps=sharding_ctx->sharding_array.entries; psallocator = sharding_ctx->allocators.elts + + (ps - sharding_ctx->sharding_array.entries) % + sharding_ctx->allocators.count; + ps->element_limit = per_elt_limit; + ps->ctx = sharding_ctx; + if ((result=init_sharding(ps, per_capacity)) != 0) { + return result; + } + } + + sharding_ctx->sharding_array.count = sharding_count; + return 0; +} + +int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx, + sf_sharding_htable_insert_callback insert_callback, + sf_sharding_htable_find_callback find_callback, + sf_sharding_htable_accept_reclaim_callback reclaim_callback, + const int sharding_count, const int64_t htable_capacity, + const int allocator_count, const int element_size, + int64_t element_limit, const int64_t min_ttl_sec, + const int64_t max_ttl_sec) +{ + int result; + int64_t per_elt_limit; + int64_t per_capacity; + + if (element_limit <= 0) { + element_limit = 1000 * 1000; + } + if ((result=init_allocators(sharding_ctx, allocator_count, + element_size, element_limit)) != 0) + { + return result; + } + + per_elt_limit = (element_limit + sharding_count - 1) / sharding_count; + per_capacity = fc_ceil_prime(htable_capacity / sharding_count); + if ((result=init_sharding_array(sharding_ctx, sharding_count, + per_elt_limit, per_capacity)) != 0) + { + return result; + } + + sharding_ctx->insert_callback = insert_callback; + sharding_ctx->find_callback = find_callback; + sharding_ctx->accept_reclaim_callback = reclaim_callback; + sharding_ctx->sharding_reclaim.elt_water_mark = per_elt_limit * 0.10; + sharding_ctx->sharding_reclaim.min_ttl_sec = min_ttl_sec; + sharding_ctx->sharding_reclaim.max_ttl_sec = max_ttl_sec; + sharding_ctx->sharding_reclaim.elt_ttl_sec = (double)(sharding_ctx-> + sharding_reclaim.max_ttl_sec - sharding_ctx-> + sharding_reclaim.min_ttl_sec) / per_elt_limit; + + /* + logInfo("per_elt_limit: %"PRId64", elt_water_mark: %d, " + "elt_ttl_sec: %.2f", per_elt_limit, (int)sharding_ctx-> + sharding_reclaim.elt_water_mark, sharding_ctx-> + sharding_reclaim.elt_ttl_sec); + */ + return 0; +} + +static inline int compare_key(const SFTwoIdsHashKey *key1, + const SFTwoIdsHashKey *key2) +{ + int sub; + if ((sub=fc_compare_int64(key1->id1, key2->id1)) != 0) { + return sub; + } + + return fc_compare_int64(key1->id2, key2->id2); +} + +static inline SFShardingHashEntry *htable_find(const SFTwoIdsHashKey *key, + struct fc_list_head *bucket) +{ + int r; + SFShardingHashEntry *current; + + fc_list_for_each_entry(current, bucket, dlinks.htable) { + r = compare_key(key, ¤t->key); + if (r < 0) { + return NULL; + } else if (r == 0) { + return current; + } + } + + return NULL; +} + +static inline void htable_insert(SFShardingHashEntry *entry, + struct fc_list_head *bucket) +{ + struct fc_list_head *previous; + struct fc_list_head *current; + SFShardingHashEntry *pe; + + previous = bucket; + fc_list_for_each(current, bucket) { + pe = fc_list_entry(current, SFShardingHashEntry, dlinks.htable); + if (compare_key(&entry->key, &pe->key) < 0) { + break; + } + + previous = current; + } + + fc_list_add_internal(&entry->dlinks.htable, previous, previous->next); +} + +static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding) +{ + int64_t reclaim_ttl_sec; + int64_t delta; + int64_t reclaim_count; + int64_t reclaim_limit; + SFShardingHashEntry *first; + SFShardingHashEntry *entry; + SFShardingHashEntry *tmp; + + if (sharding->element_count <= sharding->element_limit) { + delta = sharding->element_count; + reclaim_limit = sharding->ctx->sharding_reclaim.elt_water_mark; + } else { + delta = sharding->element_limit; + reclaim_limit = (sharding->element_count - sharding->element_limit) + + sharding->ctx->sharding_reclaim.elt_water_mark; + } + + first = NULL; + reclaim_count = 0; + reclaim_ttl_sec = (int64_t)(sharding->ctx->sharding_reclaim.max_ttl_sec - + sharding->ctx->sharding_reclaim.elt_ttl_sec * delta); + fc_list_for_each_entry_safe(entry, tmp, &sharding->lru, dlinks.lru) { + if (get_current_time() - entry->last_update_time_sec <= + reclaim_ttl_sec) + { + break; + } + + if (sharding->ctx->accept_reclaim_callback != NULL && + !sharding->ctx->accept_reclaim_callback(entry)) + { + continue; + } + + fc_list_del_init(&entry->dlinks.htable); + fc_list_del_init(&entry->dlinks.lru); + if (first == NULL) { + first = entry; //keep the first + } else { + fast_mblock_free_object(sharding->allocator, entry); + sharding->element_count--; + } + + if (++reclaim_count > reclaim_limit) { + break; + } + } + + if (reclaim_count > 0) { + logInfo("sharding index: %d, element_count: %"PRId64", " + "reclaim_ttl_sec: %"PRId64" ms, reclaim_count: %"PRId64", " + "reclaim_limit: %"PRId64, (int)(sharding - sharding->ctx-> + sharding_array.entries), sharding->element_count, + reclaim_ttl_sec, reclaim_count, reclaim_limit); + } + + return first; +} + +static inline SFShardingHashEntry *htable_entry_alloc( + SFHtableSharding *sharding) +{ + SFShardingHashEntry *entry; + + if (sharding->element_count > sharding->ctx->sharding_reclaim. + elt_water_mark && get_current_time() - sharding-> + last_reclaim_time_sec > 1000) + { + sharding->last_reclaim_time_sec = get_current_time(); + if ((entry=otid_entry_reclaim(sharding)) != NULL) { + return entry; + } + } + + entry = (SFShardingHashEntry *)fast_mblock_alloc_object( + sharding->allocator); + if (entry != NULL) { + sharding->element_count++; + entry->sharding = sharding; + } + + return entry; +} + +#define SET_SHARDING_AND_BUCKET(sharding_ctx, key) \ + SFHtableSharding *sharding; \ + struct fc_list_head *bucket; \ + SFShardingHashEntry *entry; \ + uint64_t hash_code; \ + \ + hash_code = key->id1 + key->id2; \ + sharding = sharding_ctx->sharding_array.entries + \ + hash_code % sharding_ctx->sharding_array.count; \ + bucket = sharding->hashtable.buckets + \ + key->id1 % sharding->hashtable.capacity + + +void *sf_sharding_htable_find(SFHtableShardingContext + *sharding_ctx, const SFTwoIdsHashKey *key, void *arg) +{ + void *data; + SET_SHARDING_AND_BUCKET(sharding_ctx, key); + + PTHREAD_MUTEX_LOCK(&sharding->lock); + entry = htable_find(key, bucket); + if (entry != NULL && sharding_ctx->find_callback != NULL) { + data = sharding_ctx->find_callback(entry, arg); + } else { + data = entry; + } + PTHREAD_MUTEX_UNLOCK(&sharding->lock); + + return data; +} + +int sf_sharding_htable_insert(SFHtableShardingContext + *sharding_ctx, const SFTwoIdsHashKey *key, void *arg) +{ + bool new_create; + int result; + SET_SHARDING_AND_BUCKET(sharding_ctx, key); + + PTHREAD_MUTEX_LOCK(&sharding->lock); + do { + if ((entry=htable_find(key, bucket)) == NULL) { + if ((entry=htable_entry_alloc(sharding)) == NULL) { + result = ENOMEM; + break; + } + + new_create = true; + entry->key = *key; + htable_insert(entry, bucket); + fc_list_add_tail(&entry->dlinks.lru, &sharding->lru); + } else { + new_create = false; + fc_list_move_tail(&entry->dlinks.lru, &sharding->lru); + } + + entry->last_update_time_sec = get_current_time(); + result = sharding_ctx->insert_callback( + entry, arg, new_create); + } while (0); + PTHREAD_MUTEX_UNLOCK(&sharding->lock); + + return result; +} diff --git a/src/sf_sharding_htable.h b/src/sf_sharding_htable.h new file mode 100644 index 0000000..c073b99 --- /dev/null +++ b/src/sf_sharding_htable.h @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _SF_SHARDING_HTABLE_H +#define _SF_SHARDING_HTABLE_H + +#include +#include +#include +#include "fastcommon/common_define.h" +#include "fastcommon/fc_list.h" +#include "fastcommon/pthread_func.h" + +struct sf_sharding_hash_entry; +struct sf_htable_sharding; + +typedef int (*sf_sharding_htable_insert_callback) + (struct sf_sharding_hash_entry *entry, void *arg, const bool new_create); + +typedef void *(*sf_sharding_htable_find_callback) + (struct sf_sharding_hash_entry *entry, void *arg); + +typedef bool (*sf_sharding_htable_accept_reclaim_callback) + (struct sf_sharding_hash_entry *entry); + +typedef struct sf_two_ids_hash_key { + union { + uint64_t id1; + uint64_t oid; //object id such as inode + }; + + union { + uint64_t id2; + uint64_t tid; //thread id + uint64_t bid; //file block id + }; +} SFTwoIdsHashKey; + +typedef struct sf_sharding_hash_entry { + SFTwoIdsHashKey key; + struct { + struct fc_list_head htable; //for hashtable + struct fc_list_head lru; //for LRU chain + } dlinks; + int64_t last_update_time_sec; + struct sf_htable_sharding *sharding; //hold for lock +} SFShardingHashEntry; + +typedef struct sf_dlink_hashtable { + struct fc_list_head *buckets; + int64_t capacity; +} SFDlinkHashtable; + +struct sf_htable_sharding_context; +typedef struct sf_htable_sharding { + pthread_mutex_t lock; + struct fast_mblock_man *allocator; + struct fc_list_head lru; + SFDlinkHashtable hashtable; + int64_t element_count; + int64_t element_limit; + int64_t last_reclaim_time_sec; + struct sf_htable_sharding_context *ctx; +} SFHtableSharding; + +typedef struct sf_htable_sharding_array { + SFHtableSharding *entries; + int count; +} SFHtableShardingArray; + +typedef struct sf_htable_sharding_context { + struct { + int64_t min_ttl_sec; + int64_t max_ttl_sec; + double elt_ttl_sec; + int elt_water_mark; //trigger reclaim when elements exceeds water mark + } sharding_reclaim; + + struct { + int count; + struct fast_mblock_man *elts; + } allocators; + + sf_sharding_htable_insert_callback insert_callback; + sf_sharding_htable_find_callback find_callback; + sf_sharding_htable_accept_reclaim_callback accept_reclaim_callback; + SFHtableShardingArray sharding_array; +} SFHtableShardingContext; + +#ifdef __cplusplus +extern "C" { +#endif + + int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx, + sf_sharding_htable_insert_callback insert_callback, + sf_sharding_htable_find_callback find_callback, + sf_sharding_htable_accept_reclaim_callback reclaim_callback, + const int sharding_count, const int64_t htable_capacity, + const int allocator_count, const int element_size, + int64_t element_limit, const int64_t min_ttl_sec, + const int64_t max_ttl_sec); + + int sf_sharding_htable_insert(SFHtableShardingContext + *sharding_ctx, const SFTwoIdsHashKey *key, void *arg); + + void *sf_sharding_htable_find(SFHtableShardingContext + *sharding_ctx, const SFTwoIdsHashKey *key, void *arg); + +#ifdef __cplusplus +} +#endif + +#endif