diff --git a/.gitignore b/.gitignore index d0a20d0..e2a0726 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Makefile.in src/Makefile +src/tests/Makefile # Prerequisites *.d diff --git a/HISTORY b/HISTORY index 8834474..c2f6a4f 100644 --- a/HISTORY +++ b/HISTORY @@ -1,4 +1,9 @@ +Version 1.81 2025-10-05 + * support Linux io_uring + * free_queue support parameter: need_shrink and set task->shrinked + * IOEventCallback: change event type from short to int + Version 1.80 2025-09-10 * getIpaddrByNameEx: IPv4 has priority over IPv6 * shared_func.[hc]: add function fc_ftoa diff --git a/libfastcommon.spec b/libfastcommon.spec index f6305eb..34b5719 100644 --- a/libfastcommon.spec +++ b/libfastcommon.spec @@ -17,6 +17,14 @@ BuildRequires: libcurl-devel Requires: libcurl Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id +%define kernel_major %(uname -r | cut -d'.' -f1) +%define kernel_minor %(uname -r | cut -d'.' -f2) +%define kernel_ver_int %(expr %{kernel_major} \* 100 + %{kernel_minor}) +%if %{kernel_ver_int} >= 514 +BuildRequires: liburing-devel >= 2.5 +Requires: liburing >= 2.5 +%endif + %description c common functions library extracted from my open source projects FastDFS. this library is very simple and stable. functions including: string, logger, diff --git a/make.sh b/make.sh index 0fe7bed..d65518c 100755 --- a/make.sh +++ b/make.sh @@ -112,7 +112,23 @@ HAVE_VMMETER_H=0 HAVE_USER_H=0 if [ "$uname" = "Linux" ]; then OS_NAME=OS_LINUX - IOEVENT_USE=IOEVENT_USE_EPOLL + + major_version=$(uname -r | awk -F . '{print $1;}') + minor_version=$(uname -r | awk -F . '{print $2;}') + if [ $major_version -eq 5 ] && [ $minor_version -ge 14 ]; then + out=$(grep -F IORING_OP_SEND_ZC /usr/include/liburing/io_uring.h) + if [ -n "$out" ]; then + IOEVENT_USE=IOEVENT_USE_URING + LIBS="$LIBS -luring" + else + IOEVENT_USE=IOEVENT_USE_EPOLL + fi + elif [ $major_version -gt 5 ]; then + IOEVENT_USE=IOEVENT_USE_URING + else + IOEVENT_USE=IOEVENT_USE_EPOLL + fi + if [ $glibc_minor -lt 17 ]; then LIBS="$LIBS -lrt" fi @@ -261,3 +277,12 @@ make $1 $2 $3 if [ "$1" = "clean" ]; then /bin/rm -f Makefile _os_define.h fi + +cd tests +cp Makefile.in Makefile +sed_replace "s#\\\$(CC)#gcc#g" Makefile +sed_replace "s#\\\$(INCS)#$INCS#g" Makefile +sed_replace "s#\\\$(LIBS)#$LIBS#g" Makefile +if [ "$1" = "clean" ]; then + /bin/rm -f Makefile +fi diff --git a/src/common_define.h b/src/common_define.h index 0913390..7dea6ed 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -125,7 +125,8 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind); #define FC_IOV_BATCH_SIZE IOV_MAX #endif -// 由于要支持IPv6,所以将IP_ADDRESS_SIZE的值由16修改为46 +#define IPV4_ADDRESS_SIZE INET_ADDRSTRLEN //16 +#define IPV6_ADDRESS_SIZE INET6_ADDRSTRLEN //46 #define IP_ADDRESS_SIZE INET6_ADDRSTRLEN //46 #define FORMATTED_IP_SIZE (IP_ADDRESS_SIZE + 2) #define INFINITE_FILE_SIZE (256 * 1024LL * 1024 * 1024 * 1024 * 1024LL) @@ -454,6 +455,14 @@ static inline int fc_string_compare(const string_t *s1, const string_t *s2) } } +static inline bool fc_string_equal_ex(const char *str1, + const int len1, const char *str2, const int len2) +{ + return (len1 == len2) && (memcmp(str1, str2, len1) == 0); +} +#define fc_string_equals_ex(str1, len1, str2, len2) \ + fc_string_equal_ex(str1, len1, str2, len2) + static inline bool fc_string_equal(const string_t *s1, const string_t *s2) { return (s1->len == s2->len) && (memcmp(s1->str, s2->str, s1->len) == 0); diff --git a/src/connection_pool.c b/src/connection_pool.c index 26b256f..29ee4d7 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -25,10 +25,20 @@ #include "server_id_func.h" #include "connection_pool.h" +static void conn_pool_disconnect_server_cb(ConnectionInfo *conn) +{ + conn_pool_disconnect_server(conn); +} + +static bool conn_pool_is_connected_cb(ConnectionInfo *conn) +{ + return conn_pool_is_connected(conn); +} + ConnectionCallbacks g_connection_callbacks = { false, {{conn_pool_connect_server_ex1, - conn_pool_disconnect_server, - conn_pool_is_connected}, + conn_pool_disconnect_server_cb, + conn_pool_is_connected_cb}, {NULL, NULL, NULL}}, {NULL} }; @@ -411,20 +421,6 @@ void conn_pool_destroy(ConnectionPool *cp) fast_mblock_destroy(&cp->node_allocator); } -void conn_pool_disconnect_server(ConnectionInfo *conn) -{ - if (conn->sock >= 0) - { - close(conn->sock); - conn->sock = -1; - } -} - -bool conn_pool_is_connected(ConnectionInfo *conn) -{ - return (conn->sock >= 0); -} - int conn_pool_connect_server_ex1(ConnectionInfo *conn, const char *service_name, const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error) diff --git a/src/connection_pool.h b/src/connection_pool.h index 8f44f20..a667aa7 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -331,9 +331,19 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, * conn: the connection * return 0 for success, != 0 for error */ -void conn_pool_disconnect_server(ConnectionInfo *conn); +static inline void conn_pool_disconnect_server(ConnectionInfo *conn) +{ + if (conn->sock >= 0) + { + close(conn->sock); + conn->sock = -1; + } +} -bool conn_pool_is_connected(ConnectionInfo *conn); +static inline bool conn_pool_is_connected(ConnectionInfo *conn) +{ + return (conn->sock >= 0); +} /** * connect to the server diff --git a/src/fast_task_queue.c b/src/fast_task_queue.c index 101ad59..719f7e6 100644 --- a/src/fast_task_queue.c +++ b/src/fast_task_queue.c @@ -59,11 +59,11 @@ static int task_alloc_init(struct fast_task_info *task, } int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, - const bool double_buffers, const int max_connections, - const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int padding_size, - const int arg_size, TaskInitCallback init_callback, - void *init_arg) + const bool double_buffers, const bool need_shrink, + const int max_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, + const int padding_size, const int arg_size, + TaskInitCallback init_callback, void *init_arg) { #define MAX_DATA_SIZE (256 * 1024 * 1024) int alloc_once; @@ -123,6 +123,7 @@ int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, } queue->double_buffers = double_buffers; + queue->need_shrink = need_shrink; queue->min_buff_size = aligned_min_size; queue->max_buff_size = aligned_max_size; queue->padding_size = aligned_padding_size; @@ -183,16 +184,23 @@ void free_queue_push(struct fast_task_info *task) task->send.ptr->length = 0; task->send.ptr->offset = 0; task->req_count = 0; - if (task->send.ptr->size > task->free_queue->min_buff_size) {//need thrink - _realloc_buffer(task->send.ptr, task->free_queue->min_buff_size, false); + if (task->free_queue->need_shrink && task->send. + ptr->size > task->free_queue->min_buff_size) + { //need thrink + _realloc_buffer(task->send.ptr, task->free_queue-> + min_buff_size, false); + task->shrinked = true; } if (task->free_queue->double_buffers) { task->recv.ptr->length = 0; task->recv.ptr->offset = 0; - if (task->recv.ptr->size > task->free_queue->min_buff_size) { + if (task->free_queue->need_shrink && task->recv. + ptr->size > task->free_queue->min_buff_size) + { _realloc_buffer(task->recv.ptr, task->free_queue-> min_buff_size, false); + task->shrinked = true; } } diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 9902003..d211304 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -42,16 +42,23 @@ typedef void (*TaskCleanUpCallback) (struct fast_task_info *task); typedef int (*TaskInitCallback)(struct fast_task_info *task, void *arg); typedef void (*TaskReleaseCallback)(struct fast_task_info *task); -typedef void (*IOEventCallback) (int sock, short event, void *arg); +typedef void (*IOEventCallback) (int sock, const int event, void *arg); typedef int (*TaskContinueCallback)(struct fast_task_info *task); struct sf_network_handler; struct fast_task_info; +#if IOEVENT_USE_URING +#define FC_URING_OP_TYPE(task) (task)->uring.op_type +#define FC_URING_IS_CLIENT(task) (task)->uring.is_client +#define FC_URING_IS_SEND_ZC(task) ((task)->uring.op_type == IORING_OP_SEND_ZC) +#endif + typedef struct ioevent_entry { FastTimerEntry timer; //must first int fd; + int res; //just for io_uring, since v1.0.81 IOEventCallback callback; } IOEventEntry; @@ -119,12 +126,19 @@ struct fast_task_info struct fast_net_buffer_wrapper recv; //recv buffer uint16_t port; //peer port + + struct { + int8_t is_client; + uint8_t op_type; + } uring; //just for io_uring, since v1.0.81 + struct { uint8_t current; volatile uint8_t notify; } nio_stages; //stages for network IO - volatile int8_t reffer_count; volatile int8_t canceled; //if task canceled + volatile int8_t shrinked; //if task shrinked, since V1.0.81 + volatile int reffer_count; int pending_send_count; int64_t req_count; //request count struct { @@ -153,6 +167,7 @@ struct fast_task_queue int block_size; bool malloc_whole_block; bool double_buffers; //if send buffer and recv buffer are independent + bool need_shrink; struct fast_mblock_man allocator; TaskInitCallback init_callback; void *init_arg; @@ -164,22 +179,22 @@ extern "C" { #endif int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, - const bool double_buffers, const int max_connections, - const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int padding_size, - const int arg_size, TaskInitCallback init_callback, - void *init_arg); + const bool double_buffers, const bool need_shrink, + const int max_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, + const int padding_size, const int arg_size, + TaskInitCallback init_callback, void *init_arg); static inline int free_queue_init_ex(struct fast_task_queue *queue, const char *name, const bool double_buffers, - const int max_connections, const int alloc_task_once, - const int min_buff_size, const int max_buff_size, - const int arg_size) + const bool need_shrink, const int max_connections, + const int alloc_task_once, const int min_buff_size, + const int max_buff_size, const int arg_size) { const int padding_size = 0; - return free_queue_init_ex2(queue, name, double_buffers, max_connections, - alloc_task_once, min_buff_size, max_buff_size, padding_size, - arg_size, NULL, NULL); + return free_queue_init_ex2(queue, name, double_buffers, need_shrink, + max_connections, alloc_task_once, min_buff_size, max_buff_size, + padding_size, arg_size, NULL, NULL); } static inline int free_queue_init(struct fast_task_queue *queue, @@ -188,9 +203,11 @@ static inline int free_queue_init(struct fast_task_queue *queue, { const char *name = ""; const bool double_buffers = false; + const bool need_shrink = true; const int arg_size = 0; - return free_queue_init_ex(queue, name, double_buffers, max_connections, - alloc_task_once, min_buff_size, max_buff_size, arg_size); + return free_queue_init_ex(queue, name, double_buffers, + need_shrink, max_connections, alloc_task_once, + min_buff_size, max_buff_size, arg_size); } static inline void free_queue_set_release_callback( diff --git a/src/fast_timer.c b/src/fast_timer.c index 73bfb5b..6ab5717 100644 --- a/src/fast_timer.c +++ b/src/fast_timer.c @@ -105,6 +105,7 @@ int fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, if ((result=fast_timer_remove(timer, entry)) == 0) { fast_timer_add_ex(timer, entry, new_expires, true); } + return result; } return 0; @@ -185,6 +186,7 @@ int fast_timer_timeouts_get(FastTimer *timer, const int64_t current_time, } else { last->rehash = false; } + continue; } } else { //expired diff --git a/src/ioevent.c b/src/ioevent.c index f849f84..7e0f531 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -45,51 +45,72 @@ int kqueue_ev_convert(int16_t event, uint16_t flags) } #endif -int ioevent_init(IOEventPoller *ioevent, const int size, - const int timeout_ms, const int extra_events) +int ioevent_init(IOEventPoller *ioevent, const char *service_name, + const int size, const int timeout_ms, const int extra_events) { - int bytes; +#if IOEVENT_USE_URING + int result; +#else + int bytes; - ioevent->size = size; - ioevent->extra_events = extra_events; - ioevent->iterator.index = 0; - ioevent->iterator.count = 0; - -#if IOEVENT_USE_EPOLL - ioevent->poll_fd = epoll_create(ioevent->size); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(struct epoll_event) * size; - ioevent->events = (struct epoll_event *)fc_malloc(bytes); -#elif IOEVENT_USE_KQUEUE - ioevent->poll_fd = kqueue(); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(struct kevent) * size; - ioevent->events = (struct kevent *)fc_malloc(bytes); -#elif IOEVENT_USE_PORT - ioevent->poll_fd = port_create(); - if (ioevent->poll_fd < 0) { - return errno != 0 ? errno : ENOMEM; - } - bytes = sizeof(port_event_t) * size; - ioevent->events = (port_event_t *)fc_malloc(bytes); + ioevent->iterator.index = 0; + ioevent->iterator.count = 0; #endif - if (ioevent->events == NULL) { - close(ioevent->poll_fd); - ioevent->poll_fd = -1; - return ENOMEM; - } - ioevent_set_timeout(ioevent, timeout_ms); + ioevent->service_name = service_name; + ioevent->size = size; + ioevent->extra_events = extra_events; - return 0; +#if IOEVENT_USE_EPOLL + ioevent->poll_fd = epoll_create(ioevent->size); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(struct epoll_event) * size; + ioevent->events = (struct epoll_event *)fc_malloc(bytes); +#elif IOEVENT_USE_URING + if ((result=io_uring_queue_init(size, &ioevent->ring, 0)) < 0) { + return -result; + } + ioevent->cqe = NULL; + ioevent->submit_count = 0; + ioevent->send_zc_logged = false; + ioevent->send_zc_done_notify = false; +#elif IOEVENT_USE_KQUEUE + ioevent->poll_fd = kqueue(); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(struct kevent) * size; + ioevent->events = (struct kevent *)fc_malloc(bytes); +#elif IOEVENT_USE_PORT + ioevent->poll_fd = port_create(); + if (ioevent->poll_fd < 0) { + return errno != 0 ? errno : ENOMEM; + } + bytes = sizeof(port_event_t) * size; + ioevent->events = (port_event_t *)fc_malloc(bytes); +#endif + +#if IOEVENT_USE_URING + +#else + if (ioevent->events == NULL) { + close(ioevent->poll_fd); + ioevent->poll_fd = -1; + return ENOMEM; + } +#endif + + ioevent_set_timeout(ioevent, timeout_ms); + return 0; } void ioevent_destroy(IOEventPoller *ioevent) { +#if IOEVENT_USE_URING + io_uring_queue_exit(&ioevent->ring); +#else if (ioevent->events != NULL) { free(ioevent->events); ioevent->events = NULL; @@ -99,10 +120,11 @@ void ioevent_destroy(IOEventPoller *ioevent) close(ioevent->poll_fd); ioevent->poll_fd = -1; } +#endif } -int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, - void *data) +int ioevent_attach(IOEventPoller *ioevent, const int fd, + const int e, void *data) { #if IOEVENT_USE_EPOLL struct epoll_event ev; @@ -110,6 +132,15 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, ev.events = e | ioevent->extra_events; ev.data.ptr = data; return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_ADD, fd, &ev); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + io_uring_prep_poll_multishot(sqe, fd, e | ioevent->extra_events); + sqe->user_data = (long)data; + ioevent->submit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int n = 0; @@ -128,8 +159,8 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, #endif } -int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, - void *data) +int ioevent_modify(IOEventPoller *ioevent, const int fd, + const int e, void *data) { #if IOEVENT_USE_EPOLL struct epoll_event ev; @@ -137,6 +168,16 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, ev.events = e | ioevent->extra_events; ev.data.ptr = data; return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_MOD, fd, &ev); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + io_uring_prep_poll_update(sqe, sqe->user_data, sqe->user_data, + e | ioevent->extra_events, IORING_POLL_UPDATE_EVENTS); + sqe->user_data = (long)data; + ioevent->submit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int result; @@ -173,6 +214,16 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) { #if IOEVENT_USE_EPOLL return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL); +#elif IOEVENT_USE_URING + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + return ENOSPC; + } + io_uring_prep_cancel_fd(sqe, fd, 0); + /* set sqe->flags MUST after io_uring_prep_xxx */ + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + ioevent->submit_count++; + return 0; #elif IOEVENT_USE_KQUEUE struct kevent ev[1]; int r, w; @@ -192,14 +243,25 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) int ioevent_poll(IOEventPoller *ioevent) { #if IOEVENT_USE_EPOLL - return epoll_wait(ioevent->poll_fd, ioevent->events, ioevent->size, ioevent->timeout); + return epoll_wait(ioevent->poll_fd, ioevent->events, + ioevent->size, ioevent->timeout); +#elif IOEVENT_USE_URING + int result; + result = io_uring_wait_cqe_timeout(&ioevent->ring, + &ioevent->cqe, &ioevent->timeout); + if (result < 0) { + errno = -result; + return -1; + } + return 0; #elif IOEVENT_USE_KQUEUE - return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, ioevent->size, &ioevent->timeout); + return kevent(ioevent->poll_fd, NULL, 0, ioevent->events, + ioevent->size, &ioevent->timeout); #elif IOEVENT_USE_PORT int result; int retval; unsigned int nget = 1; - if((retval = port_getn(ioevent->poll_fd, ioevent->events, + if((retval=port_getn(ioevent->poll_fd, ioevent->events, ioevent->size, &nget, &ioevent->timeout)) == 0) { result = (int)nget; @@ -225,4 +287,3 @@ int ioevent_poll(IOEventPoller *ioevent) #error port me #endif } - diff --git a/src/ioevent.h b/src/ioevent.h index c31b7bb..fe4ba96 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -20,8 +20,10 @@ #include #include #include "_os_define.h" +#include "logger.h" -#define IOEVENT_TIMEOUT 0x8000 +#define IOEVENT_TIMEOUT (1 << 20) +#define IOEVENT_NOTIFY (1 << 21) //for io_uring send_zc done callback #if IOEVENT_USE_EPOLL #include @@ -31,6 +33,13 @@ #define IOEVENT_WRITE EPOLLOUT #define IOEVENT_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) +#elif IOEVENT_USE_URING +#include +#include +#define IOEVENT_READ POLLIN +#define IOEVENT_WRITE POLLOUT +#define IOEVENT_ERROR (POLLERR | POLLPRI | POLLHUP) + #elif IOEVENT_USE_KQUEUE #include #include @@ -65,18 +74,28 @@ int kqueue_ev_convert(int16_t event, uint16_t flags); #endif typedef struct ioevent_puller { + const char *service_name; int size; //max events (fd) int extra_events; +#if IOEVENT_USE_URING + struct io_uring ring; + int submit_count; + bool send_zc_logged; + bool send_zc_done_notify; //if callback when send_zc done +#else int poll_fd; - struct { int index; int count; } iterator; //for deal event loop +#endif #if IOEVENT_USE_EPOLL struct epoll_event *events; - int timeout; + int timeout; //in milliseconds +#elif IOEVENT_USE_URING + struct io_uring_cqe *cqe; + struct __kernel_timespec timeout; #elif IOEVENT_USE_KQUEUE struct kevent *events; struct timespec timeout; @@ -84,11 +103,18 @@ typedef struct ioevent_puller { port_event_t *events; timespec_t timeout; #endif + +#ifdef OS_LINUX + bool zero_timeout; +#endif + } IOEventPoller; #if IOEVENT_USE_EPOLL #define IOEVENT_GET_EVENTS(ioevent, index) \ (ioevent)->events[index].events +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \ (ioevent)->events[index].filter, (ioevent)->events[index].flags) @@ -102,6 +128,8 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_GET_DATA(ioevent, index) \ (ioevent)->events[index].data.ptr +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_DATA(ioevent, index) \ (ioevent)->events[index].udata @@ -115,6 +143,8 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_CLEAR_DATA(ioevent, index) \ (ioevent)->events[index].data.ptr = NULL +#elif IOEVENT_USE_URING + #elif IOEVENT_USE_KQUEUE #define IOEVENT_CLEAR_DATA(ioevent, index) \ (ioevent)->events[index].udata = NULL @@ -129,18 +159,19 @@ typedef struct ioevent_puller { extern "C" { #endif -int ioevent_init(IOEventPoller *ioevent, const int size, - const int timeout_ms, const int extra_events); +int ioevent_init(IOEventPoller *ioevent, const char *service_name, + const int size, const int timeout_ms, const int extra_events); void ioevent_destroy(IOEventPoller *ioevent); -int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, - void *data); -int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, - void *data); +int ioevent_attach(IOEventPoller *ioevent, const int fd, + const int e, void *data); +int ioevent_modify(IOEventPoller *ioevent, const int fd, + const int e, void *data); int ioevent_detach(IOEventPoller *ioevent, const int fd); int ioevent_poll(IOEventPoller *ioevent); -static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout_ms) +static inline void ioevent_set_timeout(IOEventPoller *ioevent, + const int timeout_ms) { #if IOEVENT_USE_EPOLL ioevent->timeout = timeout_ms; @@ -148,6 +179,11 @@ static inline void ioevent_set_timeout(IOEventPoller *ioevent, const int timeout ioevent->timeout.tv_sec = timeout_ms / 1000; ioevent->timeout.tv_nsec = 1000000 * (timeout_ms % 1000); #endif + +#ifdef OS_LINUX + ioevent->zero_timeout = (timeout_ms == 0); +#endif + } static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) @@ -156,6 +192,114 @@ static inline int ioevent_poll_ex(IOEventPoller *ioevent, const int timeout_ms) return ioevent_poll(ioevent); } +#if IOEVENT_USE_URING +static inline void ioevent_set_send_zc_done_notify( + IOEventPoller *ioevent, const bool need_notify) +{ + ioevent->send_zc_done_notify = need_notify; +} + +static inline int ioevent_uring_submit(IOEventPoller *ioevent) +{ + int result; + + ioevent->submit_count = 0; + while (1) { + result = io_uring_submit(&ioevent->ring); + if (result < 0) { + if (result != -EINTR) { + return -result; + } + } else { + return 0; + } + } +} + +static inline struct io_uring_sqe *ioevent_uring_get_sqe(IOEventPoller *ioevent) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ioevent->ring); + if (sqe == NULL) { + logError("file: "__FILE__", line: %d, " + "io_uring_get_sqe fail", __LINE__); + } + return sqe; +} + +static inline void ioevent_uring_prep_recv(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t size, void *user_data) +{ + io_uring_prep_recv(sqe, sockfd, buf, size, 0); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + +static inline void ioevent_uring_prep_send(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, void *user_data) +{ + io_uring_prep_send(sqe, sockfd, buf, len, 0); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + +static inline void ioevent_uring_prep_writev(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, const struct iovec *iovecs, + unsigned nr_vecs, void *user_data) +{ + io_uring_prep_writev(sqe, sockfd, iovecs, nr_vecs, 0); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + +static inline void ioevent_uring_prep_send_zc(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, void *user_data) +{ + io_uring_prep_send_zc(sqe, sockfd, buf, len, 0, +#ifdef IORING_SEND_ZC_REPORT_USAGE + IORING_SEND_ZC_REPORT_USAGE +#else + 0 +#endif + ); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + +static inline void ioevent_uring_prep_close(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int fd, void *user_data) +{ + io_uring_prep_close(sqe, fd); + if (user_data == NULL) { + /* set sqe->flags MUST after io_uring_prep_xxx */ + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + } else { + sqe->user_data = (long)user_data; + } + ioevent->submit_count++; +} + +static inline void ioevent_uring_prep_cancel(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, void *user_data) +{ + io_uring_prep_cancel(sqe, user_data, 0); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + +static inline void ioevent_uring_prep_connect(IOEventPoller *ioevent, + struct io_uring_sqe *sqe, int fd, const struct sockaddr *addr, + socklen_t addrlen, void *user_data) +{ + io_uring_prep_connect(sqe, fd, addr, addrlen); + sqe->user_data = (long)user_data; + ioevent->submit_count++; +} + +#endif + #ifdef __cplusplus } #endif diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index f1ba16e..1587cff 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -17,6 +17,76 @@ #include "logger.h" #include "ioevent_loop.h" +#if IOEVENT_USE_URING +static int ioevent_process(IOEventPoller *ioevent) +{ + int result; + unsigned head; + unsigned count = 0; + IOEventEntry *pEntry; + + result = io_uring_wait_cqe_timeout(&ioevent->ring, + &ioevent->cqe, &ioevent->timeout); + switch (result) { + case 0: + break; + case -ETIME: + case -EINTR: + return 0; + default: + result *= -1; + logError("file: "__FILE__", line: %d, " + "io_uring_wait_cqe fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + io_uring_for_each_cqe(&ioevent->ring, head, ioevent->cqe) { + count++; + pEntry = (IOEventEntry *)ioevent->cqe->user_data; + if (pEntry != NULL) { + if (ioevent->cqe->flags & IORING_CQE_F_NOTIF) { + if (ioevent->send_zc_done_notify) { + pEntry->callback(pEntry->fd, IOEVENT_NOTIFY, pEntry); + } + +#ifdef IORING_NOTIF_USAGE_ZC_COPIED + if (!ioevent->send_zc_logged) { + struct fast_task_info *task; + + task = (struct fast_task_info *)pEntry; + ioevent->send_zc_logged = true; + if (ioevent->cqe->res & IORING_NOTIF_USAGE_ZC_COPIED) { + logWarning("file: "__FILE__", line: %d, %s " + "client %s:%u, io_uring send_zc: memory " + "copy instead of zero copy!", __LINE__, + ioevent->service_name, task->client_ip, + task->port); + } else { + logInfo("file: "__FILE__", line: %d, %s " + "client %s:%u, io_uring send_zc: zero " + "copy OK.", __LINE__, ioevent->service_name, + task->client_ip, task->port); + } + } +#endif + } else { + pEntry->res = ioevent->cqe->res; + pEntry->callback(pEntry->fd, 0, pEntry); + } + } else { + logWarning("file: "__FILE__", line: %d, " + "io_uring unexpected flags: %d, result: %d", __LINE__, + ioevent->cqe->flags, ioevent->cqe->res); + } + } + + io_uring_cq_advance(&ioevent->ring, count); + return 0; +} + +#else + static void deal_ioevents(IOEventPoller *ioevent) { int event; @@ -39,37 +109,29 @@ static void deal_ioevents(IOEventPoller *ioevent) } } -int ioevent_remove(IOEventPoller *ioevent, void *data) +static int ioevent_process(IOEventPoller *ioevent) { - IOEventEntry *pEntry; - int index; + int result; - if (ioevent->iterator.index >= ioevent->iterator.count) - { - return ENOENT; + ioevent->iterator.count = ioevent_poll(ioevent); + if (ioevent->iterator.count > 0) { + deal_ioevents(ioevent); } - - pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, - ioevent->iterator.index); - if (pEntry != NULL && (void *)pEntry == data) { - return 0; //do NOT clear current entry - } - - for (index=ioevent->iterator.index + 1; index < ioevent->iterator.count; - index++) - { - pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index); - if (pEntry != NULL && (void *)pEntry == data) { - logDebug("file: "__FILE__", line: %d, " - "clear ioevent data: %p", __LINE__, data); - IOEVENT_CLEAR_DATA(ioevent, index); - return 0; + else if (ioevent->iterator.count < 0) { + result = errno != 0 ? errno : EINVAL; + if (result != EINTR) { + logError("file: "__FILE__", line: %d, " + "ioevent_poll fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; } } - return ENOENT; + return 0; } +#endif + static void deal_timeouts(FastTimerEntry *head) { FastTimerEntry *entry; @@ -82,7 +144,8 @@ static void deal_timeouts(FastTimerEntry *head) current = entry; entry = entry->next; - current->prev = current->next = NULL; //must set NULL because NOT in time wheel + /* must set NULL because NOT in time wheel */ + current->prev = current->next = NULL; pEventEntry = (IOEventEntry *)current; if (pEventEntry != NULL) { @@ -131,10 +194,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, thread_data->deleted_list = NULL; last_check_time = g_current_time; - while (*continue_flag) - { + while (*continue_flag) { #ifdef OS_LINUX - if (thread_data->ev_puller.timeout == 0) { + if (thread_data->ev_puller.zero_timeout) { sched_pull = (sched_counter++ & 8) != 0; } else { sched_pull = true; @@ -143,43 +205,34 @@ int ioevent_loop(struct nio_thread_data *thread_data, sched_pull = true; #endif - if (sched_pull) - { - thread_data->ev_puller.iterator.count = ioevent_poll( - &thread_data->ev_puller); - if (thread_data->ev_puller.iterator.count > 0) - { - deal_ioevents(&thread_data->ev_puller); +#if IOEVENT_USE_URING + if (thread_data->ev_puller.submit_count > 0) { + if ((result=ioevent_uring_submit(&thread_data->ev_puller)) != 0) { + logError("file: "__FILE__", line: %d, " + "io_uring_submit fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; } - else if (thread_data->ev_puller.iterator.count < 0) - { - result = errno != 0 ? errno : EINVAL; - if (result != EINTR) - { - logError("file: "__FILE__", line: %d, " \ - "ioevent_poll fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } + } +#endif + + if (sched_pull) { + if ((result=ioevent_process(&thread_data->ev_puller)) != 0) { + return result; } } - if (thread_data->busy_polling_callback != NULL) - { + if (thread_data->busy_polling_callback != NULL) { thread_data->busy_polling_callback(thread_data); } - if (thread_data->deleted_list != NULL) - { + if (thread_data->deleted_list != NULL) { count = 0; - while (thread_data->deleted_list != NULL) - { + while (thread_data->deleted_list != NULL) { task = thread_data->deleted_list; thread_data->deleted_list = task->next; - if (task->polling.in_queue) - { + if (task->polling.in_queue) { fc_list_del_init(&task->polling.dlink); task->polling.in_queue = false; if (fc_list_empty(&task->thread_data->polling_queue)) { @@ -193,8 +246,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, //logInfo("cleanup task count: %d", count); } - if (g_current_time - last_check_time > 0) - { + if (g_current_time - last_check_time > 0) { last_check_time = g_current_time; count = fast_timer_timeouts_get( &thread_data->timer, g_current_time, &head); @@ -204,8 +256,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } } - if (thread_data->notify.enabled) - { + if (thread_data->notify.enabled) { int64_t n; if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0) { @@ -219,8 +270,7 @@ int ioevent_loop(struct nio_thread_data *thread_data, } } - if (thread_data->thread_loop_callback != NULL) - { + if (thread_data->thread_loop_callback != NULL) { thread_data->thread_loop_callback(thread_data); } } @@ -229,40 +279,48 @@ int ioevent_loop(struct nio_thread_data *thread_data, } int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, - int sock, short event, IOEventCallback callback, const int timeout) + int sock, short event, IOEventCallback callback, + const int timeout, const bool use_iouring) { int result; task->thread_data = pThread; task->event.fd = sock; task->event.callback = callback; - if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) - { - result = errno != 0 ? errno : ENOENT; - logError("file: "__FILE__", line: %d, " - "ioevent_attach fail, fd: %d, " - "errno: %d, error info: %s", - __LINE__, sock, result, STRERROR(result)); - return result; - } +#if IOEVENT_USE_URING + if (use_iouring) { + if (FC_URING_OP_TYPE(task) == IORING_OP_NOP) { + if ((result=uring_prep_first_recv(task)) != 0) { + logError("file: "__FILE__", line: %d, " + "uring_prep_recv fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); + return result; + } + } else { + /* + logWarning("file: "__FILE__", line: %d, " + "skip uring_prep_recv, fd: %d, port: %d, " + "in progress op type: %d, timeout: %"PRId64, + __LINE__, sock, task->port, FC_URING_OP_TYPE(task), + task->event.timer.expires); + */ + } + } else { +#endif + if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) { + result = errno != 0 ? errno : ENOENT; + logError("file: "__FILE__", line: %d, " + "ioevent_attach fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); + return result; + } +#if IOEVENT_USE_URING + } +#endif task->event.timer.expires = g_current_time + timeout; fast_timer_add(&pThread->timer, &task->event.timer); return 0; } - -int ioevent_reset(struct fast_task_info *task, int new_fd, short event) -{ - if (task->event.fd == new_fd) - { - return 0; - } - - if (task->event.fd >= 0) - { - ioevent_detach(&task->thread_data->ev_puller, task->event.fd); - } - - task->event.fd = new_fd; - return ioevent_attach(&task->thread_data->ev_puller, new_fd, event, task); -} diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index cce2cb6..a12f33d 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -17,6 +17,14 @@ #define _IOEVENT_LOOP_H #include "fast_task_queue.h" +#if IOEVENT_USE_URING +#include "sockopt.h" +#endif + +#define fc_hold_task_ex(task, inc_count) __sync_add_and_fetch( \ + &task->reffer_count, inc_count) + +#define fc_hold_task(task) fc_hold_task_ex(task, 1) #ifdef __cplusplus extern "C" { @@ -26,13 +34,9 @@ int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag); -//remove entry from ready list -int ioevent_remove(IOEventPoller *ioevent, void *data); - -int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread, - int sock, short event, IOEventCallback callback, const int timeout); - -int ioevent_reset(struct fast_task_info *task, int new_fd, short event); +int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, + int sock, short event, IOEventCallback callback, + const int timeout, const bool use_iouring); static inline bool ioevent_is_canceled(struct fast_task_info *task) { @@ -75,9 +79,170 @@ static inline int ioevent_notify_thread(struct nio_thread_data *thread_data) return 0; } +#if IOEVENT_USE_URING + +#define SET_OP_TYPE_AND_HOLD_TASK(task, _op_type) \ + struct io_uring_sqe *sqe; \ + if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { \ + return ENOSPC; \ + } \ + FC_URING_OP_TYPE(task) = _op_type; \ + fc_hold_task(task) + +static inline int uring_prep_recv_data(struct fast_task_info *task, + char *buff, const int len) +{ + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); + ioevent_uring_prep_recv(&task->thread_data->ev_puller, + sqe, task->event.fd, buff, len, task); + return 0; +} + +static inline int uring_prep_first_recv(struct fast_task_info *task) +{ + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); + ioevent_uring_prep_recv(&task->thread_data->ev_puller, + sqe, task->event.fd, task->recv.ptr->data, + task->recv.ptr->size, task); + return 0; +} + +static inline int uring_prep_next_recv(struct fast_task_info *task) +{ + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_RECV); + ioevent_uring_prep_recv(&task->thread_data->ev_puller, sqe, + task->event.fd, task->recv.ptr->data + task->recv.ptr->offset, + task->recv.ptr->length - task->recv.ptr->offset, task); + return 0; +} + +static inline int uring_prep_first_send(struct fast_task_info *task) +{ + if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); + ioevent_uring_prep_send(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } + return 0; +} + +static inline int uring_prep_next_send(struct fast_task_info *task) +{ + if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); + ioevent_uring_prep_send(&task->thread_data->ev_puller, sqe, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } + return 0; +} + +static inline int uring_prep_first_send_zc(struct fast_task_info *task) +{ + if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else if (task->send.ptr->length < 4096) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); + ioevent_uring_prep_send(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); + ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, + sqe, task->event.fd, task->send.ptr->data, + task->send.ptr->length, task); + } + return 0; +} + +static inline int uring_prep_next_send_zc(struct fast_task_info *task) +{ + if (task->iovec_array.iovs != NULL) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_WRITEV); + ioevent_uring_prep_writev(&task->thread_data->ev_puller, + sqe, task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX), + task); + } else if (task->send.ptr->length - task->send.ptr->offset < 4096) { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND); + ioevent_uring_prep_send(&task->thread_data->ev_puller, sqe, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } else { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_SEND_ZC); + ioevent_uring_prep_send_zc(&task->thread_data->ev_puller, sqe, + task->event.fd, task->send.ptr->data + task->send.ptr->offset, + task->send.ptr->length - task->send.ptr->offset, task); + } + return 0; +} + +static inline int uring_prep_close_fd(struct fast_task_info *task) +{ + struct io_uring_sqe *sqe; + + if ((sqe=ioevent_uring_get_sqe(&task->thread_data->ev_puller)) == NULL) { + return ENOSPC; + } + + /* do NOT need callback */ + ioevent_uring_prep_close(&task->thread_data-> + ev_puller, sqe, task->event.fd, NULL); + return 0; +} + +static inline int uring_prep_cancel(struct fast_task_info *task) +{ + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_ASYNC_CANCEL); + ioevent_uring_prep_cancel(&task->thread_data->ev_puller, sqe, task); + return 0; +} + +static inline int uring_prep_connect(struct fast_task_info *task) +{ + int result; + sockaddr_convert_t *convert; + + if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, + O_NONBLOCK, NULL, &result)) < 0) + { + return result; + } + + convert = (sockaddr_convert_t *)(task->send.ptr->data + + task->send.ptr->size - 2 * sizeof(sockaddr_convert_t)); + if ((result=setsockaddrbyip(task->server_ip, task->port, convert)) != 0) { + return result; + } + + do { + SET_OP_TYPE_AND_HOLD_TASK(task, IORING_OP_CONNECT); + ioevent_uring_prep_connect(&task->thread_data->ev_puller, sqe, + task->event.fd, &convert->sa.addr, convert->len, task); + } while (0); + return 0; +} +#endif + #ifdef __cplusplus } #endif #endif - diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c index 428207c..b365f58 100644 --- a/src/multi_socket_client.c +++ b/src/multi_socket_client.c @@ -65,8 +65,8 @@ int fast_multi_sock_client_init_ex(FastMultiSockClient *client, return EINVAL; } - if ((result=ioevent_init(&client->ioevent, entry_count, - timeout_ms, 0)) != 0) + if ((result=ioevent_init(&client->ioevent, "client", + entry_count, timeout_ms, 0)) != 0) { logError("file: "__FILE__", line: %d, " "ioevent_init fail, errno: %d, error info: %s", @@ -316,9 +316,13 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) { int result; - int event; int count; +#if IOEVENT_USE_URING + unsigned head; +#else + int event; int index; +#endif int remain_timeout; FastMultiSockEntry *entry; char formatted_ip[FORMATTED_IP_SIZE]; @@ -330,6 +334,37 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) break; } +#if IOEVENT_USE_URING + result = io_uring_wait_cqe_timeout(&client->ioevent.ring, + &client->ioevent.cqe, &client->ioevent.timeout); + switch (result) { + case 0: + break; + case -ETIME: + case -EAGAIN: + case -EINTR: + continue; + default: + result *= -1; + logError("file: "__FILE__", line: %d, " + "io_uring_wait_cqe fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + count = 0; + io_uring_for_each_cqe(&client->ioevent.ring, head, client->ioevent.cqe) { + count++; + entry = (FastMultiSockEntry *)client->ioevent.cqe->user_data; + //logInfo("sock: %d, event: %d", entry->conn->sock, event); + result = entry->io_callback(client, entry); + if (result != 0 || entry->remain == 0) { + fast_multi_sock_client_finish(client, entry, result); + } + } + io_uring_cq_advance(&client->ioevent.ring, count); + +#else count = ioevent_poll_ex(&client->ioevent, remain_timeout); //logInfo("poll count: %d\n", count); for (index=0; index