diff --git a/make.sh b/make.sh index 0fe7bed..179b656 100755 --- a/make.sh +++ b/make.sh @@ -112,7 +112,22 @@ HAVE_VMMETER_H=0 HAVE_USER_H=0 if [ "$uname" = "Linux" ]; then OS_NAME=OS_LINUX - IOEVENT_USE=IOEVENT_USE_EPOLL + + major_version=$(uname -r | awk -F . '{print $1;}') + minor_version=$(uname -r | awk -F . '{print $2;}') + if [ $major_version -eq 5 ] && [ $minor_version -ge 14 ]; then + out=$(grep -F IORING_OP_SEND_ZC /usr/include/liburing/io_uring.h) + if [ -z $out ]; then + IOEVENT_USE=IOEVENT_USE_EPOLL + else + IOEVENT_USE=IOEVENT_USE_URING + fi + elif [ $major_version -gt 5 ]; then + IOEVENT_USE=IOEVENT_USE_URING + else + IOEVENT_USE=IOEVENT_USE_EPOLL + fi + if [ $glibc_minor -lt 17 ]; then LIBS="$LIBS -lrt" fi diff --git a/src/ioevent.c b/src/ioevent.c index f849f84..8507c1a 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -48,48 +48,65 @@ int kqueue_ev_convert(int16_t event, uint16_t flags) int ioevent_init(IOEventPoller *ioevent, const int size, const int timeout_ms, const int extra_events) { - int bytes; +#if IOEVENT_USE_URING + int result; +#else + int bytes; - ioevent->size = size; - ioevent->extra_events = extra_events; - ioevent->iterator.index = 0; - ioevent->iterator.count = 0; - -#if IOEVENT_USE_EPOLL - ioevent->poll_fd = epoll_create(ioevent->size); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(struct epoll_event) * size; - ioevent->events = (struct epoll_event *)fc_malloc(bytes); -#elif IOEVENT_USE_KQUEUE - ioevent->poll_fd = kqueue(); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(struct kevent) * size; - ioevent->events = (struct kevent *)fc_malloc(bytes); -#elif IOEVENT_USE_PORT - ioevent->poll_fd = port_create(); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(port_event_t) * size; - ioevent->events = (port_event_t *)fc_malloc(bytes); + ioevent->iterator.index = 0; + ioevent->iterator.count = 0; #endif - if (ioevent->events == NULL) { - close(ioevent->poll_fd); - ioevent->poll_fd = -1; - return ENOMEM; - } - ioevent_set_timeout(ioevent, timeout_ms); + ioevent->size = size; + ioevent->extra_events = extra_events; - return 0; +#if IOEVENT_USE_EPOLL + ioevent->poll_fd = epoll_create(ioevent->size); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(struct epoll_event) * size; + ioevent->events = (struct epoll_event *)fc_malloc(bytes); +#elif IOEVENT_USE_URING + if ((result=io_uring_queue_init(size, &ioevent->ring, 0)) < 0) { + return -result; + } + ioevent->cqe = NULL; +#elif IOEVENT_USE_KQUEUE + ioevent->poll_fd = kqueue(); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(struct kevent) * size; + ioevent->events = (struct kevent *)fc_malloc(bytes); +#elif IOEVENT_USE_PORT + ioevent->poll_fd = port_create(); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(port_event_t) * size; + ioevent->events = (port_event_t *)fc_malloc(bytes); +#endif + +#if IOEVENT_USE_URING + +#else + if (ioevent->events == NULL) { + close(ioevent->poll_fd); + ioevent->poll_fd = -1; + return ENOMEM; + } +#endif + + ioevent_set_timeout(ioevent, timeout_ms); + return 0; } void ioevent_destroy(IOEventPoller *ioevent) { +#if IOEVENT_USE_URING + io_uring_queue_exit(&ioevent->ring); +#else if (ioevent->events != NULL) { free(ioevent->events); ioevent->events = NULL; @@ -99,10 +116,11 @@ void ioevent_destroy(IOEventPoller *ioevent) close(ioevent->poll_fd); ioevent->poll_fd = -1; } +#endif } -int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, - void *data) +int ioevent_attach(IOEventPoller *ioevent, const int fd, + const int e, void *data) { #if IOEVENT_USE_EPOLL struct epoll_event ev; @@ -110,6 +128,14 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, ev.events = e | ioevent->extra_events; ev.data.ptr = data; return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_ADD, fd, &ev); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->user_data = (long)data; + io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int n = 0; @@ -128,8 +154,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, #endif } -int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, - void *data) +int ioevent_modify(IOEventPoller *ioevent, const int fd, + const int e, void *data) { #if IOEVENT_USE_EPOLL struct epoll_event ev; @@ -137,6 +163,15 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, ev.events = e | ioevent->extra_events; ev.data.ptr = data; return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_MOD, fd, &ev); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->user_data = (long)data; + io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, + e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int result; @@ -173,6 +208,15 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) { #if IOEVENT_USE_EPOLL return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; + sqe->user_data = 0; + io_uring_prep_cancel_fd(sqe, fd, 0); + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; @@ -192,9 +236,20 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) int ioevent_poll(IOEventPoller *ioevent) { #if IOEVENT_USE_EPOLL - return epoll_wait(ioevent->poll_fd, ioevent->events, ioevent->size, ioevent->timeout); + return epoll_wait(ioevent->poll_fd, ioevent->events, + ioevent->size, ioevent->timeout); +#elif IOEVENT_USE_URING + int result; + result = io_uring_wait_cqe_timeout(&ioevent->ring, + &ioevent->cqe, &ioevent->timeout); + if (result < 0) { + errno = -result; + return -1; + } + return 0; #elif IOEVENT_USE_KQUEUE - return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, ioevent->size, &ioevent->timeout); + return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, + ioevent->size, &ioevent->timeout); #elif IOEVENT_USE_PORT int result; int retval; diff --git a/src/ioevent.h b/src/ioevent.h index c31b7bb..e1b4e66 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -31,6 +31,12 @@ #define IOEVENT_WRITE EPOLLOUT #define IOEVENT_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) +#elif IOEVENT_USE_URING +#include +#define IOEVENT_READ POLLIN +#define IOEVENT_WRITE POLLOUT +#define IOEVENT_ERROR (POLLERR | POLLPRI | POLLHUP) + #elif IOEVENT_USE_KQUEUE #include #include @@ -67,16 +73,22 @@ int kqueue_ev_convert(int16_t event, uint16_t flags); typedef struct ioevent_puller { int size; //max events (fd) int extra_events; +#if IOEVENT_USE_URING + struct io_uring ring; +#else int poll_fd; - struct { int index; int count; } iterator; //for deal event loop +#endif #if IOEVENT_USE_EPOLL struct epoll_event *events; - int timeout; + int timeout; //in milliseconds +#elif IOEVENT_USE_URING + struct io_uring_cqe *cqe; + struct __kernel_timespec timeout; #elif IOEVENT_USE_KQUEUE struct kevent *events; struct timespec timeout; @@ -84,11 +96,18 @@ typedef struct ioevent_puller { port_event_t *events; timespec_t timeout; #endif + +#ifdef OS_LINUX + bool zero_timeout; +#endif + } IOEventPoller; #if IOEVENT_USE_EPOLL #define IOEVENT_GET_EVENTS(ioevent, index) \ (ioevent)->events[index].events +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \ (ioevent)->events[index].filter, (ioevent)->events[index].flags) @@ -102,6 +121,8 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_GET_DATA(ioevent, index) \ (ioevent)->events[index].data.ptr +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_DATA(ioevent, index) \ (ioevent)->events[index].udata @@ -115,6 +136,8 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_CLEAR_DATA(ioevent, index) \ (ioevent)->events[index].data.ptr = NULL +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_CLEAR_DATA(ioevent, index) \ (ioevent)->events[index].udata = NULL @@ -133,14 +156,15 @@ int ioevent_init(IOEventPoller *ioevent, const int size, const int timeout_ms, const int extra_events); void ioevent_destroy(IOEventPoller *ioevent); -int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, - void *data); -int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, - void *data); +int ioevent_attach(IOEventPoller *ioevent, const int fd, + const int e, void *data); +int ioevent_modify(IOEventPoller *ioevent, const int fd, + const int e, void *data); int ioevent_detach(IOEventPoller *ioevent, const int fd); int ioevent_poll(IOEventPoller *ioevent); -static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout_ms) +static inline void ioevent_set_timeout(IOEventPoller *ioevent, + const int timeout_ms) { #if IOEVENT_USE_EPOLL ioevent->timeout = timeout_ms; @@ -148,6 +172,11 @@ static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout ioevent->timeout.tv_sec = timeout_ms / 1000; ioevent->timeout.tv_nsec = 1000000 * (timeout_ms % 1000); #endif + +#ifdef OS_LINUX + ioevent->zero_timeout = (timeout_ms == 0); +#endif + } static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) @@ -156,6 +185,23 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) return ioevent_poll(ioevent); } +#if IOEVENT_USE_URING +static inline int ioevent_uring_submit(IOEventPoller *ioevent) +{ + int result; + while (1) { + result = io_uring_submit(&ioevent->ring); + if (result < 0) { + if (result != -EINTR) { + return -result; + } + } else { + return 0; + } + } +} +#endif + #ifdef __cplusplus } #endif diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index f1ba16e..200e13a 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -17,6 +17,45 @@ #include "logger.h" #include "ioevent_loop.h" +#if IOEVENT_USE_URING +static int ioevent_process(IOEventPoller *ioevent) +{ + int result; + unsigned head; + unsigned count = 0; + IOEventEntry *pEntry; + + result = io_uring_wait_cqe_timeout(&ioevent->ring, + &ioevent->cqe, &ioevent->timeout); + switch (result) { + case 0: + break; + case -ETIME: + case -EAGAIN: + case -EINTR: + return 0; + default: + result *= -1; + logError("file: "__FILE__", line: %d, " + "io_uring_wait_cqe fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + io_uring_for_each_cqe(&ioevent->ring, head, ioevent->cqe) { + count++; + pEntry = (IOEventEntry *)ioevent->cqe->user_data; + if (pEntry != NULL) { + pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry); + } + } + + io_uring_cq_advance(&ioevent->ring, count); + return 0; +} + +#else + static void deal_ioevents(IOEventPoller *ioevent) { int event; @@ -39,37 +78,29 @@ static void deal_ioevents(IOEventPoller *ioevent) } } -int ioevent_remove(IOEventPoller *ioevent, void *data) +static int ioevent_process(IOEventPoller *ioevent) { - IOEventEntry *pEntry; - int index; + int result; - if (ioevent->iterator.index >= ioevent->iterator.count) - { - return ENOENT; + ioevent->iterator.count = ioevent_poll(ioevent); + if (ioevent->iterator.count > 0) { + deal_ioevents(ioevent); } - - pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, - ioevent->iterator.index); - if (pEntry != NULL && (void *)pEntry == data) { - return 0; //do NOT clear current entry - } - - for (index=ioevent->iterator.index + 1; index < ioevent->iterator.count; - index++) - { - pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index); - if (pEntry != NULL && (void *)pEntry == data) { - logDebug("file: "__FILE__", line: %d, " - "clear ioevent data: %p", __LINE__, data); - IOEVENT_CLEAR_DATA(ioevent, index); - return 0; + else if (ioevent->iterator.count < 0) { + result = errno != 0 ? errno : EINVAL; + if (result != EINTR) { + logError("file: "__FILE__", line: %d, " + "ioevent_poll fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; } } - return ENOENT; + return 0; } +#endif + static void deal_timeouts(FastTimerEntry *head) { FastTimerEntry *entry; @@ -131,10 +162,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, thread_data->deleted_list = NULL; last_check_time = g_current_time; - while (*continue_flag) - { + while (*continue_flag) { #ifdef OS_LINUX - if (thread_data->ev_puller.timeout == 0) { + if (thread_data->ev_puller.zero_timeout) { sched_pull = (sched_counter++ & 8) != 0; } else { sched_pull = true; @@ -143,43 +173,23 @@ int ioevent_loop(struct nio_thread_data *thread_data, sched_pull = true; #endif - if (sched_pull) - { - thread_data->ev_puller.iterator.count = ioevent_poll( - &thread_data->ev_puller); - if (thread_data->ev_puller.iterator.count > 0) - { - deal_ioevents(&thread_data->ev_puller); - } - else if (thread_data->ev_puller.iterator.count < 0) - { - result = errno != 0 ? errno : EINVAL; - if (result != EINTR) - { - logError("file: "__FILE__", line: %d, " \ - "ioevent_poll fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } + if (sched_pull) { + if ((result=ioevent_process(&thread_data->ev_puller)) != 0) { + return result; } } - if (thread_data->busy_polling_callback != NULL) - { + if (thread_data->busy_polling_callback != NULL) { thread_data->busy_polling_callback(thread_data); } - if (thread_data->deleted_list != NULL) - { + if (thread_data->deleted_list != NULL) { count = 0; - while (thread_data->deleted_list != NULL) - { + while (thread_data->deleted_list != NULL) { task = thread_data->deleted_list; thread_data->deleted_list = task->next; - if (task->polling.in_queue) - { + if (task->polling.in_queue) { fc_list_del_init(&task->polling.dlink); task->polling.in_queue = false; if (fc_list_empty(&task->thread_data->polling_queue)) { @@ -193,8 +203,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, //logInfo("cleanup task count: %d", count); } - if (g_current_time - last_check_time > 0) - { + if (g_current_time - last_check_time > 0) { last_check_time = g_current_time; count = fast_timer_timeouts_get( &thread_data->timer, g_current_time, &head); @@ -204,8 +213,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } } - if (thread_data->notify.enabled) - { + if (thread_data->notify.enabled) { int64_t n; if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0) { @@ -219,8 +227,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } } - if (thread_data->thread_loop_callback != NULL) - { + if (thread_data->thread_loop_callback != NULL) { thread_data->thread_loop_callback(thread_data); } } @@ -250,19 +257,3 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, fast_timer_add(&pThread->timer, &task->event.timer); return 0; } - -int ioevent_reset(struct fast_task_info *task, int new_fd, short event) -{ - if (task->event.fd == new_fd) - { - return 0; - } - - if (task->event.fd >= 0) - { - ioevent_detach(&task->thread_data->ev_puller, task->event.fd); - } - - task->event.fd = new_fd; - return ioevent_attach(&task->thread_data->ev_puller, new_fd, event, task); -} diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index cce2cb6..44efd4f 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -26,14 +26,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag); -//remove entry from ready list -int ioevent_remove(IOEventPoller *ioevent, void *data); - int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread, int sock, short event, IOEventCallback callback, const int timeout); -int ioevent_reset(struct fast_task_info *task, int new_fd, short event); - static inline bool ioevent_is_canceled(struct fast_task_info *task) { return __sync_fetch_and_add(&task->canceled, 0) != 0; diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c index 428207c..e122278 100644 --- a/src/multi_socket_client.c +++ b/src/multi_socket_client.c @@ -316,9 +316,13 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) { int result; - int event; int count; +#if IOEVENT_USE_URING + unsigned head; +#else + int event; int index; +#endif int remain_timeout; FastMultiSockEntry *entry; char formatted_ip[FORMATTED_IP_SIZE]; @@ -330,6 +334,37 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) break; } +#if IOEVENT_USE_URING + result = io_uring_wait_cqe_timeout(&client->ioevent.ring, + &client->ioevent.cqe, &client->ioevent.timeout); + switch (result) { + case 0: + break; + case -ETIME: + case -EAGAIN: + case -EINTR: + continue; + default: + result *= -1; + logError("file: "__FILE__", line: %d, " + "io_uring_wait_cqe fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + count = 0; + io_uring_for_each_cqe(&client->ioevent.ring, head, client->ioevent.cqe) { + count++; + entry = (FastMultiSockEntry *)client->ioevent.cqe->user_data; + //logInfo("sock: %d, event: %d", entry->conn->sock, event); + result = entry->io_callback(client, entry); + if (result != 0 || entry->remain == 0) { + fast_multi_sock_client_finish(client, entry, result); + } + } + io_uring_cq_advance(&client->ioevent.ring, count); + +#else count = ioevent_poll_ex(&client->ioevent, remain_timeout); //logInfo("poll count: %d\n", count); for (index=0; index