add function uring_prep_connect
parent
cb6f6f13f3
commit
4576f22e24
|
|
@ -206,62 +206,47 @@ static inline int ioevent_uring_submit(IOEventPoller *ioevent)
|
|||
}
|
||||
}
|
||||
|
||||
static inline int ioevent_uring_prep_recv(IOEventPoller *ioevent,
|
||||
int sockfd, void *buf, size_t size, void *user_data)
|
||||
static inline struct io_uring_sqe *ioevent_uring_get_sqe(IOEventPoller *ioevent)
|
||||
{
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
|
||||
if (sqe == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"io_uring_get_sqe fail", __LINE__);
|
||||
return ENOSPC;
|
||||
}
|
||||
return sqe;
|
||||
}
|
||||
|
||||
static inline void ioevent_uring_prep_recv(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, int sockfd,
|
||||
void *buf, size_t size, void *user_data)
|
||||
{
|
||||
io_uring_prep_recv(sqe, sockfd, buf, size, 0);
|
||||
sqe->user_data = (long)user_data;
|
||||
ioevent->submit_count++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ioevent_uring_prep_send(IOEventPoller *ioevent,
|
||||
int sockfd, void *buf, size_t len, void *user_data)
|
||||
static inline void ioevent_uring_prep_send(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, int sockfd,
|
||||
void *buf, size_t len, void *user_data)
|
||||
{
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
|
||||
if (sqe == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"io_uring_get_sqe fail", __LINE__);
|
||||
return ENOSPC;
|
||||
}
|
||||
io_uring_prep_send(sqe, sockfd, buf, len, 0);
|
||||
sqe->user_data = (long)user_data;
|
||||
ioevent->submit_count++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ioevent_uring_prep_writev(IOEventPoller *ioevent,
|
||||
int sockfd, const struct iovec *iovecs, unsigned nr_vecs,
|
||||
void *user_data)
|
||||
static inline void ioevent_uring_prep_writev(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, int sockfd, const struct iovec *iovecs,
|
||||
unsigned nr_vecs, void *user_data)
|
||||
{
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
|
||||
if (sqe == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"io_uring_get_sqe fail", __LINE__);
|
||||
return ENOSPC;
|
||||
}
|
||||
|
||||
io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0);
|
||||
sqe->user_data = (long)user_data;
|
||||
ioevent->submit_count++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent,
|
||||
int sockfd, void *buf, size_t len, void *user_data)
|
||||
static inline void ioevent_uring_prep_send_zc(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, int sockfd,
|
||||
void *buf, size_t len, void *user_data)
|
||||
{
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
|
||||
if (sqe == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"io_uring_get_sqe fail", __LINE__);
|
||||
return ENOSPC;
|
||||
}
|
||||
io_uring_prep_send_zc(sqe, sockfd, buf, len, 0,
|
||||
#ifdef IORING_SEND_ZC_REPORT_USAGE
|
||||
IORING_SEND_ZC_REPORT_USAGE
|
||||
|
|
@ -271,18 +256,11 @@ static inline int ioevent_uring_prep_send_zc(IOEventPoller *ioevent,
|
|||
);
|
||||
sqe->user_data = (long)user_data;
|
||||
ioevent->submit_count++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ioevent_uring_prep_close(IOEventPoller *ioevent,
|
||||
int fd, void *user_data)
|
||||
static inline void ioevent_uring_prep_close(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, int fd, void *user_data)
|
||||
{
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
|
||||
if (sqe == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"io_uring_get_sqe fail", __LINE__);
|
||||
return ENOSPC;
|
||||
}
|
||||
io_uring_prep_close(sqe, fd);
|
||||
if (user_data == NULL) {
|
||||
/* set sqe->flags MUST after io_uring_prep_xxx */
|
||||
|
|
@ -291,24 +269,25 @@ static inline int ioevent_uring_prep_close(IOEventPoller *ioevent,
|
|||
sqe->user_data = (long)user_data;
|
||||
}
|
||||
ioevent->submit_count++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ioevent_uring_prep_cancel(IOEventPoller *ioevent,
|
||||
void *user_data)
|
||||
static inline void ioevent_uring_prep_cancel(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, void *user_data)
|
||||
{
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring);
|
||||
if (sqe == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"io_uring_get_sqe fail", __LINE__);
|
||||
return ENOSPC;
|
||||
}
|
||||
|
||||
io_uring_prep_cancel(sqe, user_data, 0);
|
||||
sqe->user_data = (long)user_data;
|
||||
ioevent->submit_count++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline void ioevent_uring_prep_connect(IOEventPoller *ioevent,
|
||||
struct io_uring_sqe *sqe, int fd, const struct sockaddr *addr,
|
||||
socklen_t addrlen, void *user_data)
|
||||
{
|
||||
io_uring_prep_connect(sqe, fd, addr, addrlen);
|
||||
sqe->user_data = (long)user_data;
|
||||
ioevent->submit_count++;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
|||
|
|
@ -133,7 +133,8 @@ static void deal_timeouts(FastTimerEntry *head)
|
|||
current = entry;
|
||||
entry = entry->next;
|
||||
|
||||
current->prev = current->next = NULL; //must set NULL because NOT in time wheel
|
||||
/* must set NULL because NOT in time wheel */
|
||||
current->prev = current->next = NULL;
|
||||
pEventEntry = (IOEventEntry *)current;
|
||||
if (pEventEntry != NULL)
|
||||
{
|
||||
|
|
@ -277,12 +278,14 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
|
|||
task->event.callback = callback;
|
||||
#if IOEVENT_USE_URING
|
||||
if (use_iouring) {
|
||||
if ((result=uring_prep_first_recv(task)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"uring_prep_recv fail, fd: %d, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, sock, result, STRERROR(result));
|
||||
return result;
|
||||
if (FC_URING_OP_TYPE(task) == IORING_OP_NOP) {
|
||||
if ((result=uring_prep_first_recv(task)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"uring_prep_recv fail, fd: %d, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, sock, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@
|
|||
#define _IOEVENT_LOOP_H
|
||||
|
||||
#include "fast_task_queue.h"
|
||||
#if IOEVENT_USE_URING
|
||||
#include "sockopt.h"
|
||||
#endif
|
||||
|
||||
#define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \
|
||||
&task->reffer_count, inc_count)
|
||||
|
|
@ -78,6 +81,10 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data)
|
|||
#if IOEVENT_USE_URING
|
||||
|
||||
#define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \
|
||||
struct io_uring_sqe *sqe; \
|
||||
if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { \
|
||||
return ENOSPC; \
|
||||
} \
|
||||
FC_URING_OP_TYPE(task) = _op_type; \
|
||||
fc_hold_task(task)
|
||||
|
||||
|
|
@ -85,100 +92,141 @@ static inline int uring_prep_recv_data(struct fast_task_info *task,
|
|||
char *buff, const int len)
|
||||
{
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV);
|
||||
return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
|
||||
task->event.fd, buff, len, task);
|
||||
ioevent_uring_prep_recv(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, buff, len, task);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_first_recv(struct fast_task_info *task)
|
||||
{
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV);
|
||||
return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->recv.ptr->data,
|
||||
ioevent_uring_prep_recv(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->recv.ptr->data,
|
||||
task->recv.ptr->size, task);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_next_recv(struct fast_task_info *task)
|
||||
{
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV);
|
||||
return ioevent_uring_prep_recv(&task->thread_data->ev_puller,
|
||||
ioevent_uring_prep_recv(&task->thread_data->ev_puller, sqe,
|
||||
task->event.fd, task->recv.ptr->data + task->recv.ptr->offset,
|
||||
task->recv.ptr->length - task->recv.ptr->offset, task);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_first_send(struct fast_task_info *task)
|
||||
{
|
||||
if (task->iovec_array.iovs != NULL) {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
|
||||
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->iovec_array.iovs,
|
||||
ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->iovec_array.iovs,
|
||||
FC_MIN(task->iovec_array.count, IOV_MAX),
|
||||
task);
|
||||
} else {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND);
|
||||
return ioevent_uring_prep_send(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->send.ptr->data,
|
||||
ioevent_uring_prep_send(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->send.ptr->data,
|
||||
task->send.ptr->length, task);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_next_send(struct fast_task_info *task)
|
||||
{
|
||||
if (task->iovec_array.iovs != NULL) {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
|
||||
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->iovec_array.iovs,
|
||||
ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->iovec_array.iovs,
|
||||
FC_MIN(task->iovec_array.count, IOV_MAX),
|
||||
task);
|
||||
} else {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND);
|
||||
return ioevent_uring_prep_send(&task->thread_data->ev_puller,
|
||||
ioevent_uring_prep_send(&task->thread_data->ev_puller, sqe,
|
||||
task->event.fd, task->send.ptr->data + task->send.ptr->offset,
|
||||
task->send.ptr->length - task->send.ptr->offset, task);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_first_send_zc(struct fast_task_info *task)
|
||||
{
|
||||
if (task->iovec_array.iovs != NULL) {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
|
||||
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->iovec_array.iovs,
|
||||
ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->iovec_array.iovs,
|
||||
FC_MIN(task->iovec_array.count, IOV_MAX),
|
||||
task);
|
||||
} else {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC);
|
||||
return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->send.ptr->data,
|
||||
ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->send.ptr->data,
|
||||
task->send.ptr->length, task);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_next_send_zc(struct fast_task_info *task)
|
||||
{
|
||||
if (task->iovec_array.iovs != NULL) {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV);
|
||||
return ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
task->event.fd, task->iovec_array.iovs,
|
||||
ioevent_uring_prep_writev(&task->thread_data->ev_puller,
|
||||
sqe, task->event.fd, task->iovec_array.iovs,
|
||||
FC_MIN(task->iovec_array.count, IOV_MAX),
|
||||
task);
|
||||
} else {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC);
|
||||
return ioevent_uring_prep_send_zc(&task->thread_data->ev_puller,
|
||||
ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, sqe,
|
||||
task->event.fd, task->send.ptr->data + task->send.ptr->offset,
|
||||
task->send.ptr->length - task->send.ptr->offset, task);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_close_fd(struct fast_task_info *task)
|
||||
{
|
||||
return ioevent_uring_prep_close(&task->thread_data->
|
||||
ev_puller, task->event.fd, NULL);
|
||||
struct io_uring_sqe *sqe;
|
||||
|
||||
if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) {
|
||||
return ENOSPC;
|
||||
}
|
||||
|
||||
/* do NOT need callback */
|
||||
ioevent_uring_prep_close(&task->thread_data->
|
||||
ev_puller, sqe, task->event.fd, NULL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_cancel(struct fast_task_info *task)
|
||||
{
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL);
|
||||
return ioevent_uring_prep_cancel(&task->thread_data->ev_puller, task);
|
||||
ioevent_uring_prep_cancel(&task->thread_data->ev_puller, sqe, task);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int uring_prep_connect(struct fast_task_info *task)
|
||||
{
|
||||
int result;
|
||||
sockaddr_convert_t *convert;
|
||||
|
||||
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
|
||||
O_NONBLOCK, NULL, &result)) < 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
|
||||
convert = (sockaddr_convert_t *)(task->send.ptr->data +
|
||||
task->send.ptr->size - 2 * sizeof(sockaddr_convert_t));
|
||||
if ((result=setsockaddrbyip(task->server_ip, task->port, convert)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
do {
|
||||
SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_CONNECT);
|
||||
ioevent_uring_prep_connect(&task->thread_data->ev_puller, sqe,
|
||||
task->event.fd, &convert->sa.addr, convert->len, task);
|
||||
} while (0);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue