From 47fa7f99df4ed7540ea733c10b0795140b87c4ec Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 17 Sep 2025 03:49:21 +0800 Subject: [PATCH 01/13] ioevent.[hc] and ioevent_loop.[hc] support io_uring --- make.sh | 17 ++++- src/ioevent.c | 135 +++++++++++++++++++++++++----------- src/ioevent.h | 60 ++++++++++++++-- src/ioevent_loop.c | 141 ++++++++++++++++++-------------------- src/ioevent_loop.h | 5 -- src/multi_socket_client.c | 38 +++++++++- 6 files changed, 267 insertions(+), 129 deletions(-) diff --git a/make.sh b/make.sh index 0fe7bed..179b656 100755 --- a/make.sh +++ b/make.sh @@ -112,7 +112,22 @@ HAVE_VMMETER_H=0 HAVE_USER_H=0 if [ "$uname" = "Linux" ]; then OS_NAME=OS_LINUX - IOEVENT_USE=IOEVENT_USE_EPOLL + + major_version=$(uname -r | awk -F . '{print $1;}') + minor_version=$(uname -r | awk -F . '{print $2;}') + if [ $major_version -eq 5 ] && [ $minor_version -ge 14 ]; then + out=$(grep -F IORING_OP_SEND_ZC /usr/include/liburing/io_uring.h) + if [ -z $out ]; then + IOEVENT_USE=IOEVENT_USE_EPOLL + else + IOEVENT_USE=IOEVENT_USE_URING + fi + elif [ $major_version -gt 5 ]; then + IOEVENT_USE=IOEVENT_USE_URING + else + IOEVENT_USE=IOEVENT_USE_EPOLL + fi + if [ $glibc_minor -lt 17 ]; then LIBS="$LIBS -lrt" fi diff --git a/src/ioevent.c b/src/ioevent.c index f849f84..8507c1a 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -48,48 +48,65 @@ int kqueue_ev_convert(int16_t event, uint16_t flags) int ioevent_init(IOEventPoller *ioevent, const int size, const int timeout_ms, const int extra_events) { - int bytes; +#if IOEVENT_USE_URING + int result; +#else + int bytes; - ioevent->size = size; - ioevent->extra_events = extra_events; - ioevent->iterator.index = 0; - ioevent->iterator.count = 0; - -#if IOEVENT_USE_EPOLL - ioevent->poll_fd = epoll_create(ioevent->size); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(struct epoll_event) * size; - ioevent->events = (struct epoll_event *)fc_malloc(bytes); -#elif IOEVENT_USE_KQUEUE - ioevent->poll_fd = kqueue(); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(struct kevent) * size; - ioevent->events = (struct kevent *)fc_malloc(bytes); -#elif IOEVENT_USE_PORT - ioevent->poll_fd = port_create(); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(port_event_t) * size; - ioevent->events = (port_event_t *)fc_malloc(bytes); + ioevent->iterator.index = 0; + ioevent->iterator.count = 0; #endif - if (ioevent->events == NULL) { - close(ioevent->poll_fd); - ioevent->poll_fd = -1; - return ENOMEM; - } - ioevent_set_timeout(ioevent, timeout_ms); + ioevent->size = size; + ioevent->extra_events = extra_events; - return 0; +#if IOEVENT_USE_EPOLL + ioevent->poll_fd = epoll_create(ioevent->size); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(struct epoll_event) * size; + ioevent->events = (struct epoll_event *)fc_malloc(bytes); +#elif IOEVENT_USE_URING + if ((result=io_uring_queue_init(size, &ioevent->ring, 0)) < 0) { + return -result; + } + ioevent->cqe = NULL; +#elif IOEVENT_USE_KQUEUE + ioevent->poll_fd = kqueue(); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(struct kevent) * size; + ioevent->events = (struct kevent *)fc_malloc(bytes); +#elif IOEVENT_USE_PORT + ioevent->poll_fd = port_create(); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(port_event_t) * size; + ioevent->events = (port_event_t *)fc_malloc(bytes); +#endif + +#if IOEVENT_USE_URING + +#else + if (ioevent->events == NULL) { + close(ioevent->poll_fd); + ioevent->poll_fd = -1; + return ENOMEM; + } +#endif + + ioevent_set_timeout(ioevent, timeout_ms); + return 0; } void ioevent_destroy(IOEventPoller *ioevent) { +#if IOEVENT_USE_URING + io_uring_queue_exit(&ioevent->ring); +#else if (ioevent->events != NULL) { free(ioevent->events); ioevent->events = NULL; @@ -99,10 +116,11 @@ void ioevent_destroy(IOEventPoller *ioevent) close(ioevent->poll_fd); ioevent->poll_fd = -1; } +#endif } -int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, - void *data) +int ioevent_attach(IOEventPoller *ioevent, const int fd, + const int e, void *data) { #if IOEVENT_USE_EPOLL struct epoll_event ev; @@ -110,6 +128,14 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, ev.events = e | ioevent->extra_events; ev.data.ptr = data; return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_ADD, fd, &ev); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->user_data = (long)data; + io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int n = 0; @@ -128,8 +154,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, #endif } -int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, - void *data) +int ioevent_modify(IOEventPoller *ioevent, const int fd, + const int e, void *data) { #if IOEVENT_USE_EPOLL struct epoll_event ev; @@ -137,6 +163,15 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, ev.events = e | ioevent->extra_events; ev.data.ptr = data; return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_MOD, fd, &ev); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->user_data = (long)data; + io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, + e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int result; @@ -173,6 +208,15 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) { #if IOEVENT_USE_EPOLL return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; + sqe->user_data = 0; + io_uring_prep_cancel_fd(sqe, fd, 0); + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; @@ -192,9 +236,20 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) int ioevent_poll(IOEventPoller *ioevent) { #if IOEVENT_USE_EPOLL - return epoll_wait(ioevent->poll_fd, ioevent->events, ioevent->size, ioevent->timeout); + return epoll_wait(ioevent->poll_fd, ioevent->events, + ioevent->size, ioevent->timeout); +#elif IOEVENT_USE_URING + int result; + result = io_uring_wait_cqe_timeout(&ioevent->ring, + &ioevent->cqe, &ioevent->timeout); + if (result < 0) { + errno = -result; + return -1; + } + return 0; #elif IOEVENT_USE_KQUEUE - return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, ioevent->size, &ioevent->timeout); + return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, + ioevent->size, &ioevent->timeout); #elif IOEVENT_USE_PORT int result; int retval; diff --git a/src/ioevent.h b/src/ioevent.h index c31b7bb..e1b4e66 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -31,6 +31,12 @@ #define IOEVENT_WRITE EPOLLOUT #define IOEVENT_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) +#elif IOEVENT_USE_URING +#include +#define IOEVENT_READ POLLIN +#define IOEVENT_WRITE POLLOUT +#define IOEVENT_ERROR (POLLERR | POLLPRI | POLLHUP) + #elif IOEVENT_USE_KQUEUE #include #include @@ -67,16 +73,22 @@ int kqueue_ev_convert(int16_t event, uint16_t flags); typedef struct ioevent_puller { int size; //max events (fd) int extra_events; +#if IOEVENT_USE_URING + struct io_uring ring; +#else int poll_fd; - struct { int index; int count; } iterator; //for deal event loop +#endif #if IOEVENT_USE_EPOLL struct epoll_event *events; - int timeout; + int timeout; //in milliseconds +#elif IOEVENT_USE_URING + struct io_uring_cqe *cqe; + struct __kernel_timespec timeout; #elif IOEVENT_USE_KQUEUE struct kevent *events; struct timespec timeout; @@ -84,11 +96,18 @@ typedef struct ioevent_puller { port_event_t *events; timespec_t timeout; #endif + +#ifdef OS_LINUX + bool zero_timeout; +#endif + } IOEventPoller; #if IOEVENT_USE_EPOLL #define IOEVENT_GET_EVENTS(ioevent, index) \ (ioevent)->events[index].events +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \ (ioevent)->events[index].filter, (ioevent)->events[index].flags) @@ -102,6 +121,8 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_GET_DATA(ioevent, index) \ (ioevent)->events[index].data.ptr +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_DATA(ioevent, index) \ (ioevent)->events[index].udata @@ -115,6 +136,8 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_CLEAR_DATA(ioevent, index) \ (ioevent)->events[index].data.ptr = NULL +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_CLEAR_DATA(ioevent, index) \ (ioevent)->events[index].udata = NULL @@ -133,14 +156,15 @@ int ioevent_init(IOEventPoller *ioevent, const int size, const int timeout_ms, const int extra_events); void ioevent_destroy(IOEventPoller *ioevent); -int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, - void *data); -int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, - void *data); +int ioevent_attach(IOEventPoller *ioevent, const int fd, + const int e, void *data); +int ioevent_modify(IOEventPoller *ioevent, const int fd, + const int e, void *data); int ioevent_detach(IOEventPoller *ioevent, const int fd); int ioevent_poll(IOEventPoller *ioevent); -static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout_ms) +static inline void ioevent_set_timeout(IOEventPoller *ioevent, + const int timeout_ms) { #if IOEVENT_USE_EPOLL ioevent->timeout = timeout_ms; @@ -148,6 +172,11 @@ static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout ioevent->timeout.tv_sec = timeout_ms / 1000; ioevent->timeout.tv_nsec = 1000000 * (timeout_ms % 1000); #endif + +#ifdef OS_LINUX + ioevent->zero_timeout = (timeout_ms == 0); +#endif + } static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) @@ -156,6 +185,23 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) return ioevent_poll(ioevent); } +#if IOEVENT_USE_URING +static inline int ioevent_uring_submit(IOEventPoller *ioevent) +{ + int result; + while (1) { + result = io_uring_submit(&ioevent->ring); + if (result < 0) { + if (result != -EINTR) { + return -result; + } + } else { + return 0; + } + } +} +#endif + #ifdef __cplusplus } #endif diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index f1ba16e..200e13a 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -17,6 +17,45 @@ #include "logger.h" #include "ioevent_loop.h" +#if IOEVENT_USE_URING +static int ioevent_process(IOEventPoller *ioevent) +{ + int result; + unsigned head; + unsigned count = 0; + IOEventEntry *pEntry; + + result = io_uring_wait_cqe_timeout(&ioevent->ring, + &ioevent->cqe, &ioevent->timeout); + switch (result) { + case 0: + break; + case -ETIME: + case -EAGAIN: + case -EINTR: + return 0; + default: + result *= -1; + logError("file: "__FILE__", line: %d, " + "io_uring_wait_cqe fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + io_uring_for_each_cqe(&ioevent->ring, head, ioevent->cqe) { + count++; + pEntry = (IOEventEntry *)ioevent->cqe->user_data; + if (pEntry != NULL) { + pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry); + } + } + + io_uring_cq_advance(&ioevent->ring, count); + return 0; +} + +#else + static void deal_ioevents(IOEventPoller *ioevent) { int event; @@ -39,37 +78,29 @@ static void deal_ioevents(IOEventPoller *ioevent) } } -int ioevent_remove(IOEventPoller *ioevent, void *data) +static int ioevent_process(IOEventPoller *ioevent) { - IOEventEntry *pEntry; - int index; + int result; - if (ioevent->iterator.index >= ioevent->iterator.count) - { - return ENOENT; + ioevent->iterator.count = ioevent_poll(ioevent); + if (ioevent->iterator.count > 0) { + deal_ioevents(ioevent); } - - pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, - ioevent->iterator.index); - if (pEntry != NULL && (void *)pEntry == data) { - return 0; //do NOT clear current entry - } - - for (index=ioevent->iterator.index + 1; index < ioevent->iterator.count; - index++) - { - pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index); - if (pEntry != NULL && (void *)pEntry == data) { - logDebug("file: "__FILE__", line: %d, " - "clear ioevent data: %p", __LINE__, data); - IOEVENT_CLEAR_DATA(ioevent, index); - return 0; + else if (ioevent->iterator.count < 0) { + result = errno != 0 ? errno : EINVAL; + if (result != EINTR) { + logError("file: "__FILE__", line: %d, " + "ioevent_poll fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; } } - return ENOENT; + return 0; } +#endif + static void deal_timeouts(FastTimerEntry *head) { FastTimerEntry *entry; @@ -131,10 +162,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, thread_data->deleted_list = NULL; last_check_time = g_current_time; - while (*continue_flag) - { + while (*continue_flag) { #ifdef OS_LINUX - if (thread_data->ev_puller.timeout == 0) { + if (thread_data->ev_puller.zero_timeout) { sched_pull = (sched_counter++ & 8) != 0; } else { sched_pull = true; @@ -143,43 +173,23 @@ int ioevent_loop(struct nio_thread_data *thread_data, sched_pull = true; #endif - if (sched_pull) - { - thread_data->ev_puller.iterator.count = ioevent_poll( - &thread_data->ev_puller); - if (thread_data->ev_puller.iterator.count > 0) - { - deal_ioevents(&thread_data->ev_puller); - } - else if (thread_data->ev_puller.iterator.count < 0) - { - result = errno != 0 ? errno : EINVAL; - if (result != EINTR) - { - logError("file: "__FILE__", line: %d, " \ - "ioevent_poll fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } + if (sched_pull) { + if ((result=ioevent_process(&thread_data->ev_puller)) != 0) { + return result; } } - if (thread_data->busy_polling_callback != NULL) - { + if (thread_data->busy_polling_callback != NULL) { thread_data->busy_polling_callback(thread_data); } - if (thread_data->deleted_list != NULL) - { + if (thread_data->deleted_list != NULL) { count = 0; - while (thread_data->deleted_list != NULL) - { + while (thread_data->deleted_list != NULL) { task = thread_data->deleted_list; thread_data->deleted_list = task->next; - if (task->polling.in_queue) - { + if (task->polling.in_queue) { fc_list_del_init(&task->polling.dlink); task->polling.in_queue = false; if (fc_list_empty(&task->thread_data->polling_queue)) { @@ -193,8 +203,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, //logInfo("cleanup task count: %d", count); } - if (g_current_time - last_check_time > 0) - { + if (g_current_time - last_check_time > 0) { last_check_time = g_current_time; count = fast_timer_timeouts_get( &thread_data->timer, g_current_time, &head); @@ -204,8 +213,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } } - if (thread_data->notify.enabled) - { + if (thread_data->notify.enabled) { int64_t n; if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0) { @@ -219,8 +227,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } } - if (thread_data->thread_loop_callback != NULL) - { + if (thread_data->thread_loop_callback != NULL) { thread_data->thread_loop_callback(thread_data); } } @@ -250,19 +257,3 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, fast_timer_add(&pThread->timer, &task->event.timer); return 0; } - -int ioevent_reset(struct fast_task_info *task, int new_fd, short event) -{ - if (task->event.fd == new_fd) - { - return 0; - } - - if (task->event.fd >= 0) - { - ioevent_detach(&task->thread_data->ev_puller, task->event.fd); - } - - task->event.fd = new_fd; - return ioevent_attach(&task->thread_data->ev_puller, new_fd, event, task); -} diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index cce2cb6..44efd4f 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -26,14 +26,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag); -//remove entry from ready list -int ioevent_remove(IOEventPoller *ioevent, void *data); - int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread, int sock, short event, IOEventCallback callback, const int timeout); -int ioevent_reset(struct fast_task_info *task, int new_fd, short event); - static inline bool ioevent_is_canceled(struct fast_task_info *task) { return __sync_fetch_and_add(&task->canceled, 0) != 0; diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c index 428207c..e122278 100644 --- a/src/multi_socket_client.c +++ b/src/multi_socket_client.c @@ -316,9 +316,13 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) { int result; - int event; int count; +#if IOEVENT_USE_URING + unsigned head; +#else + int event; int index; +#endif int remain_timeout; FastMultiSockEntry *entry; char formatted_ip[FORMATTED_IP_SIZE]; @@ -330,6 +334,37 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) break; } +#if IOEVENT_USE_URING + result = io_uring_wait_cqe_timeout(&client->ioevent.ring, + &client->ioevent.cqe, &client->ioevent.timeout); + switch (result) { + case 0: + break; + case -ETIME: + case -EAGAIN: + case -EINTR: + continue; + default: + result *= -1; + logError("file: "__FILE__", line: %d, " + "io_uring_wait_cqe fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + count = 0; + io_uring_for_each_cqe(&client->ioevent.ring, head, client->ioevent.cqe) { + count++; + entry = (FastMultiSockEntry *)client->ioevent.cqe->user_data; + //logInfo("sock: %d, event: %d", entry->conn->sock, event); + result = entry->io_callback(client, entry); + if (result != 0 || entry->remain == 0) { + fast_multi_sock_client_finish(client, entry, result); + } + } + io_uring_cq_advance(&client->ioevent.ring, count); + +#else count = ioevent_poll_ex(&client->ioevent, remain_timeout); //logInfo("poll count: %d\n", count); for (index=0; index Date: Wed, 24 Sep 2025 15:54:03 +0800 Subject: [PATCH 02/13] ioevent_set support io_uring --- libfastcommon.spec | 8 +++++++ make.sh | 6 +++--- src/ioevent.c | 1 + src/ioevent.h | 14 ++++++++++++ src/ioevent_loop.c | 54 ++++++++++++++++++++++++++++++++++++---------- src/ioevent_loop.h | 14 +++++++++--- 6 files changed, 80 insertions(+), 17 deletions(-) diff --git a/libfastcommon.spec b/libfastcommon.spec index 98df5f9..605266c 100644 --- a/libfastcommon.spec +++ b/libfastcommon.spec @@ -17,6 +17,14 @@ BuildRequires: libcurl-devel Requires: libcurl Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id +%define kernel_major %(uname -r | cut -d'.' -f1) +%define kernel_minor %(uname -r | cut -d'.' -f2) +%define kernel_ver_int %(expr %{kernel_major} \* 100 + %{kernel_minor}) +%if %{kernel_ver_int} >= 514 +BuildRequires: liburing-devel >= 2.5 +Requires: liburing >= 2.5 +%endif + %description c common functions library extracted from my open source projects FastDFS. this library is very simple and stable. functions including: string, logger, diff --git a/make.sh b/make.sh index 179b656..47e1553 100755 --- a/make.sh +++ b/make.sh @@ -117,10 +117,10 @@ if [ "$uname" = "Linux" ]; then minor_version=$(uname -r | awk -F . '{print $2;}') if [ $major_version -eq 5 ] && [ $minor_version -ge 14 ]; then out=$(grep -F IORING_OP_SEND_ZC /usr/include/liburing/io_uring.h) - if [ -z $out ]; then - IOEVENT_USE=IOEVENT_USE_EPOLL - else + if [ -n "$out" ]; then IOEVENT_USE=IOEVENT_USE_URING + else + IOEVENT_USE=IOEVENT_USE_EPOLL fi elif [ $major_version -gt 5 ]; then IOEVENT_USE=IOEVENT_USE_URING diff --git a/src/ioevent.c b/src/ioevent.c index 8507c1a..aca7e53 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -72,6 +72,7 @@ int ioevent_init(IOEventPoller *ioevent, const int size, return -result; } ioevent->cqe = NULL; + ioevent->submmit_count = 0; #elif IOEVENT_USE_KQUEUE ioevent->poll_fd = kqueue(); if (ioevent->poll_fd < 0) { diff --git a/src/ioevent.h b/src/ioevent.h index e1b4e66..9a0fef8 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -75,6 +75,7 @@ typedef struct ioevent_puller { int extra_events; #if IOEVENT_USE_URING struct io_uring ring; + int submmit_count; #else int poll_fd; struct { @@ -200,6 +201,19 @@ static inline int ioevent_uring_submit(IOEventPoller *ioevent) } } } + +static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, + int sockfd, void *buf, size_t size, void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + sqe->user_data = (long)user_data; + io_uring_prep_recv(sqe, sockfd, buf, size, 0); + ioevent->submmit_count++; + return 0; +} #endif #ifdef __cplusplus diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 200e13a..030f577 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -46,7 +46,11 @@ static int ioevent_process(IOEventPoller *ioevent) count++; pEntry = (IOEventEntry *)ioevent->cqe->user_data; if (pEntry != NULL) { - pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry); + if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { + //TODO + } else { + pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry); + } } } @@ -230,28 +234,56 @@ int ioevent_loop(struct nio_thread_data *thread_data, if (thread_data->thread_loop_callback != NULL) { thread_data->thread_loop_callback(thread_data); } + +#if IOEVENT_USE_URING + if (thread_data->ev_puller.submmit_count > 0) { + thread_data->ev_puller.submmit_count = 0; + if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { + logError("file: "__FILE__", line: %d, " + "io_uring_submit fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + } +#endif } return 0; } int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, - int sock, short event, IOEventCallback callback, const int timeout) + int sock, short event, IOEventCallback callback, + const int timeout, const bool use_iouring) { int result; task->thread_data = pThread; task->event.fd = sock; task->event.callback = callback; - if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) - { - result = errno != 0 ? errno : ENOENT; - logError("file: "__FILE__", line: %d, " - "ioevent_attach fail, fd: %d, " - "errno: %d, error info: %s", - __LINE__, sock, result, STRERROR(result)); - return result; - } + if (use_iouring) { +#if IOEVENT_USE_URING + if ((result=uring_prep_recv_by_task(task)) != 0) { + logError("file: "__FILE__", line: %d, " + "uring_prep_recv fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); + return result; + } +#else + logError("file: "__FILE__", line: %d, " + "some mistakes happen!", __LINE__); + return EBUSY; +#endif + } else { + if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) { + result = errno != 0 ? errno : ENOENT; + logError("file: "__FILE__", line: %d, " + "ioevent_attach fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); + return result; + } + } task->event.timer.expires = g_current_time + timeout; fast_timer_add(&pThread->timer, &task->event.timer); diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 44efd4f..aee74fc 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -26,8 +26,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag); -int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread, - int sock, short event, IOEventCallback callback, const int timeout); +int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, + int sock, short event, IOEventCallback callback, + const int timeout, const bool use_iouring); static inline bool ioevent_is_canceled(struct fast_task_info *task) { @@ -70,9 +71,16 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) return 0; } +#if IOEVENT_USE_URING +static inline int uring_prep_recv_by_task(struct fast_task_info *task) +{ + return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); +} +#endif + #ifdef __cplusplus } #endif #endif - From 012b2038ee9f10bc4f670519b51deee3d420d90f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 25 Sep 2025 14:49:37 +0800 Subject: [PATCH 03/13] add functions uring_prep_xxx --- src/fast_task_queue.h | 7 ++++ src/fast_timer.h | 3 ++ src/ioevent.c | 3 +- src/ioevent.h | 65 +++++++++++++++++++++++++++++++ src/ioevent_loop.c | 6 +-- src/ioevent_loop.h | 89 ++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 167 insertions(+), 6 deletions(-) diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 9902003..f5b5281 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -48,10 +48,17 @@ typedef int (*TaskContinueCallback)(struct fast_task_info *task); struct sf_network_handler; struct fast_task_info; +#if IOEVENT_USE_URING +#define FC_URING_OP_TYPE(task) (task)->event.timer.op_type +#endif + typedef struct ioevent_entry { FastTimerEntry timer; //must first int fd; +#if IOEVENT_USE_URING + int res; +#endif IOEventCallback callback; } IOEventEntry; diff --git a/src/fast_timer.h b/src/fast_timer.h index e7ee7c5..8727878 100644 --- a/src/fast_timer.h +++ b/src/fast_timer.h @@ -27,6 +27,9 @@ typedef struct fast_timer_entry { struct fast_timer_entry *next; int slot_index; bool rehash; +#if IOEVENT_USE_URING + short op_type; +#endif } FastTimerEntry; typedef struct fast_timer_slot { diff --git a/src/ioevent.c b/src/ioevent.c index aca7e53..5057368 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -217,7 +217,8 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; sqe->user_data = 0; io_uring_prep_cancel_fd(sqe, fd, 0); - return ioevent_uring_submit(ioevent); + ioevent->submmit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; diff --git a/src/ioevent.h b/src/ioevent.h index 9a0fef8..458be8e 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -20,6 +20,7 @@ #include #include #include "_os_define.h" +#include "logger.h" #define IOEVENT_TIMEOUT 0x8000 @@ -207,6 +208,8 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, { struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); return ENOSPC; } sqe->user_data = (long)user_data; @@ -214,6 +217,68 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, ioevent->submmit_count++; return 0; } + +static inline int ioevent_uring_prep_send(IOEventPoller *ioevent, + int sockfd, void *buf, size_t len, void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + sqe->user_data = (long)user_data; + io_uring_prep_send(sqe, sockfd, buf, len, 0); + ioevent->submmit_count++; + return 0; +} + +static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent, + int sockfd, const struct iovec *iovecs, unsigned nr_vecs, + void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + + sqe->user_data = (long)user_data; + io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); + ioevent->submmit_count++; + return 0; +} + +static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, + int sockfd, void *buf, size_t len, void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + sqe->user_data = (long)user_data; + io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, + IORING_SEND_ZC_REPORT_USAGE); + ioevent->submmit_count++; + return 0; +} + +static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + sqe->user_data = 0; + io_uring_prep_close(sqe, fd); + ioevent->submmit_count++; + return 0; +} #endif #ifdef __cplusplus diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 030f577..0a91797 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -31,7 +31,6 @@ static int ioevent_process(IOEventPoller *ioevent) case 0: break; case -ETIME: - case -EAGAIN: case -EINTR: return 0; default: @@ -49,7 +48,8 @@ static int ioevent_process(IOEventPoller *ioevent) if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { //TODO } else { - pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry); + pEntry->res = ioevent->cqe->res; + pEntry->callback(pEntry->fd, 0, pEntry); } } } @@ -262,7 +262,7 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->event.callback = callback; if (use_iouring) { #if IOEVENT_USE_URING - if ((result=uring_prep_recv_by_task(task)) != 0) { + if ((result=uring_prep_first_recv(task)) != 0) { logError("file: "__FILE__", line: %d, " "uring_prep_recv fail, fd: %d, " "errno: %d, error info: %s", diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index aee74fc..4c76c94 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -72,10 +72,95 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) } #if IOEVENT_USE_URING -static inline int uring_prep_recv_by_task(struct fast_task_info *task) +static inline int uring_prep_recv_data(struct fast_task_info *task, + char *buff, const int len) { + FC_URING_OP_TYPE(task) = IORING_OP_RECV; return ioevent_uring_prep_recv(&task->thread_data->ev_puller, - task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); + task->event.fd, buff, len, task); +} + +static inline int uring_prep_first_recv(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_RECV; + return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + task->event.fd, task->recv.ptr->data, + task->recv.ptr->size, task); +} + +static inline int uring_prep_next_recv(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_RECV; + return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, + task->recv.ptr->length - task->recv.ptr->offset, task); +} + +static inline int uring_prep_first_send(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } +} + +static inline int uring_prep_next_send(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } +} + +static inline int uring_prep_first_send_zc(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } +} + +static inline int uring_prep_next_send_zc(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } +} + +static inline int uring_prep_close_fd(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_CLOSE; + return ioevent_uring_prep_close(&task->thread_data-> + ev_puller, task->event.fd); } #endif From cb6f6f13f38fe9d200f8c5f9d03e878b2c69b48e Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 27 Sep 2025 15:37:37 +0800 Subject: [PATCH 04/13] support Linux io_uring OK --- HISTORY | 3 +++ src/ioevent.c | 17 +++++++------- src/ioevent.h | 56 +++++++++++++++++++++++++++++++++++----------- src/ioevent_loop.c | 31 +++++++++++++++++-------- src/ioevent_loop.h | 36 +++++++++++++++++++++-------- 5 files changed, 103 insertions(+), 40 deletions(-) diff --git a/HISTORY b/HISTORY index 8834474..a723a5d 100644 --- a/HISTORY +++ b/HISTORY @@ -1,4 +1,7 @@ +Version 1.81 2025-09-27 + * support Linux io_uring + Version 1.80 2025-09-10 * getIpaddrByNameEx: IPv4 has priority over IPv6 * shared_func.[hc]: add function fc_ftoa diff --git a/src/ioevent.c b/src/ioevent.c index 5057368..4b11e18 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -72,7 +72,8 @@ int ioevent_init(IOEventPoller *ioevent, const int size, return -result; } ioevent->cqe = NULL; - ioevent->submmit_count = 0; + ioevent->submit_count = 0; + ioevent->send_zc_logged = false; #elif IOEVENT_USE_KQUEUE ioevent->poll_fd = kqueue(); if (ioevent->poll_fd < 0) { @@ -134,8 +135,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, if (sqe == NULL) { return ENOSPC; } - sqe->user_data = (long)data; io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); + sqe->user_data = (long)data; return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; @@ -169,9 +170,9 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, if (sqe == NULL) { return ENOSPC; } - sqe->user_data = (long)data; io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); + sqe->user_data = (long)data; return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; @@ -214,11 +215,10 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) if (sqe == NULL) { return ENOSPC; } - sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; - sqe->user_data = 0; io_uring_prep_cancel_fd(sqe, fd, 0); - ioevent->submmit_count++; - return 0; + /* set sqe->flags MUST after io_uring_prep_xxx */ + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; @@ -256,7 +256,7 @@ int ioevent_poll(IOEventPoller *ioevent) int result; int retval; unsigned int nget = 1; - if((retval = port_getn(ioevent->poll_fd, ioevent->events, + if((retval=port_getn(ioevent->poll_fd, ioevent->events, ioevent->size, &nget, &ioevent->timeout)) == 0) { result = (int)nget; @@ -282,4 +282,3 @@ int ioevent_poll(IOEventPoller *ioevent) #error port me #endif } - diff --git a/src/ioevent.h b/src/ioevent.h index 458be8e..1d26a7e 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -76,7 +76,8 @@ typedef struct ioevent_puller { int extra_events; #if IOEVENT_USE_URING struct io_uring ring; - int submmit_count; + int submit_count; + bool send_zc_logged; #else int poll_fd; struct { @@ -191,6 +192,8 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) static inline int ioevent_uring_submit(IOEventPoller *ioevent) { int result; + + ioevent->submit_count = 0; while (1) { result = io_uring_submit(&ioevent->ring); if (result < 0) { @@ -212,9 +215,9 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_recv(sqe, sockfd, buf, size, 0); - ioevent->submmit_count++; + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } @@ -227,9 +230,9 @@ static inline int ioevent_uring_prep_send(IOEventPoller *ioevent, "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_send(sqe, sockfd, buf, len, 0); - ioevent->submmit_count++; + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } @@ -244,9 +247,9 @@ static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent, return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); - ioevent->submmit_count++; + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } @@ -259,14 +262,20 @@ static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, - IORING_SEND_ZC_REPORT_USAGE); - ioevent->submmit_count++; +#ifdef IORING_SEND_ZC_REPORT_USAGE + IORING_SEND_ZC_REPORT_USAGE +#else + 0 +#endif + ); + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } -static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) +static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, + int fd, void *user_data) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); if (sqe == NULL) { @@ -274,9 +283,30 @@ static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = 0; io_uring_prep_close(sqe, fd); - ioevent->submmit_count++; + if (user_data == NULL) { + /* set sqe->flags MUST after io_uring_prep_xxx */ + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + } else { + sqe->user_data = (long)user_data; + } + ioevent->submit_count++; + return 0; +} + +static inline int ioevent_uring_prep_cancel(IOEventPoller *ioevent, + void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + + io_uring_prep_cancel(sqe, user_data, 0); + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } #endif diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 0a91797..ab5dc04 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -46,11 +46,27 @@ static int ioevent_process(IOEventPoller *ioevent) pEntry = (IOEventEntry *)ioevent->cqe->user_data; if (pEntry != NULL) { if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { - //TODO +#ifdef IORING_NOTIF_USAGE_ZC_COPIED + if (!ioevent->send_zc_logged) { + ioevent->send_zc_logged = true; + if (ioevent->cqe->res & IORING_NOTIF_USAGE_ZC_COPIED) { + logWarning("file: "__FILE__", line: %d, " + "io_uring send_zc: memory copy " + "instead of zero copy!", __LINE__); + } else { + logInfo("file: "__FILE__", line: %d, " + "io_uring send_zc: zero copy OK.", __LINE__); + } + } +#endif } else { pEntry->res = ioevent->cqe->res; pEntry->callback(pEntry->fd, 0, pEntry); } + } else { + logWarning("file: "__FILE__", line: %d, " + "unexpected flags: %d, result: %u", __LINE__, + ioevent->cqe->flags, ioevent->cqe->res); } } @@ -236,8 +252,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } #if IOEVENT_USE_URING - if (thread_data->ev_puller.submmit_count > 0) { - thread_data->ev_puller.submmit_count = 0; + if (thread_data->ev_puller.submit_count > 0) { if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { logError("file: "__FILE__", line: %d, " "io_uring_submit fail, errno: %d, error info: %s", @@ -260,8 +275,8 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->thread_data = pThread; task->event.fd = sock; task->event.callback = callback; - if (use_iouring) { #if IOEVENT_USE_URING + if (use_iouring) { if ((result=uring_prep_first_recv(task)) != 0) { logError("file: "__FILE__", line: %d, " "uring_prep_recv fail, fd: %d, " @@ -269,12 +284,8 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, __LINE__, sock, result, STRERROR(result)); return result; } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } else { +#endif if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) { result = errno != 0 ? errno : ENOENT; logError("file: "__FILE__", line: %d, " @@ -283,7 +294,9 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, __LINE__, sock, result, STRERROR(result)); return result; } +#if IOEVENT_USE_URING } +#endif task->event.timer.expires = g_current_time + timeout; fast_timer_add(&pThread->timer, &task->event.timer); diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 4c76c94..3d3c78c 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -18,6 +18,10 @@ #include "fast_task_queue.h" +#define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ + &task->reffer_count, inc_count) +#define fc_hold_task(task) fc_hold_task_ex(task, 1) + #ifdef __cplusplus extern "C" { #endif @@ -72,17 +76,22 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) } #if IOEVENT_USE_URING + +#define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \ + FC_URING_OP_TYPE(task) = _op_type; \ + fc_hold_task(task) + static inline int uring_prep_recv_data(struct fast_task_info *task, char *buff, const int len) { - FC_URING_OP_TYPE(task) = IORING_OP_RECV; + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); return ioevent_uring_prep_recv(&task->thread_data->ev_puller, task->event.fd, buff, len, task); } static inline int uring_prep_first_recv(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_RECV; + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); return ioevent_uring_prep_recv(&task->thread_data->ev_puller, task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); @@ -90,7 +99,7 @@ static inline int uring_prep_first_recv(struct fast_task_info *task) static inline int uring_prep_next_recv(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_RECV; + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); return ioevent_uring_prep_recv(&task->thread_data->ev_puller, task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, task->recv.ptr->length - task->recv.ptr->offset, task); @@ -98,13 +107,14 @@ static inline int uring_prep_next_recv(struct fast_task_info *task) static inline int uring_prep_first_send(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); return ioevent_uring_prep_send(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); @@ -113,13 +123,14 @@ static inline int uring_prep_first_send(struct fast_task_info *task) static inline int uring_prep_next_send(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); return ioevent_uring_prep_send(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); @@ -128,13 +139,14 @@ static inline int uring_prep_next_send(struct fast_task_info *task) static inline int uring_prep_first_send_zc(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); @@ -143,13 +155,14 @@ static inline int uring_prep_first_send_zc(struct fast_task_info *task) static inline int uring_prep_next_send_zc(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); @@ -158,9 +171,14 @@ static inline int uring_prep_next_send_zc(struct fast_task_info *task) static inline int uring_prep_close_fd(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_CLOSE; return ioevent_uring_prep_close(&task->thread_data-> - ev_puller, task->event.fd); + ev_puller, task->event.fd, NULL); +} + +static inline int uring_prep_cancel(struct fast_task_info *task) +{ + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL); + return ioevent_uring_prep_cancel(&task->thread_data->ev_puller, task); } #endif From 4576f22e245e9e48cf874987a5cdce675921f5a6 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 30 Sep 2025 10:12:41 +0800 Subject: [PATCH 05/13] add function uring_prep_connect --- src/ioevent.h | 83 ++++++++++++++++------------------------- src/ioevent_loop.c | 17 +++++---- src/ioevent_loop.h | 92 +++++++++++++++++++++++++++++++++++----------- 3 files changed, 111 insertions(+), 81 deletions(-) diff --git a/src/ioevent.h b/src/ioevent.h index 1d26a7e..3c70629 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -206,62 +206,47 @@ static inline int ioevent_uring_submit(IOEventPoller *ioevent) } } -static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, - int sockfd, void *buf, size_t size, void *user_data) +static inline struct io_uring_sqe *ioevent_uring_get_sqe(IOEventPoller *ioevent) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); if (sqe == NULL) { logError("file: "__FILE__", line: %d, " "io_uring_get_sqe fail", __LINE__); - return ENOSPC; } + return sqe; +} + +static inline void ioevent_uring_prep_recv(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t size, void *user_data) +{ io_uring_prep_recv(sqe, sockfd, buf, size, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_send(IOEventPoller *ioevent, - int sockfd, void *buf, size_t len, void *user_data) +static inline void ioevent_uring_prep_send(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } io_uring_prep_send(sqe, sockfd, buf, len, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent, - int sockfd, const struct iovec *iovecs, unsigned nr_vecs, - void *user_data) +static inline void ioevent_uring_prep_writev(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, const struct iovec *iovecs, + unsigned nr_vecs, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } - io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, - int sockfd, void *buf, size_t len, void *user_data) +static inline void ioevent_uring_prep_send_zc(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, #ifdef IORING_SEND_ZC_REPORT_USAGE IORING_SEND_ZC_REPORT_USAGE @@ -271,18 +256,11 @@ static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, ); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, - int fd, void *user_data) +static inline void ioevent_uring_prep_close(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int fd, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } io_uring_prep_close(sqe, fd); if (user_data == NULL) { /* set sqe->flags MUST after io_uring_prep_xxx */ @@ -291,24 +269,25 @@ static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, sqe->user_data = (long)user_data; } ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_cancel(IOEventPoller *ioevent, - void *user_data) +static inline void ioevent_uring_prep_cancel(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } - io_uring_prep_cancel(sqe, user_data, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } + +static inline void ioevent_uring_prep_connect(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int fd, const struct sockaddr *addr, + socklen_t addrlen, void *user_data) +{ + io_uring_prep_connect(sqe, fd, addr, addrlen); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + #endif #ifdef __cplusplus diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index ab5dc04..654c03c 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -133,7 +133,8 @@ static void deal_timeouts(FastTimerEntry *head) current = entry; entry = entry->next; - current->prev = current->next = NULL; //must set NULL because NOT in time wheel + /* must set NULL because NOT in time wheel */ + current->prev = current->next = NULL; pEventEntry = (IOEventEntry *)current; if (pEventEntry != NULL) { @@ -277,12 +278,14 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->event.callback = callback; #if IOEVENT_USE_URING if (use_iouring) { - if ((result=uring_prep_first_recv(task)) != 0) { - logError("file: "__FILE__", line: %d, " - "uring_prep_recv fail, fd: %d, " - "errno: %d, error info: %s", - __LINE__, sock, result, STRERROR(result)); - return result; + if (FC_URING_OP_TYPE(task) == IORING_OP_NOP) { + if ((result=uring_prep_first_recv(task)) != 0) { + logError("file: "__FILE__", line: %d, " + "uring_prep_recv fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); + return result; + } } } else { #endif diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 3d3c78c..79b6a8a 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -17,6 +17,9 @@ #define _IOEVENT_LOOP_H #include "fast_task_queue.h" +#if IOEVENT_USE_URING +#include "sockopt.h" +#endif #define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ &task->reffer_count, inc_count) @@ -78,6 +81,10 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) #if IOEVENT_USE_URING #define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \ + struct io_uring_sqe *sqe; \ + if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { \ + return ENOSPC; \ + } \ FC_URING_OP_TYPE(task) = _op_type; \ fc_hold_task(task) @@ -85,100 +92,141 @@ static inline int uring_prep_recv_data(struct fast_task_info *task, char *buff, const int len) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); - return ioevent_uring_prep_recv(&task->thread_data->ev_puller, - task->event.fd, buff, len, task); + ioevent_uring_prep_recv(&task->thread_data->ev_puller, + sqe, task->event.fd, buff, len, task); + return 0; } static inline int uring_prep_first_recv(struct fast_task_info *task) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); - return ioevent_uring_prep_recv(&task->thread_data->ev_puller, - task->event.fd, task->recv.ptr->data, + ioevent_uring_prep_recv(&task->thread_data->ev_puller, + sqe, task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); + return 0; } static inline int uring_prep_next_recv(struct fast_task_info *task) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); - return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + ioevent_uring_prep_recv(&task->thread_data->ev_puller, sqe, task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, task->recv.ptr->length - task->recv.ptr->offset, task); + return 0; } static inline int uring_prep_first_send(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); - return ioevent_uring_prep_send(&task->thread_data->ev_puller, - task->event.fd, task->send.ptr->data, + ioevent_uring_prep_send(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); } + return 0; } static inline int uring_prep_next_send(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); - return ioevent_uring_prep_send(&task->thread_data->ev_puller, + ioevent_uring_prep_send(&task->thread_data->ev_puller, sqe, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); } + return 0; } static inline int uring_prep_first_send_zc(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); - return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, - task->event.fd, task->send.ptr->data, + ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); } + return 0; } static inline int uring_prep_next_send_zc(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); - return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, sqe, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); } + return 0; } static inline int uring_prep_close_fd(struct fast_task_info *task) { - return ioevent_uring_prep_close(&task->thread_data-> - ev_puller, task->event.fd, NULL); + struct io_uring_sqe *sqe; + + if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { + return ENOSPC; + } + + /* do NOT need callback */ + ioevent_uring_prep_close(&task->thread_data-> + ev_puller, sqe, task->event.fd, NULL); + return 0; } static inline int uring_prep_cancel(struct fast_task_info *task) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL); - return ioevent_uring_prep_cancel(&task->thread_data->ev_puller, task); + ioevent_uring_prep_cancel(&task->thread_data->ev_puller, sqe, task); + return 0; +} + +static inline int uring_prep_connect(struct fast_task_info *task) +{ + int result; + sockaddr_convert_t *convert; + + if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, + O_NONBLOCK, NULL, &result)) < 0) + { + return result; + } + + convert = (sockaddr_convert_t *)(task->send.ptr->data + + task->send.ptr->size - 2 * sizeof(sockaddr_convert_t)); + if ((result=setsockaddrbyip(task->server_ip, task->port, convert)) != 0) { + return result; + } + + do { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_CONNECT); + ioevent_uring_prep_connect(&task->thread_data->ev_puller, sqe, + task->event.fd, &convert->sa.addr, convert->len, task); + } while (0); + return 0; } #endif From 7973d81b694bac7845354f6041f4e48ab30f2ee1 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 3 Oct 2025 10:18:25 +0800 Subject: [PATCH 06/13] struct fast_task_info add fields: is_client and op_type for io_uring --- .gitignore | 1 + make.sh | 10 ++++++++++ src/fast_task_queue.h | 13 +++++++++++-- src/fast_timer.h | 3 --- src/ioevent.h | 1 + src/ioevent_loop.c | 4 ++++ src/ioevent_loop.h | 1 + src/tests/{Makefile => Makefile.in} | 4 ++-- 8 files changed, 30 insertions(+), 7 deletions(-) rename src/tests/{Makefile => Makefile.in} (93%) diff --git a/.gitignore b/.gitignore index d0a20d0..e2a0726 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Makefile.in src/Makefile +src/tests/Makefile # Prerequisites *.d diff --git a/make.sh b/make.sh index 47e1553..d65518c 100755 --- a/make.sh +++ b/make.sh @@ -119,6 +119,7 @@ if [ "$uname" = "Linux" ]; then out=$(grep -F IORING_OP_SEND_ZC /usr/include/liburing/io_uring.h) if [ -n "$out" ]; then IOEVENT_USE=IOEVENT_USE_URING + LIBS="$LIBS -luring" else IOEVENT_USE=IOEVENT_USE_EPOLL fi @@ -276,3 +277,12 @@ make $1 $2 $3 if [ "$1" = "clean" ]; then /bin/rm -f Makefile _os_define.h fi + +cd tests +cp Makefile.in Makefile +sed_replace "s#\\\$(CC)#gcc#g" Makefile +sed_replace "s#\\\$(INCS)#$INCS#g" Makefile +sed_replace "s#\\\$(LIBS)#$LIBS#g" Makefile +if [ "$1" = "clean" ]; then + /bin/rm -f Makefile +fi diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index f5b5281..f9e58c2 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -49,7 +49,8 @@ struct sf_network_handler; struct fast_task_info; #if IOEVENT_USE_URING -#define FC_URING_OP_TYPE(task) (task)->event.timer.op_type +#define FC_URING_OP_TYPE(task) (task)->uring.op_type +#define FC_URING_IS_CLIENT(task) (task)->uring.is_client #endif typedef struct ioevent_entry @@ -126,12 +127,20 @@ struct fast_task_info struct fast_net_buffer_wrapper recv; //recv buffer uint16_t port; //peer port + +#if IOEVENT_USE_URING + struct { + int8_t is_client; + uint8_t op_type; + } uring; //since v1.0.81 +#endif + struct { uint8_t current; volatile uint8_t notify; } nio_stages; //stages for network IO - volatile int8_t reffer_count; volatile int8_t canceled; //if task canceled + volatile int reffer_count; int pending_send_count; int64_t req_count; //request count struct { diff --git a/src/fast_timer.h b/src/fast_timer.h index 8727878..e7ee7c5 100644 --- a/src/fast_timer.h +++ b/src/fast_timer.h @@ -27,9 +27,6 @@ typedef struct fast_timer_entry { struct fast_timer_entry *next; int slot_index; bool rehash; -#if IOEVENT_USE_URING - short op_type; -#endif } FastTimerEntry; typedef struct fast_timer_slot { diff --git a/src/ioevent.h b/src/ioevent.h index 3c70629..37741f8 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -33,6 +33,7 @@ #define IOEVENT_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) #elif IOEVENT_USE_URING +#include #include #define IOEVENT_READ POLLIN #define IOEVENT_WRITE POLLOUT diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 654c03c..3695cad 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -286,6 +286,10 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, __LINE__, sock, result, STRERROR(result)); return result; } + } else { + logInfo("file: "__FILE__", line: %d, " + "skip uring_prep_recv, fd: %d, port: %d, in progress op type: %d, timeout: %"PRId64, + __LINE__, sock, task->port, FC_URING_OP_TYPE(task), task->event.timer.expires); } } else { #endif diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 79b6a8a..c8377d1 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -23,6 +23,7 @@ #define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ &task->reffer_count, inc_count) + #define fc_hold_task(task) fc_hold_task_ex(task, 1) #ifdef __cplusplus diff --git a/src/tests/Makefile b/src/tests/Makefile.in similarity index 93% rename from src/tests/Makefile rename to src/tests/Makefile.in index 3ea0d10..669659d 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile.in @@ -1,8 +1,8 @@ .SUFFIXES: .c .o COMPILE = $(CC) -g -O3 -Wall -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -g -DDEBUG_FLAG -INC_PATH = -I/usr/local/include -LIB_PATH = -lfastcommon -lpthread +INC_PATH = $(INCS) +LIB_PATH = -lfastcommon $(LIBS) ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blocked_queue \ test_id_generator test_ini_parser test_char_convert test_char_convert_loader \ From d5dbe3d030226d32e31e765fc86471888120631b Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 3 Oct 2025 21:03:31 +0800 Subject: [PATCH 07/13] free_queue support parameter: need_shrink and set task->shrinked --- HISTORY | 3 ++- src/fast_task_queue.c | 24 ++++++++++++++++-------- src/fast_task_queue.h | 30 +++++++++++++++++------------- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/HISTORY b/HISTORY index a723a5d..59444cf 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,7 @@ -Version 1.81 2025-09-27 +Version 1.81 2025-10-03 * support Linux io_uring + * free_queue support parameter: need_shrink and set task->shrinked Version 1.80 2025-09-10 * getIpaddrByNameEx: IPv4 has priority over IPv6 diff --git a/src/fast_task_queue.c b/src/fast_task_queue.c index 101ad59..719f7e6 100644 --- a/src/fast_task_queue.c +++ b/src/fast_task_queue.c @@ -59,11 +59,11 @@ static int task_alloc_init(struct fast_task_info *task, } int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, - const bool double_buffers, const int max_connections, - const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int padding_size, - const int arg_size, TaskInitCallback init_callback, - void *init_arg) + const bool double_buffers, const bool need_shrink, + const int max_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, + const int padding_size, const int arg_size, + TaskInitCallback init_callback, void *init_arg) { #define MAX_DATA_SIZE (256 * 1024 * 1024) int alloc_once; @@ -123,6 +123,7 @@ int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, } queue->double_buffers = double_buffers; + queue->need_shrink = need_shrink; queue->min_buff_size = aligned_min_size; queue->max_buff_size = aligned_max_size; queue->padding_size = aligned_padding_size; @@ -183,16 +184,23 @@ void free_queue_push(struct fast_task_info *task) task->send.ptr->length = 0; task->send.ptr->offset = 0; task->req_count = 0; - if (task->send.ptr->size > task->free_queue->min_buff_size) {//need thrink - _realloc_buffer(task->send.ptr, task->free_queue->min_buff_size, false); + if (task->free_queue->need_shrink && task->send. + ptr->size > task->free_queue->min_buff_size) + { //need thrink + _realloc_buffer(task->send.ptr, task->free_queue-> + min_buff_size, false); + task->shrinked = true; } if (task->free_queue->double_buffers) { task->recv.ptr->length = 0; task->recv.ptr->offset = 0; - if (task->recv.ptr->size > task->free_queue->min_buff_size) { + if (task->free_queue->need_shrink && task->recv. + ptr->size > task->free_queue->min_buff_size) + { _realloc_buffer(task->recv.ptr, task->free_queue-> min_buff_size, false); + task->shrinked = true; } } diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index f9e58c2..5f5f6ff 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -140,6 +140,7 @@ struct fast_task_info volatile uint8_t notify; } nio_stages; //stages for network IO volatile int8_t canceled; //if task canceled + volatile int8_t shrinked; //if task shrinked, since V1.0.81 volatile int reffer_count; int pending_send_count; int64_t req_count; //request count @@ -169,6 +170,7 @@ struct fast_task_queue int block_size; bool malloc_whole_block; bool double_buffers; //if send buffer and recv buffer are independent + bool need_shrink; struct fast_mblock_man allocator; TaskInitCallback init_callback; void *init_arg; @@ -180,22 +182,22 @@ extern "C" { #endif int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, - const bool double_buffers, const int max_connections, - const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int padding_size, - const int arg_size, TaskInitCallback init_callback, - void *init_arg); + const bool double_buffers, const bool need_shrink, + const int max_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, + const int padding_size, const int arg_size, + TaskInitCallback init_callback, void *init_arg); static inline int free_queue_init_ex(struct fast_task_queue *queue, const char *name, const bool double_buffers, - const int max_connections, const int alloc_task_once, - const int min_buff_size, const int max_buff_size, - const int arg_size) + const bool need_shrink, const int max_connections, + const int alloc_task_once, const int min_buff_size, + const int max_buff_size, const int arg_size) { const int padding_size = 0; - return free_queue_init_ex2(queue, name, double_buffers, max_connections, - alloc_task_once, min_buff_size, max_buff_size, padding_size, - arg_size, NULL, NULL); + return free_queue_init_ex2(queue, name, double_buffers, need_shrink, + max_connections, alloc_task_once, min_buff_size, max_buff_size, + padding_size, arg_size, NULL, NULL); } static inline int free_queue_init(struct fast_task_queue *queue, @@ -204,9 +206,11 @@ static inline int free_queue_init(struct fast_task_queue *queue, { const char *name = ""; const bool double_buffers = false; + const bool need_shrink = true; const int arg_size = 0; - return free_queue_init_ex(queue, name, double_buffers, max_connections, - alloc_task_once, min_buff_size, max_buff_size, arg_size); + return free_queue_init_ex(queue, name, double_buffers, + need_shrink, max_connections, alloc_task_once, + min_buff_size, max_buff_size, arg_size); } static inline void free_queue_set_release_callback( From dac653d6940824eadf2ccc5d09d26697ca4f61e9 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 5 Oct 2025 09:44:24 +0800 Subject: [PATCH 08/13] IOEventCallback: change event type from short to int --- HISTORY | 3 ++- src/fast_task_queue.h | 10 +++------- src/fast_timer.c | 1 + src/ioevent.h | 2 +- src/ioevent_loop.c | 12 ++++++++---- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/HISTORY b/HISTORY index 59444cf..c2f6a4f 100644 --- a/HISTORY +++ b/HISTORY @@ -1,7 +1,8 @@ -Version 1.81 2025-10-03 +Version 1.81 2025-10-05 * support Linux io_uring * free_queue support parameter: need_shrink and set task->shrinked + * IOEventCallback: change event type from short to int Version 1.80 2025-09-10 * getIpaddrByNameEx: IPv4 has priority over IPv6 diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 5f5f6ff..3234358 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -42,7 +42,7 @@ typedef void (*TaskCleanUpCallback) (struct fast_task_info *task); typedef int (*TaskInitCallback)(struct fast_task_info *task, void *arg); typedef void (*TaskReleaseCallback)(struct fast_task_info *task); -typedef void (*IOEventCallback) (int sock, short event, void *arg); +typedef void (*IOEventCallback) (int sock, const int event, void *arg); typedef int (*TaskContinueCallback)(struct fast_task_info *task); struct sf_network_handler; @@ -57,9 +57,7 @@ typedef struct ioevent_entry { FastTimerEntry timer; //must first int fd; -#if IOEVENT_USE_URING - int res; -#endif + int res; //just for io_uring, since v1.0.81 IOEventCallback callback; } IOEventEntry; @@ -128,12 +126,10 @@ struct fast_task_info uint16_t port; //peer port -#if IOEVENT_USE_URING struct { int8_t is_client; uint8_t op_type; - } uring; //since v1.0.81 -#endif + } uring; //just for io_uring, since v1.0.81 struct { uint8_t current; diff --git a/src/fast_timer.c b/src/fast_timer.c index 73bfb5b..53cb666 100644 --- a/src/fast_timer.c +++ b/src/fast_timer.c @@ -185,6 +185,7 @@ int fast_timer_timeouts_get(FastTimer *timer, const int64_t current_time, } else { last->rehash = false; } + continue; } } else { //expired diff --git a/src/ioevent.h b/src/ioevent.h index 37741f8..b82efae 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -22,7 +22,7 @@ #include "_os_define.h" #include "logger.h" -#define IOEVENT_TIMEOUT 0x8000 +#define IOEVENT_TIMEOUT (1 << 20) #if IOEVENT_USE_EPOLL #include diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 3695cad..60a0a9d 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -65,7 +65,7 @@ static int ioevent_process(IOEventPoller *ioevent) } } else { logWarning("file: "__FILE__", line: %d, " - "unexpected flags: %d, result: %u", __LINE__, + "io_uring unexpected flags: %d, result: %d", __LINE__, ioevent->cqe->flags, ioevent->cqe->res); } } @@ -287,9 +287,13 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, return result; } } else { - logInfo("file: "__FILE__", line: %d, " - "skip uring_prep_recv, fd: %d, port: %d, in progress op type: %d, timeout: %"PRId64, - __LINE__, sock, task->port, FC_URING_OP_TYPE(task), task->event.timer.expires); + /* + logWarning("file: "__FILE__", line: %d, " + "skip uring_prep_recv, fd: %d, port: %d, " + "in progress op type: %d, timeout: %"PRId64, + __LINE__, sock, task->port, FC_URING_OP_TYPE(task), + task->event.timer.expires); + */ } } else { #endif From de80dc19dc1e5bf4fd76f23c4fe877108c949212 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 6 Oct 2025 20:53:14 +0800 Subject: [PATCH 09/13] struct ioevent_puller add field service_name --- src/fast_timer.c | 1 + src/ioevent.c | 5 +++-- src/ioevent.h | 5 +++-- src/ioevent_loop.c | 17 ++++++++++++----- src/multi_socket_client.c | 4 ++-- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/fast_timer.c b/src/fast_timer.c index 53cb666..6ab5717 100644 --- a/src/fast_timer.c +++ b/src/fast_timer.c @@ -105,6 +105,7 @@ int fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, if ((result=fast_timer_remove(timer, entry)) == 0) { fast_timer_add_ex(timer, entry, new_expires, true); } + return result; } return 0; diff --git a/src/ioevent.c b/src/ioevent.c index 4b11e18..44d5c04 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -45,8 +45,8 @@ int kqueue_ev_convert(int16_t event, uint16_t flags) } #endif -int ioevent_init(IOEventPoller *ioevent, const int size, - const int timeout_ms, const int extra_events) +int ioevent_init(IOEventPoller *ioevent, const char *service_name, + const int size, const int timeout_ms, const int extra_events) { #if IOEVENT_USE_URING int result; @@ -57,6 +57,7 @@ int ioevent_init(IOEventPoller *ioevent, const int size, ioevent->iterator.count = 0; #endif + ioevent->service_name = service_name; ioevent->size = size; ioevent->extra_events = extra_events; diff --git a/src/ioevent.h b/src/ioevent.h index b82efae..19a517f 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -73,6 +73,7 @@ int kqueue_ev_convert(int16_t event, uint16_t flags); #endif typedef struct ioevent_puller { + const char *service_name; int size; //max events (fd) int extra_events; #if IOEVENT_USE_URING @@ -156,8 +157,8 @@ typedef struct ioevent_puller { extern "C" { #endif -int ioevent_init(IOEventPoller *ioevent, const int size, - const int timeout_ms, const int extra_events); +int ioevent_init(IOEventPoller *ioevent, const char *service_name, + const int size, const int timeout_ms, const int extra_events); void ioevent_destroy(IOEventPoller *ioevent); int ioevent_attach(IOEventPoller *ioevent, const int fd, diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 60a0a9d..e062029 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -48,14 +48,21 @@ static int ioevent_process(IOEventPoller *ioevent) if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { #ifdef IORING_NOTIF_USAGE_ZC_COPIED if (!ioevent->send_zc_logged) { + struct fast_task_info *task; + + task = (struct fast_task_info *)pEntry; ioevent->send_zc_logged = true; if (ioevent->cqe->res & IORING_NOTIF_USAGE_ZC_COPIED) { - logWarning("file: "__FILE__", line: %d, " - "io_uring send_zc: memory copy " - "instead of zero copy!", __LINE__); + logWarning("file: "__FILE__", line: %d, %s " + "client %s:%u, io_uring send_zc: memory " + "copy instead of zero copy!", __LINE__, + ioevent->service_name, task->client_ip, + task->port); } else { - logInfo("file: "__FILE__", line: %d, " - "io_uring send_zc: zero copy OK.", __LINE__); + logInfo("file: "__FILE__", line: %d, %s " + "client %s:%u, io_uring send_zc: zero " + "copy OK.", __LINE__, ioevent->service_name, + task->client_ip, task->port); } } #endif diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c index e122278..b365f58 100644 --- a/src/multi_socket_client.c +++ b/src/multi_socket_client.c @@ -65,8 +65,8 @@ int fast_multi_sock_client_init_ex(FastMultiSockClient *client, return EINVAL; } - if ((result=ioevent_init(&client->ioevent, entry_count, - timeout_ms, 0)) != 0) + if ((result=ioevent_init(&client->ioevent, "client", + entry_count, timeout_ms, 0)) != 0) { logError("file: "__FILE__", line: %d, " "ioevent_init fail, errno: %d, error info: %s", From 065184a2030620332103622978b7840b95cfcf9b Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 7 Oct 2025 19:52:48 +0800 Subject: [PATCH 10/13] batch ioevent_uring_submit for RDMA network --- src/ioevent.c | 9 ++++++--- src/ioevent_loop.c | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/ioevent.c b/src/ioevent.c index 44d5c04..c6552b5 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -138,7 +138,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, } io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); sqe->user_data = (long)data; - return ioevent_uring_submit(ioevent); + ioevent->submit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int n = 0; @@ -174,7 +175,8 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); sqe->user_data = (long)data; - return ioevent_uring_submit(ioevent); + ioevent->submit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int result; @@ -219,7 +221,8 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) io_uring_prep_cancel_fd(sqe, fd, 0); /* set sqe->flags MUST after io_uring_prep_xxx */ sqe->flags = IOSQE_CQE_SKIP_SUCCESS; - return ioevent_uring_submit(ioevent); + ioevent->submit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index e062029..d34e682 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -201,6 +201,17 @@ int ioevent_loop(struct nio_thread_data *thread_data, sched_pull = true; #endif +#if IOEVENT_USE_URING + if (thread_data->ev_puller.submit_count > 0) { + if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { + logError("file: "__FILE__", line: %d, " + "io_uring_submit fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + } +#endif + if (sched_pull) { if ((result=ioevent_process(&thread_data->ev_puller)) != 0) { return result; @@ -258,17 +269,6 @@ int ioevent_loop(struct nio_thread_data *thread_data, if (thread_data->thread_loop_callback != NULL) { thread_data->thread_loop_callback(thread_data); } - -#if IOEVENT_USE_URING - if (thread_data->ev_puller.submit_count > 0) { - if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { - logError("file: "__FILE__", line: %d, " - "io_uring_submit fail, errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); - return result; - } - } -#endif } return 0; From 23cd03bc76fa826986001497e639fb48873c8634 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 18 Oct 2025 15:48:31 +0800 Subject: [PATCH 11/13] add macro IPV4_ADDRESS_SIZE and IPV6_ADDRESS_SIZE --- src/common_define.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common_define.h b/src/common_define.h index 0913390..4c96473 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -125,7 +125,8 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind); #define FC_IOV_BATCH_SIZE IOV_MAX #endif -// 由于要支持IPv6,所以将IP_ADDRESS_SIZE的值由16修改为46 +#define IPV4_ADDRESS_SIZE INET_ADDRSTRLEN //16 +#define IPV6_ADDRESS_SIZE INET6_ADDRSTRLEN //46 #define IP_ADDRESS_SIZE INET6_ADDRSTRLEN //46 #define FORMATTED_IP_SIZE (IP_ADDRESS_SIZE + 2) #define INFINITE_FILE_SIZE (256 * 1024LL * 1024 * 1024 * 1024 * 1024LL) From ddf6b5dfe9c21505a962d60604a2920e45b9c0a2 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 19 Oct 2025 20:10:12 +0800 Subject: [PATCH 12/13] send zc done notify callback for recycling buffer --- src/fast_task_queue.h | 5 +++-- src/ioevent.c | 1 + src/ioevent.h | 8 ++++++++ src/ioevent_loop.c | 4 ++++ src/ioevent_loop.h | 10 ++++++++++ 5 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 3234358..d211304 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -49,8 +49,9 @@ struct sf_network_handler; struct fast_task_info; #if IOEVENT_USE_URING -#define FC_URING_OP_TYPE(task) (task)->uring.op_type -#define FC_URING_IS_CLIENT(task) (task)->uring.is_client +#define FC_URING_OP_TYPE(task) (task)->uring.op_type +#define FC_URING_IS_CLIENT(task) (task)->uring.is_client +#define FC_URING_IS_SEND_ZC(task) ((task)->uring.op_type == IORING_OP_SEND_ZC) #endif typedef struct ioevent_entry diff --git a/src/ioevent.c b/src/ioevent.c index c6552b5..7e0f531 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -75,6 +75,7 @@ int ioevent_init(IOEventPoller *ioevent, const char *service_name, ioevent->cqe = NULL; ioevent->submit_count = 0; ioevent->send_zc_logged = false; + ioevent->send_zc_done_notify = false; #elif IOEVENT_USE_KQUEUE ioevent->poll_fd = kqueue(); if (ioevent->poll_fd < 0) { diff --git a/src/ioevent.h b/src/ioevent.h index 19a517f..fe4ba96 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -23,6 +23,7 @@ #include "logger.h" #define IOEVENT_TIMEOUT (1 << 20) +#define IOEVENT_NOTIFY (1 << 21) //for io_uring send_zc done callback #if IOEVENT_USE_EPOLL #include @@ -80,6 +81,7 @@ typedef struct ioevent_puller { struct io_uring ring; int submit_count; bool send_zc_logged; + bool send_zc_done_notify; //if callback when send_zc done #else int poll_fd; struct { @@ -191,6 +193,12 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) } #if IOEVENT_USE_URING +static inline void ioevent_set_send_zc_done_notify( + IOEventPoller *ioevent, const bool need_notify) +{ + ioevent->send_zc_done_notify = need_notify; +} + static inline int ioevent_uring_submit(IOEventPoller *ioevent) { int result; diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index d34e682..1587cff 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -46,6 +46,10 @@ static int ioevent_process(IOEventPoller *ioevent) pEntry = (IOEventEntry *)ioevent->cqe->user_data; if (pEntry != NULL) { if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { + if (ioevent->send_zc_done_notify) { + pEntry->callback(pEntry->fd, IOEVENT_NOTIFY, pEntry); + } + #ifdef IORING_NOTIF_USAGE_ZC_COPIED if (!ioevent->send_zc_logged) { struct fast_task_info *task; diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index c8377d1..a12f33d 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -158,6 +158,11 @@ static inline int uring_prep_first_send_zc(struct fast_task_info *task) sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); + } else if (task->send.ptr->length < 4096) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); + ioevent_uring_prep_send(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, @@ -175,6 +180,11 @@ static inline int uring_prep_next_send_zc(struct fast_task_info *task) sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); + } else if (task->send.ptr->length - task->send.ptr->offset < 4096) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); + ioevent_uring_prep_send(&task->thread_data->ev_puller, sqe, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, sqe, From 511b1066c42f85ccf4d9f4c1887b531389e7eb69 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 24 Oct 2025 11:59:43 +0800 Subject: [PATCH 13/13] add macro function fc_string_equals_ex --- src/common_define.h | 8 ++++++++ src/connection_pool.c | 28 ++++++++++++---------------- src/connection_pool.h | 14 ++++++++++++-- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/common_define.h b/src/common_define.h index 4c96473..7dea6ed 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -455,6 +455,14 @@ static inline int fc_string_compare(const string_t *s1, const string_t *s2) } } +static inline bool fc_string_equal_ex(const char *str1, + const int len1, const char *str2, const int len2) +{ + return (len1 == len2) && (memcmp(str1, str2, len1) == 0); +} +#define fc_string_equals_ex(str1, len1, str2, len2) \ + fc_string_equal_ex(str1, len1, str2, len2) + static inline bool fc_string_equal(const string_t *s1, const string_t *s2) { return (s1->len == s2->len) && (memcmp(s1->str, s2->str, s1->len) == 0); diff --git a/src/connection_pool.c b/src/connection_pool.c index 26b256f..29ee4d7 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -25,10 +25,20 @@ #include "server_id_func.h" #include "connection_pool.h" +static void conn_pool_disconnect_server_cb(ConnectionInfo *conn) +{ + conn_pool_disconnect_server(conn); +} + +static bool conn_pool_is_connected_cb(ConnectionInfo *conn) +{ + return conn_pool_is_connected(conn); +} + ConnectionCallbacks g_connection_callbacks = { false, {{conn_pool_connect_server_ex1, - conn_pool_disconnect_server, - conn_pool_is_connected}, + conn_pool_disconnect_server_cb, + conn_pool_is_connected_cb}, {NULL, NULL, NULL}}, {NULL} }; @@ -411,20 +421,6 @@ void conn_pool_destroy(ConnectionPool *cp) fast_mblock_destroy(&cp->node_allocator); } -void conn_pool_disconnect_server(ConnectionInfo *conn) -{ - if (conn->sock >= 0) - { - close(conn->sock); - conn->sock = -1; - } -} - -bool conn_pool_is_connected(ConnectionInfo *conn) -{ - return (conn->sock >= 0); -} - int conn_pool_connect_server_ex1(ConnectionInfo *conn, const char *service_name, const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error) diff --git a/src/connection_pool.h b/src/connection_pool.h index 8f44f20..a667aa7 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -331,9 +331,19 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, * conn: the connection * return 0 for success, != 0 for error */ -void conn_pool_disconnect_server(ConnectionInfo *conn); +static inline void conn_pool_disconnect_server(ConnectionInfo *conn) +{ + if (conn->sock >= 0) + { + close(conn->sock); + conn->sock = -1; + } +} -bool conn_pool_is_connected(ConnectionInfo *conn); +static inline bool conn_pool_is_connected(ConnectionInfo *conn) +{ + return (conn->sock >= 0); +} /** * connect to the server