request_metadata.[hc] v2 impl.

replication_quorum
YuQing 2022-06-27 22:17:17 +08:00
parent 92613c765f
commit 990ef2d173
2 changed files with 12 additions and 24 deletions

View File

@ -33,8 +33,7 @@ static struct {
#define CHECK_MASTER_METADATA(meta) \ #define CHECK_MASTER_METADATA(meta) \
(meta != NULL && g_current_time - meta->enqueue_time > \ (meta != NULL && g_current_time - meta->enqueue_time > \
g_request_metadata.master_side_timeout && \ g_request_metadata.master_side_timeout)
FC_ATOMIC_GET(meta->reffer_count) == 0)
static void process_master_side(IdempotencyRequestMetadataContext *ctx) static void process_master_side(IdempotencyRequestMetadataContext *ctx)
{ {
@ -69,8 +68,7 @@ static void process_master_side(IdempotencyRequestMetadataContext *ctx)
} }
#define CHECK_SLAVE_METADATA(meta, dv) \ #define CHECK_SLAVE_METADATA(meta, dv) \
(meta != NULL && meta->data_version <= dv && \ (meta != NULL && meta->data_version <= dv)
FC_ATOMIC_GET(meta->reffer_count) == 0)
static void process_slave_side(IdempotencyRequestMetadataContext *ctx, static void process_slave_side(IdempotencyRequestMetadataContext *ctx,
const int64_t data_version) const int64_t data_version)
@ -152,6 +150,7 @@ int idempotency_request_metadata_init(
ctx->is_master_callback.arg = arg; ctx->is_master_callback.arg = arg;
ctx->list.head = ctx->list.tail = NULL; ctx->list.head = ctx->list.tail = NULL;
ctx->next = NULL;
if (g_request_metadata.list.head == NULL) { if (g_request_metadata.list.head == NULL) {
g_request_metadata.list.head = ctx; g_request_metadata.list.head = ctx;
} else { } else {
@ -193,7 +192,7 @@ int idempotency_request_metadata_start(const int process_interval_ms,
SF_G_THREAD_STACK_SIZE); SF_G_THREAD_STACK_SIZE);
} }
IdempotencyRequestMetadata *idempotency_request_metadata_add( int idempotency_request_metadata_add(
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
SFRequestMetadata *metadata) SFRequestMetadata *metadata)
{ {
@ -208,9 +207,7 @@ IdempotencyRequestMetadata *idempotency_request_metadata_add(
idemp_meta->req_id = metadata->req_id; idemp_meta->req_id = metadata->req_id;
idemp_meta->data_version = metadata->data_version; idemp_meta->data_version = metadata->data_version;
idemp_meta->enqueue_time = g_current_time; idemp_meta->enqueue_time = g_current_time;
idemp_meta->result = SF_IDEMPOTENCY_REQUEST_PENDING_RESULT;
idemp_meta->next = NULL; idemp_meta->next = NULL;
FC_ATOMIC_INC(idemp_meta->reffer_count);
if (ctx->list.head == NULL) { if (ctx->list.head == NULL) {
ctx->list.head = idemp_meta; ctx->list.head = idemp_meta;
@ -221,12 +218,15 @@ IdempotencyRequestMetadata *idempotency_request_metadata_add(
} while (0); } while (0);
PTHREAD_MUTEX_UNLOCK(&ctx->lock); PTHREAD_MUTEX_UNLOCK(&ctx->lock);
return idemp_meta; logInfo("add req_id: %"PRId64", data_version: %"PRId64,
metadata->req_id, metadata->data_version);
return (idemp_meta != NULL ? 0 : ENOMEM);
} }
int idempotency_request_metadata_get( int idempotency_request_metadata_get(
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
const int64_t req_id, int *err_no) const int64_t req_id, int64_t *data_version)
{ {
int result; int result;
IdempotencyRequestMetadata *meta; IdempotencyRequestMetadata *meta;
@ -237,7 +237,7 @@ int idempotency_request_metadata_get(
while (meta != NULL) { while (meta != NULL) {
if (req_id == meta->req_id) { if (req_id == meta->req_id) {
result = 0; result = 0;
*err_no = FC_ATOMIC_GET(meta->result); *data_version = meta->data_version;
break; break;
} }
meta = meta->next; meta = meta->next;

View File

@ -19,16 +19,12 @@
#include "server_types.h" #include "server_types.h"
#define SF_IDEMPOTENCY_REQUEST_PENDING_RESULT -1
typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version); typedef bool (*sf_is_master_callback)(void *arg, int64_t *data_version);
typedef struct idempotency_request_metadata { typedef struct idempotency_request_metadata {
int64_t req_id; int64_t req_id;
int64_t data_version; int64_t data_version;
time_t enqueue_time; time_t enqueue_time;
volatile int result;
volatile int reffer_count;
struct idempotency_request_metadata *next; struct idempotency_request_metadata *next;
} IdempotencyRequestMetadata; } IdempotencyRequestMetadata;
@ -57,21 +53,13 @@ extern "C" {
int idempotency_request_metadata_start(const int process_interval_ms, int idempotency_request_metadata_start(const int process_interval_ms,
const int master_side_timeout); const int master_side_timeout);
IdempotencyRequestMetadata *idempotency_request_metadata_add( int idempotency_request_metadata_add(
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
SFRequestMetadata *metadata); SFRequestMetadata *metadata);
static inline void idempotency_request_metadata_set_result(
IdempotencyRequestMetadata *meta, const int result)
{
__sync_bool_compare_and_swap(&meta->result,
SF_IDEMPOTENCY_REQUEST_PENDING_RESULT, result);
FC_ATOMIC_DEC(meta->reffer_count);
}
int idempotency_request_metadata_get( int idempotency_request_metadata_get(
IdempotencyRequestMetadataContext *ctx, IdempotencyRequestMetadataContext *ctx,
const int64_t req_id, int *err_no); const int64_t req_id, int64_t *data_version);
#ifdef __cplusplus #ifdef __cplusplus
} }