support function sf_sharding_htable_delete

posix_api
YuQing 2022-02-27 15:57:32 +08:00
parent 3e4ddce4a2
commit 6549172c67
2 changed files with 65 additions and 10 deletions

View File

@ -119,6 +119,7 @@ int sf_sharding_htable_init_ex(SFHtableShardingContext *sharding_ctx,
const SFShardingHtableKeyType key_type, const SFShardingHtableKeyType key_type,
sf_sharding_htable_insert_callback insert_callback, sf_sharding_htable_insert_callback insert_callback,
sf_sharding_htable_find_callback find_callback, sf_sharding_htable_find_callback find_callback,
sf_sharding_htable_delete_callback delete_callback,
sf_sharding_htable_accept_reclaim_callback reclaim_callback, sf_sharding_htable_accept_reclaim_callback reclaim_callback,
const int sharding_count, const int64_t htable_capacity, const int sharding_count, const int64_t htable_capacity,
const int allocator_count, const int element_size, const int allocator_count, const int element_size,
@ -132,6 +133,7 @@ int sf_sharding_htable_init_ex(SFHtableShardingContext *sharding_ctx,
if (element_limit <= 0) { if (element_limit <= 0) {
element_limit = 1000 * 1000; element_limit = 1000 * 1000;
} }
if ((result=init_allocators(sharding_ctx, allocator_count, if ((result=init_allocators(sharding_ctx, allocator_count,
element_size, element_limit)) != 0) element_size, element_limit)) != 0)
{ {
@ -149,7 +151,10 @@ int sf_sharding_htable_init_ex(SFHtableShardingContext *sharding_ctx,
sharding_ctx->key_type = key_type; sharding_ctx->key_type = key_type;
sharding_ctx->insert_callback = insert_callback; sharding_ctx->insert_callback = insert_callback;
sharding_ctx->find_callback = find_callback; sharding_ctx->find_callback = find_callback;
sharding_ctx->delete_callback = delete_callback;
sharding_ctx->accept_reclaim_callback = reclaim_callback; sharding_ctx->accept_reclaim_callback = reclaim_callback;
sharding_ctx->sharding_reclaim.enabled = (delete_callback == NULL);
sharding_ctx->sharding_reclaim.elt_water_mark = sharding_ctx->sharding_reclaim.elt_water_mark =
per_elt_limit * low_water_mark_ratio; per_elt_limit * low_water_mark_ratio;
sharding_ctx->sharding_reclaim.min_ttl_ms = min_ttl_ms; sharding_ctx->sharding_reclaim.min_ttl_ms = min_ttl_ms;
@ -291,14 +296,16 @@ static SFShardingHashEntry *hash_entry_reclaim(SFHtableSharding *sharding)
} }
static inline SFShardingHashEntry *htable_entry_alloc( static inline SFShardingHashEntry *htable_entry_alloc(
SFHtableShardingContext *sharding_ctx,
SFHtableSharding *sharding) SFHtableSharding *sharding)
{ {
SFShardingHashEntry *entry; SFShardingHashEntry *entry;
int64_t current_time_ms; int64_t current_time_ms;
int64_t last_reclaim_time_ms; int64_t last_reclaim_time_ms;
if (sharding->element_count > sharding->ctx-> if (sharding_ctx->sharding_reclaim.enabled &&
sharding_reclaim.elt_water_mark) (sharding->element_count > sharding->ctx->
sharding_reclaim.elt_water_mark))
{ {
current_time_ms = 1000LL * (int64_t)get_current_time(); current_time_ms = 1000LL * (int64_t)get_current_time();
last_reclaim_time_ms = FC_ATOMIC_GET(sharding->last_reclaim_time_ms); last_reclaim_time_ms = FC_ATOMIC_GET(sharding->last_reclaim_time_ms);
@ -325,7 +332,6 @@ static inline SFShardingHashEntry *htable_entry_alloc(
#define SET_SHARDING_AND_BUCKET(sharding_ctx, key) \ #define SET_SHARDING_AND_BUCKET(sharding_ctx, key) \
SFHtableSharding *sharding; \ SFHtableSharding *sharding; \
struct fc_list_head *bucket; \ struct fc_list_head *bucket; \
SFShardingHashEntry *entry; \
uint64_t hash_code; \ uint64_t hash_code; \
\ \
hash_code = sf_sharding_htable_key_ids_one == sharding_ctx-> \ hash_code = sf_sharding_htable_key_ids_one == sharding_ctx-> \
@ -340,6 +346,7 @@ void *sf_sharding_htable_find(SFHtableShardingContext
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg) *sharding_ctx, const SFTwoIdsHashKey *key, void *arg)
{ {
void *data; void *data;
SFShardingHashEntry *entry;
SET_SHARDING_AND_BUCKET(sharding_ctx, key); SET_SHARDING_AND_BUCKET(sharding_ctx, key);
PTHREAD_MUTEX_LOCK(&sharding->lock); PTHREAD_MUTEX_LOCK(&sharding->lock);
@ -354,9 +361,43 @@ void *sf_sharding_htable_find(SFHtableShardingContext
return data; return data;
} }
int sf_sharding_htable_delete(SFHtableShardingContext
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg)
{
int result;
SFShardingHashEntry *entry;
if (sharding_ctx->delete_callback != NULL) {
SET_SHARDING_AND_BUCKET(sharding_ctx, key);
PTHREAD_MUTEX_LOCK(&sharding->lock);
entry = htable_find(sharding_ctx, key, bucket);
if (entry != NULL) {
if (sharding_ctx->delete_callback(entry, arg)) {
fc_list_del_init(&entry->dlinks.htable);
if (sharding_ctx->sharding_reclaim.enabled) {
fc_list_del_init(&entry->dlinks.lru);
}
fast_mblock_free_object(sharding->allocator, entry);
sharding->element_count--;
}
result = 0;
} else {
result = ENOENT;
}
PTHREAD_MUTEX_UNLOCK(&sharding->lock);
} else {
logError("file: "__FILE__", line: %d, "
"delete callback is NULL!", __LINE__);
result = EINVAL;
}
return result;
}
int sf_sharding_htable_insert(SFHtableShardingContext int sf_sharding_htable_insert(SFHtableShardingContext
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg) *sharding_ctx, const SFTwoIdsHashKey *key, void *arg)
{ {
SFShardingHashEntry *entry;
bool new_create; bool new_create;
int result; int result;
SET_SHARDING_AND_BUCKET(sharding_ctx, key); SET_SHARDING_AND_BUCKET(sharding_ctx, key);
@ -364,7 +405,7 @@ int sf_sharding_htable_insert(SFHtableShardingContext
PTHREAD_MUTEX_LOCK(&sharding->lock); PTHREAD_MUTEX_LOCK(&sharding->lock);
do { do {
if ((entry=htable_find(sharding_ctx, key, bucket)) == NULL) { if ((entry=htable_find(sharding_ctx, key, bucket)) == NULL) {
if ((entry=htable_entry_alloc(sharding)) == NULL) { if ((entry=htable_entry_alloc(sharding_ctx, sharding)) == NULL) {
result = ENOMEM; result = ENOMEM;
break; break;
} }
@ -372,10 +413,14 @@ int sf_sharding_htable_insert(SFHtableShardingContext
new_create = true; new_create = true;
entry->key = *key; entry->key = *key;
htable_insert(sharding_ctx, entry, bucket); htable_insert(sharding_ctx, entry, bucket);
fc_list_add_tail(&entry->dlinks.lru, &sharding->lru); if (sharding_ctx->sharding_reclaim.enabled) {
fc_list_add_tail(&entry->dlinks.lru, &sharding->lru);
}
} else { } else {
new_create = false; new_create = false;
fc_list_move_tail(&entry->dlinks.lru, &sharding->lru); if (sharding_ctx->sharding_reclaim.enabled) {
fc_list_move_tail(&entry->dlinks.lru, &sharding->lru);
}
} }
entry->last_update_time_ms = 1000LL * (int64_t)get_current_time(); entry->last_update_time_ms = 1000LL * (int64_t)get_current_time();

View File

@ -36,6 +36,9 @@ typedef int (*sf_sharding_htable_insert_callback)
typedef void *(*sf_sharding_htable_find_callback) typedef void *(*sf_sharding_htable_find_callback)
(struct sf_sharding_hash_entry *entry, void *arg); (struct sf_sharding_hash_entry *entry, void *arg);
typedef bool (*sf_sharding_htable_delete_callback)
(struct sf_sharding_hash_entry *entry, void *arg);
typedef bool (*sf_sharding_htable_accept_reclaim_callback) typedef bool (*sf_sharding_htable_accept_reclaim_callback)
(struct sf_sharding_hash_entry *entry); (struct sf_sharding_hash_entry *entry);
@ -90,6 +93,7 @@ typedef struct sf_htable_sharding_context {
int64_t max_ttl_ms; int64_t max_ttl_ms;
double elt_ttl_ms; double elt_ttl_ms;
int elt_water_mark; //trigger reclaim when elements exceeds water mark int elt_water_mark; //trigger reclaim when elements exceeds water mark
bool enabled;
} sharding_reclaim; } sharding_reclaim;
struct { struct {
@ -100,6 +104,7 @@ typedef struct sf_htable_sharding_context {
SFShardingHtableKeyType key_type; //id count in the hash entry SFShardingHtableKeyType key_type; //id count in the hash entry
sf_sharding_htable_insert_callback insert_callback; sf_sharding_htable_insert_callback insert_callback;
sf_sharding_htable_find_callback find_callback; sf_sharding_htable_find_callback find_callback;
sf_sharding_htable_delete_callback delete_callback;
sf_sharding_htable_accept_reclaim_callback accept_reclaim_callback; sf_sharding_htable_accept_reclaim_callback accept_reclaim_callback;
SFHtableShardingArray sharding_array; SFHtableShardingArray sharding_array;
} SFHtableShardingContext; } SFHtableShardingContext;
@ -112,6 +117,7 @@ extern "C" {
const SFShardingHtableKeyType key_type, const SFShardingHtableKeyType key_type,
sf_sharding_htable_insert_callback insert_callback, sf_sharding_htable_insert_callback insert_callback,
sf_sharding_htable_find_callback find_callback, sf_sharding_htable_find_callback find_callback,
sf_sharding_htable_delete_callback delete_callback,
sf_sharding_htable_accept_reclaim_callback reclaim_callback, sf_sharding_htable_accept_reclaim_callback reclaim_callback,
const int sharding_count, const int64_t htable_capacity, const int sharding_count, const int64_t htable_capacity,
const int allocator_count, const int element_size, const int allocator_count, const int element_size,
@ -122,6 +128,7 @@ extern "C" {
*sharding_ctx, const SFShardingHtableKeyType key_type, *sharding_ctx, const SFShardingHtableKeyType key_type,
sf_sharding_htable_insert_callback insert_callback, sf_sharding_htable_insert_callback insert_callback,
sf_sharding_htable_find_callback find_callback, sf_sharding_htable_find_callback find_callback,
sf_sharding_htable_delete_callback delete_callback,
sf_sharding_htable_accept_reclaim_callback reclaim_callback, sf_sharding_htable_accept_reclaim_callback reclaim_callback,
const int sharding_count, const int64_t htable_capacity, const int sharding_count, const int64_t htable_capacity,
const int allocator_count, const int element_size, const int allocator_count, const int element_size,
@ -130,10 +137,10 @@ extern "C" {
{ {
const double low_water_mark_ratio = 0.10; const double low_water_mark_ratio = 0.10;
return sf_sharding_htable_init_ex(sharding_ctx, key_type, return sf_sharding_htable_init_ex(sharding_ctx, key_type,
insert_callback, find_callback, reclaim_callback, insert_callback, find_callback, delete_callback,
sharding_count, htable_capacity, allocator_count, reclaim_callback, sharding_count, htable_capacity,
element_size, element_limit, min_ttl_ms, max_ttl_ms, allocator_count, element_size, element_limit,
low_water_mark_ratio); min_ttl_ms, max_ttl_ms, low_water_mark_ratio);
} }
int sf_sharding_htable_insert(SFHtableShardingContext int sf_sharding_htable_insert(SFHtableShardingContext
@ -142,6 +149,9 @@ extern "C" {
void *sf_sharding_htable_find(SFHtableShardingContext void *sf_sharding_htable_find(SFHtableShardingContext
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg); *sharding_ctx, const SFTwoIdsHashKey *key, void *arg);
int sf_sharding_htable_delete(SFHtableShardingContext
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif