From 92613c765f5fcb0e81a1834da8f63e465b418a31 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 27 Jun 2022 17:17:57 +0800 Subject: [PATCH] request_metadata.[hc] first verson finished --- src/idempotency/server/request_metadata.c | 169 ++++++++++++++++++---- src/idempotency/server/request_metadata.h | 14 +- 2 files changed, 154 insertions(+), 29 deletions(-) diff --git a/src/idempotency/server/request_metadata.c b/src/idempotency/server/request_metadata.c index 779298c..980c838 100644 --- a/src/idempotency/server/request_metadata.c +++ b/src/idempotency/server/request_metadata.c @@ -22,24 +22,110 @@ #include "request_metadata.h" static struct { - IdempotencyRequestMetadataContext *head; - IdempotencyRequestMetadataContext *tail; -} ctx_list = {NULL, NULL}; + int process_interval_ms; + int master_side_timeout; //in seconds + struct { + IdempotencyRequestMetadataContext *head; + IdempotencyRequestMetadataContext *tail; + } list; +} g_request_metadata = {1000, 300, {NULL, NULL}}; + + +#define CHECK_MASTER_METADATA(meta) \ + (meta != NULL && g_current_time - meta->enqueue_time > \ + g_request_metadata.master_side_timeout && \ + FC_ATOMIC_GET(meta->reffer_count) == 0) + +static void process_master_side(IdempotencyRequestMetadataContext *ctx) +{ + struct fast_mblock_chain chain; + struct fast_mblock_node *node; + + chain.head = chain.tail = NULL; + PTHREAD_MUTEX_LOCK(&ctx->lock); + if (CHECK_MASTER_METADATA(ctx->list.head)) { + do { + node = fast_mblock_to_node_ptr(ctx->list.head); + if (chain.head == NULL) { + chain.head = node; + } else { + chain.tail->next = node; + } + chain.tail = node; + + ctx->list.head = ctx->list.head->next; + } while (CHECK_MASTER_METADATA(ctx->list.head)); + + if (ctx->list.head == NULL) { + ctx->list.tail = NULL; + } + chain.tail->next = NULL; + } + + if (chain.head != NULL) { + fast_mblock_batch_free(&ctx->allocator, &chain); + } + PTHREAD_MUTEX_UNLOCK(&ctx->lock); +} + +#define CHECK_SLAVE_METADATA(meta, dv) \ + (meta != NULL && meta->data_version <= dv && \ + FC_ATOMIC_GET(meta->reffer_count) == 0) + +static void process_slave_side(IdempotencyRequestMetadataContext *ctx, + const int64_t data_version) +{ + struct fast_mblock_chain chain; + struct fast_mblock_node *node; + + chain.head = chain.tail = NULL; + PTHREAD_MUTEX_LOCK(&ctx->lock); + if (CHECK_SLAVE_METADATA(ctx->list.head, data_version)) { + do { + node = fast_mblock_to_node_ptr(ctx->list.head); + if (chain.head == NULL) { + chain.head = node; + } else { + chain.tail->next = node; + } + chain.tail = node; + + ctx->list.head = ctx->list.head->next; + } while (CHECK_SLAVE_METADATA(ctx->list.head, data_version)); + + if (ctx->list.head == NULL) { + ctx->list.tail = NULL; + } + chain.tail->next = NULL; + } + + if (chain.head != NULL) { + fast_mblock_batch_free(&ctx->allocator, &chain); + } + PTHREAD_MUTEX_UNLOCK(&ctx->lock); +} static void *thread_run(void *arg) { IdempotencyRequestMetadataContext *ctx; int64_t data_version; - ctx = ctx_list.head; - while (ctx != NULL) { + ctx = g_request_metadata.list.head; + while (SF_G_CONTINUE_FLAG) { + fc_sleep_ms(g_request_metadata.process_interval_ms); + if (ctx->is_master_callback.func(ctx->is_master_callback. arg, &data_version)) { + process_master_side(ctx); } else { + process_slave_side(ctx, data_version); } ctx = ctx->next; + if (ctx == NULL) { + ctx = g_request_metadata.list.head; + } } return NULL; @@ -53,7 +139,7 @@ int idempotency_request_metadata_init( if ((result=fast_mblock_init_ex1(&ctx->allocator, "req-metadata-info", sizeof(IdempotencyRequestMetadata), 8192, 0, - NULL, NULL, true)) != 0) + NULL, NULL, false)) != 0) { return result; } @@ -66,20 +152,43 @@ int idempotency_request_metadata_init( ctx->is_master_callback.arg = arg; ctx->list.head = ctx->list.tail = NULL; - if (ctx_list.head == NULL) { - ctx_list.head = ctx; + if (g_request_metadata.list.head == NULL) { + g_request_metadata.list.head = ctx; } else { - ctx_list.tail->next = ctx; + g_request_metadata.list.tail->next = ctx; } - ctx_list.tail = ctx; + g_request_metadata.list.tail = ctx; return 0; } -int idempotency_request_metadata_start() +int idempotency_request_metadata_start(const int process_interval_ms, + const int master_side_timeout) { pthread_t tid; + if (g_request_metadata.list.head == NULL) { + logError("file: "__FILE__", line: %d, " + "list is empty!", __LINE__); + return ENOENT; + } + + if (process_interval_ms <= 0) { + logError("file: "__FILE__", line: %d, " + "invalid process interval: %d!", + __LINE__, process_interval_ms); + return EINVAL; + } + + if (master_side_timeout <= 0) { + logError("file: "__FILE__", line: %d, " + "invalid master side timeout: %d!", + __LINE__, master_side_timeout); + return EINVAL; + } + + g_request_metadata.process_interval_ms = process_interval_ms; + g_request_metadata.master_side_timeout = master_side_timeout; return fc_create_thread(&tid, thread_run, NULL, SF_G_THREAD_STACK_SIZE); } @@ -90,23 +199,26 @@ IdempotencyRequestMetadata *idempotency_request_metadata_add( { IdempotencyRequestMetadata *idemp_meta; - if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) { - return NULL; - } - - idemp_meta->req_id = metadata->req_id; - idemp_meta->data_version = metadata->data_version; - idemp_meta->result = -1; - idemp_meta->next = NULL; - FC_ATOMIC_INC_EX(idemp_meta->reffer_count, 2); - PTHREAD_MUTEX_LOCK(&ctx->lock); - if (ctx->list.head == NULL) { - ctx->list.head = idemp_meta; - } else { - ctx->list.tail->next = idemp_meta; - } - ctx->list.tail = idemp_meta; + do { + if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) { + break; + } + + idemp_meta->req_id = metadata->req_id; + idemp_meta->data_version = metadata->data_version; + idemp_meta->enqueue_time = g_current_time; + idemp_meta->result = SF_IDEMPOTENCY_REQUEST_PENDING_RESULT; + idemp_meta->next = NULL; + FC_ATOMIC_INC(idemp_meta->reffer_count); + + if (ctx->list.head == NULL) { + ctx->list.head = idemp_meta; + } else { + ctx->list.tail->next = idemp_meta; + } + ctx->list.tail = idemp_meta; + } while (0); PTHREAD_MUTEX_UNLOCK(&ctx->lock); return idemp_meta; @@ -125,7 +237,8 @@ int idempotency_request_metadata_get( while (meta != NULL) { if (req_id == meta->req_id) { result = 0; - *err_no = meta->result; + *err_no = FC_ATOMIC_GET(meta->result); + break; } meta = meta->next; } diff --git a/src/idempotency/server/request_metadata.h b/src/idempotency/server/request_metadata.h index aee0438..e83fe8c 100644 --- a/src/idempotency/server/request_metadata.h +++ b/src/idempotency/server/request_metadata.h @@ -19,11 +19,14 @@ #include "server_types.h" +#define SF_IDEMPOTENCY_REQUEST_PENDING_RESULT -1 + typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version); typedef struct idempotency_request_metadata { int64_t req_id; int64_t data_version; + time_t enqueue_time; volatile int result; volatile int reffer_count; struct idempotency_request_metadata *next; @@ -51,12 +54,21 @@ extern "C" { IdempotencyRequestMetadataContext *ctx, sf_is_master_callback is_master_callback, void *arg); - int idempotency_request_metadata_start(); + int idempotency_request_metadata_start(const int process_interval_ms, + const int master_side_timeout); IdempotencyRequestMetadata *idempotency_request_metadata_add( IdempotencyRequestMetadataContext *ctx, SFRequestMetadata *metadata); + static inline void idempotency_request_metadata_set_result( + IdempotencyRequestMetadata *meta, const int result) + { + __sync_bool_compare_and_swap(&meta->result, + SF_IDEMPOTENCY_REQUEST_PENDING_RESULT, result); + FC_ATOMIC_DEC(meta->reffer_count); + } + int idempotency_request_metadata_get( IdempotencyRequestMetadataContext *ctx, const int64_t req_id, int *err_no);