sf_sharding_htable.[hc]: use ms instead of second
parent
5e1444ef71
commit
4480a4a39c
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include "fastcommon/shared_func.h"
|
#include "fastcommon/shared_func.h"
|
||||||
|
#include "fastcommon/fc_atomic.h"
|
||||||
#include "sf_sharding_htable.h"
|
#include "sf_sharding_htable.h"
|
||||||
|
|
||||||
static int init_allocators(SFHtableShardingContext *sharding_ctx,
|
static int init_allocators(SFHtableShardingContext *sharding_ctx,
|
||||||
|
|
@ -78,7 +79,7 @@ static int init_sharding(SFHtableSharding *sharding,
|
||||||
|
|
||||||
sharding->hashtable.capacity = per_capacity;
|
sharding->hashtable.capacity = per_capacity;
|
||||||
sharding->element_count = 0;
|
sharding->element_count = 0;
|
||||||
sharding->last_reclaim_time_sec = get_current_time();
|
sharding->last_reclaim_time_ms = 1000LL * get_current_time();
|
||||||
FC_INIT_LIST_HEAD(&sharding->lru);
|
FC_INIT_LIST_HEAD(&sharding->lru);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -114,15 +115,15 @@ static int init_sharding_array(SFHtableShardingContext *sharding_ctx,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx,
|
int sf_sharding_htable_init_ex(SFHtableShardingContext *sharding_ctx,
|
||||||
const SFShardingHtableKeyType key_type,
|
const SFShardingHtableKeyType key_type,
|
||||||
sf_sharding_htable_insert_callback insert_callback,
|
sf_sharding_htable_insert_callback insert_callback,
|
||||||
sf_sharding_htable_find_callback find_callback,
|
sf_sharding_htable_find_callback find_callback,
|
||||||
sf_sharding_htable_accept_reclaim_callback reclaim_callback,
|
sf_sharding_htable_accept_reclaim_callback reclaim_callback,
|
||||||
const int sharding_count, const int64_t htable_capacity,
|
const int sharding_count, const int64_t htable_capacity,
|
||||||
const int allocator_count, const int element_size,
|
const int allocator_count, const int element_size,
|
||||||
int64_t element_limit, const int64_t min_ttl_sec,
|
int64_t element_limit, const int64_t min_ttl_ms,
|
||||||
const int64_t max_ttl_sec)
|
const int64_t max_ttl_ms, const double low_water_mark_ratio)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int64_t per_elt_limit;
|
int64_t per_elt_limit;
|
||||||
|
|
@ -149,18 +150,19 @@ int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx,
|
||||||
sharding_ctx->insert_callback = insert_callback;
|
sharding_ctx->insert_callback = insert_callback;
|
||||||
sharding_ctx->find_callback = find_callback;
|
sharding_ctx->find_callback = find_callback;
|
||||||
sharding_ctx->accept_reclaim_callback = reclaim_callback;
|
sharding_ctx->accept_reclaim_callback = reclaim_callback;
|
||||||
sharding_ctx->sharding_reclaim.elt_water_mark = per_elt_limit * 0.10;
|
sharding_ctx->sharding_reclaim.elt_water_mark =
|
||||||
sharding_ctx->sharding_reclaim.min_ttl_sec = min_ttl_sec;
|
per_elt_limit * low_water_mark_ratio;
|
||||||
sharding_ctx->sharding_reclaim.max_ttl_sec = max_ttl_sec;
|
sharding_ctx->sharding_reclaim.min_ttl_ms = min_ttl_ms;
|
||||||
sharding_ctx->sharding_reclaim.elt_ttl_sec = (double)(sharding_ctx->
|
sharding_ctx->sharding_reclaim.max_ttl_ms = max_ttl_ms;
|
||||||
sharding_reclaim.max_ttl_sec - sharding_ctx->
|
sharding_ctx->sharding_reclaim.elt_ttl_ms = (double)(sharding_ctx->
|
||||||
sharding_reclaim.min_ttl_sec) / per_elt_limit;
|
sharding_reclaim.max_ttl_ms - sharding_ctx->
|
||||||
|
sharding_reclaim.min_ttl_ms) / per_elt_limit;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
logInfo("per_elt_limit: %"PRId64", elt_water_mark: %d, "
|
logInfo("per_elt_limit: %"PRId64", elt_water_mark: %d, "
|
||||||
"elt_ttl_sec: %.2f", per_elt_limit, (int)sharding_ctx->
|
"elt_ttl_ms: %.2f", per_elt_limit, (int)sharding_ctx->
|
||||||
sharding_reclaim.elt_water_mark, sharding_ctx->
|
sharding_reclaim.elt_water_mark, sharding_ctx->
|
||||||
sharding_reclaim.elt_ttl_sec);
|
sharding_reclaim.elt_ttl_ms);
|
||||||
*/
|
*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -222,7 +224,8 @@ static inline void htable_insert(SFHtableShardingContext *sharding_ctx,
|
||||||
|
|
||||||
static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding)
|
static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding)
|
||||||
{
|
{
|
||||||
int64_t reclaim_ttl_sec;
|
int64_t current_time_ms;
|
||||||
|
int64_t reclaim_ttl_ms;
|
||||||
int64_t delta;
|
int64_t delta;
|
||||||
int64_t reclaim_count;
|
int64_t reclaim_count;
|
||||||
int64_t reclaim_limit;
|
int64_t reclaim_limit;
|
||||||
|
|
@ -232,7 +235,14 @@ static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding)
|
||||||
|
|
||||||
if (sharding->element_count <= sharding->element_limit) {
|
if (sharding->element_count <= sharding->element_limit) {
|
||||||
delta = sharding->element_count;
|
delta = sharding->element_count;
|
||||||
reclaim_limit = sharding->ctx->sharding_reclaim.elt_water_mark;
|
if (sharding->ctx->sharding_reclaim.elt_water_mark > 0) {
|
||||||
|
reclaim_count = sharding->element_count - sharding->ctx->
|
||||||
|
sharding_reclaim.elt_water_mark;
|
||||||
|
reclaim_limit = FC_MIN(reclaim_count, sharding->ctx->
|
||||||
|
sharding_reclaim.elt_water_mark);
|
||||||
|
} else {
|
||||||
|
reclaim_limit = sharding->element_count;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
delta = sharding->element_limit;
|
delta = sharding->element_limit;
|
||||||
reclaim_limit = (sharding->element_count - sharding->element_limit) +
|
reclaim_limit = (sharding->element_count - sharding->element_limit) +
|
||||||
|
|
@ -241,12 +251,11 @@ static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding)
|
||||||
|
|
||||||
first = NULL;
|
first = NULL;
|
||||||
reclaim_count = 0;
|
reclaim_count = 0;
|
||||||
reclaim_ttl_sec = (int64_t)(sharding->ctx->sharding_reclaim.max_ttl_sec -
|
current_time_ms = 1000LL * get_current_time();
|
||||||
sharding->ctx->sharding_reclaim.elt_ttl_sec * delta);
|
reclaim_ttl_ms = (int64_t)(sharding->ctx->sharding_reclaim.max_ttl_ms -
|
||||||
|
sharding->ctx->sharding_reclaim.elt_ttl_ms * delta);
|
||||||
fc_list_for_each_entry_safe(entry, tmp, &sharding->lru, dlinks.lru) {
|
fc_list_for_each_entry_safe(entry, tmp, &sharding->lru, dlinks.lru) {
|
||||||
if (get_current_time() - entry->last_update_time_sec <=
|
if (current_time_ms - entry->last_update_time_ms <= reclaim_ttl_ms) {
|
||||||
reclaim_ttl_sec)
|
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -272,10 +281,10 @@ static SFShardingHashEntry *otid_entry_reclaim(SFHtableSharding *sharding)
|
||||||
|
|
||||||
if (reclaim_count > 0) {
|
if (reclaim_count > 0) {
|
||||||
logInfo("sharding index: %d, element_count: %"PRId64", "
|
logInfo("sharding index: %d, element_count: %"PRId64", "
|
||||||
"reclaim_ttl_sec: %"PRId64" ms, reclaim_count: %"PRId64", "
|
"reclaim_ttl_ms: %"PRId64" ms, reclaim_count: %"PRId64", "
|
||||||
"reclaim_limit: %"PRId64, (int)(sharding - sharding->ctx->
|
"reclaim_limit: %"PRId64, (int)(sharding - sharding->ctx->
|
||||||
sharding_array.entries), sharding->element_count,
|
sharding_array.entries), sharding->element_count,
|
||||||
reclaim_ttl_sec, reclaim_count, reclaim_limit);
|
reclaim_ttl_ms, reclaim_count, reclaim_limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
return first;
|
return first;
|
||||||
|
|
@ -285,16 +294,23 @@ static inline SFShardingHashEntry *htable_entry_alloc(
|
||||||
SFHtableSharding *sharding)
|
SFHtableSharding *sharding)
|
||||||
{
|
{
|
||||||
SFShardingHashEntry *entry;
|
SFShardingHashEntry *entry;
|
||||||
|
int64_t current_time_ms;
|
||||||
|
int64_t last_reclaim_time_ms;
|
||||||
|
|
||||||
if (sharding->element_count > sharding->ctx->sharding_reclaim.
|
if (sharding->element_count > sharding->ctx->
|
||||||
elt_water_mark && get_current_time() - sharding->
|
sharding_reclaim.elt_water_mark)
|
||||||
last_reclaim_time_sec > 1000)
|
{
|
||||||
|
current_time_ms = 1000LL * get_current_time();
|
||||||
|
last_reclaim_time_ms = FC_ATOMIC_GET(sharding->last_reclaim_time_ms);
|
||||||
|
if (current_time_ms - last_reclaim_time_ms > 100 &&
|
||||||
|
__sync_bool_compare_and_swap(&sharding->last_reclaim_time_ms,
|
||||||
|
last_reclaim_time_ms, current_time_ms))
|
||||||
{
|
{
|
||||||
sharding->last_reclaim_time_sec = get_current_time();
|
|
||||||
if ((entry=otid_entry_reclaim(sharding)) != NULL) {
|
if ((entry=otid_entry_reclaim(sharding)) != NULL) {
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
entry = (SFShardingHashEntry *)fast_mblock_alloc_object(
|
entry = (SFShardingHashEntry *)fast_mblock_alloc_object(
|
||||||
sharding->allocator);
|
sharding->allocator);
|
||||||
|
|
@ -362,7 +378,7 @@ int sf_sharding_htable_insert(SFHtableShardingContext
|
||||||
fc_list_move_tail(&entry->dlinks.lru, &sharding->lru);
|
fc_list_move_tail(&entry->dlinks.lru, &sharding->lru);
|
||||||
}
|
}
|
||||||
|
|
||||||
entry->last_update_time_sec = get_current_time();
|
entry->last_update_time_ms = 1000LL * get_current_time();
|
||||||
result = sharding_ctx->insert_callback(
|
result = sharding_ctx->insert_callback(
|
||||||
entry, arg, new_create);
|
entry, arg, new_create);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ typedef struct sf_sharding_hash_entry {
|
||||||
struct fc_list_head htable; //for hashtable
|
struct fc_list_head htable; //for hashtable
|
||||||
struct fc_list_head lru; //for LRU chain
|
struct fc_list_head lru; //for LRU chain
|
||||||
} dlinks;
|
} dlinks;
|
||||||
int64_t last_update_time_sec;
|
int64_t last_update_time_ms;
|
||||||
struct sf_htable_sharding *sharding; //hold for lock
|
struct sf_htable_sharding *sharding; //hold for lock
|
||||||
} SFShardingHashEntry;
|
} SFShardingHashEntry;
|
||||||
|
|
||||||
|
|
@ -76,7 +76,7 @@ typedef struct sf_htable_sharding {
|
||||||
SFDlinkHashtable hashtable;
|
SFDlinkHashtable hashtable;
|
||||||
int64_t element_count;
|
int64_t element_count;
|
||||||
int64_t element_limit;
|
int64_t element_limit;
|
||||||
int64_t last_reclaim_time_sec;
|
volatile int64_t last_reclaim_time_ms;
|
||||||
struct sf_htable_sharding_context *ctx;
|
struct sf_htable_sharding_context *ctx;
|
||||||
} SFHtableSharding;
|
} SFHtableSharding;
|
||||||
|
|
||||||
|
|
@ -87,9 +87,9 @@ typedef struct sf_htable_sharding_array {
|
||||||
|
|
||||||
typedef struct sf_htable_sharding_context {
|
typedef struct sf_htable_sharding_context {
|
||||||
struct {
|
struct {
|
||||||
int64_t min_ttl_sec;
|
int64_t min_ttl_ms;
|
||||||
int64_t max_ttl_sec;
|
int64_t max_ttl_ms;
|
||||||
double elt_ttl_sec;
|
double elt_ttl_ms;
|
||||||
int elt_water_mark; //trigger reclaim when elements exceeds water mark
|
int elt_water_mark; //trigger reclaim when elements exceeds water mark
|
||||||
} sharding_reclaim;
|
} sharding_reclaim;
|
||||||
|
|
||||||
|
|
@ -109,15 +109,33 @@ typedef struct sf_htable_sharding_context {
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int sf_sharding_htable_init(SFHtableShardingContext *sharding_ctx,
|
int sf_sharding_htable_init_ex(SFHtableShardingContext *sharding_ctx,
|
||||||
const SFShardingHtableKeyType key_type,
|
const SFShardingHtableKeyType key_type,
|
||||||
sf_sharding_htable_insert_callback insert_callback,
|
sf_sharding_htable_insert_callback insert_callback,
|
||||||
sf_sharding_htable_find_callback find_callback,
|
sf_sharding_htable_find_callback find_callback,
|
||||||
sf_sharding_htable_accept_reclaim_callback reclaim_callback,
|
sf_sharding_htable_accept_reclaim_callback reclaim_callback,
|
||||||
const int sharding_count, const int64_t htable_capacity,
|
const int sharding_count, const int64_t htable_capacity,
|
||||||
const int allocator_count, const int element_size,
|
const int allocator_count, const int element_size,
|
||||||
int64_t element_limit, const int64_t min_ttl_sec,
|
int64_t element_limit, const int64_t min_ttl_ms,
|
||||||
const int64_t max_ttl_sec);
|
const int64_t max_ttl_ms, const double low_water_mark_ratio);
|
||||||
|
|
||||||
|
static inline int sf_sharding_htable_init(SFHtableShardingContext
|
||||||
|
*sharding_ctx, const SFShardingHtableKeyType key_type,
|
||||||
|
sf_sharding_htable_insert_callback insert_callback,
|
||||||
|
sf_sharding_htable_find_callback find_callback,
|
||||||
|
sf_sharding_htable_accept_reclaim_callback reclaim_callback,
|
||||||
|
const int sharding_count, const int64_t htable_capacity,
|
||||||
|
const int allocator_count, const int element_size,
|
||||||
|
int64_t element_limit, const int64_t min_ttl_ms,
|
||||||
|
const int64_t max_ttl_ms)
|
||||||
|
{
|
||||||
|
const double low_water_mark_ratio = 0.10;
|
||||||
|
return sf_sharding_htable_init_ex(sharding_ctx, key_type,
|
||||||
|
insert_callback, find_callback, reclaim_callback,
|
||||||
|
sharding_count, htable_capacity, allocator_count,
|
||||||
|
element_size, element_limit, min_ttl_ms, max_ttl_ms,
|
||||||
|
low_water_mark_ratio);
|
||||||
|
}
|
||||||
|
|
||||||
int sf_sharding_htable_insert(SFHtableShardingContext
|
int sf_sharding_htable_insert(SFHtableShardingContext
|
||||||
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg);
|
*sharding_ctx, const SFTwoIdsHashKey *key, void *arg);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue