From b364a875c288562bbdacbfa1354d19969422d5b1 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 27 Jun 2022 11:30:01 +0800 Subject: [PATCH] add files idempotency/server/request_metadata.[hc] --- src/Makefile.in | 14 ++- src/idempotency/server/request_metadata.c | 135 ++++++++++++++++++++++ src/idempotency/server/request_metadata.h | 68 +++++++++++ src/idempotency/server/server_types.h | 11 ++ 4 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 src/idempotency/server/request_metadata.c create mode 100644 src/idempotency/server/request_metadata.h diff --git a/src/Makefile.in b/src/Makefile.in index 02772c0..21b9bd7 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -17,7 +17,8 @@ IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ idempotency/server/server_channel.h \ idempotency/server/request_htable.h \ idempotency/server/channel_htable.h \ - idempotency/server/server_handler.h + idempotency/server/server_handler.h \ + idempotency/server/request_metadata.h IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \ idempotency/client/receipt_handler.h \ @@ -32,11 +33,12 @@ SHARED_OBJS = sf_nio.lo sf_iov.lo sf_service.lo sf_global.lo \ sf_connection_manager.lo sf_serializer.lo \ sf_binlog_index.lo sf_file_writer.lo \ sf_binlog_writer.lo sf_ordered_writer.lo \ - idempotency/server/server_channel.lo \ - idempotency/server/request_htable.lo \ - idempotency/server/channel_htable.lo \ - idempotency/server/server_handler.lo \ - idempotency/client/receipt_handler.lo \ + idempotency/server/server_channel.lo \ + idempotency/server/request_htable.lo \ + idempotency/server/channel_htable.lo \ + idempotency/server/server_handler.lo \ + idempotency/server/request_metadata.lo \ + idempotency/client/receipt_handler.lo \ idempotency/client/client_channel.lo ALL_OBJS = $(SHARED_OBJS) diff --git a/src/idempotency/server/request_metadata.c b/src/idempotency/server/request_metadata.c new file mode 100644 index 0000000..779298c --- /dev/null +++ b/src/idempotency/server/request_metadata.c @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "fastcommon/shared_func.h" +#include "fastcommon/logger.h" +#include "fastcommon/fc_atomic.h" +#include "sf/sf_global.h" +#include "request_metadata.h" + +static struct { + IdempotencyRequestMetadataContext *head; + IdempotencyRequestMetadataContext *tail; +} ctx_list = {NULL, NULL}; + +static void *thread_run(void *arg) +{ + IdempotencyRequestMetadataContext *ctx; + int64_t data_version; + + ctx = ctx_list.head; + while (ctx != NULL) { + if (ctx->is_master_callback.func(ctx->is_master_callback. + arg, &data_version)) + { + } else { + } + + ctx = ctx->next; + } + + return NULL; +} + +int idempotency_request_metadata_init( + IdempotencyRequestMetadataContext *ctx, + sf_is_master_callback is_master_callback, void *arg) +{ + int result; + + if ((result=fast_mblock_init_ex1(&ctx->allocator, "req-metadata-info", + sizeof(IdempotencyRequestMetadata), 8192, 0, + NULL, NULL, true)) != 0) + { + return result; + } + + if ((result=init_pthread_lock(&ctx->lock)) != 0) { + return result; + } + + ctx->is_master_callback.func = is_master_callback; + ctx->is_master_callback.arg = arg; + ctx->list.head = ctx->list.tail = NULL; + + if (ctx_list.head == NULL) { + ctx_list.head = ctx; + } else { + ctx_list.tail->next = ctx; + } + ctx_list.tail = ctx; + + return 0; +} + +int idempotency_request_metadata_start() +{ + pthread_t tid; + + return fc_create_thread(&tid, thread_run, NULL, + SF_G_THREAD_STACK_SIZE); +} + +IdempotencyRequestMetadata *idempotency_request_metadata_add( + IdempotencyRequestMetadataContext *ctx, + SFRequestMetadata *metadata) +{ + IdempotencyRequestMetadata *idemp_meta; + + if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) { + return NULL; + } + + idemp_meta->req_id = metadata->req_id; + idemp_meta->data_version = metadata->data_version; + idemp_meta->result = -1; + idemp_meta->next = NULL; + FC_ATOMIC_INC_EX(idemp_meta->reffer_count, 2); + + PTHREAD_MUTEX_LOCK(&ctx->lock); + if (ctx->list.head == NULL) { + ctx->list.head = idemp_meta; + } else { + ctx->list.tail->next = idemp_meta; + } + ctx->list.tail = idemp_meta; + PTHREAD_MUTEX_UNLOCK(&ctx->lock); + + return idemp_meta; +} + +int idempotency_request_metadata_get( + IdempotencyRequestMetadataContext *ctx, + const int64_t req_id, int *err_no) +{ + int result; + IdempotencyRequestMetadata *meta; + + result = ENOENT; + PTHREAD_MUTEX_LOCK(&ctx->lock); + meta = ctx->list.head; + while (meta != NULL) { + if (req_id == meta->req_id) { + result = 0; + *err_no = meta->result; + } + meta = meta->next; + } + PTHREAD_MUTEX_UNLOCK(&ctx->lock); + + return result; +} diff --git a/src/idempotency/server/request_metadata.h b/src/idempotency/server/request_metadata.h new file mode 100644 index 0000000..aee0438 --- /dev/null +++ b/src/idempotency/server/request_metadata.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + + +#ifndef _SF_IDEMPOTENCY_REQUEST_METADATA_H +#define _SF_IDEMPOTENCY_REQUEST_METADATA_H + +#include "server_types.h" + +typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version); + +typedef struct idempotency_request_metadata { + int64_t req_id; + int64_t data_version; + volatile int result; + volatile int reffer_count; + struct idempotency_request_metadata *next; +} IdempotencyRequestMetadata; + +typedef struct idempotency_request_metadata_context { + struct { + sf_is_master_callback func; + void *arg; + } is_master_callback; + struct fast_mblock_man allocator; //element: IdempotencyRequestMetadata + pthread_mutex_t lock; + struct { + IdempotencyRequestMetadata *head; + IdempotencyRequestMetadata *tail; + } list; + struct idempotency_request_metadata_context *next; +} IdempotencyRequestMetadataContext; + +#ifdef __cplusplus +extern "C" { +#endif + + int idempotency_request_metadata_init( + IdempotencyRequestMetadataContext *ctx, + sf_is_master_callback is_master_callback, void *arg); + + int idempotency_request_metadata_start(); + + IdempotencyRequestMetadata *idempotency_request_metadata_add( + IdempotencyRequestMetadataContext *ctx, + SFRequestMetadata *metadata); + + int idempotency_request_metadata_get( + IdempotencyRequestMetadataContext *ctx, + const int64_t req_id, int *err_no); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/server_types.h b/src/idempotency/server/server_types.h index 8786a54..38b63f5 100644 --- a/src/idempotency/server/server_types.h +++ b/src/idempotency/server/server_types.h @@ -57,6 +57,17 @@ typedef struct idempotency_channel { struct idempotency_channel *next; } IdempotencyChannel; +typedef struct sf_request_metadata { + int64_t req_id; + int64_t data_version; +} SFRequestMetadata; + +typedef struct sf_request_metadata_array { + SFRequestMetadata *elts; + int count; + int alloc; +} SFRequestMetadataArray; + #ifdef __cplusplus extern "C" { #endif