add functions uring_prep_xxx

use_iouring
YuQing 2025-09-25 14:49:37 +08:00
parent 48a0ea2e30
commit 012b2038ee
6 changed files with 167 additions and 6 deletions

View File

@ -48,10 +48,17 @@ typedef int (*TaskContinueCallback)(struct fast_task_info *task);
struct sf_network_handler;
struct fast_task_info;
#if IOEVENT_USE_URING
#define FC_URING_OP_TYPE(task) (task)->event.timer.op_type
#endif
typedef struct ioevent_entry
{
FastTimerEntry timer; //must first
int fd;
#if IOEVENT_USE_URING
int res;
#endif
IOEventCallback callback;
} IOEventEntry;

View File

@ -27,6 +27,9 @@ typedef struct fast_timer_entry {
struct fast_timer_entry *next;
int slot_index;
bool rehash;
#if IOEVENT_USE_URING
short op_type;
#endif
} FastTimerEntry;
typedef struct fast_timer_slot {

View File

@ -217,7 +217,8 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd)
sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
sqe->user_data = 0;
io_uring_prep_cancel_fd(sqe, fd, 0);
return ioevent_uring_submit(ioevent);
ioevent->submmit_count++;
return 0;
#elif IOEVENT_USE_KQUEUE
struct kevent ev[1];
int r, w;

View File

@ -20,6 +20,7 @@
#include <poll.h>
#include <sys/time.h>
#include "_os_define.h"
#include "logger.h"
#define IOEVENT_TIMEOUT 0x8000
@ -207,6 +208,8 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent,
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
logError("file: "__FILE__", line: %d, "
"io_uring_get_sqe fail", __LINE__);
return ENOSPC;
}
sqe->user_data = (long)user_data;
@ -214,6 +217,68 @@ static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent,
ioevent->submmit_count++;
return 0;
}
static inline int ioevent_uring_prep_send(IOEventPoller *ioevent,
int sockfd, void *buf, size_t len, void *user_data)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
logError("file: "__FILE__", line: %d, "
"io_uring_get_sqe fail", __LINE__);
return ENOSPC;
}
sqe->user_data = (long)user_data;
io_uring_prep_send(sqe, sockfd, buf, len, 0);
ioevent->submmit_count++;
return 0;
}
static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent,
int sockfd, const struct iovec *iovecs, unsigned nr_vecs,
void *user_data)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
logError("file: "__FILE__", line: %d, "
"io_uring_get_sqe fail", __LINE__);
return ENOSPC;
}
sqe->user_data = (long)user_data;
io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0);
ioevent->submmit_count++;
return 0;
}
static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent,
int sockfd, void *buf, size_t len, void *user_data)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
logError("file: "__FILE__", line: %d, "
"io_uring_get_sqe fail", __LINE__);
return ENOSPC;
}
sqe->user_data = (long)user_data;
io_uring_prep_send_zc(sqe, sockfd, buf, len, 0,
IORING_SEND_ZC_REPORT_USAGE);
ioevent->submmit_count++;
return 0;
}
static inline int ioevent_uring_prep_close(IOEventPoller *ioevent, int fd)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
if (sqe == NULL) {
logError("file: "__FILE__", line: %d, "
"io_uring_get_sqe fail", __LINE__);
return ENOSPC;
}
sqe->user_data = 0;
io_uring_prep_close(sqe, fd);
ioevent->submmit_count++;
return 0;
}
#endif
#ifdef __cplusplus

View File

@ -31,7 +31,6 @@ static int ioevent_process(IOEventPoller *ioevent)
case 0:
break;
case -ETIME:
case -EAGAIN:
case -EINTR:
return 0;
default:
@ -49,7 +48,8 @@ static int ioevent_process(IOEventPoller *ioevent)
if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) {
//TODO
} else {
pEntry->callback(pEntry->fd, ioevent->cqe->res, pEntry);
pEntry->res = ioevent->cqe->res;
pEntry->callback(pEntry->fd, 0, pEntry);
}
}
}
@ -262,7 +262,7 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
task->event.callback = callback;
if (use_iouring) {
#if IOEVENT_USE_URING
if ((result=uring_prep_recv_by_task(task)) != 0) {
if ((result=uring_prep_first_recv(task)) != 0) {
logError("file: "__FILE__", line: %d, "
"uring_prep_recv fail, fd: %d, "
"errno: %d, error info: %s",

View File

@ -72,10 +72,95 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data)
}
#if IOEVENT_USE_URING
static inline int uring_prep_recv_by_task(struct fast_task_info *task)
static inline int uring_prep_recv_data(struct fast_task_info *task,
char *buff, const int len)
{
FC_URING_OP_TYPE(task) = IORING_OP_RECV;
return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
task->event.fd, task->recv.ptr->data, task->recv.ptr->size, task);
task->event.fd, buff, len, task);
}
static inline int uring_prep_first_recv(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_RECV;
return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
task->event.fd, task->recv.ptr->data,
task->recv.ptr->size, task);
}
static inline int uring_prep_next_recv(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_RECV;
return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
task->event.fd, task->recv.ptr->data + task->recv.ptr->offset,
task->recv.ptr->length - task->recv.ptr->offset, task);
}
static inline int uring_prep_first_send(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) {
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX),
task);
} else {
return ioevent_uring_prep_send(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data,
task->send.ptr->length, task);
}
}
static inline int uring_prep_next_send(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) {
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX),
task);
} else {
return ioevent_uring_prep_send(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data + task->send.ptr->offset,
task->send.ptr->length - task->send.ptr->offset, task);
}
}
static inline int uring_prep_first_send_zc(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) {
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX),
task);
} else {
return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data,
task->send.ptr->length, task);
}
}
static inline int uring_prep_next_send_zc(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_SEND;
if (task->iovec_array.iovs != NULL) {
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX),
task);
} else {
return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
task->event.fd, task->send.ptr->data + task->send.ptr->offset,
task->send.ptr->length - task->send.ptr->offset, task);
}
}
static inline int uring_prep_close_fd(struct fast_task_info *task)
{
FC_URING_OP_TYPE(task) = IORING_OP_CLOSE;
return ioevent_uring_prep_close(&task->thread_data->
ev_puller, task->event.fd);
}
#endif