From 56ccde45ba8f05597349730338fb6bd68d2ce96d Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 24 Jun 2022 10:37:10 +0800 Subject: [PATCH] idempotency seq_id includes server id and channel id for global unique --- src/idempotency/client/client_channel.h | 3 +- src/idempotency/client/client_types.h | 4 +- src/idempotency/client/receipt_handler.c | 4 +- src/idempotency/common/idempotency_types.h | 46 ++++++++++++++++++++++ src/idempotency/server/server_handler.c | 6 +-- src/idempotency/server/server_handler.h | 4 +- src/idempotency/server/server_types.h | 6 +-- src/sf_proto.h | 2 +- 8 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 src/idempotency/common/idempotency_types.h diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index bc58cc4..4647845 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -46,7 +46,8 @@ struct idempotency_client_channel *idempotency_client_channel_get( static inline uint64_t idempotency_client_channel_next_seq_id( struct idempotency_client_channel *channel) { - return __sync_add_and_fetch(&channel->next_req_id, 1); + return SF_IDEMPOTENCY_NEXT_REQ_ID(channel->server_id, + channel->id, FC_ATOMIC_INC(channel->next_seq)); } int idempotency_client_channel_push(struct idempotency_client_channel *channel, diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h index 3bf815f..8e992bc 100644 --- a/src/idempotency/client/client_types.h +++ b/src/idempotency/client/client_types.h @@ -21,6 +21,7 @@ #include "fastcommon/fast_mblock.h" #include "fastcommon/fc_list.h" #include "fastcommon/fc_queue.h" +#include "sf/idempotency/common/idempotency_types.h" typedef struct idempotency_client_config { bool enabled; @@ -40,11 +41,12 @@ typedef struct idempotency_client_channel { volatile char in_ioevent; volatile char established; int buffer_size; //the min task size of the server and mine + uint32_t server_id; + volatile uint32_t next_seq; time_t last_connect_time; //for connect frequency control time_t last_pkg_time; //last communication time time_t last_report_time; //last report time for rpc receipt pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify - volatile uint64_t next_req_id; struct fast_mblock_man receipt_allocator; struct fast_task_info *task; struct fc_queue queue; diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 0ce09d5..4424ba9 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -271,10 +271,12 @@ static int deal_setup_channel_response(struct fast_task_info *task) return 0; } - resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); + resp = (SFProtoSetupChannelResp *)(task->data + + sizeof(SFCommonProtoHeader)); channel_id = buff2int(resp->channel_id); channel_key = buff2int(resp->key); buffer_size = buff2int(resp->buffer_size); + channel->server_id = buff2int(resp->server_id); idempotency_client_channel_set_id_key(channel, channel_id, channel_key); if (__sync_bool_compare_and_swap(&channel->established, 0, 1)) { thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; diff --git a/src/idempotency/common/idempotency_types.h b/src/idempotency/common/idempotency_types.h new file mode 100644 index 0000000..16420b4 --- /dev/null +++ b/src/idempotency/common/idempotency_types.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + + +#ifndef _IDEMPOTENCY_COMMON_TYPES_H +#define _IDEMPOTENCY_COMMON_TYPES_H + +#include "fastcommon/common_define.h" + +#define SF_IDEMPOTENCY_CHANNEL_ID_BITS 16 +#define SF_IDEMPOTENCY_REQUEST_ID_BITS (64 - SF_IDEMPOTENCY_CHANNEL_ID_BITS) +#define SF_IDEMPOTENCY_MAX_CHANNEL_COUNT ((1 << SF_IDEMPOTENCY_CHANNEL_ID_BITS) - 1) +#define SF_IDEMPOTENCY_MAX_CHANNEL_ID SF_IDEMPOTENCY_MAX_CHANNEL_COUNT + +#define SF_IDEMPOTENCY_SERVER_ID_OFFSET 48 +#define SF_IDEMPOTENCY_CHANNEL_ID_OFFSET 32 + +#define SF_IDEMPOTENCY_NEXT_REQ_ID(server_id, channel_id, seq) \ + (((int64_t)server_id) << SF_IDEMPOTENCY_SERVER_ID_OFFSET) | \ + (((int64_t)channel_id) << SF_IDEMPOTENCY_CHANNEL_ID_OFFSET) | \ + (int64_t)seq + +#define SF_IDEMPOTENCY_EXTRACT_SERVER_ID(req_id) \ + (int)((req_id >> SF_IDEMPOTENCY_SERVER_ID_OFFSET) & 0xFFFF) + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index 0214697..b3cfbaa 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -41,8 +41,8 @@ (task->length - sizeof(SFCommonProtoHeader)) int sf_server_deal_setup_channel(struct fast_task_info *task, - int *task_type, IdempotencyChannel **channel, - SFResponseInfo *response) + int *task_type, const int server_id, IdempotencyChannel + **channel, SFResponseInfo *response) { int result; SFProtoSetupChannelReq *req; @@ -74,13 +74,13 @@ int sf_server_deal_setup_channel(struct fast_task_info *task, "alloc channel fail, hint channel id: %d", channel_id); return ENOMEM; } - *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER; resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->key, resp->key); + int2buff(server_id, resp->server_id); int2buff(task->size, resp->buffer_size); response->header.body_len = sizeof(SFProtoSetupChannelResp); return 0; diff --git a/src/idempotency/server/server_handler.h b/src/idempotency/server/server_handler.h index 74bc478..d7f016d 100644 --- a/src/idempotency/server/server_handler.h +++ b/src/idempotency/server/server_handler.h @@ -25,8 +25,8 @@ extern "C" { #endif int sf_server_deal_setup_channel(struct fast_task_info *task, - int *task_type, IdempotencyChannel **channel, - SFResponseInfo *response); + int *task_type, const int server_id, IdempotencyChannel + **channel, SFResponseInfo *response); int sf_server_deal_close_channel(struct fast_task_info *task, int *task_type, IdempotencyChannel **channel, diff --git a/src/idempotency/server/server_types.h b/src/idempotency/server/server_types.h index 89ebffa..8786a54 100644 --- a/src/idempotency/server/server_types.h +++ b/src/idempotency/server/server_types.h @@ -19,11 +19,7 @@ #include "fastcommon/fast_mblock.h" #include "fastcommon/fast_timer.h" - -#define SF_IDEMPOTENCY_CHANNEL_ID_BITS 16 -#define SF_IDEMPOTENCY_REQUEST_ID_BITS (64 - SF_IDEMPOTENCY_CHANNEL_ID_BITS) -#define SF_IDEMPOTENCY_MAX_CHANNEL_COUNT ((1 << SF_IDEMPOTENCY_CHANNEL_ID_BITS) - 1) -#define SF_IDEMPOTENCY_MAX_CHANNEL_ID SF_IDEMPOTENCY_MAX_CHANNEL_COUNT +#include "sf/idempotency/common/idempotency_types.h" #define SF_IDEMPOTENCY_DEFAULT_REQUEST_HINT_CAPACITY 1023 #define SF_IDEMPOTENCY_DEFAULT_CHANNEL_RESERVE_INTERVAL 600 diff --git a/src/sf_proto.h b/src/sf_proto.h index be31a1a..ac0dcb4 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -176,8 +176,8 @@ typedef struct sf_proto_setup_channel_req { typedef struct sf_proto_setup_channel_resp { char channel_id[4]; char key[4]; + char server_id[4]; char buffer_size[4]; - char padding[4]; } SFProtoSetupChannelResp; typedef struct sf_proto_rebind_channel_req {