diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index 5832465..2f672a1 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -95,7 +95,8 @@ static inline int64_t sf_file_writer_get_last_version( return writer->last_versions.done; } else { logError("file: "__FILE__", line: %d, " - "should set writer flags to %d!", __LINE__, + "writer: %s, should set writer flags to %d!", + __LINE__, writer->cfg.subdir_name, SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION); return -1; } diff --git a/src/sf_nio.c b/src/sf_nio.c index 67fd9d4..e5fc842 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -355,7 +355,6 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) "current stage: %d equals to the target, skip set", __LINE__, stage); return 0; - } else if (old_stage != SF_NIO_STAGE_NONE) { logWarning("file: "__FILE__", line: %d, " "current stage: %d != %d, skip set stage to %d", @@ -393,6 +392,24 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) return 0; } +static inline void deal_notified_task(struct fast_task_info *task, + const int stage) +{ + if (!task->canceled) { + sf_nio_deal_task(task, stage); + } else { + if (stage == SF_NIO_STAGE_CONTINUE) { + if (task->continue_callback != NULL) { + task->continue_callback(task); + } else { + logWarning("file: "__FILE__", line: %d, " + "task %p, continue_callback is NULL", + __LINE__, task); + } + } + } +} + void sf_recv_notify_read(int sock, short event, void *arg) { int64_t n; @@ -408,38 +425,48 @@ void sf_recv_notify_read(int sock, short event, void *arg) __LINE__, sock, errno, STRERROR(errno)); } - PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); - current = thread_data->waiting_queue.head; - thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL; - PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); + if (SF_G_EPOLL_EDGE_TRIGGER) { + while (1) { + PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); + if (thread_data->waiting_queue.head != NULL) { + task = thread_data->waiting_queue.head; + thread_data->waiting_queue.head = task->notify_next; + if (thread_data->waiting_queue.head == NULL) { + thread_data->waiting_queue.tail = NULL; + } + } else { + task = NULL; + } + PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); - while (current != NULL) { - task = current; - current = current->notify_next; + if (task != NULL) { + stage = FC_ATOMIC_GET(task->nio_stages.notify); + __sync_bool_compare_and_swap(&task->nio_stages.notify, + stage, SF_NIO_STAGE_NONE); + deal_notified_task(task, stage); + } else { + break; + } + } + } else { + PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); + current = thread_data->waiting_queue.head; + thread_data->waiting_queue.head = NULL; + thread_data->waiting_queue.tail = NULL; + PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); - stage = FC_ATOMIC_GET(task->nio_stages.notify); - if (!task->canceled) { + while (current != NULL) { + task = current; + current = current->notify_next; + + stage = FC_ATOMIC_GET(task->nio_stages.notify); if (stage == SF_NIO_STAGE_CONTINUE) { /* MUST set to SF_NIO_STAGE_NONE first for re-entry */ __sync_bool_compare_and_swap(&task->nio_stages.notify, stage, SF_NIO_STAGE_NONE); - sf_nio_deal_task(task, stage); + deal_notified_task(task, stage); } else { - sf_nio_deal_task(task, stage); - __sync_bool_compare_and_swap(&task->nio_stages.notify, - stage, SF_NIO_STAGE_NONE); - } - } else { - if (stage != SF_NIO_STAGE_NONE) { - if (stage == SF_NIO_STAGE_CONTINUE) { - if (task->continue_callback != NULL) { - task->continue_callback(task); - } else { - logWarning("file: "__FILE__", line: %d, " - "task %p, continue_callback is NULL", - __LINE__, task); - } - } + deal_notified_task(task, stage); __sync_bool_compare_and_swap(&task->nio_stages.notify, stage, SF_NIO_STAGE_NONE); }