diff --git a/src/Makefile.in b/src/Makefile.in
index 1da9162..af7edae 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -6,7 +6,8 @@ LIB_PATH = $(LIBS) -lfastcommon
TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION)
TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
- sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h
+ sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \
+ sf_sharding_htable.h
IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
@@ -23,7 +24,7 @@ ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER)
SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \
sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \
- sf_binlog_writer.lo \
+ sf_binlog_writer.lo sf_sharding_htable.lo \
idempotency/server/server_channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
diff --git a/src/sf_sharding_htable.c b/src/sf_sharding_htable.c
new file mode 100644
index 0000000..3f11975
--- /dev/null
+++ b/src/sf_sharding_htable.c
@@ -0,0 +1,363 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include "fastcommon/shared_func.h"
+#include "sf_sharding_htable.h"
+
+static int init_allocators(SFHtableShardingContext *sharding_ctx,
+ const int allocator_count, const int element_size,
+ const int64_t element_limit)
+{
+ int result;
+ int bytes;
+ int alloc_elts_once;
+ int64_t max_elts_per_allocator;
+ struct fast_mblock_man *pa;
+ struct fast_mblock_man *end;
+
+ bytes = sizeof(struct fast_mblock_man) * allocator_count;
+ sharding_ctx->allocators.elts = (struct fast_mblock_man *)fc_malloc(bytes);
+ if (sharding_ctx->allocators.elts == NULL) {
+ return ENOMEM;
+ }
+
+ max_elts_per_allocator = element_limit +
+ (allocator_count - 1) / allocator_count;
+ if (max_elts_per_allocator < 8 * 1024) {
+ alloc_elts_once = max_elts_per_allocator;
+ } else {
+ alloc_elts_once = 8 * 1024;
+ }
+
+ end = sharding_ctx->allocators.elts + allocator_count;
+ for (pa=sharding_ctx->allocators.elts; paallocators.count = allocator_count;
+ return 0;
+}
+
+static int init_sharding(SFHtableSharding *sharding,
+ const int64_t per_capacity)
+{
+ int result;
+ int bytes;
+ struct fc_list_head *ph;
+ struct fc_list_head *end;
+
+ if ((result=init_pthread_lock(&sharding->lock)) != 0) {
+ return result;
+ }
+
+ bytes = sizeof(struct fc_list_head) * per_capacity;
+ sharding->hashtable.buckets = (struct fc_list_head *)fc_malloc(bytes);
+ if (sharding->hashtable.buckets == NULL) {
+ return ENOMEM;
+ }
+ end = sharding->hashtable.buckets + per_capacity;
+ for (ph=sharding->hashtable.buckets; phhashtable.capacity = per_capacity;
+ sharding->element_count = 0;
+ sharding->last_reclaim_time_sec = get_current_time();
+ FC_INIT_LIST_HEAD(&sharding->lru);
+ return 0;
+}
+
+static int init_sharding_array(SFHtableShardingContext *sharding_ctx,
+ const int sharding_count, const int64_t per_elt_limit,
+ const int64_t per_capacity)
+{
+ int result;
+ int bytes;
+ SFHtableSharding *ps;
+ SFHtableSharding *end;
+
+ bytes = sizeof(SFHtableSharding) * sharding_count;
+ sharding_ctx->sharding_array.entries = (SFHtableSharding *)fc_malloc(bytes);
+ if (sharding_ctx->sharding_array.entries == NULL) {
+ return ENOMEM;
+ }
+
+ end = sharding_ctx->sharding_array.entries + sharding_count;
+ for (ps=sharding_ctx->sharding_array.entries; psallocator = sharding_ctx->allocators.elts +
+ (ps - sharding_ctx->sharding_array.entries) %
+ sharding_ctx->allocators.count;
+ ps->element_limit = per_elt_limit;
+ ps->ctx = sharding_ctx;
+ if ((result=init_sharding(ps, per_capacity)) != 0) {
+ return result;
+ }
+ }
+
+ sharding_ctx->sharding_array.count = sharding_count;
+ return 0;
+}
+
+int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx,
+ sf_sharding_htable_insert_callback insert_callback,
+ sf_sharding_htable_find_callback find_callback,
+ sf_sharding_htable_accept_reclaim_callback reclaim_callback,
+ const int sharding_count, const int64_t htable_capacity,
+ const int allocator_count, const int element_size,
+ int64_t element_limit, const int64_t min_ttl_sec,
+ const int64_t max_ttl_sec)
+{
+ int result;
+ int64_t per_elt_limit;
+ int64_t per_capacity;
+
+ if (element_limit <= 0) {
+ element_limit = 1000 * 1000;
+ }
+ if ((result=init_allocators(sharding_ctx, allocator_count,
+ element_size, element_limit)) != 0)
+ {
+ return result;
+ }
+
+ per_elt_limit = (element_limit + sharding_count - 1) / sharding_count;
+ per_capacity = fc_ceil_prime(htable_capacity / sharding_count);
+ if ((result=init_sharding_array(sharding_ctx, sharding_count,
+ per_elt_limit, per_capacity)) != 0)
+ {
+ return result;
+ }
+
+ sharding_ctx->insert_callback = insert_callback;
+ sharding_ctx->find_callback = find_callback;
+ sharding_ctx->accept_reclaim_callback = reclaim_callback;
+ sharding_ctx->sharding_reclaim.elt_water_mark = per_elt_limit * 0.10;
+ sharding_ctx->sharding_reclaim.min_ttl_sec = min_ttl_sec;
+ sharding_ctx->sharding_reclaim.max_ttl_sec = max_ttl_sec;
+ sharding_ctx->sharding_reclaim.elt_ttl_sec = (double)(sharding_ctx->
+ sharding_reclaim.max_ttl_sec - sharding_ctx->
+ sharding_reclaim.min_ttl_sec) / per_elt_limit;
+
+ /*
+ logInfo("per_elt_limit: %"PRId64", elt_water_mark: %d, "
+ "elt_ttl_sec: %.2f", per_elt_limit, (int)sharding_ctx->
+ sharding_reclaim.elt_water_mark, sharding_ctx->
+ sharding_reclaim.elt_ttl_sec);
+ */
+ return 0;
+}
+
+static inline int compare_key(const SFTwoIdsHashKey *key1,
+ const SFTwoIdsHashKey *key2)
+{
+ int sub;
+ if ((sub=fc_compare_int64(key1->id1, key2->id1)) != 0) {
+ return sub;
+ }
+
+ return fc_compare_int64(key1->id2, key2->id2);
+}
+
+static inline SFShardingHashEntry *htable_find(const SFTwoIdsHashKey *key,
+ struct fc_list_head *bucket)
+{
+ int r;
+ SFShardingHashEntry *current;
+
+ fc_list_for_each_entry(current, bucket, dlinks.htable) {
+ r = compare_key(key, ¤t->key);
+ if (r < 0) {
+ return NULL;
+ } else if (r == 0) {
+ return current;
+ }
+ }
+
+ return NULL;
+}
+
+static inline void htable_insert(SFShardingHashEntry *entry,
+ struct fc_list_head *bucket)
+{
+ struct fc_list_head *previous;
+ struct fc_list_head *current;
+ SFShardingHashEntry *pe;
+
+ previous = bucket;
+ fc_list_for_each(current, bucket) {
+ pe = fc_list_entry(current, SFShardingHashEntry, dlinks.htable);
+ if (compare_key(&entry->key, &pe->key) < 0) {
+ break;
+ }
+
+ previous = current;
+ }
+
+ fc_list_add_internal(&entry->dlinks.htable, previous, previous->next);
+}
+
+static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding)
+{
+ int64_t reclaim_ttl_sec;
+ int64_t delta;
+ int64_t reclaim_count;
+ int64_t reclaim_limit;
+ SFShardingHashEntry *first;
+ SFShardingHashEntry *entry;
+ SFShardingHashEntry *tmp;
+
+ if (sharding->element_count <= sharding->element_limit) {
+ delta = sharding->element_count;
+ reclaim_limit = sharding->ctx->sharding_reclaim.elt_water_mark;
+ } else {
+ delta = sharding->element_limit;
+ reclaim_limit = (sharding->element_count - sharding->element_limit) +
+ sharding->ctx->sharding_reclaim.elt_water_mark;
+ }
+
+ first = NULL;
+ reclaim_count = 0;
+ reclaim_ttl_sec = (int64_t)(sharding->ctx->sharding_reclaim.max_ttl_sec -
+ sharding->ctx->sharding_reclaim.elt_ttl_sec * delta);
+ fc_list_for_each_entry_safe(entry, tmp, &sharding->lru, dlinks.lru) {
+ if (get_current_time() - entry->last_update_time_sec <=
+ reclaim_ttl_sec)
+ {
+ break;
+ }
+
+ if (sharding->ctx->accept_reclaim_callback != NULL &&
+ !sharding->ctx->accept_reclaim_callback(entry))
+ {
+ continue;
+ }
+
+ fc_list_del_init(&entry->dlinks.htable);
+ fc_list_del_init(&entry->dlinks.lru);
+ if (first == NULL) {
+ first = entry; //keep the first
+ } else {
+ fast_mblock_free_object(sharding->allocator, entry);
+ sharding->element_count--;
+ }
+
+ if (++reclaim_count > reclaim_limit) {
+ break;
+ }
+ }
+
+ if (reclaim_count > 0) {
+ logInfo("sharding index: %d, element_count: %"PRId64", "
+ "reclaim_ttl_sec: %"PRId64" ms, reclaim_count: %"PRId64", "
+ "reclaim_limit: %"PRId64, (int)(sharding - sharding->ctx->
+ sharding_array.entries), sharding->element_count,
+ reclaim_ttl_sec, reclaim_count, reclaim_limit);
+ }
+
+ return first;
+}
+
+static inline SFShardingHashEntry *htable_entry_alloc(
+ SFHtableSharding *sharding)
+{
+ SFShardingHashEntry *entry;
+
+ if (sharding->element_count > sharding->ctx->sharding_reclaim.
+ elt_water_mark && get_current_time() - sharding->
+ last_reclaim_time_sec > 1000)
+ {
+ sharding->last_reclaim_time_sec = get_current_time();
+ if ((entry=otid_entry_reclaim(sharding)) != NULL) {
+ return entry;
+ }
+ }
+
+ entry = (SFShardingHashEntry *)fast_mblock_alloc_object(
+ sharding->allocator);
+ if (entry != NULL) {
+ sharding->element_count++;
+ entry->sharding = sharding;
+ }
+
+ return entry;
+}
+
+#define SET_SHARDING_AND_BUCKET(sharding_ctx, key) \
+ SFHtableSharding *sharding; \
+ struct fc_list_head *bucket; \
+ SFShardingHashEntry *entry; \
+ uint64_t hash_code; \
+ \
+ hash_code = key->id1 + key->id2; \
+ sharding = sharding_ctx->sharding_array.entries + \
+ hash_code % sharding_ctx->sharding_array.count; \
+ bucket = sharding->hashtable.buckets + \
+ key->id1 % sharding->hashtable.capacity
+
+
+void *sf_sharding_htable_find(SFHtableShardingContext
+ *sharding_ctx, const SFTwoIdsHashKey *key, void *arg)
+{
+ void *data;
+ SET_SHARDING_AND_BUCKET(sharding_ctx, key);
+
+ PTHREAD_MUTEX_LOCK(&sharding->lock);
+ entry = htable_find(key, bucket);
+ if (entry != NULL && sharding_ctx->find_callback != NULL) {
+ data = sharding_ctx->find_callback(entry, arg);
+ } else {
+ data = entry;
+ }
+ PTHREAD_MUTEX_UNLOCK(&sharding->lock);
+
+ return data;
+}
+
+int sf_sharding_htable_insert(SFHtableShardingContext
+ *sharding_ctx, const SFTwoIdsHashKey *key, void *arg)
+{
+ bool new_create;
+ int result;
+ SET_SHARDING_AND_BUCKET(sharding_ctx, key);
+
+ PTHREAD_MUTEX_LOCK(&sharding->lock);
+ do {
+ if ((entry=htable_find(key, bucket)) == NULL) {
+ if ((entry=htable_entry_alloc(sharding)) == NULL) {
+ result = ENOMEM;
+ break;
+ }
+
+ new_create = true;
+ entry->key = *key;
+ htable_insert(entry, bucket);
+ fc_list_add_tail(&entry->dlinks.lru, &sharding->lru);
+ } else {
+ new_create = false;
+ fc_list_move_tail(&entry->dlinks.lru, &sharding->lru);
+ }
+
+ entry->last_update_time_sec = get_current_time();
+ result = sharding_ctx->insert_callback(
+ entry, arg, new_create);
+ } while (0);
+ PTHREAD_MUTEX_UNLOCK(&sharding->lock);
+
+ return result;
+}
diff --git a/src/sf_sharding_htable.h b/src/sf_sharding_htable.h
new file mode 100644
index 0000000..c073b99
--- /dev/null
+++ b/src/sf_sharding_htable.h
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _SF_SHARDING_HTABLE_H
+#define _SF_SHARDING_HTABLE_H
+
+#include
+#include
+#include
+#include "fastcommon/common_define.h"
+#include "fastcommon/fc_list.h"
+#include "fastcommon/pthread_func.h"
+
+struct sf_sharding_hash_entry;
+struct sf_htable_sharding;
+
+typedef int (*sf_sharding_htable_insert_callback)
+ (struct sf_sharding_hash_entry *entry, void *arg, const bool new_create);
+
+typedef void *(*sf_sharding_htable_find_callback)
+ (struct sf_sharding_hash_entry *entry, void *arg);
+
+typedef bool (*sf_sharding_htable_accept_reclaim_callback)
+ (struct sf_sharding_hash_entry *entry);
+
+typedef struct sf_two_ids_hash_key {
+ union {
+ uint64_t id1;
+ uint64_t oid; //object id such as inode
+ };
+
+ union {
+ uint64_t id2;
+ uint64_t tid; //thread id
+ uint64_t bid; //file block id
+ };
+} SFTwoIdsHashKey;
+
+typedef struct sf_sharding_hash_entry {
+ SFTwoIdsHashKey key;
+ struct {
+ struct fc_list_head htable; //for hashtable
+ struct fc_list_head lru; //for LRU chain
+ } dlinks;
+ int64_t last_update_time_sec;
+ struct sf_htable_sharding *sharding; //hold for lock
+} SFShardingHashEntry;
+
+typedef struct sf_dlink_hashtable {
+ struct fc_list_head *buckets;
+ int64_t capacity;
+} SFDlinkHashtable;
+
+struct sf_htable_sharding_context;
+typedef struct sf_htable_sharding {
+ pthread_mutex_t lock;
+ struct fast_mblock_man *allocator;
+ struct fc_list_head lru;
+ SFDlinkHashtable hashtable;
+ int64_t element_count;
+ int64_t element_limit;
+ int64_t last_reclaim_time_sec;
+ struct sf_htable_sharding_context *ctx;
+} SFHtableSharding;
+
+typedef struct sf_htable_sharding_array {
+ SFHtableSharding *entries;
+ int count;
+} SFHtableShardingArray;
+
+typedef struct sf_htable_sharding_context {
+ struct {
+ int64_t min_ttl_sec;
+ int64_t max_ttl_sec;
+ double elt_ttl_sec;
+ int elt_water_mark; //trigger reclaim when elements exceeds water mark
+ } sharding_reclaim;
+
+ struct {
+ int count;
+ struct fast_mblock_man *elts;
+ } allocators;
+
+ sf_sharding_htable_insert_callback insert_callback;
+ sf_sharding_htable_find_callback find_callback;
+ sf_sharding_htable_accept_reclaim_callback accept_reclaim_callback;
+ SFHtableShardingArray sharding_array;
+} SFHtableShardingContext;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx,
+ sf_sharding_htable_insert_callback insert_callback,
+ sf_sharding_htable_find_callback find_callback,
+ sf_sharding_htable_accept_reclaim_callback reclaim_callback,
+ const int sharding_count, const int64_t htable_capacity,
+ const int allocator_count, const int element_size,
+ int64_t element_limit, const int64_t min_ttl_sec,
+ const int64_t max_ttl_sec);
+
+ int sf_sharding_htable_insert(SFHtableShardingContext
+ *sharding_ctx, const SFTwoIdsHashKey *key, void *arg);
+
+ void *sf_sharding_htable_find(SFHtableShardingContext
+ *sharding_ctx, const SFTwoIdsHashKey *key, void *arg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif