add files idempotency/server/request_metadata.[hc]

replication_quorum
YuQing 2022-06-27 11:30:01 +08:00
parent c6300318c8
commit b364a875c2
4 changed files with 222 additions and 6 deletions

View File

@ -17,7 +17,8 @@ IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
idempotency/server/request_htable.h \
idempotency/server/channel_htable.h \
idempotency/server/server_handler.h
idempotency/server/server_handler.h \
idempotency/server/request_metadata.h
IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \
idempotency/client/receipt_handler.h \
@ -36,6 +37,7 @@ SHARED_OBJS = sf_nio.lo sf_iov.lo sf_service.lo sf_global.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
idempotency/server/server_handler.lo \
idempotency/server/request_metadata.lo \
idempotency/client/receipt_handler.lo \
idempotency/client/client_channel.lo

View File

@ -0,0 +1,135 @@
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <limits.h>
#include <sys/stat.h>
#include "fastcommon/shared_func.h"
#include "fastcommon/logger.h"
#include "fastcommon/fc_atomic.h"
#include "sf/sf_global.h"
#include "request_metadata.h"
static struct {
IdempotencyRequestMetadataContext *head;
IdempotencyRequestMetadataContext *tail;
} ctx_list = {NULL, NULL};
static void *thread_run(void *arg)
{
IdempotencyRequestMetadataContext *ctx;
int64_t data_version;
ctx = ctx_list.head;
while (ctx != NULL) {
if (ctx->is_master_callback.func(ctx->is_master_callback.
arg, &data_version))
{
} else {
}
ctx = ctx->next;
}
return NULL;
}
int idempotency_request_metadata_init(
IdempotencyRequestMetadataContext *ctx,
sf_is_master_callback is_master_callback, void *arg)
{
int result;
if ((result=fast_mblock_init_ex1(&ctx->allocator, "req-metadata-info",
sizeof(IdempotencyRequestMetadata), 8192, 0,
NULL, NULL, true)) != 0)
{
return result;
}
if ((result=init_pthread_lock(&ctx->lock)) != 0) {
return result;
}
ctx->is_master_callback.func = is_master_callback;
ctx->is_master_callback.arg = arg;
ctx->list.head = ctx->list.tail = NULL;
if (ctx_list.head == NULL) {
ctx_list.head = ctx;
} else {
ctx_list.tail->next = ctx;
}
ctx_list.tail = ctx;
return 0;
}
int idempotency_request_metadata_start()
{
pthread_t tid;
return fc_create_thread(&tid, thread_run, NULL,
SF_G_THREAD_STACK_SIZE);
}
IdempotencyRequestMetadata *idempotency_request_metadata_add(
IdempotencyRequestMetadataContext *ctx,
SFRequestMetadata *metadata)
{
IdempotencyRequestMetadata *idemp_meta;
if ((idemp_meta=fast_mblock_alloc_object(&ctx->allocator)) == NULL) {
return NULL;
}
idemp_meta->req_id = metadata->req_id;
idemp_meta->data_version = metadata->data_version;
idemp_meta->result = -1;
idemp_meta->next = NULL;
FC_ATOMIC_INC_EX(idemp_meta->reffer_count, 2);
PTHREAD_MUTEX_LOCK(&ctx->lock);
if (ctx->list.head == NULL) {
ctx->list.head = idemp_meta;
} else {
ctx->list.tail->next = idemp_meta;
}
ctx->list.tail = idemp_meta;
PTHREAD_MUTEX_UNLOCK(&ctx->lock);
return idemp_meta;
}
int idempotency_request_metadata_get(
IdempotencyRequestMetadataContext *ctx,
const int64_t req_id, int *err_no)
{
int result;
IdempotencyRequestMetadata *meta;
result = ENOENT;
PTHREAD_MUTEX_LOCK(&ctx->lock);
meta = ctx->list.head;
while (meta != NULL) {
if (req_id == meta->req_id) {
result = 0;
*err_no = meta->result;
}
meta = meta->next;
}
PTHREAD_MUTEX_UNLOCK(&ctx->lock);
return result;
}

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef _SF_IDEMPOTENCY_REQUEST_METADATA_H
#define _SF_IDEMPOTENCY_REQUEST_METADATA_H
#include "server_types.h"
typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version);
typedef struct idempotency_request_metadata {
int64_t req_id;
int64_t data_version;
volatile int result;
volatile int reffer_count;
struct idempotency_request_metadata *next;
} IdempotencyRequestMetadata;
typedef struct idempotency_request_metadata_context {
struct {
sf_is_master_callback func;
void *arg;
} is_master_callback;
struct fast_mblock_man allocator; //element: IdempotencyRequestMetadata
pthread_mutex_t lock;
struct {
IdempotencyRequestMetadata *head;
IdempotencyRequestMetadata *tail;
} list;
struct idempotency_request_metadata_context *next;
} IdempotencyRequestMetadataContext;
#ifdef __cplusplus
extern "C" {
#endif
int idempotency_request_metadata_init(
IdempotencyRequestMetadataContext *ctx,
sf_is_master_callback is_master_callback, void *arg);
int idempotency_request_metadata_start();
IdempotencyRequestMetadata *idempotency_request_metadata_add(
IdempotencyRequestMetadataContext *ctx,
SFRequestMetadata *metadata);
int idempotency_request_metadata_get(
IdempotencyRequestMetadataContext *ctx,
const int64_t req_id, int *err_no);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -57,6 +57,17 @@ typedef struct idempotency_channel {
struct idempotency_channel *next;
} IdempotencyChannel;
typedef struct sf_request_metadata {
int64_t req_id;
int64_t data_version;
} SFRequestMetadata;
typedef struct sf_request_metadata_array {
SFRequestMetadata *elts;
int count;
int alloc;
} SFRequestMetadataArray;
#ifdef __cplusplus
extern "C" {
#endif