From 294ad5e636e37aa1b5fd26461ebe91f059d98ee6 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 12 Feb 2023 19:47:31 +0800 Subject: [PATCH] use field notify_next for notify queue of nio thread --- src/sf_nio.c | 63 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/src/sf_nio.c b/src/sf_nio.c index 659d4c7..1800cb7 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -95,6 +95,8 @@ static inline void release_iovec_buffer(struct fast_task_info *task) void sf_task_finish_clean_up(struct fast_task_info *task) { + //int next_stage; + /* assert(task->event.fd >= 0); if (task->event.fd < 0) { @@ -110,6 +112,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task) task->finish_callback = NULL; } + /* + if (SF_G_EPOLL_EDGE_TRIGGER && (next_stage=FC_ATOMIC_GET(task-> + nio_stages.next)) != SF_NIO_STAGE_NONE) + { + logWarning("file: "__FILE__", line: %d, " + "task: %p, nio_stages.next: %d != 0", + __LINE__, task, FC_ATOMIC_GET(task->nio_stages.next)); + __sync_bool_compare_and_swap(&task->nio_stages.next, + next_stage, SF_NIO_STAGE_NONE); + } + */ + release_iovec_buffer(task); sf_task_detach_thread(task); @@ -177,14 +191,20 @@ int sf_set_read_event(struct fast_task_info *task) if ((result=setup_read_event(task)) != 0) { return result; } + return 0; if (SF_G_EPOLL_EDGE_TRIGGER) { if (FC_ATOMIC_GET(task->nio_stages.notify) == SF_NIO_STAGE_SEND) { - __sync_bool_compare_and_swap(&task->nio_stages.next, - SF_NIO_STAGE_NONE, SF_NIO_STAGE_RECV); - return 0; + if (__sync_bool_compare_and_swap(&task->nio_stages.next, + SF_NIO_STAGE_NONE, SF_NIO_STAGE_RECV)) + { + return 0; + } else { + return EAGAIN; + } } else { return sf_nio_notify(task, SF_NIO_STAGE_RECV); + return 0; } } else { return 0; @@ -299,8 +319,11 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) break; case SF_NIO_STAGE_RECV: if ((result=setup_read_event(task)) == 0) { - sf_client_sock_read(task->event.fd, - IOEVENT_READ, task); + if (sf_client_sock_read(task->event.fd, + IOEVENT_READ, task) < 0) + { + result = errno != 0 ? errno : EIO; + } } break; case SF_NIO_STAGE_SEND: @@ -322,14 +345,18 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) break; default: logError("file: "__FILE__", line: %d, " - "client ip: %s, invalid notify stage: %d", - __LINE__, task->client_ip, stage); + "client ip: %s, task: %p, sock: %d, invalid notify stage: %d", + __LINE__, task->client_ip, task, task->event.fd, stage); result = -EINVAL; break; } if (result < 0) { ioevent_add_to_deleted_list(task); + } else if (result > 0) { + if (stage == SF_NIO_STAGE_RECV || stage == SF_NIO_STAGE_SEND) { + return -1 * result; + } } return result; @@ -370,6 +397,9 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) __LINE__, stage); return 0; + } else if (old_stage == SF_NIO_STAGE_NONE) { + continue; + /* } else if (SF_G_EPOLL_EDGE_TRIGGER && ( (old_stage == SF_NIO_STAGE_RECV && stage == SF_NIO_STAGE_SEND) || @@ -379,7 +409,8 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) __sync_bool_compare_and_swap(&task->nio_stages. next, SF_NIO_STAGE_NONE, stage); return 0; - } else if (old_stage != SF_NIO_STAGE_NONE) { + */ + } else { logWarning("file: "__FILE__", line: %d, " "current stage: %d != %d, skip set stage to %d", __LINE__, old_stage, SF_NIO_STAGE_NONE, stage); @@ -388,13 +419,12 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) } PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock); - task->next = NULL; - + task->notify_next = NULL; if (task->thread_data->waiting_queue.tail == NULL) { task->thread_data->waiting_queue.head = task; notify = true; } else { - task->thread_data->waiting_queue.tail->next = task; + task->thread_data->waiting_queue.tail->notify_next = task; notify = false; } task->thread_data->waiting_queue.tail = task; @@ -422,6 +452,7 @@ void sf_recv_notify_read(int sock, short event, void *arg) int64_t n; int stage; int next; + int result; struct nio_thread_data *thread_data; struct fast_task_info *task; struct fast_task_info *current; @@ -440,7 +471,7 @@ void sf_recv_notify_read(int sock, short event, void *arg) while (current != NULL) { task = current; - current = current->next; + current = current->notify_next; stage = FC_ATOMIC_GET(task->nio_stages.notify); if (!task->canceled) { @@ -448,17 +479,17 @@ void sf_recv_notify_read(int sock, short event, void *arg) /* MUST set to SF_NIO_STAGE_NONE first for re-entry */ __sync_bool_compare_and_swap(&task->nio_stages.notify, stage, SF_NIO_STAGE_NONE); - sf_nio_deal_task(task, stage); + result = sf_nio_deal_task(task, stage); } else { - sf_nio_deal_task(task, stage); + result = sf_nio_deal_task(task, stage); __sync_bool_compare_and_swap(&task->nio_stages.notify, stage, SF_NIO_STAGE_NONE); } - if (SF_G_EPOLL_EDGE_TRIGGER) { + if (SF_G_EPOLL_EDGE_TRIGGER && result >= 0) { next = FC_ATOMIC_GET(task->nio_stages.next); if (next != SF_NIO_STAGE_NONE) { - sf_nio_notify(task, next); + //sf_nio_notify(task, next); __sync_bool_compare_and_swap(&task->nio_stages.next, next, SF_NIO_STAGE_NONE); }