From cb6f6f13f38fe9d200f8c5f9d03e878b2c69b48e Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 27 Sep 2025 15:37:37 +0800 Subject: [PATCH] support Linux io_uring OK --- HISTORY | 3 +++ src/ioevent.c | 17 +++++++------- src/ioevent.h | 56 +++++++++++++++++++++++++++++++++++----------- src/ioevent_loop.c | 31 +++++++++++++++++-------- src/ioevent_loop.h | 36 +++++++++++++++++++++-------- 5 files changed, 103 insertions(+), 40 deletions(-) diff --git a/HISTORY b/HISTORY index 8834474..a723a5d 100644 --- a/HISTORY +++ b/HISTORY @@ -1,4 +1,7 @@ +Version 1.81 2025-09-27 + * support Linux io_uring + Version 1.80 2025-09-10 * getIpaddrByNameEx: IPv4 has priority over IPv6 * shared_func.[hc]: add function fc_ftoa diff --git a/src/ioevent.c b/src/ioevent.c index 5057368..4b11e18 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -72,7 +72,8 @@ int ioevent_init(IOEventPoller *ioevent, const int size, return -result; } ioevent->cqe = NULL; - ioevent->submmit_count = 0; + ioevent->submit_count = 0; + ioevent->send_zc_logged = false; #elif IOEVENT_USE_KQUEUE ioevent->poll_fd = kqueue(); if (ioevent->poll_fd < 0) { @@ -134,8 +135,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, if (sqe == NULL) { return ENOSPC; } - sqe->user_data = (long)data; io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); + sqe->user_data = (long)data; return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; @@ -169,9 +170,9 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, if (sqe == NULL) { return ENOSPC; } - sqe->user_data = (long)data; io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); + sqe->user_data = (long)data; return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; @@ -214,11 +215,10 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) if (sqe == NULL) { return ENOSPC; } - sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; - sqe->user_data = 0; io_uring_prep_cancel_fd(sqe, fd, 0); - ioevent->submmit_count++; - return 0; + /* set sqe->flags MUST after io_uring_prep_xxx */ + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + return ioevent_uring_submit(ioevent); #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; @@ -256,7 +256,7 @@ int ioevent_poll(IOEventPoller *ioevent) int result; int retval; unsigned int nget = 1; - if((retval = port_getn(ioevent->poll_fd, ioevent->events, + if((retval=port_getn(ioevent->poll_fd, ioevent->events, ioevent->size, &nget, &ioevent->timeout)) == 0) { result = (int)nget; @@ -282,4 +282,3 @@ int ioevent_poll(IOEventPoller *ioevent) #error port me #endif } - diff --git a/src/ioevent.h b/src/ioevent.h index 458be8e..1d26a7e 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -76,7 +76,8 @@ typedef struct ioevent_puller { int extra_events; #if IOEVENT_USE_URING struct io_uring ring; - int submmit_count; + int submit_count; + bool send_zc_logged; #else int poll_fd; struct { @@ -191,6 +192,8 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) static inline int ioevent_uring_submit(IOEventPoller *ioevent) { int result; + + ioevent->submit_count = 0; while (1) { result = io_uring_submit(&ioevent->ring); if (result < 0) { @@ -212,9 +215,9 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_recv(sqe, sockfd, buf, size, 0); - ioevent->submmit_count++; + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } @@ -227,9 +230,9 @@ static inline int ioevent_uring_prep_send(IOEventPoller *ioevent, "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_send(sqe, sockfd, buf, len, 0); - ioevent->submmit_count++; + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } @@ -244,9 +247,9 @@ static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent, return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); - ioevent->submmit_count++; + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } @@ -259,14 +262,20 @@ static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = (long)user_data; io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, - IORING_SEND_ZC_REPORT_USAGE); - ioevent->submmit_count++; +#ifdef IORING_SEND_ZC_REPORT_USAGE + IORING_SEND_ZC_REPORT_USAGE +#else + 0 +#endif + ); + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } -static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) +static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, + int fd, void *user_data) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); if (sqe == NULL) { @@ -274,9 +283,30 @@ static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) "io_uring_get_sqe fail", __LINE__); return ENOSPC; } - sqe->user_data = 0; io_uring_prep_close(sqe, fd); - ioevent->submmit_count++; + if (user_data == NULL) { + /* set sqe->flags MUST after io_uring_prep_xxx */ + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + } else { + sqe->user_data = (long)user_data; + } + ioevent->submit_count++; + return 0; +} + +static inline int ioevent_uring_prep_cancel(IOEventPoller *ioevent, + void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + + io_uring_prep_cancel(sqe, user_data, 0); + sqe->user_data = (long)user_data; + ioevent->submit_count++; return 0; } #endif diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 0a91797..ab5dc04 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -46,11 +46,27 @@ static int ioevent_process(IOEventPoller *ioevent) pEntry = (IOEventEntry *)ioevent->cqe->user_data; if (pEntry != NULL) { if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { - //TODO +#ifdef IORING_NOTIF_USAGE_ZC_COPIED + if (!ioevent->send_zc_logged) { + ioevent->send_zc_logged = true; + if (ioevent->cqe->res & IORING_NOTIF_USAGE_ZC_COPIED) { + logWarning("file: "__FILE__", line: %d, " + "io_uring send_zc: memory copy " + "instead of zero copy!", __LINE__); + } else { + logInfo("file: "__FILE__", line: %d, " + "io_uring send_zc: zero copy OK.", __LINE__); + } + } +#endif } else { pEntry->res = ioevent->cqe->res; pEntry->callback(pEntry->fd, 0, pEntry); } + } else { + logWarning("file: "__FILE__", line: %d, " + "unexpected flags: %d, result: %u", __LINE__, + ioevent->cqe->flags, ioevent->cqe->res); } } @@ -236,8 +252,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } #if IOEVENT_USE_URING - if (thread_data->ev_puller.submmit_count > 0) { - thread_data->ev_puller.submmit_count = 0; + if (thread_data->ev_puller.submit_count > 0) { if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { logError("file: "__FILE__", line: %d, " "io_uring_submit fail, errno: %d, error info: %s", @@ -260,8 +275,8 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->thread_data = pThread; task->event.fd = sock; task->event.callback = callback; - if (use_iouring) { #if IOEVENT_USE_URING + if (use_iouring) { if ((result=uring_prep_first_recv(task)) != 0) { logError("file: "__FILE__", line: %d, " "uring_prep_recv fail, fd: %d, " @@ -269,12 +284,8 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, __LINE__, sock, result, STRERROR(result)); return result; } -#else - logError("file: "__FILE__", line: %d, " - "some mistakes happen!", __LINE__); - return EBUSY; -#endif } else { +#endif if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) { result = errno != 0 ? errno : ENOENT; logError("file: "__FILE__", line: %d, " @@ -283,7 +294,9 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, __LINE__, sock, result, STRERROR(result)); return result; } +#if IOEVENT_USE_URING } +#endif task->event.timer.expires = g_current_time + timeout; fast_timer_add(&pThread->timer, &task->event.timer); diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 4c76c94..3d3c78c 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -18,6 +18,10 @@ #include "fast_task_queue.h" +#define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ + &task->reffer_count, inc_count) +#define fc_hold_task(task) fc_hold_task_ex(task, 1) + #ifdef __cplusplus extern "C" { #endif @@ -72,17 +76,22 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) } #if IOEVENT_USE_URING + +#define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \ + FC_URING_OP_TYPE(task) = _op_type; \ + fc_hold_task(task) + static inline int uring_prep_recv_data(struct fast_task_info *task, char *buff, const int len) { - FC_URING_OP_TYPE(task) = IORING_OP_RECV; + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); return ioevent_uring_prep_recv(&task->thread_data->ev_puller, task->event.fd, buff, len, task); } static inline int uring_prep_first_recv(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_RECV; + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); return ioevent_uring_prep_recv(&task->thread_data->ev_puller, task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); @@ -90,7 +99,7 @@ static inline int uring_prep_first_recv(struct fast_task_info *task) static inline int uring_prep_next_recv(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_RECV; + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); return ioevent_uring_prep_recv(&task->thread_data->ev_puller, task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, task->recv.ptr->length - task->recv.ptr->offset, task); @@ -98,13 +107,14 @@ static inline int uring_prep_next_recv(struct fast_task_info *task) static inline int uring_prep_first_send(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); return ioevent_uring_prep_send(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); @@ -113,13 +123,14 @@ static inline int uring_prep_first_send(struct fast_task_info *task) static inline int uring_prep_next_send(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); return ioevent_uring_prep_send(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); @@ -128,13 +139,14 @@ static inline int uring_prep_next_send(struct fast_task_info *task) static inline int uring_prep_first_send_zc(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); @@ -143,13 +155,14 @@ static inline int uring_prep_first_send_zc(struct fast_task_info *task) static inline int uring_prep_next_send_zc(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_SEND; if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); return ioevent_uring_prep_writev(&task->thread_data->ev_puller, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); @@ -158,9 +171,14 @@ static inline int uring_prep_next_send_zc(struct fast_task_info *task) static inline int uring_prep_close_fd(struct fast_task_info *task) { - FC_URING_OP_TYPE(task) = IORING_OP_CLOSE; return ioevent_uring_prep_close(&task->thread_data-> - ev_puller, task->event.fd); + ev_puller, task->event.fd, NULL); +} + +static inline int uring_prep_cancel(struct fast_task_info *task) +{ + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL); + return ioevent_uring_prep_cancel(&task->thread_data->ev_puller, task); } #endif