idempotency seq_id includes server id and channel id for global unique

replication_quorum
YuQing 2022-06-24 10:37:10 +08:00
parent 7f7ba8d835
commit 56ccde45ba
8 changed files with 61 additions and 14 deletions

View File

@ -46,7 +46,8 @@ struct idempotency_client_channel *idempotency_client_channel_get(
static inline uint64_t idempotency_client_channel_next_seq_id( static inline uint64_t idempotency_client_channel_next_seq_id(
struct idempotency_client_channel *channel) struct idempotency_client_channel *channel)
{ {
return __sync_add_and_fetch(&channel->next_req_id, 1); return SF_IDEMPOTENCY_NEXT_REQ_ID(channel->server_id,
channel->id, FC_ATOMIC_INC(channel->next_seq));
} }
int idempotency_client_channel_push(struct idempotency_client_channel *channel, int idempotency_client_channel_push(struct idempotency_client_channel *channel,

View File

@ -21,6 +21,7 @@
#include "fastcommon/fast_mblock.h" #include "fastcommon/fast_mblock.h"
#include "fastcommon/fc_list.h" #include "fastcommon/fc_list.h"
#include "fastcommon/fc_queue.h" #include "fastcommon/fc_queue.h"
#include "sf/idempotency/common/idempotency_types.h"
typedef struct idempotency_client_config { typedef struct idempotency_client_config {
bool enabled; bool enabled;
@ -40,11 +41,12 @@ typedef struct idempotency_client_channel {
volatile char in_ioevent; volatile char in_ioevent;
volatile char established; volatile char established;
int buffer_size; //the min task size of the server and mine int buffer_size; //the min task size of the server and mine
uint32_t server_id;
volatile uint32_t next_seq;
time_t last_connect_time; //for connect frequency control time_t last_connect_time; //for connect frequency control
time_t last_pkg_time; //last communication time time_t last_pkg_time; //last communication time
time_t last_report_time; //last report time for rpc receipt time_t last_report_time; //last report time for rpc receipt
pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify
volatile uint64_t next_req_id;
struct fast_mblock_man receipt_allocator; struct fast_mblock_man receipt_allocator;
struct fast_task_info *task; struct fast_task_info *task;
struct fc_queue queue; struct fc_queue queue;

View File

@ -271,10 +271,12 @@ static int deal_setup_channel_response(struct fast_task_info *task)
return 0; return 0;
} }
resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); resp = (SFProtoSetupChannelResp *)(task->data +
sizeof(SFCommonProtoHeader));
channel_id = buff2int(resp->channel_id); channel_id = buff2int(resp->channel_id);
channel_key = buff2int(resp->key); channel_key = buff2int(resp->key);
buffer_size = buff2int(resp->buffer_size); buffer_size = buff2int(resp->buffer_size);
channel->server_id = buff2int(resp->server_id);
idempotency_client_channel_set_id_key(channel, channel_id, channel_key); idempotency_client_channel_set_id_key(channel, channel_id, channel_key);
if (__sync_bool_compare_and_swap(&channel->established, 0, 1)) { if (__sync_bool_compare_and_swap(&channel->established, 0, 1)) {
thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg;

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef _IDEMPOTENCY_COMMON_TYPES_H
#define _IDEMPOTENCY_COMMON_TYPES_H
#include "fastcommon/common_define.h"
#define SF_IDEMPOTENCY_CHANNEL_ID_BITS 16
#define SF_IDEMPOTENCY_REQUEST_ID_BITS (64 - SF_IDEMPOTENCY_CHANNEL_ID_BITS)
#define SF_IDEMPOTENCY_MAX_CHANNEL_COUNT ((1 << SF_IDEMPOTENCY_CHANNEL_ID_BITS) - 1)
#define SF_IDEMPOTENCY_MAX_CHANNEL_ID SF_IDEMPOTENCY_MAX_CHANNEL_COUNT
#define SF_IDEMPOTENCY_SERVER_ID_OFFSET 48
#define SF_IDEMPOTENCY_CHANNEL_ID_OFFSET 32
#define SF_IDEMPOTENCY_NEXT_REQ_ID(server_id, channel_id, seq) \
(((int64_t)server_id) << SF_IDEMPOTENCY_SERVER_ID_OFFSET) | \
(((int64_t)channel_id) << SF_IDEMPOTENCY_CHANNEL_ID_OFFSET) | \
(int64_t)seq
#define SF_IDEMPOTENCY_EXTRACT_SERVER_ID(req_id) \
(int)((req_id >> SF_IDEMPOTENCY_SERVER_ID_OFFSET) & 0xFFFF)
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -41,8 +41,8 @@
(task->length - sizeof(SFCommonProtoHeader)) (task->length - sizeof(SFCommonProtoHeader))
int sf_server_deal_setup_channel(struct fast_task_info *task, int sf_server_deal_setup_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel, int *task_type, const int server_id, IdempotencyChannel
SFResponseInfo *response) **channel, SFResponseInfo *response)
{ {
int result; int result;
SFProtoSetupChannelReq *req; SFProtoSetupChannelReq *req;
@ -74,13 +74,13 @@ int sf_server_deal_setup_channel(struct fast_task_info *task,
"alloc channel fail, hint channel id: %d", channel_id); "alloc channel fail, hint channel id: %d", channel_id);
return ENOMEM; return ENOMEM;
} }
*task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER; *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER;
resp = (SFProtoSetupChannelResp *)(task->data + resp = (SFProtoSetupChannelResp *)(task->data +
sizeof(SFCommonProtoHeader)); sizeof(SFCommonProtoHeader));
int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->id, resp->channel_id);
int2buff((*channel)->key, resp->key); int2buff((*channel)->key, resp->key);
int2buff(server_id, resp->server_id);
int2buff(task->size, resp->buffer_size); int2buff(task->size, resp->buffer_size);
response->header.body_len = sizeof(SFProtoSetupChannelResp); response->header.body_len = sizeof(SFProtoSetupChannelResp);
return 0; return 0;

View File

@ -25,8 +25,8 @@ extern "C" {
#endif #endif
int sf_server_deal_setup_channel(struct fast_task_info *task, int sf_server_deal_setup_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel, int *task_type, const int server_id, IdempotencyChannel
SFResponseInfo *response); **channel, SFResponseInfo *response);
int sf_server_deal_close_channel(struct fast_task_info *task, int sf_server_deal_close_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel, int *task_type, IdempotencyChannel **channel,

View File

@ -19,11 +19,7 @@
#include "fastcommon/fast_mblock.h" #include "fastcommon/fast_mblock.h"
#include "fastcommon/fast_timer.h" #include "fastcommon/fast_timer.h"
#include "sf/idempotency/common/idempotency_types.h"
#define SF_IDEMPOTENCY_CHANNEL_ID_BITS 16
#define SF_IDEMPOTENCY_REQUEST_ID_BITS (64 - SF_IDEMPOTENCY_CHANNEL_ID_BITS)
#define SF_IDEMPOTENCY_MAX_CHANNEL_COUNT ((1 << SF_IDEMPOTENCY_CHANNEL_ID_BITS) - 1)
#define SF_IDEMPOTENCY_MAX_CHANNEL_ID SF_IDEMPOTENCY_MAX_CHANNEL_COUNT
#define SF_IDEMPOTENCY_DEFAULT_REQUEST_HINT_CAPACITY 1023 #define SF_IDEMPOTENCY_DEFAULT_REQUEST_HINT_CAPACITY 1023
#define SF_IDEMPOTENCY_DEFAULT_CHANNEL_RESERVE_INTERVAL 600 #define SF_IDEMPOTENCY_DEFAULT_CHANNEL_RESERVE_INTERVAL 600

View File

@ -176,8 +176,8 @@ typedef struct sf_proto_setup_channel_req {
typedef struct sf_proto_setup_channel_resp { typedef struct sf_proto_setup_channel_resp {
char channel_id[4]; char channel_id[4];
char key[4]; char key[4];
char server_id[4];
char buffer_size[4]; char buffer_size[4];
char padding[4];
} SFProtoSetupChannelResp; } SFProtoSetupChannelResp;
typedef struct sf_proto_rebind_channel_req { typedef struct sf_proto_rebind_channel_req {