request_metadata.[hc] first verson finished

replication_quorum
YuQing 2022-06-27 17:17:57 +08:00
parent b364a875c2
commit 92613c765f
2 changed files with 154 additions and 29 deletions

View File

@ -22,24 +22,110 @@
#include "request_metadata.h" #include "request_metadata.h"
static struct { static struct {
int process_interval_ms;
int master_side_timeout; //in seconds
struct {
IdempotencyRequestMetadataContext *head; IdempotencyRequestMetadataContext *head;
IdempotencyRequestMetadataContext *tail; IdempotencyRequestMetadataContext *tail;
} ctx_list = {NULL, NULL}; } list;
} g_request_metadata = {1000, 300, {NULL, NULL}};
#define CHECK_MASTER_METADATA(meta) \
(meta != NULL && g_current_time - meta->enqueue_time > \
g_request_metadata.master_side_timeout && \
FC_ATOMIC_GET(meta->reffer_count) == 0)
static void process_master_side(IdempotencyRequestMetadataContext *ctx)
{
struct fast_mblock_chain chain;
struct fast_mblock_node *node;
chain.head = chain.tail = NULL;
PTHREAD_MUTEX_LOCK(&ctx->lock);
if (CHECK_MASTER_METADATA(ctx->list.head)) {
do {
node = fast_mblock_to_node_ptr(ctx->list.head);
if (chain.head == NULL) {
chain.head = node;
} else {
chain.tail->next = node;
}
chain.tail = node;
ctx->list.head = ctx->list.head->next;
} while (CHECK_MASTER_METADATA(ctx->list.head));
if (ctx->list.head == NULL) {
ctx->list.tail = NULL;
}
chain.tail->next = NULL;
}
if (chain.head != NULL) {
fast_mblock_batch_free(&ctx->allocator, &chain);
}
PTHREAD_MUTEX_UNLOCK(&ctx->lock);
}
#define CHECK_SLAVE_METADATA(meta, dv) \
(meta != NULL && meta->data_version <= dv && \
FC_ATOMIC_GET(meta->reffer_count) == 0)
static void process_slave_side(IdempotencyRequestMetadataContext *ctx,
const int64_t data_version)
{
struct fast_mblock_chain chain;
struct fast_mblock_node *node;
chain.head = chain.tail = NULL;
PTHREAD_MUTEX_LOCK(&ctx->lock);
if (CHECK_SLAVE_METADATA(ctx->list.head, data_version)) {
do {
node = fast_mblock_to_node_ptr(ctx->list.head);
if (chain.head == NULL) {
chain.head = node;
} else {
chain.tail->next = node;
}
chain.tail = node;
ctx->list.head = ctx->list.head->next;
} while (CHECK_SLAVE_METADATA(ctx->list.head, data_version));
if (ctx->list.head == NULL) {
ctx->list.tail = NULL;
}
chain.tail->next = NULL;
}
if (chain.head != NULL) {
fast_mblock_batch_free(&ctx->allocator, &chain);
}
PTHREAD_MUTEX_UNLOCK(&ctx->lock);
}
static void *thread_run(void *arg) static void *thread_run(void *arg)
{ {
IdempotencyRequestMetadataContext *ctx; IdempotencyRequestMetadataContext *ctx;
int64_t data_version; int64_t data_version;
ctx = ctx_list.head; ctx = g_request_metadata.list.head;
while (ctx != NULL) { while (SF_G_CONTINUE_FLAG) {
fc_sleep_ms(g_request_metadata.process_interval_ms);
if (ctx->is_master_callback.func(ctx->is_master_callback. if (ctx->is_master_callback.func(ctx->is_master_callback.
arg, &data_version)) arg, &data_version))
{ {
process_master_side(ctx);
} else { } else {
process_slave_side(ctx, data_version);
} }
ctx = ctx->next; ctx = ctx->next;
if (ctx == NULL) {
ctx = g_request_metadata.list.head;
}
} }
return NULL; return NULL;
@ -53,7 +139,7 @@ int idempotency_request_metadata_init(
if ((result=fast_mblock_init_ex1(&ctx->allocator, "req-metadata-info", if ((result=fast_mblock_init_ex1(&ctx->allocator, "req-metadata-info",
sizeof(IdempotencyRequestMetadata), 8192, 0, sizeof(IdempotencyRequestMetadata), 8192, 0,
NULL, NULL, true)) != 0) NULL, NULL, false)) != 0)
{ {
return result; return result;
} }
@ -66,20 +152,43 @@ int idempotency_request_metadata_init(
ctx->is_master_callback.arg = arg; ctx->is_master_callback.arg = arg;
ctx->list.head = ctx->list.tail = NULL; ctx->list.head = ctx->list.tail = NULL;
if (ctx_list.head == NULL) { if (g_request_metadata.list.head == NULL) {
ctx_list.head = ctx; g_request_metadata.list.head = ctx;
} else { } else {
ctx_list.tail->next = ctx; g_request_metadata.list.tail->next = ctx;
} }
ctx_list.tail = ctx; g_request_metadata.list.tail = ctx;
return 0; return 0;
} }
int idempotency_request_metadata_start() int idempotency_request_metadata_start(const int process_interval_ms,
const int master_side_timeout)
{ {
pthread_t tid; pthread_t tid;
if (g_request_metadata.list.head == NULL) {
logError("file: "__FILE__", line: %d, "
"list is empty!", __LINE__);
return ENOENT;
}
if (process_interval_ms <= 0) {
logError("file: "__FILE__", line: %d, "
"invalid process interval: %d!",
__LINE__, process_interval_ms);
return EINVAL;
}
if (master_side_timeout <= 0) {
logError("file: "__FILE__", line: %d, "
"invalid master side timeout: %d!",
__LINE__, master_side_timeout);
return EINVAL;
}
g_request_metadata.process_interval_ms = process_interval_ms;
g_request_metadata.master_side_timeout = master_side_timeout;
return fc_create_thread(&tid, thread_run, NULL, return fc_create_thread(&tid, thread_run, NULL,
SF_G_THREAD_STACK_SIZE); SF_G_THREAD_STACK_SIZE);
} }
@ -90,23 +199,26 @@ IdempotencyRequestMetadata *idempotency_request_metadata_add(
{ {
IdempotencyRequestMetadata *idemp_meta; IdempotencyRequestMetadata *idemp_meta;
PTHREAD_MUTEX_LOCK(&ctx->lock);
do {
if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) { if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) {
return NULL; break;
} }
idemp_meta->req_id = metadata->req_id; idemp_meta->req_id = metadata->req_id;
idemp_meta->data_version = metadata->data_version; idemp_meta->data_version = metadata->data_version;
idemp_meta->result = -1; idemp_meta->enqueue_time = g_current_time;
idemp_meta->result = SF_IDEMPOTENCY_REQUEST_PENDING_RESULT;
idemp_meta->next = NULL; idemp_meta->next = NULL;
FC_ATOMIC_INC_EX(idemp_meta->reffer_count, 2); FC_ATOMIC_INC(idemp_meta->reffer_count);
PTHREAD_MUTEX_LOCK(&ctx->lock);
if (ctx->list.head == NULL) { if (ctx->list.head == NULL) {
ctx->list.head = idemp_meta; ctx->list.head = idemp_meta;
} else { } else {
ctx->list.tail->next = idemp_meta; ctx->list.tail->next = idemp_meta;
} }
ctx->list.tail = idemp_meta; ctx->list.tail = idemp_meta;
} while (0);
PTHREAD_MUTEX_UNLOCK(&ctx->lock); PTHREAD_MUTEX_UNLOCK(&ctx->lock);
return idemp_meta; return idemp_meta;
@ -125,7 +237,8 @@ int idempotency_request_metadata_get(
while (meta != NULL) { while (meta != NULL) {
if (req_id == meta->req_id) { if (req_id == meta->req_id) {
result = 0; result = 0;
*err_no = meta->result; *err_no = FC_ATOMIC_GET(meta->result);
break;
} }
meta = meta->next; meta = meta->next;
} }

View File

@ -19,11 +19,14 @@
#include "server_types.h" #include "server_types.h"
#define SF_IDEMPOTENCY_REQUEST_PENDING_RESULT -1
typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version); typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version);
typedef struct idempotency_request_metadata { typedef struct idempotency_request_metadata {
int64_t req_id; int64_t req_id;
int64_t data_version; int64_t data_version;
time_t enqueue_time;
volatile int result; volatile int result;
volatile int reffer_count; volatile int reffer_count;
struct idempotency_request_metadata *next; struct idempotency_request_metadata *next;
@ -51,12 +54,21 @@ extern "C" {
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
sf_is_master_callback is_master_callback, void *arg); sf_is_master_callback is_master_callback, void *arg);
int idempotency_request_metadata_start(); int idempotency_request_metadata_start(const int process_interval_ms,
const int master_side_timeout);
IdempotencyRequestMetadata *idempotency_request_metadata_add( IdempotencyRequestMetadata *idempotency_request_metadata_add(
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
SFRequestMetadata *metadata); SFRequestMetadata *metadata);
static inline void idempotency_request_metadata_set_result(
IdempotencyRequestMetadata *meta, const int result)
{
__sync_bool_compare_and_swap(&meta->result,
SF_IDEMPOTENCY_REQUEST_PENDING_RESULT, result);
FC_ATOMIC_DEC(meta->reffer_count);
}
int idempotency_request_metadata_get( int idempotency_request_metadata_get(
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
const int64_t req_id, int *err_no); const int64_t req_id, int *err_no);