diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 118c937..c5f9225 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -174,7 +174,6 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, const uint32_t hash_code, const FCCommunicationType comm_type, const char *server_ip, const uint16_t port, int *err_no) { - int len; struct fast_task_info *task; SFAddressFamilyHandler *fh; SFNetworkHandler *handler; @@ -195,12 +194,7 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel return NULL; } - len = strlen(server_ip); - if (len >= sizeof(task->server_ip)) { - len = sizeof(task->server_ip) - 1; - } - memcpy(task->server_ip, server_ip, len); - *(task->server_ip + len) = '\0'; + fc_safe_strcpy(task->server_ip, server_ip); task->port = port; task->arg = channel; task->thread_data = g_sf_context.thread_data + @@ -209,7 +203,8 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel channel->last_connect_time = g_current_time; if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { channel->in_ioevent = 0; //rollback - sf_release_task(task); + __sync_sub_and_fetch(&task->reffer_count, 1); + free_queue_push(task); return NULL; } return task; @@ -221,6 +216,12 @@ int idempotency_client_channel_check_reconnect( int result; char formatted_ip[FORMATTED_IP_SIZE]; +#if IOEVENT_USE_URING + if (FC_ATOMIC_GET(channel->task->reffer_count) > 1) { + return 0; + } +#endif + if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { return 0; } @@ -237,6 +238,9 @@ int idempotency_client_channel_check_reconnect( formatted_ip, channel->task->port); } + if (channel->task->event.fd >= 0) { + channel->task->handler->close_connection(channel->task); + } __sync_bool_compare_and_swap(&channel->task->canceled, 1, 0); if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { channel->last_connect_time = g_current_time; @@ -348,8 +352,8 @@ int idempotency_client_channel_push(struct idempotency_client_channel *channel, receipt->req_id = req_id; fc_queue_push_ex(&channel->queue, receipt, ¬ify); if (notify) { - if (__sync_add_and_fetch(&channel->in_ioevent, 0)) { - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->in_ioevent)) { + if (FC_ATOMIC_GET(channel->established)) { sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE); } } else { diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index 8fa82be..44de026 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -76,13 +76,13 @@ static inline void idempotency_client_channel_set_id_key( static inline int idempotency_client_channel_check_wait_ex( struct idempotency_client_channel *channel, const int timeout) { - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { return 0; } idempotency_client_channel_check_reconnect(channel); lcp_timedwait_sec(&channel->lcp, timeout); - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { return 0; } else { /* diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index e0c1d06..b6db0bf 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -49,6 +49,10 @@ static IdempotencyReceiptGlobalVars receipt_global_vars; static int receipt_init_task(struct fast_task_info *task, void *arg) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = true; +#endif + if (RDMA_INIT_CONNECTION != NULL) { return RDMA_INIT_CONNECTION(task, arg); } else { @@ -92,7 +96,6 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) if (task->event.fd >= 0) { sf_task_detach_thread(task); - task->handler->close_connection(task); } sf_nio_reset_task_length(task); @@ -282,7 +285,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) } channel = (IdempotencyClientChannel *)task->arg; - if (__sync_add_and_fetch(&channel->established, 0)) { + if (FC_ATOMIC_GET(channel->established)) { format_ip_address(task->server_ip, formatted_ip); logWarning("file: "__FILE__", line: %d, " "response from server %s:%u, unexpected cmd: " diff --git a/src/sf_nio.c b/src/sf_nio.c index e733ac3..44cbc58 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -269,6 +269,12 @@ static inline int set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task) { +#if IOEVENT_USE_URING + if (task->handler->use_io_uring) { + return 0; + } +#endif + task->recv.ptr->offset = 0; task->recv.ptr->length = 0; task->nio_stages.current = SF_NIO_STAGE_RECV; @@ -561,7 +567,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " - "write eventfd %d fail, errno: %d, error info: %s", + "write to fd %d fail, errno: %d, error info: %s", __LINE__, FC_NOTIFY_WRITE_FD(task->thread_data), result, STRERROR(result)); return result; @@ -589,26 +595,32 @@ static inline void deal_notified_task(struct fast_task_info *task, } } -void sf_recv_notify_read(int sock, short event, void *arg) +void sf_recv_notify_read(int fd, short event, void *arg) { int64_t n; int stage; - struct nio_thread_data *thread_data; + struct ioevent_notify_entry *notify_entry; struct fast_task_info *task; struct fast_task_info *current; - thread_data = ((struct ioevent_notify_entry *)arg)->thread_data; - if (read(sock, &n, sizeof(n)) < 0) { + notify_entry = (struct ioevent_notify_entry *)arg; + if (read(fd, &n, sizeof(n)) < 0) { +#if IOEVENT_USE_URING + if (errno == EAGAIN) { + return; + } +#endif + logWarning("file: "__FILE__", line: %d, " - "read from eventfd %d fail, errno: %d, error info: %s", - __LINE__, sock, errno, STRERROR(errno)); + "read from fd %d fail, errno: %d, error info: %s", + __LINE__, fd, errno, STRERROR(errno)); } - PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); - current = thread_data->waiting_queue.head; - thread_data->waiting_queue.head = NULL; - thread_data->waiting_queue.tail = NULL; - PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); + PTHREAD_MUTEX_LOCK(¬ify_entry->thread_data->waiting_queue.lock); + current = notify_entry->thread_data->waiting_queue.head; + notify_entry->thread_data->waiting_queue.head = NULL; + notify_entry->thread_data->waiting_queue.tail = NULL; + PTHREAD_MUTEX_UNLOCK(¬ify_entry->thread_data->waiting_queue.lock); while (current != NULL) { task = current; @@ -772,9 +784,9 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, task length: %d, offset: %d, " + "client ip: %s, task length: %d, offset: %d, " "send failed, connection disconnected", __LINE__, - task->client_ip, task->event.fd, task->send.ptr->length, + task->client_ip, task->send.ptr->length, task->send.ptr->offset); return -1; } diff --git a/src/sf_service.c b/src/sf_service.c index efd3389..a9adfee 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -490,7 +490,9 @@ struct fast_task_info *sf_socket_accept_connection(SFListener *listener) } FC_SET_CLOEXEC(incomesock); - if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) { + if ((task=sf_alloc_init_server_task(listener->handler, + incomesock)) == NULL) + { close(incomesock); return NULL; } diff --git a/src/sf_service.h b/src/sf_service.h index 813e8f2..eba2c7f 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -169,12 +169,42 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return task; } -#define sf_hold_task_ex(task, inc_count) \ - fc_hold_task_ex(task, inc_count) +#define sf_hold_task_ex(task, inc_count) fc_hold_task_ex(task, inc_count) #define sf_hold_task(task) fc_hold_task(task) #define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) +static inline struct fast_task_info *sf_alloc_init_server_task( + SFNetworkHandler *handler, const int fd) +{ + const int reffer_count = 1; + struct fast_task_info *task; + + if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = false; +#endif + } + + return task; +} + +static inline struct fast_task_info *sf_alloc_init_client_task( + SFNetworkHandler *handler) +{ + const int fd = -1; + const int reffer_count = 1; + struct fast_task_info *task; + + if ((task=sf_alloc_init_task_ex(handler, fd, reffer_count)) != NULL) { +#if IOEVENT_USE_URING + FC_URING_IS_CLIENT(task) = true; +#endif + } + + return task; +} + static inline void sf_release_task(struct fast_task_info *task) { if (__sync_sub_and_fetch(&task->reffer_count, 1) == 0) { @@ -194,6 +224,7 @@ static inline void sf_release_task(struct fast_task_info *task) connection_stat.current_count, 1); } #endif + free_queue_push(task); } } @@ -269,6 +300,19 @@ static inline SFNetworkHandler *sf_get_rdma_network_handler3( return sf_get_rdma_network_handler(sf_context3); } +static inline bool sf_get_double_buffers_flag(FCServerGroupInfo *server_group) +{ + if (server_group->comm_type == fc_comm_type_sock) { +#if IOEVENT_USE_URING + return true; +#else + return false; +#endif + } else { //RDMA + return true; + } +} + #ifdef __cplusplus } #endif