support send_done_callback for FastDFS

fstore_storage_engine
YuQing 2022-09-14 10:38:38 +08:00
parent 78337ec4a3
commit cf4856e04b
6 changed files with 46 additions and 22 deletions

View File

@ -493,7 +493,7 @@ static int do_init()
return sf_service_init_ex2(&g_sf_context, "idemp-receipt", return sf_service_init_ex2(&g_sf_context, "idemp-receipt",
receipt_alloc_thread_extra_data, receipt_thread_loop_callback, receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
NULL, sf_proto_set_body_length, NULL, receipt_deal_task, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback, receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL); 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL);
} }

View File

@ -44,6 +44,7 @@
void sf_set_parameters_ex(SFContext *sf_context, const int header_size, void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback
release_buffer_callback) release_buffer_callback)
@ -51,6 +52,7 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_context->header_size = header_size; sf_context->header_size = header_size;
sf_context->set_body_length = set_body_length_func; sf_context->set_body_length = set_body_length_func;
sf_context->alloc_recv_buffer = alloc_recv_buffer_func; sf_context->alloc_recv_buffer = alloc_recv_buffer_func;
sf_context->send_done_callback = send_done_callback;
sf_context->deal_task = deal_func; sf_context->deal_task = deal_func;
sf_context->task_cleanup_func = cleanup_func; sf_context->task_cleanup_func = cleanup_func;
sf_context->timeout_callback = timeout_callback; sf_context->timeout_callback = timeout_callback;
@ -680,6 +682,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
int result; int result;
int bytes; int bytes;
int total_write; int total_write;
int length;
struct fast_task_info *task; struct fast_task_info *task;
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
@ -724,10 +727,9 @@ int sf_client_sock_write(int sock, short event, void *arg)
continue; continue;
} else { } else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, " "client ip: %s, send fail, task offset: %d, length: %d, "
"errno: %d, error info: %s", "errno: %d, error info: %s", __LINE__, task->client_ip,
__LINE__, task->client_ip, task->offset, task->length, errno, strerror(errno));
errno, strerror(errno));
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
@ -747,11 +749,20 @@ int sf_client_sock_write(int sock, short event, void *arg)
if (task->offset >= task->length) { if (task->offset >= task->length) {
release_iovec_buffer(task); release_iovec_buffer(task);
length = task->length;
task->offset = 0; task->offset = 0;
task->length = 0; task->length = 0;
if (sf_set_read_event(task) != 0) { if (sf_set_read_event(task) != 0) {
return -1; return -1;
} }
if (SF_CTX->send_done_callback != NULL) {
if (SF_CTX->send_done_callback(task, length) != 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
}
break; break;
} }

View File

@ -35,6 +35,7 @@ extern "C" {
void sf_set_parameters_ex(SFContext *sf_context, const int header_size, void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback
release_buffer_callback); release_buffer_callback);

View File

@ -122,6 +122,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_accept_done_callback accept_done_callback, sf_accept_done_callback accept_done_callback,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_arg_size, const int proto_header_size, const int task_arg_size,
@ -143,8 +144,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_context->accept_done_func = accept_done_callback; sf_context->accept_done_func = accept_done_callback;
sf_set_parameters_ex(sf_context, proto_header_size, sf_set_parameters_ex(sf_context, proto_header_size,
set_body_length_func, alloc_recv_buffer_func, set_body_length_func, alloc_recv_buffer_func,
deal_func, task_cleanup_func, timeout_callback, send_done_callback, deal_func, task_cleanup_func,
release_buffer_callback); timeout_callback, release_buffer_callback);
if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) { if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) {
return result; return result;
@ -427,9 +428,15 @@ static void accept_run(struct accept_thread_context *accept_context)
task->thread_data = accept_context->sf_context->thread_data + task->thread_data = accept_context->sf_context->thread_data +
incomesock % accept_context->sf_context->work_threads; incomesock % accept_context->sf_context->work_threads;
if (accept_context->sf_context->accept_done_func != NULL) { if (accept_context->sf_context->accept_done_func != NULL) {
accept_context->sf_context->accept_done_func(task, if (accept_context->sf_context->accept_done_func(task,
inaddr.sin_addr.s_addr,
accept_context->server_sock == accept_context->server_sock ==
accept_context->sf_context->inner_sock); accept_context->sf_context->inner_sock) != 0)
{
close(incomesock);
sf_release_task(task);
continue;
}
} }
if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) {

View File

@ -40,6 +40,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_accept_done_callback accept_done_callback, sf_accept_done_callback accept_done_callback,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_arg_size, const int proto_header_size, const int task_arg_size,
@ -48,21 +49,22 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
proto_header_size, task_arg_size) \ net_timeout_ms, proto_header_size, task_arg_size) \
sf_service_init_ex2(sf_context, name, alloc_thread_extra_data_callback, \ sf_service_init_ex2(sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
NULL, deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ NULL, send_done_callback, deal_func, task_cleanup_func, \
proto_header_size, task_arg_size, NULL, NULL) timeout_callback, net_timeout_ms, proto_header_size, \
task_arg_size, NULL, NULL)
#define sf_service_init(name, alloc_thread_extra_data_callback, \ #define sf_service_init(name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
proto_header_size, task_arg_size) \ net_timeout_ms, proto_header_size, task_arg_size) \
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \ thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \
deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
proto_header_size, task_arg_size, NULL, NULL) net_timeout_ms, proto_header_size, task_arg_size, NULL, NULL)
int sf_service_destroy_ex(SFContext *sf_context); int sf_service_destroy_ex(SFContext *sf_context);

View File

@ -34,13 +34,15 @@
#define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency
typedef void (*sf_accept_done_callback)(struct fast_task_info *task, typedef int (*sf_accept_done_callback)(struct fast_task_info *task,
const bool bInnerPort); const in_addr_t client_addr, const bool bInnerPort);
typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task,
const int buff_size, bool *new_alloc); const int buff_size, bool *new_alloc);
typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
typedef int (*sf_send_done_callback)(struct fast_task_info *task,
const int length);
/* calback for release iovec buffer */ /* calback for release iovec buffer */
typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); typedef void (*sf_release_buffer_callback)(struct fast_task_info *task);
@ -69,6 +71,7 @@ typedef struct sf_context {
sf_set_body_length_callback set_body_length; sf_set_body_length_callback set_body_length;
sf_alloc_recv_buffer_callback alloc_recv_buffer; sf_alloc_recv_buffer_callback alloc_recv_buffer;
sf_accept_done_callback accept_done_func; sf_accept_done_callback accept_done_func;
sf_send_done_callback send_done_callback;
TaskCleanUpCallback task_cleanup_func; TaskCleanUpCallback task_cleanup_func;
sf_recv_timeout_callback timeout_callback; sf_recv_timeout_callback timeout_callback;
sf_release_buffer_callback release_buffer_callback; sf_release_buffer_callback release_buffer_callback;