use field notify_next for notify queue of nio thread

fstore_storage_engine
YuQing 2023-02-12 19:47:31 +08:00
parent 7f758fd293
commit 294ad5e636
1 changed files with 47 additions and 16 deletions

View File

@ -95,6 +95,8 @@ static inline void release_iovec_buffer(struct fast_task_info *task)
void sf_task_finish_clean_up(struct fast_task_info *task) void sf_task_finish_clean_up(struct fast_task_info *task)
{ {
//int next_stage;
/* /*
assert(task->event.fd >= 0); assert(task->event.fd >= 0);
if (task->event.fd < 0) { if (task->event.fd < 0) {
@ -110,6 +112,18 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
task->finish_callback = NULL; task->finish_callback = NULL;
} }
/*
if (SF_G_EPOLL_EDGE_TRIGGER && (next_stage=FC_ATOMIC_GET(task->
nio_stages.next)) != SF_NIO_STAGE_NONE)
{
logWarning("file: "__FILE__", line: %d, "
"task: %p, nio_stages.next: %d != 0",
__LINE__, task, FC_ATOMIC_GET(task->nio_stages.next));
__sync_bool_compare_and_swap(&task->nio_stages.next,
next_stage, SF_NIO_STAGE_NONE);
}
*/
release_iovec_buffer(task); release_iovec_buffer(task);
sf_task_detach_thread(task); sf_task_detach_thread(task);
@ -177,14 +191,20 @@ int sf_set_read_event(struct fast_task_info *task)
if ((result=setup_read_event(task)) != 0) { if ((result=setup_read_event(task)) != 0) {
return result; return result;
} }
return 0;
if (SF_G_EPOLL_EDGE_TRIGGER) { if (SF_G_EPOLL_EDGE_TRIGGER) {
if (FC_ATOMIC_GET(task->nio_stages.notify) == SF_NIO_STAGE_SEND) { if (FC_ATOMIC_GET(task->nio_stages.notify) == SF_NIO_STAGE_SEND) {
__sync_bool_compare_and_swap(&task->nio_stages.next, if (__sync_bool_compare_and_swap(&task->nio_stages.next,
SF_NIO_STAGE_NONE, SF_NIO_STAGE_RECV); SF_NIO_STAGE_NONE, SF_NIO_STAGE_RECV))
return 0; {
return 0;
} else {
return EAGAIN;
}
} else { } else {
return sf_nio_notify(task, SF_NIO_STAGE_RECV); return sf_nio_notify(task, SF_NIO_STAGE_RECV);
return 0;
} }
} else { } else {
return 0; return 0;
@ -299,8 +319,11 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
break; break;
case SF_NIO_STAGE_RECV: case SF_NIO_STAGE_RECV:
if ((result=setup_read_event(task)) == 0) { if ((result=setup_read_event(task)) == 0) {
sf_client_sock_read(task->event.fd, if (sf_client_sock_read(task->event.fd,
IOEVENT_READ, task); IOEVENT_READ, task) < 0)
{
result = errno != 0 ? errno : EIO;
}
} }
break; break;
case SF_NIO_STAGE_SEND: case SF_NIO_STAGE_SEND:
@ -322,14 +345,18 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
break; break;
default: default:
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, invalid notify stage: %d", "client ip: %s, task: %p, sock: %d, invalid notify stage: %d",
__LINE__, task->client_ip, stage); __LINE__, task->client_ip, task, task->event.fd, stage);
result = -EINVAL; result = -EINVAL;
break; break;
} }
if (result < 0) { if (result < 0) {
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
} else if (result > 0) {
if (stage == SF_NIO_STAGE_RECV || stage == SF_NIO_STAGE_SEND) {
return -1 * result;
}
} }
return result; return result;
@ -370,6 +397,9 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
__LINE__, stage); __LINE__, stage);
return 0; return 0;
} else if (old_stage == SF_NIO_STAGE_NONE) {
continue;
/*
} else if (SF_G_EPOLL_EDGE_TRIGGER && ( } else if (SF_G_EPOLL_EDGE_TRIGGER && (
(old_stage == SF_NIO_STAGE_RECV && (old_stage == SF_NIO_STAGE_RECV &&
stage == SF_NIO_STAGE_SEND) || stage == SF_NIO_STAGE_SEND) ||
@ -379,7 +409,8 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
__sync_bool_compare_and_swap(&task->nio_stages. __sync_bool_compare_and_swap(&task->nio_stages.
next, SF_NIO_STAGE_NONE, stage); next, SF_NIO_STAGE_NONE, stage);
return 0; return 0;
} else if (old_stage != SF_NIO_STAGE_NONE) { */
} else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"current stage: %d != %d, skip set stage to %d", "current stage: %d != %d, skip set stage to %d",
__LINE__, old_stage, SF_NIO_STAGE_NONE, stage); __LINE__, old_stage, SF_NIO_STAGE_NONE, stage);
@ -388,13 +419,12 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
} }
PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock); PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock);
task->next = NULL; task->notify_next = NULL;
if (task->thread_data->waiting_queue.tail == NULL) { if (task->thread_data->waiting_queue.tail == NULL) {
task->thread_data->waiting_queue.head = task; task->thread_data->waiting_queue.head = task;
notify = true; notify = true;
} else { } else {
task->thread_data->waiting_queue.tail->next = task; task->thread_data->waiting_queue.tail->notify_next = task;
notify = false; notify = false;
} }
task->thread_data->waiting_queue.tail = task; task->thread_data->waiting_queue.tail = task;
@ -422,6 +452,7 @@ void sf_recv_notify_read(int sock, short event, void *arg)
int64_t n; int64_t n;
int stage; int stage;
int next; int next;
int result;
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
struct fast_task_info *task; struct fast_task_info *task;
struct fast_task_info *current; struct fast_task_info *current;
@ -440,7 +471,7 @@ void sf_recv_notify_read(int sock, short event, void *arg)
while (current != NULL) { while (current != NULL) {
task = current; task = current;
current = current->next; current = current->notify_next;
stage = FC_ATOMIC_GET(task->nio_stages.notify); stage = FC_ATOMIC_GET(task->nio_stages.notify);
if (!task->canceled) { if (!task->canceled) {
@ -448,17 +479,17 @@ void sf_recv_notify_read(int sock, short event, void *arg)
/* MUST set to SF_NIO_STAGE_NONE first for re-entry */ /* MUST set to SF_NIO_STAGE_NONE first for re-entry */
__sync_bool_compare_and_swap(&task->nio_stages.notify, __sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE); stage, SF_NIO_STAGE_NONE);
sf_nio_deal_task(task, stage); result = sf_nio_deal_task(task, stage);
} else { } else {
sf_nio_deal_task(task, stage); result = sf_nio_deal_task(task, stage);
__sync_bool_compare_and_swap(&task->nio_stages.notify, __sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE); stage, SF_NIO_STAGE_NONE);
} }
if (SF_G_EPOLL_EDGE_TRIGGER) { if (SF_G_EPOLL_EDGE_TRIGGER && result >= 0) {
next = FC_ATOMIC_GET(task->nio_stages.next); next = FC_ATOMIC_GET(task->nio_stages.next);
if (next != SF_NIO_STAGE_NONE) { if (next != SF_NIO_STAGE_NONE) {
sf_nio_notify(task, next); //sf_nio_notify(task, next);
__sync_bool_compare_and_swap(&task->nio_stages.next, __sync_bool_compare_and_swap(&task->nio_stages.next,
next, SF_NIO_STAGE_NONE); next, SF_NIO_STAGE_NONE);
} }