From 4c0dde69e52721ce930a162db94d9dec5f43e059 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 16 Sep 2020 21:52:43 +0800 Subject: [PATCH] channel buffer_size for receipt report --- src/idempotency/client/client_types.h | 2 +- src/idempotency/client/receipt_handler.c | 10 +++++++++- src/idempotency/server/server_handler.c | 2 +- src/sf_proto.h | 2 ++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h index 930a358..a65322b 100644 --- a/src/idempotency/client/client_types.h +++ b/src/idempotency/client/client_types.h @@ -23,8 +23,8 @@ typedef struct idempotency_client_channel { volatile uint32_t id; //channel id, 0 for invalid volatile int key; //channel key volatile char in_ioevent; - //volatile char in_heartbeat; volatile char established; + int buffer_size; //the min task size of the server and mine time_t last_connect_time; //for connect frequency control time_t last_pkg_time; //last communication time time_t last_report_time; //last report time for rpc receipt diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 746ae9e..2607c48 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -135,7 +135,7 @@ static int check_report_req_receipt(struct fast_task_info *task, header = (SFCommonProtoHeader *)task->data; rheader = (SFProtoReportReqReceiptHeader *)(header + 1); rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1); - buff_end = task->data + task->size; + buff_end = task->data + channel->buffer_size; last = NULL; receipt = channel->waiting_resp_qinfo.head; do { @@ -250,6 +250,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) IdempotencyClientChannel *channel; int channel_id; int channel_key; + int buffer_size; if ((result=receipt_expect_body_length(task, sizeof(SFProtoSetupChannelResp))) != 0) @@ -269,11 +270,18 @@ static int deal_setup_channel_response(struct fast_task_info *task) resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); channel_id = buff2int(resp->channel_id); channel_key = buff2int(resp->key); + buffer_size = buff2int(resp->buffer_size); idempotency_client_channel_set_id_key(channel, channel_id, channel_key); if (__sync_bool_compare_and_swap(&channel->established, 0, 1)) { thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; fc_list_add_tail(&channel->dlink, &thread_ctx->head); } + channel->buffer_size = FC_MIN(buffer_size, task->size); + + logInfo("file: "__FILE__", line: %d, " + "peer buffer size: %d, mine buffer size: %d, " + "min buffer size: %d", __LINE__, buffer_size, + task->size, channel->buffer_size); PTHREAD_MUTEX_LOCK(&channel->lc_pair.lock); pthread_cond_broadcast(&channel->lc_pair.cond); diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index 616ea80..027c76a 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -65,8 +65,8 @@ int sf_server_deal_setup_channel(struct fast_task_info *task, sizeof(SFCommonProtoHeader)); int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->key, resp->key); + int2buff(task->size, resp->buffer_size); response->header.body_len = sizeof(SFProtoSetupChannelResp); - //TASK_ARG->context.response_done = true; return 0; } diff --git a/src/sf_proto.h b/src/sf_proto.h index ee739d0..faa1b6d 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -72,6 +72,8 @@ typedef struct sf_proto_setup_channel_req { typedef struct sf_proto_setup_channel_resp { char channel_id[4]; char key[4]; + char buffer_size[4]; + char padding[4]; } SFProtoSetupChannelResp; typedef struct sf_proto_report_req_receipt_header {