ioevent.[hc] and ioevent_loop.[hc] support io_uring

use_iouring
YuQing 2025-09-17 03:49:21 +08:00
parent ec8e47f831
commit 47fa7f99df
6 changed files with 267 additions and 129 deletions

17
make.sh
View File

@ -112,7 +112,22 @@ HAVE_VMMETER_H=0
HAVE_USER_H=0 HAVE_USER_H=0
if [ "$uname" = "Linux" ]; then if [ "$uname" = "Linux" ]; then
OS_NAME=OS_LINUX OS_NAME=OS_LINUX
IOEVENT_USE=IOEVENT_USE_EPOLL
major_version=$(uname -r | awk -F . '{print $1;}')
minor_version=$(uname -r | awk -F . '{print $2;}')
if [ $major_version -eq 5 ] && [ $minor_version -ge 14 ]; then
out=$(grep -F IORING_OP_SEND_ZC /usr/include/liburing/io_uring.h)
if [ -z $out ]; then
IOEVENT_USE=IOEVENT_USE_EPOLL
else
IOEVENT_USE=IOEVENT_USE_URING
fi
elif [ $major_version -gt 5 ]; then
IOEVENT_USE=IOEVENT_USE_URING
else
IOEVENT_USE=IOEVENT_USE_EPOLL
fi
if [ $glibc_minor -lt 17 ]; then if [ $glibc_minor -lt 17 ]; then
LIBS="$LIBS -lrt" LIBS="$LIBS -lrt"
fi fi

View File

@ -48,48 +48,65 @@ int kqueue_ev_convert(int16_t event, uint16_t flags)
int ioevent_init(IOEventPoller *ioevent, const int size, int ioevent_init(IOEventPoller *ioevent, const int size,
const int timeout_ms, const int extra_events) const int timeout_ms, const int extra_events)
{ {
int bytes; #if IOEVENT_USE_URING
int result;
#else
int bytes;
ioevent->size = size; ioevent->iterator.index = 0;
ioevent->extra_events = extra_events; ioevent->iterator.count = 0;
ioevent->iterator.index = 0;
ioevent->iterator.count = 0;
#if IOEVENT_USE_EPOLL
ioevent->poll_fd = epoll_create(ioevent->size);
if (ioevent->poll_fd < 0) {
return errno != 0 ? errno : ENOMEM;
}
bytes = sizeof(struct epoll_event) * size;
ioevent->events = (struct epoll_event *)fc_malloc(bytes);
#elif IOEVENT_USE_KQUEUE
ioevent->poll_fd = kqueue();
if (ioevent->poll_fd < 0) {
return errno != 0 ? errno : ENOMEM;
}
bytes = sizeof(struct kevent) * size;
ioevent->events = (struct kevent *)fc_malloc(bytes);
#elif IOEVENT_USE_PORT
ioevent->poll_fd = port_create();
if (ioevent->poll_fd < 0) {
return errno != 0 ? errno : ENOMEM;
}
bytes = sizeof(port_event_t) * size;
ioevent->events = (port_event_t *)fc_malloc(bytes);
#endif #endif
if (ioevent->events == NULL) { ioevent->size = size;
close(ioevent->poll_fd); ioevent->extra_events = extra_events;
ioevent->poll_fd = -1;
return ENOMEM;
}
ioevent_set_timeout(ioevent, timeout_ms);
return 0; #if IOEVENT_USE_EPOLL
ioevent->poll_fd = epoll_create(ioevent->size);
if (ioevent->poll_fd < 0) {
return errno != 0 ? errno : ENOMEM;
}
bytes = sizeof(struct epoll_event) * size;
ioevent->events = (struct epoll_event *)fc_malloc(bytes);
#elif IOEVENT_USE_URING
if ((result=io_uring_queue_init(size, &ioevent->ring, 0)) < 0) {
return -result;
}
ioevent->cqe = NULL;
#elif IOEVENT_USE_KQUEUE
ioevent->poll_fd = kqueue();
if (ioevent->poll_fd < 0) {
return errno != 0 ? errno : ENOMEM;
}
bytes = sizeof(struct kevent) * size;
ioevent->events = (struct kevent *)fc_malloc(bytes);
#elif IOEVENT_USE_PORT
ioevent->poll_fd = port_create();
if (ioevent->poll_fd < 0) {
return errno != 0 ? errno : ENOMEM;
}
bytes = sizeof(port_event_t) * size;
ioevent->events = (port_event_t *)fc_malloc(bytes);
#endif
#if IOEVENT_USE_URING
#else
if (ioevent->events == NULL) {
close(ioevent->poll_fd);
ioevent->poll_fd = -1;
return ENOMEM;
}
#endif
ioevent_set_timeout(ioevent, timeout_ms);
return 0;
} }
void ioevent_destroy(IOEventPoller *ioevent) void ioevent_destroy(IOEventPoller *ioevent)
{ {
#if IOEVENT_USE_URING
io_uring_queue_exit(&ioevent->ring);
#else
if (ioevent->events != NULL) { if (ioevent->events != NULL) {
free(ioevent->events); free(ioevent->events);
ioevent->events = NULL; ioevent->events = NULL;
@ -99,10 +116,11 @@ void ioevent_destroy(IOEventPoller *ioevent)
close(ioevent->poll_fd); close(ioevent->poll_fd);
ioevent->poll_fd = -1; ioevent->poll_fd = -1;
} }
#endif
} }
int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, int ioevent_attach(IOEventPoller *ioevent, const int fd,
void *data) const int e, void *data)
{ {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
struct epoll_event ev; struct epoll_event ev;
@ -110,6 +128,14 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e,
ev.events = e | ioevent->extra_events; ev.events = e | ioevent->extra_events;
ev.data.ptr = data; ev.data.ptr = data;
return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_ADD, fd, &ev); return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_ADD, fd, &ev);
#elif IOEVENT_USE_URING
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
return ENOSPC;
}
sqe->user_data = (long)data;
io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events);
return ioevent_uring_submit(ioevent);
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent ev[2]; struct kevent ev[2];
int n = 0; int n = 0;
@ -128,8 +154,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e,
#endif #endif
} }
int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, int ioevent_modify(IOEventPoller *ioevent, const int fd,
void *data) const int e, void *data)
{ {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
struct epoll_event ev; struct epoll_event ev;
@ -137,6 +163,15 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e,
ev.events = e | ioevent->extra_events; ev.events = e | ioevent->extra_events;
ev.data.ptr = data; ev.data.ptr = data;
return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_MOD, fd, &ev); return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_MOD, fd, &ev);
#elif IOEVENT_USE_URING
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
return ENOSPC;
}
sqe->user_data = (long)data;
io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data,
e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS);
return ioevent_uring_submit(ioevent);
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent ev[2]; struct kevent ev[2];
int result; int result;
@ -173,6 +208,15 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd)
{ {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL); return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL);
#elif IOEVENT_USE_URING
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
return ENOSPC;
}
sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
sqe->user_data = 0;
io_uring_prep_cancel_fd(sqe, fd, 0);
return ioevent_uring_submit(ioevent);
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent ev[1]; struct kevent ev[1];
int r, w; int r, w;
@ -192,9 +236,20 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd)
int ioevent_poll(IOEventPoller *ioevent) int ioevent_poll(IOEventPoller *ioevent)
{ {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
return epoll_wait(ioevent->poll_fd, ioevent->events, ioevent->size, ioevent->timeout); return epoll_wait(ioevent->poll_fd, ioevent->events,
ioevent->size, ioevent->timeout);
#elif IOEVENT_USE_URING
int result;
result = io_uring_wait_cqe_timeout(&ioevent->ring,
&ioevent->cqe, &ioevent->timeout);
if (result < 0) {
errno = -result;
return -1;
}
return 0;
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, ioevent->size, &ioevent->timeout); return kevent(ioevent->poll_fd, NULL, 0, ioevent->events,
ioevent->size, &ioevent->timeout);
#elif IOEVENT_USE_PORT #elif IOEVENT_USE_PORT
int result; int result;
int retval; int retval;

View File

@ -31,6 +31,12 @@
#define IOEVENT_WRITE EPOLLOUT #define IOEVENT_WRITE EPOLLOUT
#define IOEVENT_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) #define IOEVENT_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP)
#elif IOEVENT_USE_URING
#include <liburing.h>
#define IOEVENT_READ POLLIN
#define IOEVENT_WRITE POLLOUT
#define IOEVENT_ERROR (POLLERR | POLLPRI | POLLHUP)
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#include <sys/event.h> #include <sys/event.h>
#include <sys/poll.h> #include <sys/poll.h>
@ -67,16 +73,22 @@ int kqueue_ev_convert(int16_t event, uint16_t flags);
typedef struct ioevent_puller { typedef struct ioevent_puller {
int size; //max events (fd) int size; //max events (fd)
int extra_events; int extra_events;
#if IOEVENT_USE_URING
struct io_uring ring;
#else
int poll_fd; int poll_fd;
struct { struct {
int index; int index;
int count; int count;
} iterator; //for deal event loop } iterator; //for deal event loop
#endif
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
struct epoll_event *events; struct epoll_event *events;
int timeout; int timeout; //in milliseconds
#elif IOEVENT_USE_URING
struct io_uring_cqe *cqe;
struct __kernel_timespec timeout;
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent *events; struct kevent *events;
struct timespec timeout; struct timespec timeout;
@ -84,11 +96,18 @@ typedef struct ioevent_puller {
port_event_t *events; port_event_t *events;
timespec_t timeout; timespec_t timeout;
#endif #endif
#ifdef OS_LINUX
bool zero_timeout;
#endif
} IOEventPoller; } IOEventPoller;
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
#define IOEVENT_GET_EVENTS(ioevent, index) \ #define IOEVENT_GET_EVENTS(ioevent, index) \
(ioevent)->events[index].events (ioevent)->events[index].events
#elif IOEVENT_USE_URING
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \ #define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \
(ioevent)->events[index].filter, (ioevent)->events[index].flags) (ioevent)->events[index].filter, (ioevent)->events[index].flags)
@ -102,6 +121,8 @@ typedef struct ioevent_puller {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
#define IOEVENT_GET_DATA(ioevent, index) \ #define IOEVENT_GET_DATA(ioevent, index) \
(ioevent)->events[index].data.ptr (ioevent)->events[index].data.ptr
#elif IOEVENT_USE_URING
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#define IOEVENT_GET_DATA(ioevent, index) \ #define IOEVENT_GET_DATA(ioevent, index) \
(ioevent)->events[index].udata (ioevent)->events[index].udata
@ -115,6 +136,8 @@ typedef struct ioevent_puller {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
#define IOEVENT_CLEAR_DATA(ioevent, index) \ #define IOEVENT_CLEAR_DATA(ioevent, index) \
(ioevent)->events[index].data.ptr = NULL (ioevent)->events[index].data.ptr = NULL
#elif IOEVENT_USE_URING
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#define IOEVENT_CLEAR_DATA(ioevent, index) \ #define IOEVENT_CLEAR_DATA(ioevent, index) \
(ioevent)->events[index].udata = NULL (ioevent)->events[index].udata = NULL
@ -133,14 +156,15 @@ int ioevent_init(IOEventPoller *ioevent, const int size,
const int timeout_ms, const int extra_events); const int timeout_ms, const int extra_events);
void ioevent_destroy(IOEventPoller *ioevent); void ioevent_destroy(IOEventPoller *ioevent);
int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, int ioevent_attach(IOEventPoller *ioevent, const int fd,
void *data); const int e, void *data);
int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, int ioevent_modify(IOEventPoller *ioevent, const int fd,
void *data); const int e, void *data);
int ioevent_detach(IOEventPoller *ioevent, const int fd); int ioevent_detach(IOEventPoller *ioevent, const int fd);
int ioevent_poll(IOEventPoller *ioevent); int ioevent_poll(IOEventPoller *ioevent);
static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout_ms) static inline void ioevent_set_timeout(IOEventPoller *ioevent,
const int timeout_ms)
{ {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
ioevent->timeout = timeout_ms; ioevent->timeout = timeout_ms;
@ -148,6 +172,11 @@ static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout
ioevent->timeout.tv_sec = timeout_ms / 1000; ioevent->timeout.tv_sec = timeout_ms / 1000;
ioevent->timeout.tv_nsec = 1000000 * (timeout_ms % 1000); ioevent->timeout.tv_nsec = 1000000 * (timeout_ms % 1000);
#endif #endif
#ifdef OS_LINUX
ioevent->zero_timeout = (timeout_ms == 0);
#endif
} }
static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms)
@ -156,6 +185,23 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms)
return ioevent_poll(ioevent); return ioevent_poll(ioevent);
} }
#if IOEVENT_USE_URING
static inline int ioevent_uring_submit(IOEventPoller *ioevent)
{
int result;
while (1) {
result = io_uring_submit(&ioevent->ring);
if (result < 0) {
if (result != -EINTR) {
return -result;
}
} else {
return 0;
}
}
}
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -17,6 +17,45 @@
#include "logger.h" #include "logger.h"
#include "ioevent_loop.h" #include "ioevent_loop.h"
#if IOEVENT_USE_URING
static int ioevent_process(IOEventPoller *ioevent)
{
int result;
unsigned head;
unsigned count = 0;
IOEventEntry *pEntry;
result = io_uring_wait_cqe_timeout(&ioevent->ring,
&ioevent->cqe, &ioevent->timeout);
switch (result) {
case 0:
break;
case -ETIME:
case -EAGAIN:
case -EINTR:
return 0;
default:
result *= -1;
logError("file: "__FILE__", line: %d, "
"io_uring_wait_cqe fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
return result;
}
io_uring_for_each_cqe(&ioevent->ring, head, ioevent->cqe) {
count++;
pEntry = (IOEventEntry *)ioevent->cqe->user_data;
if (pEntry != NULL) {
pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry);
}
}
io_uring_cq_advance(&ioevent->ring, count);
return 0;
}
#else
static void deal_ioevents(IOEventPoller *ioevent) static void deal_ioevents(IOEventPoller *ioevent)
{ {
int event; int event;
@ -39,37 +78,29 @@ static void deal_ioevents(IOEventPoller *ioevent)
} }
} }
int ioevent_remove(IOEventPoller *ioevent, void *data) static int ioevent_process(IOEventPoller *ioevent)
{ {
IOEventEntry *pEntry; int result;
int index;
if (ioevent->iterator.index >= ioevent->iterator.count) ioevent->iterator.count = ioevent_poll(ioevent);
{ if (ioevent->iterator.count > 0) {
return ENOENT; deal_ioevents(ioevent);
} }
else if (ioevent->iterator.count < 0) {
pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, result = errno != 0 ? errno : EINVAL;
ioevent->iterator.index); if (result != EINTR) {
if (pEntry != NULL && (void *)pEntry == data) { logError("file: "__FILE__", line: %d, "
return 0; //do NOT clear current entry "ioevent_poll fail, errno: %d, error info: %s",
} __LINE__, result, STRERROR(result));
return result;
for (index=ioevent->iterator.index + 1; index < ioevent->iterator.count;
index++)
{
pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index);
if (pEntry != NULL && (void *)pEntry == data) {
logDebug("file: "__FILE__", line: %d, "
"clear ioevent data: %p", __LINE__, data);
IOEVENT_CLEAR_DATA(ioevent, index);
return 0;
} }
} }
return ENOENT; return 0;
} }
#endif
static void deal_timeouts(FastTimerEntry *head) static void deal_timeouts(FastTimerEntry *head)
{ {
FastTimerEntry *entry; FastTimerEntry *entry;
@ -131,10 +162,9 @@ int ioevent_loop(struct nio_thread_data *thread_data,
thread_data->deleted_list = NULL; thread_data->deleted_list = NULL;
last_check_time = g_current_time; last_check_time = g_current_time;
while (*continue_flag) while (*continue_flag) {
{
#ifdef OS_LINUX #ifdef OS_LINUX
if (thread_data->ev_puller.timeout == 0) { if (thread_data->ev_puller.zero_timeout) {
sched_pull = (sched_counter++ & 8) != 0; sched_pull = (sched_counter++ & 8) != 0;
} else { } else {
sched_pull = true; sched_pull = true;
@ -143,43 +173,23 @@ int ioevent_loop(struct nio_thread_data *thread_data,
sched_pull = true; sched_pull = true;
#endif #endif
if (sched_pull) if (sched_pull) {
{ if ((result=ioevent_process(&thread_data->ev_puller)) != 0) {
thread_data->ev_puller.iterator.count = ioevent_poll( return result;
&thread_data->ev_puller);
if (thread_data->ev_puller.iterator.count > 0)
{
deal_ioevents(&thread_data->ev_puller);
}
else if (thread_data->ev_puller.iterator.count < 0)
{
result = errno != 0 ? errno : EINVAL;
if (result != EINTR)
{
logError("file: "__FILE__", line: %d, " \
"ioevent_poll fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
} }
} }
if (thread_data->busy_polling_callback != NULL) if (thread_data->busy_polling_callback != NULL) {
{
thread_data->busy_polling_callback(thread_data); thread_data->busy_polling_callback(thread_data);
} }
if (thread_data->deleted_list != NULL) if (thread_data->deleted_list != NULL) {
{
count = 0; count = 0;
while (thread_data->deleted_list != NULL) while (thread_data->deleted_list != NULL) {
{
task = thread_data->deleted_list; task = thread_data->deleted_list;
thread_data->deleted_list = task->next; thread_data->deleted_list = task->next;
if (task->polling.in_queue) if (task->polling.in_queue) {
{
fc_list_del_init(&task->polling.dlink); fc_list_del_init(&task->polling.dlink);
task->polling.in_queue = false; task->polling.in_queue = false;
if (fc_list_empty(&task->thread_data->polling_queue)) { if (fc_list_empty(&task->thread_data->polling_queue)) {
@ -193,8 +203,7 @@ int ioevent_loop(struct nio_thread_data *thread_data,
//logInfo("cleanup task count: %d", count); //logInfo("cleanup task count: %d", count);
} }
if (g_current_time - last_check_time > 0) if (g_current_time - last_check_time > 0) {
{
last_check_time = g_current_time; last_check_time = g_current_time;
count = fast_timer_timeouts_get( count = fast_timer_timeouts_get(
&thread_data->timer, g_current_time, &head); &thread_data->timer, g_current_time, &head);
@ -204,8 +213,7 @@ int ioevent_loop(struct nio_thread_data *thread_data,
} }
} }
if (thread_data->notify.enabled) if (thread_data->notify.enabled) {
{
int64_t n; int64_t n;
if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0) if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0)
{ {
@ -219,8 +227,7 @@ int ioevent_loop(struct nio_thread_data *thread_data,
} }
} }
if (thread_data->thread_loop_callback != NULL) if (thread_data->thread_loop_callback != NULL) {
{
thread_data->thread_loop_callback(thread_data); thread_data->thread_loop_callback(thread_data);
} }
} }
@ -250,19 +257,3 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
fast_timer_add(&pThread->timer, &task->event.timer); fast_timer_add(&pThread->timer, &task->event.timer);
return 0; return 0;
} }
int ioevent_reset(struct fast_task_info *task, int new_fd, short event)
{
if (task->event.fd == new_fd)
{
return 0;
}
if (task->event.fd >= 0)
{
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
}
task->event.fd = new_fd;
return ioevent_attach(&task->thread_data->ev_puller, new_fd, event, task);
}

View File

@ -26,14 +26,9 @@ int ioevent_loop(struct nio_thread_data *thread_data,
IOEventCallback recv_notify_callback, TaskCleanUpCallback IOEventCallback recv_notify_callback, TaskCleanUpCallback
clean_up_callback, volatile bool *continue_flag); clean_up_callback, volatile bool *continue_flag);
//remove entry from ready list
int ioevent_remove(IOEventPoller *ioevent, void *data);
int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread, int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread,
int sock, short event, IOEventCallback callback, const int timeout); int sock, short event, IOEventCallback callback, const int timeout);
int ioevent_reset(struct fast_task_info *task, int new_fd, short event);
static inline bool ioevent_is_canceled(struct fast_task_info *task) static inline bool ioevent_is_canceled(struct fast_task_info *task)
{ {
return __sync_fetch_and_add(&task->canceled, 0) != 0; return __sync_fetch_and_add(&task->canceled, 0) != 0;

View File

@ -316,9 +316,13 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
{ {
int result; int result;
int event;
int count; int count;
#if IOEVENT_USE_URING
unsigned head;
#else
int event;
int index; int index;
#endif
int remain_timeout; int remain_timeout;
FastMultiSockEntry *entry; FastMultiSockEntry *entry;
char formatted_ip[FORMATTED_IP_SIZE]; char formatted_ip[FORMATTED_IP_SIZE];
@ -330,6 +334,37 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
break; break;
} }
#if IOEVENT_USE_URING
result = io_uring_wait_cqe_timeout(&client->ioevent.ring,
&client->ioevent.cqe, &client->ioevent.timeout);
switch (result) {
case 0:
break;
case -ETIME:
case -EAGAIN:
case -EINTR:
continue;
default:
result *= -1;
logError("file: "__FILE__", line: %d, "
"io_uring_wait_cqe fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
return result;
}
count = 0;
io_uring_for_each_cqe(&client->ioevent.ring, head, client->ioevent.cqe) {
count++;
entry = (FastMultiSockEntry *)client->ioevent.cqe->user_data;
//logInfo("sock: %d, event: %d", entry->conn->sock, event);
result = entry->io_callback(client, entry);
if (result != 0 || entry->remain == 0) {
fast_multi_sock_client_finish(client, entry, result);
}
}
io_uring_cq_advance(&client->ioevent.ring, count);
#else
count = ioevent_poll_ex(&client->ioevent, remain_timeout); count = ioevent_poll_ex(&client->ioevent, remain_timeout);
//logInfo("poll count: %d\n", count); //logInfo("poll count: %d\n", count);
for (index=0; index<count; index++) { for (index=0; index<count; index++) {
@ -354,6 +389,7 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
fast_multi_sock_client_finish(client, entry, result); fast_multi_sock_client_finish(client, entry, result);
} }
} }
#endif
} }
/* /*