diff --git a/src/Makefile.in b/src/Makefile.in
index 02772c0..21b9bd7 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -17,7 +17,8 @@ IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
idempotency/server/request_htable.h \
idempotency/server/channel_htable.h \
- idempotency/server/server_handler.h
+ idempotency/server/server_handler.h \
+ idempotency/server/request_metadata.h
IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \
idempotency/client/receipt_handler.h \
@@ -32,11 +33,12 @@ SHARED_OBJS = sf_nio.lo sf_iov.lo sf_service.lo sf_global.lo \
sf_connection_manager.lo sf_serializer.lo \
sf_binlog_index.lo sf_file_writer.lo \
sf_binlog_writer.lo sf_ordered_writer.lo \
- idempotency/server/server_channel.lo \
- idempotency/server/request_htable.lo \
- idempotency/server/channel_htable.lo \
- idempotency/server/server_handler.lo \
- idempotency/client/receipt_handler.lo \
+ idempotency/server/server_channel.lo \
+ idempotency/server/request_htable.lo \
+ idempotency/server/channel_htable.lo \
+ idempotency/server/server_handler.lo \
+ idempotency/server/request_metadata.lo \
+ idempotency/client/receipt_handler.lo \
idempotency/client/client_channel.lo
ALL_OBJS = $(SHARED_OBJS)
diff --git a/src/idempotency/server/request_metadata.c b/src/idempotency/server/request_metadata.c
new file mode 100644
index 0000000..779298c
--- /dev/null
+++ b/src/idempotency/server/request_metadata.c
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include "fastcommon/shared_func.h"
+#include "fastcommon/logger.h"
+#include "fastcommon/fc_atomic.h"
+#include "sf/sf_global.h"
+#include "request_metadata.h"
+
+static struct {
+ IdempotencyRequestMetadataContext *head;
+ IdempotencyRequestMetadataContext *tail;
+} ctx_list = {NULL, NULL};
+
+static void *thread_run(void *arg)
+{
+ IdempotencyRequestMetadataContext *ctx;
+ int64_t data_version;
+
+ ctx = ctx_list.head;
+ while (ctx != NULL) {
+ if (ctx->is_master_callback.func(ctx->is_master_callback.
+ arg, &data_version))
+ {
+ } else {
+ }
+
+ ctx = ctx->next;
+ }
+
+ return NULL;
+}
+
+int idempotency_request_metadata_init(
+ IdempotencyRequestMetadataContext *ctx,
+ sf_is_master_callback is_master_callback, void *arg)
+{
+ int result;
+
+ if ((result=fast_mblock_init_ex1(&ctx->allocator, "req-metadata-info",
+ sizeof(IdempotencyRequestMetadata), 8192, 0,
+ NULL, NULL, true)) != 0)
+ {
+ return result;
+ }
+
+ if ((result=init_pthread_lock(&ctx->lock)) != 0) {
+ return result;
+ }
+
+ ctx->is_master_callback.func = is_master_callback;
+ ctx->is_master_callback.arg = arg;
+ ctx->list.head = ctx->list.tail = NULL;
+
+ if (ctx_list.head == NULL) {
+ ctx_list.head = ctx;
+ } else {
+ ctx_list.tail->next = ctx;
+ }
+ ctx_list.tail = ctx;
+
+ return 0;
+}
+
+int idempotency_request_metadata_start()
+{
+ pthread_t tid;
+
+ return fc_create_thread(&tid, thread_run, NULL,
+ SF_G_THREAD_STACK_SIZE);
+}
+
+IdempotencyRequestMetadata *idempotency_request_metadata_add(
+ IdempotencyRequestMetadataContext *ctx,
+ SFRequestMetadata *metadata)
+{
+ IdempotencyRequestMetadata *idemp_meta;
+
+ if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) {
+ return NULL;
+ }
+
+ idemp_meta->req_id = metadata->req_id;
+ idemp_meta->data_version = metadata->data_version;
+ idemp_meta->result = -1;
+ idemp_meta->next = NULL;
+ FC_ATOMIC_INC_EX(idemp_meta->reffer_count, 2);
+
+ PTHREAD_MUTEX_LOCK(&ctx->lock);
+ if (ctx->list.head == NULL) {
+ ctx->list.head = idemp_meta;
+ } else {
+ ctx->list.tail->next = idemp_meta;
+ }
+ ctx->list.tail = idemp_meta;
+ PTHREAD_MUTEX_UNLOCK(&ctx->lock);
+
+ return idemp_meta;
+}
+
+int idempotency_request_metadata_get(
+ IdempotencyRequestMetadataContext *ctx,
+ const int64_t req_id, int *err_no)
+{
+ int result;
+ IdempotencyRequestMetadata *meta;
+
+ result = ENOENT;
+ PTHREAD_MUTEX_LOCK(&ctx->lock);
+ meta = ctx->list.head;
+ while (meta != NULL) {
+ if (req_id == meta->req_id) {
+ result = 0;
+ *err_no = meta->result;
+ }
+ meta = meta->next;
+ }
+ PTHREAD_MUTEX_UNLOCK(&ctx->lock);
+
+ return result;
+}
diff --git a/src/idempotency/server/request_metadata.h b/src/idempotency/server/request_metadata.h
new file mode 100644
index 0000000..aee0438
--- /dev/null
+++ b/src/idempotency/server/request_metadata.h
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+
+#ifndef _SF_IDEMPOTENCY_REQUEST_METADATA_H
+#define _SF_IDEMPOTENCY_REQUEST_METADATA_H
+
+#include "server_types.h"
+
+typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version);
+
+typedef struct idempotency_request_metadata {
+ int64_t req_id;
+ int64_t data_version;
+ volatile int result;
+ volatile int reffer_count;
+ struct idempotency_request_metadata *next;
+} IdempotencyRequestMetadata;
+
+typedef struct idempotency_request_metadata_context {
+ struct {
+ sf_is_master_callback func;
+ void *arg;
+ } is_master_callback;
+ struct fast_mblock_man allocator; //element: IdempotencyRequestMetadata
+ pthread_mutex_t lock;
+ struct {
+ IdempotencyRequestMetadata *head;
+ IdempotencyRequestMetadata *tail;
+ } list;
+ struct idempotency_request_metadata_context *next;
+} IdempotencyRequestMetadataContext;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ int idempotency_request_metadata_init(
+ IdempotencyRequestMetadataContext *ctx,
+ sf_is_master_callback is_master_callback, void *arg);
+
+ int idempotency_request_metadata_start();
+
+ IdempotencyRequestMetadata *idempotency_request_metadata_add(
+ IdempotencyRequestMetadataContext *ctx,
+ SFRequestMetadata *metadata);
+
+ int idempotency_request_metadata_get(
+ IdempotencyRequestMetadataContext *ctx,
+ const int64_t req_id, int *err_no);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/idempotency/server/server_types.h b/src/idempotency/server/server_types.h
index 8786a54..38b63f5 100644
--- a/src/idempotency/server/server_types.h
+++ b/src/idempotency/server/server_types.h
@@ -57,6 +57,17 @@ typedef struct idempotency_channel {
struct idempotency_channel *next;
} IdempotencyChannel;
+typedef struct sf_request_metadata {
+ int64_t req_id;
+ int64_t data_version;
+} SFRequestMetadata;
+
+typedef struct sf_request_metadata_array {
+ SFRequestMetadata *elts;
+ int count;
+ int alloc;
+} SFRequestMetadataArray;
+
#ifdef __cplusplus
extern "C" {
#endif