From cf4856e04b7ea7aff1763e55ad33fb1aa408b1c9 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 14 Sep 2022 10:38:38 +0800 Subject: [PATCH] support send_done_callback for FastDFS --- src/idempotency/client/receipt_handler.c | 2 +- src/sf_nio.c | 19 +++++++++++++++---- src/sf_nio.h | 1 + src/sf_service.c | 17 ++++++++++++----- src/sf_service.h | 22 ++++++++++++---------- src/sf_types.h | 7 +++++-- 6 files changed, 46 insertions(+), 22 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 3fcc4bb..b4e8c51 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -493,7 +493,7 @@ static int do_init() return sf_service_init_ex2(&g_sf_context, "idemp-receipt", receipt_alloc_thread_extra_data, receipt_thread_loop_callback, - NULL, sf_proto_set_body_length, NULL, receipt_deal_task, + NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL); } diff --git a/src/sf_nio.c b/src/sf_nio.c index 0f115ff..498a5d6 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -44,6 +44,7 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, + sf_send_done_callback send_done_callback, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback) @@ -51,6 +52,7 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_context->header_size = header_size; sf_context->set_body_length = set_body_length_func; sf_context->alloc_recv_buffer = alloc_recv_buffer_func; + sf_context->send_done_callback = send_done_callback; sf_context->deal_task = deal_func; sf_context->task_cleanup_func = cleanup_func; sf_context->timeout_callback = timeout_callback; @@ -680,6 +682,7 @@ int sf_client_sock_write(int sock, short event, void *arg) int result; int bytes; int total_write; + int length; struct fast_task_info *task; task = (struct fast_task_info *)arg; @@ -724,10 +727,9 @@ int sf_client_sock_write(int sock, short event, void *arg) continue; } else { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, send fail, " - "errno: %d, error info: %s", - __LINE__, task->client_ip, - errno, strerror(errno)); + "client ip: %s, send fail, task offset: %d, length: %d, " + "errno: %d, error info: %s", __LINE__, task->client_ip, + task->offset, task->length, errno, strerror(errno)); ioevent_add_to_deleted_list(task); return -1; @@ -747,11 +749,20 @@ int sf_client_sock_write(int sock, short event, void *arg) if (task->offset >= task->length) { release_iovec_buffer(task); + length = task->length; task->offset = 0; task->length = 0; if (sf_set_read_event(task) != 0) { return -1; } + + if (SF_CTX->send_done_callback != NULL) { + if (SF_CTX->send_done_callback(task, length) != 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + } + break; } diff --git a/src/sf_nio.h b/src/sf_nio.h index fb4b06e..cbdefb4 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -35,6 +35,7 @@ extern "C" { void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, + sf_send_done_callback send_done_callback, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback); diff --git a/src/sf_service.c b/src/sf_service.c index 6ac2d5e..819b817 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -122,6 +122,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_accept_done_callback accept_done_callback, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, + sf_send_done_callback send_done_callback, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size, @@ -143,8 +144,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_context->accept_done_func = accept_done_callback; sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func, alloc_recv_buffer_func, - deal_func, task_cleanup_func, timeout_callback, - release_buffer_callback); + send_done_callback, deal_func, task_cleanup_func, + timeout_callback, release_buffer_callback); if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) { return result; @@ -427,9 +428,15 @@ static void accept_run(struct accept_thread_context *accept_context) task->thread_data = accept_context->sf_context->thread_data + incomesock % accept_context->sf_context->work_threads; if (accept_context->sf_context->accept_done_func != NULL) { - accept_context->sf_context->accept_done_func(task, - accept_context->server_sock == - accept_context->sf_context->inner_sock); + if (accept_context->sf_context->accept_done_func(task, + inaddr.sin_addr.s_addr, + accept_context->server_sock == + accept_context->sf_context->inner_sock) != 0) + { + close(incomesock); + sf_release_task(task); + continue; + } } if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { diff --git a/src/sf_service.h b/src/sf_service.h index 892edc3..a49bd97 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -40,6 +40,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_accept_done_callback accept_done_callback, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, + sf_send_done_callback send_done_callback, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size, @@ -47,22 +48,23 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ - thread_loop_callback, accept_done_callback, set_body_length_func, \ - deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ - proto_header_size, task_arg_size) \ + thread_loop_callback, accept_done_callback, set_body_length_func, \ + send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ + net_timeout_ms, proto_header_size, task_arg_size) \ sf_service_init_ex2(sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ - NULL, deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ - proto_header_size, task_arg_size, NULL, NULL) + NULL, send_done_callback, deal_func, task_cleanup_func, \ + timeout_callback, net_timeout_ms, proto_header_size, \ + task_arg_size, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ - thread_loop_callback, accept_done_callback, set_body_length_func, \ - deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ - proto_header_size, task_arg_size) \ + thread_loop_callback, accept_done_callback, set_body_length_func, \ + send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ + net_timeout_ms, proto_header_size, task_arg_size) \ sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \ - deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ - proto_header_size, task_arg_size, NULL, NULL) + send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ + net_timeout_ms, proto_header_size, task_arg_size, NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); diff --git a/src/sf_types.h b/src/sf_types.h index cbaa06b..be5bee4 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -34,13 +34,15 @@ #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency -typedef void (*sf_accept_done_callback)(struct fast_task_info *task, - const bool bInnerPort); +typedef int (*sf_accept_done_callback)(struct fast_task_info *task, + const in_addr_t client_addr, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, const int buff_size, bool *new_alloc); typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); +typedef int (*sf_send_done_callback)(struct fast_task_info *task, + const int length); /* calback for release iovec buffer */ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); @@ -69,6 +71,7 @@ typedef struct sf_context { sf_set_body_length_callback set_body_length; sf_alloc_recv_buffer_callback alloc_recv_buffer; sf_accept_done_callback accept_done_func; + sf_send_done_callback send_done_callback; TaskCleanUpCallback task_cleanup_func; sf_recv_timeout_callback timeout_callback; sf_release_buffer_callback release_buffer_callback;