call writev for iovec array

iovec_array
YuQing 2021-06-08 14:19:11 +08:00
parent ba70c63e80
commit d11243964b
6 changed files with 72 additions and 13 deletions

View File

@ -485,7 +485,7 @@ static int do_init()
receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
NULL, sf_proto_set_body_length, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task);
1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL);
}
int receipt_handler_init()

View File

@ -20,6 +20,7 @@
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
@ -45,13 +46,15 @@
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func,
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback)
sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback
release_buffer_callback)
{
sf_context->header_size = header_size;
sf_context->set_body_length = set_body_length_func;
sf_context->deal_task = deal_func;
sf_context->task_cleanup_func = cleanup_func;
sf_context->timeout_callback = timeout_callback;
sf_context->release_buffer_callback = release_buffer_callback;
}
void sf_task_detach_thread(struct fast_task_info *task)
@ -76,6 +79,17 @@ void sf_task_switch_thread(struct fast_task_info *task,
task->thread_data = SF_CTX->thread_data + new_thread_index;
}
static inline void release_iovec_buffer(struct fast_task_info *task)
{
if (task->iovec_array.iovs != NULL) {
if (SF_CTX->release_buffer_callback != NULL) {
SF_CTX->release_buffer_callback(task);
}
task->iovec_array.iovs = NULL;
task->iovec_array.count = 0;
}
}
void sf_task_finish_clean_up(struct fast_task_info *task)
{
/*
@ -93,6 +107,8 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
task->finish_callback = NULL;
}
release_iovec_buffer(task);
sf_task_detach_thread(task);
close(task->event.fd);
task->event.fd = -1;
@ -667,8 +683,13 @@ int sf_client_sock_write(int sock, short event, void *arg)
&task->event.timer, g_current_time +
task->network_timeout);
bytes = write(sock, task->data + task->offset,
task->length - task->offset);
if (task->iovec_array.iovs != NULL) {
bytes = writev(sock, task->iovec_array.iovs,
task->iovec_array.count);
} else {
bytes = write(sock, task->data + task->offset,
task->length - task->offset);
}
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
@ -704,6 +725,8 @@ int sf_client_sock_write(int sock, short event, void *arg)
total_write += bytes;
task->offset += bytes;
if (task->offset >= task->length) {
release_iovec_buffer(task);
task->offset = 0;
task->length = 0;
if (sf_set_read_event(task) != 0) {
@ -711,6 +734,35 @@ int sf_client_sock_write(int sock, short event, void *arg)
}
break;
}
/* set next writev iovec array */
if (task->iovec_array.iovs != NULL) {
struct iovec *iov;
struct iovec *end;
int iov_sum;
int iov_remain;
iov = task->iovec_array.iovs;
end = task->iovec_array.iovs + task->iovec_array.count;
iov_sum = 0;
do {
iov_sum += iov->iov_len;
iov_remain = iov_sum - bytes;
if (iov_remain == 0) {
iov++;
break;
} else if (iov_remain > 0) {
iov->iov_base += (iov->iov_len - iov_remain);
iov->iov_len = iov_remain;
break;
}
iov++;
} while (iov < end);
task->iovec_array.iovs = iov;
task->iovec_array.count = end - iov;
}
}
return total_write;

View File

@ -32,13 +32,14 @@ extern "C" {
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func,
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback);
sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback
release_buffer_callback);
#define sf_set_parameters(header_size, set_body_length_func, \
deal_func, cleanup_func, timeout_callback) \
sf_set_parameters_ex(&g_sf_context, header_size, \
set_body_length_func, deal_func, \
cleanup_func, timeout_callback)
cleanup_func, timeout_callback, NULL)
static inline void sf_set_deal_task_func_ex(SFContext *sf_context,
sf_deal_task_func deal_func)

View File

@ -124,7 +124,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_arg_size,
TaskInitCallback init_callback)
TaskInitCallback init_callback, sf_release_buffer_callback
release_buffer_callback)
{
int result;
int bytes;
@ -139,8 +140,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_context->realloc_task_buffer = g_sf_global_vars.
min_buff_size < g_sf_global_vars.max_buff_size;
sf_context->accept_done_func = accept_done_callback;
sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func,
deal_func, task_cleanup_func, timeout_callback);
sf_set_parameters_ex(sf_context, proto_header_size,
set_body_length_func, deal_func, task_cleanup_func,
timeout_callback, release_buffer_callback);
if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) {
return result;

View File

@ -42,7 +42,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_arg_size,
TaskInitCallback init_callback);
TaskInitCallback init_callback, sf_release_buffer_callback
release_buffer_callback);
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -51,7 +52,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_service_init_ex2(sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \
deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \
proto_header_size, task_arg_size, NULL)
proto_header_size, task_arg_size, NULL, NULL)
#define sf_service_init(name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -60,7 +61,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \
deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \
proto_header_size, task_arg_size, NULL)
proto_header_size, task_arg_size, NULL, NULL)
int sf_service_destroy_ex(SFContext *sf_context);
@ -151,7 +152,6 @@ static inline void sf_release_task(struct fast_task_info *task)
"used: %d, freed: %d", __LINE__, task,
alloc_count, alloc_count - free_count, free_count);
*/
free_queue_push(task);
} else {
/*

View File

@ -40,6 +40,9 @@ typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
/* calback for release iovec buffer */
typedef void (*sf_release_buffer_callback)(struct fast_task_info *task);
typedef struct sf_context {
char name[64];
struct nio_thread_data *thread_data;
@ -63,6 +66,7 @@ typedef struct sf_context {
sf_accept_done_callback accept_done_func;
TaskCleanUpCallback task_cleanup_func;
sf_recv_timeout_callback timeout_callback;
sf_release_buffer_callback release_buffer_callback;
} SFContext;
typedef struct {