support Linux io_uring OK

use_iouring
YuQing 2025-09-27 15:37:37 +08:00
parent 012b2038ee
commit cb6f6f13f3
5 changed files with 103 additions and 40 deletions

View File

@ -1,4 +1,7 @@
Version 1.81 2025-09-27
* support Linux io_uring
Version 1.80 2025-09-10 Version 1.80 2025-09-10
* getIpaddrByNameEx: IPv4 has priority over IPv6 * getIpaddrByNameEx: IPv4 has priority over IPv6
* shared_func.[hc]: add function fc_ftoa * shared_func.[hc]: add function fc_ftoa

View File

@ -72,7 +72,8 @@ int ioevent_init(IOEventPoller *ioevent, const int size,
return -result; return -result;
} }
ioevent->cqe = NULL; ioevent->cqe = NULL;
ioevent->submmit_count = 0; ioevent->submit_count = 0;
ioevent->send_zc_logged = false;
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
ioevent->poll_fd = kqueue(); ioevent->poll_fd = kqueue();
if (ioevent->poll_fd < 0) { if (ioevent->poll_fd < 0) {
@ -134,8 +135,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd,
if (sqe == NULL) { if (sqe == NULL) {
return ENOSPC; return ENOSPC;
} }
sqe->user_data = (long)data;
io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events);
sqe->user_data = (long)data;
return ioevent_uring_submit(ioevent); return ioevent_uring_submit(ioevent);
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent ev[2]; struct kevent ev[2];
@ -169,9 +170,9 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd,
if (sqe == NULL) { if (sqe == NULL) {
return ENOSPC; return ENOSPC;
} }
sqe->user_data = (long)data;
io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data,
e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS);
sqe->user_data = (long)data;
return ioevent_uring_submit(ioevent); return ioevent_uring_submit(ioevent);
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent ev[2]; struct kevent ev[2];
@ -214,11 +215,10 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd)
if (sqe == NULL) { if (sqe == NULL) {
return ENOSPC; return ENOSPC;
} }
sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
sqe->user_data = 0;
io_uring_prep_cancel_fd(sqe, fd, 0); io_uring_prep_cancel_fd(sqe, fd, 0);
ioevent->submmit_count++; /* set sqe->flags MUST after io_uring_prep_xxx */
return 0; sqe->flags = IOSQE_CQE_SKIP_SUCCESS;
return ioevent_uring_submit(ioevent);
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
struct kevent ev[1]; struct kevent ev[1];
int r, w; int r, w;
@ -282,4 +282,3 @@ int ioevent_poll(IOEventPoller *ioevent)
#error port me #error port me
#endif #endif
} }

View File

@ -76,7 +76,8 @@ typedef struct ioevent_puller {
int extra_events; int extra_events;
#if IOEVENT_USE_URING #if IOEVENT_USE_URING
struct io_uring ring; struct io_uring ring;
int submmit_count; int submit_count;
bool send_zc_logged;
#else #else
int poll_fd; int poll_fd;
struct { struct {
@ -191,6 +192,8 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms)
static inline int ioevent_uring_submit(IOEventPoller *ioevent) static inline int ioevent_uring_submit(IOEventPoller *ioevent)
{ {
int result; int result;
ioevent->submit_count = 0;
while (1) { while (1) {
result = io_uring_submit(&ioevent->ring); result = io_uring_submit(&ioevent->ring);
if (result < 0) { if (result < 0) {
@ -212,9 +215,9 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent,
"io_uring_get_sqe fail", __LINE__); "io_uring_get_sqe fail", __LINE__);
return ENOSPC; return ENOSPC;
} }
sqe->user_data = (long)user_data;
io_uring_prep_recv(sqe, sockfd, buf, size, 0); io_uring_prep_recv(sqe, sockfd, buf, size, 0);
ioevent->submmit_count++; sqe->user_data = (long)user_data;
ioevent->submit_count++;
return 0; return 0;
} }
@ -227,9 +230,9 @@ static inline int ioevent_uring_prep_send(IOEventPoller *ioevent,
"io_uring_get_sqe fail", __LINE__); "io_uring_get_sqe fail", __LINE__);
return ENOSPC; return ENOSPC;
} }
sqe->user_data = (long)user_data;
io_uring_prep_send(sqe, sockfd, buf, len, 0); io_uring_prep_send(sqe, sockfd, buf, len, 0);
ioevent->submmit_count++; sqe->user_data = (long)user_data;
ioevent->submit_count++;
return 0; return 0;
} }
@ -244,9 +247,9 @@ static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent,
return ENOSPC; return ENOSPC;
} }
sqe->user_data = (long)user_data;
io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0);
ioevent->submmit_count++; sqe->user_data = (long)user_data;
ioevent->submit_count++;
return 0; return 0;
} }
@ -259,14 +262,20 @@ static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent,
"io_uring_get_sqe fail", __LINE__); "io_uring_get_sqe fail", __LINE__);
return ENOSPC; return ENOSPC;
} }
sqe->user_data = (long)user_data;
io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, io_uring_prep_send_zc(sqe, sockfd, buf, len, 0,
IORING_SEND_ZC_REPORT_USAGE); #ifdef IORING_SEND_ZC_REPORT_USAGE
ioevent->submmit_count++; IORING_SEND_ZC_REPORT_USAGE
#else
0
#endif
);
sqe->user_data = (long)user_data;
ioevent->submit_count++;
return 0; return 0;
} }
static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd) static inline int ioevent_uring_prep_close(IOEventPoller *ioevent,
int fd, void *user_data)
{ {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) { if (sqe == NULL) {
@ -274,9 +283,30 @@ static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd)
"io_uring_get_sqe fail", __LINE__); "io_uring_get_sqe fail", __LINE__);
return ENOSPC; return ENOSPC;
} }
sqe->user_data = 0;
io_uring_prep_close(sqe, fd); io_uring_prep_close(sqe, fd);
ioevent->submmit_count++; if (user_data == NULL) {
/* set sqe->flags MUST after io_uring_prep_xxx */
sqe->flags = IOSQE_CQE_SKIP_SUCCESS;
} else {
sqe->user_data = (long)user_data;
}
ioevent->submit_count++;
return 0;
}
static inline int ioevent_uring_prep_cancel(IOEventPoller *ioevent,
void *user_data)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
logError("file: "__FILE__", line: %d, "
"io_uring_get_sqe fail", __LINE__);
return ENOSPC;
}
io_uring_prep_cancel(sqe, user_data, 0);
sqe->user_data = (long)user_data;
ioevent->submit_count++;
return 0; return 0;
} }
#endif #endif

View File

@ -46,11 +46,27 @@ static int ioevent_process(IOEventPoller *ioevent)
pEntry = (IOEventEntry *)ioevent->cqe->user_data; pEntry = (IOEventEntry *)ioevent->cqe->user_data;
if (pEntry != NULL) { if (pEntry != NULL) {
if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) {
//TODO #ifdef IORING_NOTIF_USAGE_ZC_COPIED
if (!ioevent->send_zc_logged) {
ioevent->send_zc_logged = true;
if (ioevent->cqe->res & IORING_NOTIF_USAGE_ZC_COPIED) {
logWarning("file: "__FILE__", line: %d, "
"io_uring send_zc: memory copy "
"instead of zero copy!", __LINE__);
} else {
logInfo("file: "__FILE__", line: %d, "
"io_uring send_zc: zero copy OK.", __LINE__);
}
}
#endif
} else { } else {
pEntry->res = ioevent->cqe->res; pEntry->res = ioevent->cqe->res;
pEntry->callback(pEntry->fd, 0, pEntry); pEntry->callback(pEntry->fd, 0, pEntry);
} }
} else {
logWarning("file: "__FILE__", line: %d, "
"unexpected flags: %d, result: %u", __LINE__,
ioevent->cqe->flags, ioevent->cqe->res);
} }
} }
@ -236,8 +252,7 @@ int ioevent_loop(struct nio_thread_data *thread_data,
} }
#if IOEVENT_USE_URING #if IOEVENT_USE_URING
if (thread_data->ev_puller.submmit_count > 0) { if (thread_data->ev_puller.submit_count > 0) {
thread_data->ev_puller.submmit_count = 0;
if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"io_uring_submit fail, errno: %d, error info: %s", "io_uring_submit fail, errno: %d, error info: %s",
@ -260,8 +275,8 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
task->thread_data = pThread; task->thread_data = pThread;
task->event.fd = sock; task->event.fd = sock;
task->event.callback = callback; task->event.callback = callback;
if (use_iouring) {
#if IOEVENT_USE_URING #if IOEVENT_USE_URING
if (use_iouring) {
if ((result=uring_prep_first_recv(task)) != 0) { if ((result=uring_prep_first_recv(task)) != 0) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"uring_prep_recv fail, fd: %d, " "uring_prep_recv fail, fd: %d, "
@ -269,12 +284,8 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
__LINE__, sock, result, STRERROR(result)); __LINE__, sock, result, STRERROR(result));
return result; return result;
} }
#else
logError("file: "__FILE__", line: %d, "
"some mistakes happen!", __LINE__);
return EBUSY;
#endif
} else { } else {
#endif
if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) { if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) {
result = errno != 0 ? errno : ENOENT; result = errno != 0 ? errno : ENOENT;
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
@ -283,7 +294,9 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
__LINE__, sock, result, STRERROR(result)); __LINE__, sock, result, STRERROR(result));
return result; return result;
} }
#if IOEVENT_USE_URING
} }
#endif
task->event.timer.expires = g_current_time + timeout; task->event.timer.expires = g_current_time + timeout;
fast_timer_add(&pThread->timer, &task->event.timer); fast_timer_add(&pThread->timer, &task->event.timer);

View File

@ -18,6 +18,10 @@
#include "fast_task_queue.h" #include "fast_task_queue.h"
#define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \
&task->reffer_count, inc_count)
#define fc_hold_task(task) fc_hold_task_ex(task, 1)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -72,17 +76,22 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data)
} }
#if IOEVENT_USE_URING #if IOEVENT_USE_URING
#define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \
FC_URING_OP_TYPE(task) = _op_type; \
fc_hold_task(task)
static inline int uring_prep_recv_data(struct fast_task_info *task, static inline int uring_prep_recv_data(struct fast_task_info *task,
char *buff, const int len) char *buff, const int len)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_RECV; SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV);
return ioevent_uring_prep_recv(&task->thread_data->ev_puller, return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
task->event.fd, buff, len, task); task->event.fd, buff, len, task);
} }
static inline int uring_prep_first_recv(struct fast_task_info *task) static inline int uring_prep_first_recv(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_RECV; SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV);
return ioevent_uring_prep_recv(&task->thread_data->ev_puller, return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
task->event.fd, task->recv.ptr->data, task->event.fd, task->recv.ptr->data,
task->recv.ptr->size, task); task->recv.ptr->size, task);
@ -90,7 +99,7 @@ static inline int uring_prep_first_recv(struct fast_task_info *task)
static inline int uring_prep_next_recv(struct fast_task_info *task) static inline int uring_prep_next_recv(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_RECV; SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV);
return ioevent_uring_prep_recv(&task->thread_data->ev_puller, return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, task->event.fd, task->recv.ptr->data + task->recv.ptr->offset,
task->recv.ptr->length - task->recv.ptr->offset, task); task->recv.ptr->length - task->recv.ptr->offset, task);
@ -98,13 +107,14 @@ static inline int uring_prep_next_recv(struct fast_task_info *task)
static inline int uring_prep_first_send(struct fast_task_info *task) static inline int uring_prep_first_send(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
return ioevent_uring_prep_writev(&task->thread_data->ev_puller, return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs, task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX), FC_MIN(task->iovec_array.count, IOV_MAX),
task); task);
} else { } else {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND);
return ioevent_uring_prep_send(&task->thread_data->ev_puller, return ioevent_uring_prep_send(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data, task->event.fd, task->send.ptr->data,
task->send.ptr->length, task); task->send.ptr->length, task);
@ -113,13 +123,14 @@ static inline int uring_prep_first_send(struct fast_task_info *task)
static inline int uring_prep_next_send(struct fast_task_info *task) static inline int uring_prep_next_send(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
return ioevent_uring_prep_writev(&task->thread_data->ev_puller, return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs, task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX), FC_MIN(task->iovec_array.count, IOV_MAX),
task); task);
} else { } else {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND);
return ioevent_uring_prep_send(&task->thread_data->ev_puller, return ioevent_uring_prep_send(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->event.fd, task->send.ptr->data + task->send.ptr->offset,
task->send.ptr->length - task->send.ptr->offset, task); task->send.ptr->length - task->send.ptr->offset, task);
@ -128,13 +139,14 @@ static inline int uring_prep_next_send(struct fast_task_info *task)
static inline int uring_prep_first_send_zc(struct fast_task_info *task) static inline int uring_prep_first_send_zc(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
return ioevent_uring_prep_writev(&task->thread_data->ev_puller, return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs, task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX), FC_MIN(task->iovec_array.count, IOV_MAX),
task); task);
} else { } else {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC);
return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data, task->event.fd, task->send.ptr->data,
task->send.ptr->length, task); task->send.ptr->length, task);
@ -143,13 +155,14 @@ static inline int uring_prep_first_send_zc(struct fast_task_info *task)
static inline int uring_prep_next_send_zc(struct fast_task_info *task) static inline int uring_prep_next_send_zc(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
return ioevent_uring_prep_writev(&task->thread_data->ev_puller, return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs, task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX), FC_MIN(task->iovec_array.count, IOV_MAX),
task); task);
} else { } else {
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC);
return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data + task->send.ptr->offset, task->event.fd, task->send.ptr->data + task->send.ptr->offset,
task->send.ptr->length - task->send.ptr->offset, task); task->send.ptr->length - task->send.ptr->offset, task);
@ -158,9 +171,14 @@ static inline int uring_prep_next_send_zc(struct fast_task_info *task)
static inline int uring_prep_close_fd(struct fast_task_info *task) static inline int uring_prep_close_fd(struct fast_task_info *task)
{ {
FC_URING_OP_TYPE(task) = IORING_OP_CLOSE;
return ioevent_uring_prep_close(&task->thread_data-> return ioevent_uring_prep_close(&task->thread_data->
ev_puller, task->event.fd); ev_puller, task->event.fd, NULL);
}
static inline int uring_prep_cancel(struct fast_task_info *task)
{
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL);
return ioevent_uring_prep_cancel(&task->thread_data->ev_puller, task);
} }
#endif #endif