From d11243964b0842f50187c0d12ef8c1a81600ac79 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 8 Jun 2021 14:19:11 +0800 Subject: [PATCH] call writev for iovec array --- src/idempotency/client/receipt_handler.c | 2 +- src/sf_nio.c | 58 ++++++++++++++++++++++-- src/sf_nio.h | 5 +- src/sf_service.c | 8 ++-- src/sf_service.h | 8 ++-- src/sf_types.h | 4 ++ 6 files changed, 72 insertions(+), 13 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 0889713..61c8d8c 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -485,7 +485,7 @@ static int do_init() receipt_alloc_thread_extra_data, receipt_thread_loop_callback, NULL, sf_proto_set_body_length, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, - 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task); + 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL); } int receipt_handler_init() diff --git a/src/sf_nio.c b/src/sf_nio.c index c2b2f67..9f5c7c0 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -45,13 +46,15 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, - sf_recv_timeout_callback timeout_callback) + sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback + release_buffer_callback) { sf_context->header_size = header_size; sf_context->set_body_length = set_body_length_func; sf_context->deal_task = deal_func; sf_context->task_cleanup_func = cleanup_func; sf_context->timeout_callback = timeout_callback; + sf_context->release_buffer_callback = release_buffer_callback; } void sf_task_detach_thread(struct fast_task_info *task) @@ -76,6 +79,17 @@ void sf_task_switch_thread(struct fast_task_info *task, task->thread_data = SF_CTX->thread_data + new_thread_index; } +static inline void release_iovec_buffer(struct fast_task_info *task) +{ + if (task->iovec_array.iovs != NULL) { + if (SF_CTX->release_buffer_callback != NULL) { + SF_CTX->release_buffer_callback(task); + } + task->iovec_array.iovs = NULL; + task->iovec_array.count = 0; + } +} + void sf_task_finish_clean_up(struct fast_task_info *task) { /* @@ -93,6 +107,8 @@ void sf_task_finish_clean_up(struct fast_task_info *task) task->finish_callback = NULL; } + release_iovec_buffer(task); + sf_task_detach_thread(task); close(task->event.fd); task->event.fd = -1; @@ -667,8 +683,13 @@ int sf_client_sock_write(int sock, short event, void *arg) &task->event.timer, g_current_time + task->network_timeout); - bytes = write(sock, task->data + task->offset, - task->length - task->offset); + if (task->iovec_array.iovs != NULL) { + bytes = writev(sock, task->iovec_array.iovs, + task->iovec_array.count); + } else { + bytes = write(sock, task->data + task->offset, + task->length - task->offset); + } if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -704,6 +725,8 @@ int sf_client_sock_write(int sock, short event, void *arg) total_write += bytes; task->offset += bytes; if (task->offset >= task->length) { + release_iovec_buffer(task); + task->offset = 0; task->length = 0; if (sf_set_read_event(task) != 0) { @@ -711,6 +734,35 @@ int sf_client_sock_write(int sock, short event, void *arg) } break; } + + /* set next writev iovec array */ + if (task->iovec_array.iovs != NULL) { + struct iovec *iov; + struct iovec *end; + int iov_sum; + int iov_remain; + + iov = task->iovec_array.iovs; + end = task->iovec_array.iovs + task->iovec_array.count; + iov_sum = 0; + do { + iov_sum += iov->iov_len; + iov_remain = iov_sum - bytes; + if (iov_remain == 0) { + iov++; + break; + } else if (iov_remain > 0) { + iov->iov_base += (iov->iov_len - iov_remain); + iov->iov_len = iov_remain; + break; + } + + iov++; + } while (iov < end); + + task->iovec_array.iovs = iov; + task->iovec_array.count = end - iov; + } } return total_write; diff --git a/src/sf_nio.h b/src/sf_nio.h index 1dbf6b3..8a08448 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -32,13 +32,14 @@ extern "C" { void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, - sf_recv_timeout_callback timeout_callback); + sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback + release_buffer_callback); #define sf_set_parameters(header_size, set_body_length_func, \ deal_func, cleanup_func, timeout_callback) \ sf_set_parameters_ex(&g_sf_context, header_size, \ set_body_length_func, deal_func, \ - cleanup_func, timeout_callback) + cleanup_func, timeout_callback, NULL) static inline void sf_set_deal_task_func_ex(SFContext *sf_context, sf_deal_task_func deal_func) diff --git a/src/sf_service.c b/src/sf_service.c index da70a26..212e91a 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -124,7 +124,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size, - TaskInitCallback init_callback) + TaskInitCallback init_callback, sf_release_buffer_callback + release_buffer_callback) { int result; int bytes; @@ -139,8 +140,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_context->realloc_task_buffer = g_sf_global_vars. min_buff_size < g_sf_global_vars.max_buff_size; sf_context->accept_done_func = accept_done_callback; - sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func, - deal_func, task_cleanup_func, timeout_callback); + sf_set_parameters_ex(sf_context, proto_header_size, + set_body_length_func, deal_func, task_cleanup_func, + timeout_callback, release_buffer_callback); if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) { return result; diff --git a/src/sf_service.h b/src/sf_service.h index 12a9c22..9e6f844 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -42,7 +42,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size, - TaskInitCallback init_callback); + TaskInitCallback init_callback, sf_release_buffer_callback + release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -51,7 +52,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_service_init_ex2(sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ - proto_header_size, task_arg_size, NULL) + proto_header_size, task_arg_size, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -60,7 +61,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ - proto_header_size, task_arg_size, NULL) + proto_header_size, task_arg_size, NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); @@ -151,7 +152,6 @@ static inline void sf_release_task(struct fast_task_info *task) "used: %d, freed: %d", __LINE__, task, alloc_count, alloc_count - free_count, free_count); */ - free_queue_push(task); } else { /* diff --git a/src/sf_types.h b/src/sf_types.h index afb9bd1..7b16c1e 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -40,6 +40,9 @@ typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); +/* calback for release iovec buffer */ +typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); + typedef struct sf_context { char name[64]; struct nio_thread_data *thread_data; @@ -63,6 +66,7 @@ typedef struct sf_context { sf_accept_done_callback accept_done_func; TaskCleanUpCallback task_cleanup_func; sf_recv_timeout_callback timeout_callback; + sf_release_buffer_callback release_buffer_callback; } SFContext; typedef struct {