use task->pending_send_count to prevent re-entry

support_rdma
YuQing 2023-11-06 10:53:25 +08:00
parent a8867a19c4
commit cf9088fb0c
3 changed files with 17 additions and 8 deletions

View File

@ -97,6 +97,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
sf_nio_reset_task_length(task);
task->req_count = 0;
task->pending_send_count = 0;
channel = (IdempotencyClientChannel *)task->arg;
fc_list_del_init(&channel->dlink);
@ -238,6 +239,7 @@ static void report_req_receipt_request(struct fast_task_info *task,
if (update_lru) {
update_lru_chain(task);
}
task->pending_send_count++;
}
}
@ -325,9 +327,10 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task)
"response from server %s:%u, unexpect cmd: "
"REPORT_REQ_RECEIPT_RESP", __LINE__,
task->server_ip, task->port);
return 0;
return EINVAL;
}
task->pending_send_count--;
current = channel->waiting_resp_qinfo.head;
do {
deleted = current;
@ -352,7 +355,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
result = 0;
break;
} else if (stage == SF_NIO_STAGE_CONTINUE) {
if (sf_nio_task_send_done(task)) {
if (task->pending_send_count == 0) {
if (((IdempotencyClientChannel *)task->arg)->established) {
report_req_receipt_request(task, true);
} else if (task->req_count > 0) {
@ -389,6 +392,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
result = deal_report_req_receipt_response(task);
break;
case SF_PROTO_ACTIVE_TEST_RESP:
task->pending_send_count--;
result = 0;
break;
case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP:
@ -410,7 +414,9 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
update_lru_chain(task);
task->recv.ptr->length = 0;
task->recv.ptr->offset = 0;
report_req_receipt_request(task, false);
if (task->pending_send_count == 0) {
report_req_receipt_request(task, false);
}
}
} while (0);
@ -430,9 +436,10 @@ static void receipt_thread_check_heartbeat(
break;
}
if (sf_nio_task_send_done(channel->task)) {
if (channel->task->pending_send_count == 0) {
channel->last_pkg_time = g_current_time;
active_test_request(channel->task);
channel->task->pending_send_count++;
}
}
}
@ -444,7 +451,7 @@ static void receipt_thread_close_idle_channel(
IdempotencyClientChannel *tmp;
fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) {
if (!sf_nio_task_send_done(channel->task)) {
if (channel->task->pending_send_count > 0) {
continue;
}

View File

@ -131,6 +131,7 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task,
SFProtoReportReqReceiptBody *body_part;
SFProtoReportReqReceiptBody *body_end;
response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP;
if ((result=check_holder_channel(task_type, channel, response)) != 0) {
return result;
}
@ -164,7 +165,6 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task,
}
//logInfo("receipt count: %d, success: %d", count, success);
response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP;
return 0;
}

View File

@ -956,10 +956,12 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (action == sf_comm_action_finish) {
task->req_count++;
task->nio_stages.current = SF_NIO_STAGE_SEND;
if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) {
ioevent_add_to_deleted_list(task);
return -1;
} else if (task->handler->explicit_post_recv) {
}
if (task->handler->explicit_post_recv) {
if (task->handler->post_recv(task) != 0) {
ioevent_add_to_deleted_list(task);
return -1;