restore epoll timeout when polling_queue is empty

support_rdma
YuQing 2023-09-19 09:30:11 +08:00
parent 70c44ea490
commit 1c1cb6d5e7
3 changed files with 61 additions and 42 deletions

View File

@ -73,6 +73,7 @@ struct nio_thread_data
volatile int64_t counter;
} notify; //for thread notify
int timeout_ms; //for restore
struct fc_list_head polling_queue; //for RDMA busy polling
};

View File

@ -91,7 +91,7 @@ static void deal_timeouts(FastTimerEntry *head)
}
}
int ioevent_loop(struct nio_thread_data *pThreadData,
int ioevent_loop(struct nio_thread_data *thread_data,
IOEventCallback recv_notify_callback, TaskCleanUpCallback
clean_up_callback, volatile bool *continue_flag)
{
@ -102,15 +102,17 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
time_t last_check_time;
int save_extra_events;
int count;
uint32_t sched_counter;
bool sched_pull;
memset(&ev_notify, 0, sizeof(ev_notify));
ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData);
ev_notify.event.fd = FC_NOTIFY_READ_FD(thread_data);
ev_notify.event.callback = recv_notify_callback;
ev_notify.thread_data = pThreadData;
ev_notify.thread_data = thread_data;
save_extra_events = pThreadData->ev_puller.extra_events;
pThreadData->ev_puller.extra_events = 0; //disable edge trigger temporarily
if (ioevent_attach(&pThreadData->ev_puller, ev_notify.
save_extra_events = thread_data->ev_puller.extra_events;
thread_data->ev_puller.extra_events = 0; //disable edge trigger temporarily
if (ioevent_attach(&thread_data->ev_puller, ev_notify.
event.fd, IOEVENT_READ, &ev_notify) != 0)
{
result = errno != 0 ? errno : ENOMEM;
@ -119,43 +121,63 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
__LINE__, result, STRERROR(result));
return result;
}
pThreadData->ev_puller.extra_events = save_extra_events; //restore
thread_data->ev_puller.extra_events = save_extra_events; //restore
pThreadData->deleted_list = NULL;
sched_counter = 0;
thread_data->deleted_list = NULL;
last_check_time = g_current_time;
while (*continue_flag)
{
pThreadData->ev_puller.iterator.count = ioevent_poll(
&pThreadData->ev_puller);
if (pThreadData->ev_puller.iterator.count > 0)
{
deal_ioevents(&pThreadData->ev_puller);
}
else if (pThreadData->ev_puller.iterator.count < 0)
{
result = errno != 0 ? errno : EINVAL;
if (result != EINTR)
{
logError("file: "__FILE__", line: %d, " \
"ioevent_poll fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
}
if (thread_data->ev_puller.timeout == 0)
{
sched_pull = (sched_counter++ & 8) != 0;
} else {
sched_pull = true;
}
if (pThreadData->deleted_list != NULL)
if (sched_pull)
{
thread_data->ev_puller.iterator.count = ioevent_poll(
&thread_data->ev_puller);
if (thread_data->ev_puller.iterator.count > 0)
{
deal_ioevents(&thread_data->ev_puller);
}
else if (thread_data->ev_puller.iterator.count < 0)
{
result = errno != 0 ? errno : EINVAL;
if (result != EINTR)
{
logError("file: "__FILE__", line: %d, " \
"ioevent_poll fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
}
}
if (thread_data->busy_polling_callback != NULL)
{
thread_data->busy_polling_callback(thread_data);
}
if (thread_data->deleted_list != NULL)
{
count = 0;
while (pThreadData->deleted_list != NULL)
while (thread_data->deleted_list != NULL)
{
task = pThreadData->deleted_list;
pThreadData->deleted_list = task->next;
task = thread_data->deleted_list;
thread_data->deleted_list = task->next;
if (task->polling.in_queue)
{
fc_list_del_init(&task->polling.dlink);
task->polling.in_queue = false;
if (fc_list_empty(&task->thread_data->polling_queue)) {
ioevent_set_timeout(&task->thread_data->ev_puller,
task->thread_data->timeout_ms);
}
}
clean_up_callback(task);
count++;
@ -167,35 +189,31 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
{
last_check_time = g_current_time;
count = fast_timer_timeouts_get(
&pThreadData->timer, g_current_time, &head);
&thread_data->timer, g_current_time, &head);
if (count > 0)
{
deal_timeouts(&head);
}
}
if (pThreadData->notify.enabled)
if (thread_data->notify.enabled)
{
int64_t n;
if ((n=__sync_fetch_and_add(&pThreadData->notify.counter, 0)) != 0)
if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0)
{
__sync_fetch_and_sub(&pThreadData->notify.counter, n);
__sync_fetch_and_sub(&thread_data->notify.counter, n);
/*
logInfo("file: "__FILE__", line: %d, "
"n ==== %"PRId64", now: %"PRId64,
__LINE__, n, __sync_fetch_and_add(
&pThreadData->notify.counter, 0));
&thread_data->notify.counter, 0));
*/
}
}
if (pThreadData->thread_loop_callback != NULL)
if (thread_data->thread_loop_callback != NULL)
{
pThreadData->thread_loop_callback(pThreadData);
}
if (pThreadData->busy_polling_callback != NULL)
{
pThreadData->busy_polling_callback(pThreadData);
thread_data->thread_loop_callback(thread_data);
}
}

View File

@ -22,7 +22,7 @@
extern "C" {
#endif
int ioevent_loop(struct nio_thread_data *pThreadData,
int ioevent_loop(struct nio_thread_data *thread_data,
IOEventCallback recv_notify_callback, TaskCleanUpCallback
clean_up_callback, volatile bool *continue_flag);