bugfixed: fastdfs issue #620

set notify.stage to SF_NIO_STAGE_NONE before deal_notified_task
fstore_storage_engine
YuQing 2023-02-23 10:24:24 +08:00
parent ac923ebaf8
commit 92fbcab0f4
2 changed files with 55 additions and 27 deletions

View File

@ -95,7 +95,8 @@ static inline int64_t sf_file_writer_get_last_version(
return writer->last_versions.done; return writer->last_versions.done;
} else { } else {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"should set writer flags to %d!", __LINE__, "writer: %s, should set writer flags to %d!",
__LINE__, writer->cfg.subdir_name,
SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION); SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION);
return -1; return -1;
} }

View File

@ -355,7 +355,6 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
"current stage: %d equals to the target, skip set", "current stage: %d equals to the target, skip set",
__LINE__, stage); __LINE__, stage);
return 0; return 0;
} else if (old_stage != SF_NIO_STAGE_NONE) { } else if (old_stage != SF_NIO_STAGE_NONE) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"current stage: %d != %d, skip set stage to %d", "current stage: %d != %d, skip set stage to %d",
@ -393,6 +392,24 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
return 0; return 0;
} }
static inline void deal_notified_task(struct fast_task_info *task,
const int stage)
{
if (!task->canceled) {
sf_nio_deal_task(task, stage);
} else {
if (stage == SF_NIO_STAGE_CONTINUE) {
if (task->continue_callback != NULL) {
task->continue_callback(task);
} else {
logWarning("file: "__FILE__", line: %d, "
"task %p, continue_callback is NULL",
__LINE__, task);
}
}
}
}
void sf_recv_notify_read(int sock, short event, void *arg) void sf_recv_notify_read(int sock, short event, void *arg)
{ {
int64_t n; int64_t n;
@ -408,9 +425,34 @@ void sf_recv_notify_read(int sock, short event, void *arg)
__LINE__, sock, errno, STRERROR(errno)); __LINE__, sock, errno, STRERROR(errno));
} }
if (SF_G_EPOLL_EDGE_TRIGGER) {
while (1) {
PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock);
if (thread_data->waiting_queue.head != NULL) {
task = thread_data->waiting_queue.head;
thread_data->waiting_queue.head = task->notify_next;
if (thread_data->waiting_queue.head == NULL) {
thread_data->waiting_queue.tail = NULL;
}
} else {
task = NULL;
}
PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock);
if (task != NULL) {
stage = FC_ATOMIC_GET(task->nio_stages.notify);
__sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE);
deal_notified_task(task, stage);
} else {
break;
}
}
} else {
PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock);
current = thread_data->waiting_queue.head; current = thread_data->waiting_queue.head;
thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL; thread_data->waiting_queue.head = NULL;
thread_data->waiting_queue.tail = NULL;
PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock);
while (current != NULL) { while (current != NULL) {
@ -418,28 +460,13 @@ void sf_recv_notify_read(int sock, short event, void *arg)
current = current->notify_next; current = current->notify_next;
stage = FC_ATOMIC_GET(task->nio_stages.notify); stage = FC_ATOMIC_GET(task->nio_stages.notify);
if (!task->canceled) {
if (stage == SF_NIO_STAGE_CONTINUE) { if (stage == SF_NIO_STAGE_CONTINUE) {
/* MUST set to SF_NIO_STAGE_NONE first for re-entry */ /* MUST set to SF_NIO_STAGE_NONE first for re-entry */
__sync_bool_compare_and_swap(&task->nio_stages.notify, __sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE); stage, SF_NIO_STAGE_NONE);
sf_nio_deal_task(task, stage); deal_notified_task(task, stage);
} else { } else {
sf_nio_deal_task(task, stage); deal_notified_task(task, stage);
__sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE);
}
} else {
if (stage != SF_NIO_STAGE_NONE) {
if (stage == SF_NIO_STAGE_CONTINUE) {
if (task->continue_callback != NULL) {
task->continue_callback(task);
} else {
logWarning("file: "__FILE__", line: %d, "
"task %p, continue_callback is NULL",
__LINE__, task);
}
}
__sync_bool_compare_and_swap(&task->nio_stages.notify, __sync_bool_compare_and_swap(&task->nio_stages.notify,
stage, SF_NIO_STAGE_NONE); stage, SF_NIO_STAGE_NONE);
} }