From 012b2038ee9f10bc4f670519b51deee3d420d90f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 25 Sep 2025 14:49:37 +0800 Subject: [PATCH] add functions uring_prep_xxx --- src/fast_task_queue.h | 7 ++++ src/fast_timer.h | 3 ++ src/ioevent.c | 3 +- src/ioevent.h | 65 +++++++++++++++++++++++++++++++ src/ioevent_loop.c | 6 +-- src/ioevent_loop.h | 89 ++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 167 insertions(+), 6 deletions(-) diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 9902003..f5b5281 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -48,10 +48,17 @@ typedef int (*TaskContinueCallback)(struct fast_task_info *task); struct sf_network_handler; struct fast_task_info; +#if IOEVENT_USE_URING +#define FC_URING_OP_TYPE(task) (task)->event.timer.op_type +#endif + typedef struct ioevent_entry { FastTimerEntry timer; //must first int fd; +#if IOEVENT_USE_URING + int res; +#endif IOEventCallback callback; } IOEventEntry; diff --git a/src/fast_timer.h b/src/fast_timer.h index e7ee7c5..8727878 100644 --- a/src/fast_timer.h +++ b/src/fast_timer.h @@ -27,6 +27,9 @@ typedef struct fast_timer_entry { struct fast_timer_entry *next; int slot_index; bool rehash; +#if IOEVENT_USE_URING + short op_type; +#endif } FastTimerEntry; typedef struct fast_timer_slot { diff --git a/src/ioevent.c b/src/ioevent.c index aca7e53..5057368 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -217,7 +217,8 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; sqe->user_data = 0; io_uring_prep_cancel_fd(sqe, fd, 0); - return ioevent_uring_submit(ioevent); + ioevent->submmit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; diff --git a/src/ioevent.h b/src/ioevent.h index 9a0fef8..458be8e 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -20,6 +20,7 @@ #include #include #include "_os_define.h" +#include "logger.h" #define IOEVENT_TIMEOUT 0x8000 @@ -207,6 +208,8 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, { struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); return ENOSPC; } sqe->user_data = (long)user_data; @@ -214,6 +217,68 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, ioevent->submmit_count++; return 0; } + +static inline int ioevent_uring_prep_send(IOEventPoller *ioevent, + int sockfd, void *buf, size_t len, void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + sqe->user_data = (long)user_data; + io_uring_prep_send(sqe, sockfd, buf, len, 0); + ioevent->submmit_count++; + return 0; +} + +static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent, + int sockfd, const struct iovec *iovecs, unsigned nr_vecs, + void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + + sqe->user_data = (long)user_data; + io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); + ioevent->submmit_count++; + return 0; +} + +static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, + int sockfd, void *buf, size_t len, void *user_data) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + sqe->user_data = (long)user_data; + io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, + IORING_SEND_ZC_REPORT_USAGE); + ioevent->submmit_count++; + return 0; +} + +static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + return ENOSPC; + } + sqe->user_data = 0; + io_uring_prep_close(sqe, fd); + ioevent->submmit_count++; + return 0; +} #endif #ifdef __cplusplus diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 030f577..0a91797 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -31,7 +31,6 @@ static int ioevent_process(IOEventPoller *ioevent) case 0: break; case -ETIME: - case -EAGAIN: case -EINTR: return 0; default: @@ -49,7 +48,8 @@ static int ioevent_process(IOEventPoller *ioevent) if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { //TODO } else { - pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry); + pEntry->res = ioevent->cqe->res; + pEntry->callback(pEntry->fd, 0, pEntry); } } } @@ -262,7 +262,7 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->event.callback = callback; if (use_iouring) { #if IOEVENT_USE_URING - if ((result=uring_prep_recv_by_task(task)) != 0) { + if ((result=uring_prep_first_recv(task)) != 0) { logError("file: "__FILE__", line: %d, " "uring_prep_recv fail, fd: %d, " "errno: %d, error info: %s", diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index aee74fc..4c76c94 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -72,10 +72,95 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) } #if IOEVENT_USE_URING -static inline int uring_prep_recv_by_task(struct fast_task_info *task) +static inline int uring_prep_recv_data(struct fast_task_info *task, + char *buff, const int len) { + FC_URING_OP_TYPE(task) = IORING_OP_RECV; return ioevent_uring_prep_recv(&task->thread_data->ev_puller, - task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); + task->event.fd, buff, len, task); +} + +static inline int uring_prep_first_recv(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_RECV; + return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + task->event.fd, task->recv.ptr->data, + task->recv.ptr->size, task); +} + +static inline int uring_prep_next_recv(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_RECV; + return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, + task->recv.ptr->length - task->recv.ptr->offset, task); +} + +static inline int uring_prep_first_send(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } +} + +static inline int uring_prep_next_send(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } +} + +static inline int uring_prep_first_send_zc(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } +} + +static inline int uring_prep_next_send_zc(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_SEND; + if (task->iovec_array.iovs != NULL) { + return ioevent_uring_prep_writev(&task->thread_data->ev_puller, + task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } +} + +static inline int uring_prep_close_fd(struct fast_task_info *task) +{ + FC_URING_OP_TYPE(task) = IORING_OP_CLOSE; + return ioevent_uring_prep_close(&task->thread_data-> + ev_puller, task->event.fd); } #endif