bugfixed: do NOT return result of sf_send_add_event

connection_manager
YuQing 2020-09-22 22:25:45 +08:00
parent 62a35f03d5
commit ee949f15f1
1 changed files with 21 additions and 28 deletions

View File

@ -81,7 +81,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
__LINE__, task->server_ip, task->port); __LINE__, task->server_ip, task->port);
} }
static int setup_channel_request(struct fast_task_info *task) static void setup_channel_request(struct fast_task_info *task)
{ {
IdempotencyClientChannel *channel; IdempotencyClientChannel *channel;
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
@ -96,11 +96,10 @@ static int setup_channel_request(struct fast_task_info *task)
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ, SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ,
sizeof(SFProtoSetupChannelReq)); sizeof(SFProtoSetupChannelReq));
task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq); task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq);
return sf_send_add_event(task); sf_send_add_event(task);
} }
static int check_report_req_receipt(struct fast_task_info *task, static int check_report_req_receipt(struct fast_task_info *task)
int *count)
{ {
IdempotencyClientChannel *channel; IdempotencyClientChannel *channel;
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
@ -110,9 +109,9 @@ static int check_report_req_receipt(struct fast_task_info *task,
IdempotencyClientReceipt *last; IdempotencyClientReceipt *last;
IdempotencyClientReceipt *receipt; IdempotencyClientReceipt *receipt;
char *buff_end; char *buff_end;
int count;
if (task->length > 0) { if (task->length > 0) {
*count = 0;
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"server %s:%d, task length: %d != 0, skip check " "server %s:%d, task length: %d != 0, skip check "
"and report receipt request!", __LINE__, "and report receipt request!", __LINE__,
@ -122,13 +121,11 @@ static int check_report_req_receipt(struct fast_task_info *task,
channel = (IdempotencyClientChannel *)task->arg; channel = (IdempotencyClientChannel *)task->arg;
if (channel->waiting_resp_qinfo.head != NULL) { if (channel->waiting_resp_qinfo.head != NULL) {
*count = 0;
return 0; return 0;
} }
fc_queue_pop_to_queue(&channel->queue, &channel->waiting_resp_qinfo); fc_queue_pop_to_queue(&channel->queue, &channel->waiting_resp_qinfo);
if (channel->waiting_resp_qinfo.head == NULL) { if (channel->waiting_resp_qinfo.head == NULL) {
*count = 0;
return 0; return 0;
} }
@ -163,15 +160,16 @@ static int check_report_req_receipt(struct fast_task_info *task,
channel->waiting_resp_qinfo.tail = last; channel->waiting_resp_qinfo.tail = last;
} }
*count = rbody - rstart; count = rbody - rstart;
int2buff(*count, rheader->count); int2buff(count, rheader->count);
task->length = (char *)rbody - task->data; task->length = (char *)rbody - task->data;
int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len);
header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ;
return sf_send_add_event(task); sf_send_add_event(task);
return count;
} }
static int close_channel_request(struct fast_task_info *task) static void close_channel_request(struct fast_task_info *task)
{ {
IdempotencyClientChannel *channel; IdempotencyClientChannel *channel;
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
@ -182,16 +180,16 @@ static int close_channel_request(struct fast_task_info *task)
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->data;
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0); SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0);
task->length = sizeof(SFCommonProtoHeader); task->length = sizeof(SFCommonProtoHeader);
return sf_send_add_event(task); sf_send_add_event(task);
} }
static int active_test_request(struct fast_task_info *task) static void active_test_request(struct fast_task_info *task)
{ {
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->data;
SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0); SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0);
task->length = sizeof(SFCommonProtoHeader); task->length = sizeof(SFCommonProtoHeader);
return sf_send_add_event(task); sf_send_add_event(task);
} }
static inline void update_lru_chain(struct fast_task_info *task) static inline void update_lru_chain(struct fast_task_info *task)
@ -205,18 +203,13 @@ static inline void update_lru_chain(struct fast_task_info *task)
fc_list_move_tail(&channel->dlink, &thread_ctx->head); fc_list_move_tail(&channel->dlink, &thread_ctx->head);
} }
static int report_req_receipt_request(struct fast_task_info *task, static void report_req_receipt_request(struct fast_task_info *task,
const bool update_lru) const bool update_lru)
{ {
int result;
int count; int count;
if ((result=check_report_req_receipt(task, &count)) != 0) { if ((count=check_report_req_receipt(task)) == 0) {
return result; sf_set_read_event(task); //trigger read event
}
if (count == 0) {
result = sf_set_read_event(task);
} else { } else {
((IdempotencyClientChannel *)task->arg)-> ((IdempotencyClientChannel *)task->arg)->
last_report_time = g_current_time; last_report_time = g_current_time;
@ -224,8 +217,6 @@ static int report_req_receipt_request(struct fast_task_info *task,
update_lru_chain(task); update_lru_chain(task);
} }
} }
return 0;
} }
static inline int receipt_expect_body_length(struct fast_task_info *task, static inline int receipt_expect_body_length(struct fast_task_info *task,
@ -334,14 +325,16 @@ static int receipt_deal_task(struct fast_task_info *task)
do { do {
stage = SF_NIO_TASK_STAGE_FETCH(task); stage = SF_NIO_TASK_STAGE_FETCH(task);
if (stage == SF_NIO_STAGE_HANDSHAKE) { if (stage == SF_NIO_STAGE_HANDSHAKE) {
result = setup_channel_request(task); setup_channel_request(task);
result = 0;
break; break;
} else if (stage == SF_NIO_STAGE_CONTINUE) { } else if (stage == SF_NIO_STAGE_CONTINUE) {
if (((IdempotencyClientChannel *)task->arg)->established) { if (((IdempotencyClientChannel *)task->arg)->established) {
result = report_req_receipt_request(task, true); report_req_receipt_request(task, true);
} else { } else {
result = 0; //just ignore sf_set_read_event(task); //trigger read event
} }
result = 0;
break; break;
} }
@ -391,7 +384,7 @@ static int receipt_deal_task(struct fast_task_info *task)
if (result == 0) { if (result == 0) {
update_lru_chain(task); update_lru_chain(task);
task->offset = task->length = 0; task->offset = task->length = 0;
result = report_req_receipt_request(task, false); report_req_receipt_request(task, false);
} }
} while (0); } while (0);