diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 2859614..bd47897 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -81,7 +81,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) __LINE__, task->server_ip, task->port); } -static int setup_channel_request(struct fast_task_info *task) +static void setup_channel_request(struct fast_task_info *task) { IdempotencyClientChannel *channel; SFCommonProtoHeader *header; @@ -96,11 +96,10 @@ static int setup_channel_request(struct fast_task_info *task) SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ, sizeof(SFProtoSetupChannelReq)); task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq); - return sf_send_add_event(task); + sf_send_add_event(task); } -static int check_report_req_receipt(struct fast_task_info *task, - int *count) +static int check_report_req_receipt(struct fast_task_info *task) { IdempotencyClientChannel *channel; SFCommonProtoHeader *header; @@ -110,9 +109,9 @@ static int check_report_req_receipt(struct fast_task_info *task, IdempotencyClientReceipt *last; IdempotencyClientReceipt *receipt; char *buff_end; + int count; if (task->length > 0) { - *count = 0; logWarning("file: "__FILE__", line: %d, " "server %s:%d, task length: %d != 0, skip check " "and report receipt request!", __LINE__, @@ -122,13 +121,11 @@ static int check_report_req_receipt(struct fast_task_info *task, channel = (IdempotencyClientChannel *)task->arg; if (channel->waiting_resp_qinfo.head != NULL) { - *count = 0; return 0; } fc_queue_pop_to_queue(&channel->queue, &channel->waiting_resp_qinfo); if (channel->waiting_resp_qinfo.head == NULL) { - *count = 0; return 0; } @@ -163,15 +160,16 @@ static int check_report_req_receipt(struct fast_task_info *task, channel->waiting_resp_qinfo.tail = last; } - *count = rbody - rstart; - int2buff(*count, rheader->count); + count = rbody - rstart; + int2buff(count, rheader->count); task->length = (char *)rbody - task->data; int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; - return sf_send_add_event(task); + sf_send_add_event(task); + return count; } -static int close_channel_request(struct fast_task_info *task) +static void close_channel_request(struct fast_task_info *task) { IdempotencyClientChannel *channel; SFCommonProtoHeader *header; @@ -182,16 +180,16 @@ static int close_channel_request(struct fast_task_info *task) header = (SFCommonProtoHeader *)task->data; SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0); task->length = sizeof(SFCommonProtoHeader); - return sf_send_add_event(task); + sf_send_add_event(task); } -static int active_test_request(struct fast_task_info *task) +static void active_test_request(struct fast_task_info *task) { SFCommonProtoHeader *header; header = (SFCommonProtoHeader *)task->data; SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0); task->length = sizeof(SFCommonProtoHeader); - return sf_send_add_event(task); + sf_send_add_event(task); } static inline void update_lru_chain(struct fast_task_info *task) @@ -205,18 +203,13 @@ static inline void update_lru_chain(struct fast_task_info *task) fc_list_move_tail(&channel->dlink, &thread_ctx->head); } -static int report_req_receipt_request(struct fast_task_info *task, +static void report_req_receipt_request(struct fast_task_info *task, const bool update_lru) { - int result; int count; - if ((result=check_report_req_receipt(task, &count)) != 0) { - return result; - } - - if (count == 0) { - result = sf_set_read_event(task); + if ((count=check_report_req_receipt(task)) == 0) { + sf_set_read_event(task); //trigger read event } else { ((IdempotencyClientChannel *)task->arg)-> last_report_time = g_current_time; @@ -224,8 +217,6 @@ static int report_req_receipt_request(struct fast_task_info *task, update_lru_chain(task); } } - - return 0; } static inline int receipt_expect_body_length(struct fast_task_info *task, @@ -334,14 +325,16 @@ static int receipt_deal_task(struct fast_task_info *task) do { stage = SF_NIO_TASK_STAGE_FETCH(task); if (stage == SF_NIO_STAGE_HANDSHAKE) { - result = setup_channel_request(task); + setup_channel_request(task); + result = 0; break; } else if (stage == SF_NIO_STAGE_CONTINUE) { if (((IdempotencyClientChannel *)task->arg)->established) { - result = report_req_receipt_request(task, true); + report_req_receipt_request(task, true); } else { - result = 0; //just ignore + sf_set_read_event(task); //trigger read event } + result = 0; break; } @@ -391,7 +384,7 @@ static int receipt_deal_task(struct fast_task_info *task) if (result == 0) { update_lru_chain(task); task->offset = task->length = 0; - result = report_req_receipt_request(task, false); + report_req_receipt_request(task, false); } } while (0);