diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 592beaf..1f58f62 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -73,6 +73,7 @@ struct nio_thread_data volatile int64_t counter; } notify; //for thread notify + int timeout_ms; //for restore struct fc_list_head polling_queue; //for RDMA busy polling }; diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index ef6f499..1c18fe5 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -91,7 +91,7 @@ static void deal_timeouts(FastTimerEntry *head) } } -int ioevent_loop(struct nio_thread_data *pThreadData, +int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag) { @@ -102,15 +102,17 @@ int ioevent_loop(struct nio_thread_data *pThreadData, time_t last_check_time; int save_extra_events; int count; + uint32_t sched_counter; + bool sched_pull; memset(&ev_notify, 0, sizeof(ev_notify)); - ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData); + ev_notify.event.fd = FC_NOTIFY_READ_FD(thread_data); ev_notify.event.callback = recv_notify_callback; - ev_notify.thread_data = pThreadData; + ev_notify.thread_data = thread_data; - save_extra_events = pThreadData->ev_puller.extra_events; - pThreadData->ev_puller.extra_events = 0; //disable edge trigger temporarily - if (ioevent_attach(&pThreadData->ev_puller, ev_notify. + save_extra_events = thread_data->ev_puller.extra_events; + thread_data->ev_puller.extra_events = 0; //disable edge trigger temporarily + if (ioevent_attach(&thread_data->ev_puller, ev_notify. event.fd, IOEVENT_READ, &ev_notify) != 0) { result = errno != 0 ? errno : ENOMEM; @@ -119,43 +121,63 @@ int ioevent_loop(struct nio_thread_data *pThreadData, __LINE__, result, STRERROR(result)); return result; } - pThreadData->ev_puller.extra_events = save_extra_events; //restore + thread_data->ev_puller.extra_events = save_extra_events; //restore - pThreadData->deleted_list = NULL; + sched_counter = 0; + thread_data->deleted_list = NULL; last_check_time = g_current_time; while (*continue_flag) { - pThreadData->ev_puller.iterator.count = ioevent_poll( - &pThreadData->ev_puller); - if (pThreadData->ev_puller.iterator.count > 0) - { - deal_ioevents(&pThreadData->ev_puller); - } - else if (pThreadData->ev_puller.iterator.count < 0) - { - result = errno != 0 ? errno : EINVAL; - if (result != EINTR) - { - logError("file: "__FILE__", line: %d, " \ - "ioevent_poll fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - } + if (thread_data->ev_puller.timeout == 0) + { + sched_pull = (sched_counter++ & 8) != 0; + } else { + sched_pull = true; + } - if (pThreadData->deleted_list != NULL) + if (sched_pull) + { + thread_data->ev_puller.iterator.count = ioevent_poll( + &thread_data->ev_puller); + if (thread_data->ev_puller.iterator.count > 0) + { + deal_ioevents(&thread_data->ev_puller); + } + else if (thread_data->ev_puller.iterator.count < 0) + { + result = errno != 0 ? errno : EINVAL; + if (result != EINTR) + { + logError("file: "__FILE__", line: %d, " \ + "ioevent_poll fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + return result; + } + } + } + + if (thread_data->busy_polling_callback != NULL) + { + thread_data->busy_polling_callback(thread_data); + } + + if (thread_data->deleted_list != NULL) { count = 0; - while (pThreadData->deleted_list != NULL) + while (thread_data->deleted_list != NULL) { - task = pThreadData->deleted_list; - pThreadData->deleted_list = task->next; + task = thread_data->deleted_list; + thread_data->deleted_list = task->next; if (task->polling.in_queue) { fc_list_del_init(&task->polling.dlink); task->polling.in_queue = false; + if (fc_list_empty(&task->thread_data->polling_queue)) { + ioevent_set_timeout(&task->thread_data->ev_puller, + task->thread_data->timeout_ms); + } } clean_up_callback(task); count++; @@ -167,35 +189,31 @@ int ioevent_loop(struct nio_thread_data *pThreadData, { last_check_time = g_current_time; count = fast_timer_timeouts_get( - &pThreadData->timer, g_current_time, &head); + &thread_data->timer, g_current_time, &head); if (count > 0) { deal_timeouts(&head); } } - if (pThreadData->notify.enabled) + if (thread_data->notify.enabled) { int64_t n; - if ((n=__sync_fetch_and_add(&pThreadData->notify.counter, 0)) != 0) + if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0) { - __sync_fetch_and_sub(&pThreadData->notify.counter, n); + __sync_fetch_and_sub(&thread_data->notify.counter, n); /* logInfo("file: "__FILE__", line: %d, " "n ==== %"PRId64", now: %"PRId64, __LINE__, n, __sync_fetch_and_add( - &pThreadData->notify.counter, 0)); + &thread_data->notify.counter, 0)); */ } } - if (pThreadData->thread_loop_callback != NULL) + if (thread_data->thread_loop_callback != NULL) { - pThreadData->thread_loop_callback(pThreadData); - } - if (pThreadData->busy_polling_callback != NULL) - { - pThreadData->busy_polling_callback(pThreadData); + thread_data->thread_loop_callback(thread_data); } } diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index c176126..cce2cb6 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -22,7 +22,7 @@ extern "C" { #endif -int ioevent_loop(struct nio_thread_data *pThreadData, +int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag);