diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index b98a8fa..b487da2 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -97,6 +97,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) sf_nio_reset_task_length(task); task->req_count = 0; + task->pending_send_count = 0; channel = (IdempotencyClientChannel *)task->arg; fc_list_del_init(&channel->dlink); @@ -238,6 +239,7 @@ static void report_req_receipt_request(struct fast_task_info *task, if (update_lru) { update_lru_chain(task); } + task->pending_send_count++; } } @@ -325,9 +327,10 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) "response from server %s:%u, unexpect cmd: " "REPORT_REQ_RECEIPT_RESP", __LINE__, task->server_ip, task->port); - return 0; + return EINVAL; } + task->pending_send_count--; current = channel->waiting_resp_qinfo.head; do { deleted = current; @@ -352,7 +355,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) result = 0; break; } else if (stage == SF_NIO_STAGE_CONTINUE) { - if (sf_nio_task_send_done(task)) { + if (task->pending_send_count == 0) { if (((IdempotencyClientChannel *)task->arg)->established) { report_req_receipt_request(task, true); } else if (task->req_count > 0) { @@ -389,6 +392,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) result = deal_report_req_receipt_response(task); break; case SF_PROTO_ACTIVE_TEST_RESP: + task->pending_send_count--; result = 0; break; case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: @@ -410,7 +414,9 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) update_lru_chain(task); task->recv.ptr->length = 0; task->recv.ptr->offset = 0; - report_req_receipt_request(task, false); + if (task->pending_send_count == 0) { + report_req_receipt_request(task, false); + } } } while (0); @@ -430,9 +436,10 @@ static void receipt_thread_check_heartbeat( break; } - if (sf_nio_task_send_done(channel->task)) { + if (channel->task->pending_send_count == 0) { channel->last_pkg_time = g_current_time; active_test_request(channel->task); + channel->task->pending_send_count++; } } } @@ -444,7 +451,7 @@ static void receipt_thread_close_idle_channel( IdempotencyClientChannel *tmp; fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) { - if (!sf_nio_task_send_done(channel->task)) { + if (channel->task->pending_send_count > 0) { continue; } diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index d5a9e13..1a7cfa3 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -131,6 +131,7 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, SFProtoReportReqReceiptBody *body_part; SFProtoReportReqReceiptBody *body_end; + response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; if ((result=check_holder_channel(task_type, channel, response)) != 0) { return result; } @@ -164,7 +165,6 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, } //logInfo("receipt count: %d, success: %d", count, success); - response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; return 0; } diff --git a/src/sf_nio.c b/src/sf_nio.c index 8a820b6..c4d7350 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -956,10 +956,12 @@ int sf_client_sock_read(int sock, short event, void *arg) if (action == sf_comm_action_finish) { task->req_count++; task->nio_stages.current = SF_NIO_STAGE_SEND; - if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error + if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { ioevent_add_to_deleted_list(task); return -1; - } else if (task->handler->explicit_post_recv) { + } + + if (task->handler->explicit_post_recv) { if (task->handler->post_recv(task) != 0) { ioevent_add_to_deleted_list(task); return -1;