From 4576f22e245e9e48cf874987a5cdce675921f5a6 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 30 Sep 2025 10:12:41 +0800 Subject: [PATCH] add function uring_prep_connect --- src/ioevent.h | 83 ++++++++++++++++------------------------- src/ioevent_loop.c | 17 +++++---- src/ioevent_loop.h | 92 +++++++++++++++++++++++++++++++++++----------- 3 files changed, 111 insertions(+), 81 deletions(-) diff --git a/src/ioevent.h b/src/ioevent.h index 1d26a7e..3c70629 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -206,62 +206,47 @@ static inline int ioevent_uring_submit(IOEventPoller *ioevent) } } -static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent, - int sockfd, void *buf, size_t size, void *user_data) +static inline struct io_uring_sqe *ioevent_uring_get_sqe(IOEventPoller *ioevent) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); if (sqe == NULL) { logError("file: "__FILE__", line: %d, " "io_uring_get_sqe fail", __LINE__); - return ENOSPC; } + return sqe; +} + +static inline void ioevent_uring_prep_recv(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t size, void *user_data) +{ io_uring_prep_recv(sqe, sockfd, buf, size, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_send(IOEventPoller *ioevent, - int sockfd, void *buf, size_t len, void *user_data) +static inline void ioevent_uring_prep_send(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } io_uring_prep_send(sqe, sockfd, buf, len, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent, - int sockfd, const struct iovec *iovecs, unsigned nr_vecs, - void *user_data) +static inline void ioevent_uring_prep_writev(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, const struct iovec *iovecs, + unsigned nr_vecs, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } - io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, - int sockfd, void *buf, size_t len, void *user_data) +static inline void ioevent_uring_prep_send_zc(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, #ifdef IORING_SEND_ZC_REPORT_USAGE IORING_SEND_ZC_REPORT_USAGE @@ -271,18 +256,11 @@ static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent, ); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, - int fd, void *user_data) +static inline void ioevent_uring_prep_close(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int fd, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } io_uring_prep_close(sqe, fd); if (user_data == NULL) { /* set sqe->flags MUST after io_uring_prep_xxx */ @@ -291,24 +269,25 @@ static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, sqe->user_data = (long)user_data; } ioevent->submit_count++; - return 0; } -static inline int ioevent_uring_prep_cancel(IOEventPoller *ioevent, - void *user_data) +static inline void ioevent_uring_prep_cancel(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, void *user_data) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); - if (sqe == NULL) { - logError("file: "__FILE__", line: %d, " - "io_uring_get_sqe fail", __LINE__); - return ENOSPC; - } - io_uring_prep_cancel(sqe, user_data, 0); sqe->user_data = (long)user_data; ioevent->submit_count++; - return 0; } + +static inline void ioevent_uring_prep_connect(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int fd, const struct sockaddr *addr, + socklen_t addrlen, void *user_data) +{ + io_uring_prep_connect(sqe, fd, addr, addrlen); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + #endif #ifdef __cplusplus diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index ab5dc04..654c03c 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -133,7 +133,8 @@ static void deal_timeouts(FastTimerEntry *head) current = entry; entry = entry->next; - current->prev = current->next = NULL; //must set NULL because NOT in time wheel + /* must set NULL because NOT in time wheel */ + current->prev = current->next = NULL; pEventEntry = (IOEventEntry *)current; if (pEventEntry != NULL) { @@ -277,12 +278,14 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->event.callback = callback; #if IOEVENT_USE_URING if (use_iouring) { - if ((result=uring_prep_first_recv(task)) != 0) { - logError("file: "__FILE__", line: %d, " - "uring_prep_recv fail, fd: %d, " - "errno: %d, error info: %s", - __LINE__, sock, result, STRERROR(result)); - return result; + if (FC_URING_OP_TYPE(task) == IORING_OP_NOP) { + if ((result=uring_prep_first_recv(task)) != 0) { + logError("file: "__FILE__", line: %d, " + "uring_prep_recv fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); + return result; + } } } else { #endif diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 3d3c78c..79b6a8a 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -17,6 +17,9 @@ #define _IOEVENT_LOOP_H #include "fast_task_queue.h" +#if IOEVENT_USE_URING +#include "sockopt.h" +#endif #define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ &task->reffer_count, inc_count) @@ -78,6 +81,10 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) #if IOEVENT_USE_URING #define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \ + struct io_uring_sqe *sqe; \ + if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { \ + return ENOSPC; \ + } \ FC_URING_OP_TYPE(task) = _op_type; \ fc_hold_task(task) @@ -85,100 +92,141 @@ static inline int uring_prep_recv_data(struct fast_task_info *task, char *buff, const int len) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); - return ioevent_uring_prep_recv(&task->thread_data->ev_puller, - task->event.fd, buff, len, task); + ioevent_uring_prep_recv(&task->thread_data->ev_puller, + sqe, task->event.fd, buff, len, task); + return 0; } static inline int uring_prep_first_recv(struct fast_task_info *task) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); - return ioevent_uring_prep_recv(&task->thread_data->ev_puller, - task->event.fd, task->recv.ptr->data, + ioevent_uring_prep_recv(&task->thread_data->ev_puller, + sqe, task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task); + return 0; } static inline int uring_prep_next_recv(struct fast_task_info *task) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); - return ioevent_uring_prep_recv(&task->thread_data->ev_puller, + ioevent_uring_prep_recv(&task->thread_data->ev_puller, sqe, task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, task->recv.ptr->length - task->recv.ptr->offset, task); + return 0; } static inline int uring_prep_first_send(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); - return ioevent_uring_prep_send(&task->thread_data->ev_puller, - task->event.fd, task->send.ptr->data, + ioevent_uring_prep_send(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); } + return 0; } static inline int uring_prep_next_send(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); - return ioevent_uring_prep_send(&task->thread_data->ev_puller, + ioevent_uring_prep_send(&task->thread_data->ev_puller, sqe, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); } + return 0; } static inline int uring_prep_first_send_zc(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); - return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, - task->event.fd, task->send.ptr->data, + ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, task->send.ptr->length, task); } + return 0; } static inline int uring_prep_next_send_zc(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); - return ioevent_uring_prep_writev(&task->thread_data->ev_puller, - task->event.fd, task->iovec_array.iovs, + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX), task); } else { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); - return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, sqe, task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->send.ptr->length - task->send.ptr->offset, task); } + return 0; } static inline int uring_prep_close_fd(struct fast_task_info *task) { - return ioevent_uring_prep_close(&task->thread_data-> - ev_puller, task->event.fd, NULL); + struct io_uring_sqe *sqe; + + if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { + return ENOSPC; + } + + /* do NOT need callback */ + ioevent_uring_prep_close(&task->thread_data-> + ev_puller, sqe, task->event.fd, NULL); + return 0; } static inline int uring_prep_cancel(struct fast_task_info *task) { SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL); - return ioevent_uring_prep_cancel(&task->thread_data->ev_puller, task); + ioevent_uring_prep_cancel(&task->thread_data->ev_puller, sqe, task); + return 0; +} + +static inline int uring_prep_connect(struct fast_task_info *task) +{ + int result; + sockaddr_convert_t *convert; + + if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, + O_NONBLOCK, NULL, &result)) < 0) + { + return result; + } + + convert = (sockaddr_convert_t *)(task->send.ptr->data + + task->send.ptr->size - 2 * sizeof(sockaddr_convert_t)); + if ((result=setsockaddrbyip(task->server_ip, task->port, convert)) != 0) { + return result; + } + + do { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_CONNECT); + ioevent_uring_prep_connect(&task->thread_data->ev_puller, sqe, + task->event.fd, &convert->sa.addr, convert->len, task); + } while (0); + return 0; } #endif