nio support callback connect_done for client

support_rdma
YuQing 2023-09-22 18:27:12 +08:00
parent 3a413408ad
commit be9b71422f
6 changed files with 137 additions and 88 deletions

View File

@ -47,9 +47,8 @@ SFGlobalVariables g_sf_global_vars = {
SFContext g_sf_context = {{'\0'}, NULL, 0, SFContext g_sf_context = {{'\0'}, NULL, 0,
{{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}},
1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, true,
{false, 0, 0}, NULL, NULL, NULL, NULL, NULL, {false, 0, 0}, {sf_task_finish_clean_up}
sf_task_finish_clean_up, NULL
}; };
static inline void set_config_str_value(const char *value, static inline void set_config_str_value(const char *value,
@ -459,7 +458,7 @@ static int load_rdma_apis(SFNetworkHandler *handler)
LOAD_API_EX(handler, , close_server); LOAD_API_EX(handler, , close_server);
LOAD_API(handler, accept_connection); LOAD_API(handler, accept_connection);
LOAD_API_EX(handler, , async_connect_server); LOAD_API_EX(handler, , async_connect_server);
LOAD_API_EX(handler, , connect_server_done); LOAD_API_EX(handler, , async_connect_check);
LOAD_API(handler, close_connection); LOAD_API(handler, close_connection);
LOAD_API(handler, send_data); LOAD_API(handler, send_data);
LOAD_API(handler, recv_data); LOAD_API(handler, recv_data);
@ -483,7 +482,7 @@ static int init_network_handler(SFNetworkHandler *handler,
handler->close_server = sf_socket_close_server; handler->close_server = sf_socket_close_server;
handler->accept_connection = sf_socket_accept_connection; handler->accept_connection = sf_socket_accept_connection;
handler->async_connect_server = sf_socket_async_connect_server; handler->async_connect_server = sf_socket_async_connect_server;
handler->connect_server_done = sf_socket_connect_server_done; handler->async_connect_check = sf_socket_async_connect_check;
handler->close_connection = sf_socket_close_connection; handler->close_connection = sf_socket_close_connection;
handler->send_data = sf_socket_send_data; handler->send_data = sf_socket_send_data;
handler->recv_data = sf_socket_recv_data; handler->recv_data = sf_socket_recv_data;

View File

@ -45,18 +45,18 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback, sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_deal_task_callback deal_func, TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback
release_buffer_callback) release_buffer_callback)
{ {
sf_context->header_size = header_size; sf_context->header_size = header_size;
sf_context->set_body_length = set_body_length_func; sf_context->callbacks.set_body_length = set_body_length_func;
sf_context->alloc_recv_buffer = alloc_recv_buffer_func; sf_context->callbacks.alloc_recv_buffer = alloc_recv_buffer_func;
sf_context->send_done_callback = send_done_callback; sf_context->callbacks.send_done = send_done_callback;
sf_context->deal_task = deal_func; sf_context->callbacks.deal_task = deal_func;
sf_context->task_cleanup_func = cleanup_func; sf_context->callbacks.task_cleanup = cleanup_func;
sf_context->timeout_callback = timeout_callback; sf_context->callbacks.task_timeout = timeout_callback;
sf_context->release_buffer_callback = release_buffer_callback; sf_context->callbacks.release_buffer = release_buffer_callback;
} }
void sf_task_detach_thread(struct fast_task_info *task) void sf_task_detach_thread(struct fast_task_info *task)
@ -84,8 +84,8 @@ void sf_task_switch_thread(struct fast_task_info *task,
static inline void release_iovec_buffer(struct fast_task_info *task) static inline void release_iovec_buffer(struct fast_task_info *task)
{ {
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
if (SF_CTX->release_buffer_callback != NULL) { if (SF_CTX->callbacks.release_buffer != NULL) {
SF_CTX->release_buffer_callback(task); SF_CTX->callbacks.release_buffer(task);
} }
task->iovec_array.iovs = NULL; task->iovec_array.iovs = NULL;
task->iovec_array.count = 0; task->iovec_array.count = 0;
@ -189,7 +189,7 @@ static inline int sf_nio_init(struct fast_task_info *task)
task->network_timeout); task->network_timeout);
} }
int sf_socket_connect_server_done(struct fast_task_info *task) int sf_socket_async_connect_check(struct fast_task_info *task)
{ {
int result; int result;
socklen_t len; socklen_t len;
@ -201,34 +201,46 @@ int sf_socket_connect_server_done(struct fast_task_info *task)
return result; return result;
} }
static int sf_client_sock_connect(int sock, short event, void *arg) static int sf_client_connect_done(int sock, short event, void *arg)
{ {
int result; int result;
struct fast_task_info *task; struct fast_task_info *task;
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if (task->canceled) {
return ENOTCONN;
}
if (event & IOEVENT_TIMEOUT) { if (event & IOEVENT_TIMEOUT) {
result = ETIMEDOUT; result = ETIMEDOUT;
} else { } else {
result = task->handler->connect_server_done(task); result = task->handler->async_connect_check(task);
if (result == EINPROGRESS) { if (result == EINPROGRESS) {
return 0; return 0;
} }
} }
if (SF_CTX->callbacks.connect_done != NULL) {
SF_CTX->callbacks.connect_done(task, result);
}
if (result != 0) { if (result != 0) {
if (SF_CTX->connect_need_log) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"connect to server %s:%u fail, errno: %d, " "connect to server %s:%u fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip, "error info: %s", __LINE__, task->server_ip,
task->port, result, STRERROR(result)); task->port, result, STRERROR(result));
}
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
if (SF_CTX->connect_need_log) {
logInfo("file: "__FILE__", line: %d, " logInfo("file: "__FILE__", line: %d, "
"connect to server %s:%u successfully", "connect to server %s:%u successfully",
__LINE__, task->server_ip, task->port); __LINE__, task->server_ip, task->port);
return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); }
return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE);
} }
int sf_socket_async_connect_server(struct fast_task_info *task) int sf_socket_async_connect_server(struct fast_task_info *task)
@ -248,7 +260,16 @@ static int sf_async_connect_server(struct fast_task_info *task)
{ {
int result; int result;
result = task->handler->async_connect_server(task); if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) {
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
sf_client_connect_done, task->connect_timeout);
return result > 0 ? -1 * result : result;
} else {
if (SF_CTX->callbacks.connect_done != NULL) {
SF_CTX->callbacks.connect_done(task, result);
}
if (result == 0) { if (result == 0) {
if ((result=sf_ioevent_add(task, (IOEventCallback) if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, task->network_timeout)) != 0) sf_client_sock_read, task->network_timeout)) != 0)
@ -256,23 +277,23 @@ static int sf_async_connect_server(struct fast_task_info *task)
return result; return result;
} }
if (SF_CTX->connect_need_log) {
logInfo("file: "__FILE__", line: %d, " logInfo("file: "__FILE__", line: %d, "
"connect to server %s:%u successfully", "connect to server %s:%u successfully",
__LINE__, task->server_ip, task->port); __LINE__, task->server_ip, task->port);
return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); }
} else if (result == EINPROGRESS) { return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE);
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
sf_client_sock_connect, task->connect_timeout);
return result > 0 ? -1 * result : result;
} else { } else {
task->handler->close_connection(task); task->handler->close_connection(task);
if (SF_CTX->connect_need_log) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"connect to server %s:%u fail, errno: %d, " "connect to server %s:%u fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip, "error info: %s", __LINE__, task->server_ip,
task->port, result, STRERROR(result)); task->port, result, STRERROR(result));
}
return result > 0 ? -1 * result : result; return result > 0 ? -1 * result : result;
} }
}
} }
static int sf_nio_deal_task(struct fast_task_info *task, const int stage) static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
@ -300,14 +321,14 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
result = sf_send_add_event(task); result = sf_send_add_event(task);
break; break;
case SF_NIO_STAGE_CONTINUE: //continue deal case SF_NIO_STAGE_CONTINUE: //continue deal
result = SF_CTX->deal_task(task, SF_NIO_STAGE_CONTINUE); result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE);
break; break;
case SF_NIO_STAGE_FORWARDED: //forward by other thread case SF_NIO_STAGE_FORWARDED: //forward by other thread
if ((result=sf_ioevent_add(task, (IOEventCallback) if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, sf_client_sock_read,
task->network_timeout)) == 0) task->network_timeout)) == 0)
{ {
result = SF_CTX->deal_task(task, SF_NIO_STAGE_SEND); result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND);
} }
break; break;
case SF_NIO_STAGE_CLOSE: case SF_NIO_STAGE_CLOSE:
@ -665,8 +686,8 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
return -1; return -1;
} }
if (SF_CTX->alloc_recv_buffer != NULL) { if (SF_CTX->callbacks.alloc_recv_buffer != NULL) {
task->recv_body = SF_CTX->alloc_recv_buffer(task, task->recv_body = SF_CTX->callbacks.alloc_recv_buffer(task,
task->length - SF_CTX->header_size, &new_alloc); task->length - SF_CTX->header_size, &new_alloc);
if (new_alloc && task->recv_body == NULL) { if (new_alloc && task->recv_body == NULL) {
return -1; return -1;
@ -823,7 +844,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
if (action == sf_comm_action_finish) { if (action == sf_comm_action_finish) {
task->req_count++; task->req_count++;
task->nio_stages.current = SF_NIO_STAGE_SEND; task->nio_stages.current = SF_NIO_STAGE_SEND;
if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
} }
} else { } else {
@ -851,8 +872,8 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (event & IOEVENT_TIMEOUT) { if (event & IOEVENT_TIMEOUT) {
if (task->offset == 0 && task->req_count > 0) { if (task->offset == 0 && task->req_count > 0) {
if (SF_CTX->timeout_callback != NULL) { if (SF_CTX->callbacks.task_timeout != NULL) {
if (SF_CTX->timeout_callback(task) != 0) { if (SF_CTX->callbacks.task_timeout(task) != 0) {
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -898,7 +919,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (action == sf_comm_action_finish) { if (action == sf_comm_action_finish) {
task->req_count++; task->req_count++;
task->nio_stages.current = SF_NIO_STAGE_SEND; task->nio_stages.current = SF_NIO_STAGE_SEND;
if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -966,8 +987,8 @@ int sf_client_sock_write(int sock, short event, void *arg)
return -1; return -1;
} }
if (SF_CTX->send_done_callback != NULL) { if (SF_CTX->callbacks.send_done != NULL) {
if (SF_CTX->send_done_callback(task, length) != 0) { if (SF_CTX->callbacks.send_done(task, length) != 0) {
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }

View File

@ -37,7 +37,7 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback, sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_deal_task_callback deal_func, TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback
release_buffer_callback); release_buffer_callback);
@ -47,17 +47,28 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
set_body_length_func, alloc_recv_buffer_func, \ set_body_length_func, alloc_recv_buffer_func, \
deal_func, cleanup_func, timeout_callback, NULL) deal_func, cleanup_func, timeout_callback, NULL)
static inline void sf_set_deal_task_func_ex(SFContext *sf_context, static inline void sf_set_deal_task_callback_ex(SFContext *sf_context,
sf_deal_task_func deal_func) sf_deal_task_callback deal_func)
{ {
sf_context->deal_task = deal_func; sf_context->callbacks.deal_task = deal_func;
} }
#define sf_set_deal_task_func(deal_func) \ #define sf_set_deal_task_callback(deal_func) \
sf_set_deal_task_func_ex(&g_sf_context, deal_func) sf_set_deal_task_callback_ex(&g_sf_context, deal_func)
static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context,
const bool enabled) static inline void sf_set_connect_done_callback_ex(SFContext *sf_context,
sf_connect_done_callback done_callback)
{
sf_context->callbacks.connect_done = done_callback;
}
#define sf_set_connect_done_callback(done_callback) \
sf_set_connect_done_callback_ex(&g_sf_context, done_callback)
static inline void sf_set_remove_from_ready_list_ex(
SFContext *sf_context, const bool enabled)
{ {
sf_context->remove_from_ready_list = enabled; sf_context->remove_from_ready_list = enabled;
} }
@ -65,14 +76,14 @@ static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context,
#define sf_set_remove_from_ready_list(enabled) \ #define sf_set_remove_from_ready_list(enabled) \
sf_set_remove_from_ready_list_ex(&g_sf_context, enabled); sf_set_remove_from_ready_list_ex(&g_sf_context, enabled);
static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex( static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex(
SFContext *sf_context) SFContext *sf_context)
{ {
return sf_context->task_cleanup_func; return sf_context->callbacks.task_cleanup;
} }
#define sf_get_task_cleanup_func() \ #define sf_get_task_cleanup_callback() \
sf_get_task_cleanup_func_ex(&g_sf_context) sf_get_task_cleanup_callback_ex(&g_sf_context)
#define sf_nio_task_is_idle(task) \ #define sf_nio_task_is_idle(task) \
(task->offset == 0 && task->length == 0) (task->offset == 0 && task->length == 0)
@ -95,7 +106,7 @@ void sf_task_detach_thread(struct fast_task_info *task);
static inline int sf_set_body_length(struct fast_task_info *task) static inline int sf_set_body_length(struct fast_task_info *task)
{ {
if (SF_CTX->set_body_length(task) != 0) { if (SF_CTX->callbacks.set_body_length(task) != 0) {
return -1; return -1;
} }
if (task->length < 0) { if (task->length < 0) {
@ -120,7 +131,7 @@ static inline int sf_set_body_length(struct fast_task_info *task)
} }
int sf_socket_async_connect_server(struct fast_task_info *task); int sf_socket_async_connect_server(struct fast_task_info *task);
int sf_socket_connect_server_done(struct fast_task_info *task); int sf_socket_async_connect_check(struct fast_task_info *task);
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action);
ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action);

View File

@ -117,7 +117,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback, sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, TaskInitCallback init_callback, const int task_arg_size, TaskInitCallback init_callback,
@ -134,9 +134,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
pthread_attr_t thread_attr; pthread_attr_t thread_attr;
snprintf(sf_context->name, sizeof(sf_context->name), "%s", name); snprintf(sf_context->name, sizeof(sf_context->name), "%s", name);
sf_context->connect_need_log = true;
sf_context->realloc_task_buffer = g_sf_global_vars. sf_context->realloc_task_buffer = g_sf_global_vars.
min_buff_size < g_sf_global_vars.max_buff_size; min_buff_size < g_sf_global_vars.max_buff_size;
sf_context->accept_done_func = accept_done_callback; sf_context->callbacks.accept_done = accept_done_callback;
sf_set_parameters_ex(sf_context, proto_header_size, sf_set_parameters_ex(sf_context, proto_header_size,
set_body_length_func, alloc_recv_buffer_func, set_body_length_func, alloc_recv_buffer_func,
send_done_callback, deal_func, task_cleanup_func, send_done_callback, deal_func, task_cleanup_func,
@ -335,7 +336,7 @@ static void *worker_thread_entrance(void *arg)
ioevent_loop(thread_ctx->thread_data, ioevent_loop(thread_ctx->thread_data,
sf_recv_notify_read, sf_recv_notify_read,
thread_ctx->sf_context->task_cleanup_func, thread_ctx->sf_context->callbacks.task_cleanup,
&g_sf_global_vars.continue_flag); &g_sf_global_vars.continue_flag);
ioevent_destroy(&thread_ctx->thread_data->ev_puller); ioevent_destroy(&thread_ctx->thread_data->ev_puller);
@ -530,8 +531,8 @@ static void accept_run(SFListener *listener)
task->thread_data = listener->handler->ctx->thread_data + task->thread_data = listener->handler->ctx->thread_data +
task->event.fd % listener->handler->ctx->work_threads; task->event.fd % listener->handler->ctx->work_threads;
if (listener->handler->ctx->accept_done_func != NULL) { if (listener->handler->ctx->callbacks.accept_done != NULL) {
if (listener->handler->ctx->accept_done_func(task, if (listener->handler->ctx->callbacks.accept_done(task,
listener->inaddr.sin_addr.s_addr, listener->inaddr.sin_addr.s_addr,
listener->is_inner) != 0) listener->is_inner) != 0)
{ {

View File

@ -41,7 +41,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_set_body_length_callback set_body_length_func, sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback, sf_send_done_callback send_done_callback,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, TaskInitCallback init_callback, const int task_arg_size, TaskInitCallback init_callback,
@ -81,10 +81,18 @@ static inline void sf_service_set_smart_polling_ex(SFContext *sf_context,
{ {
sf_context->smart_polling = *smart_polling; sf_context->smart_polling = *smart_polling;
} }
#define sf_service_set_smart_polling(smart_polling) \ #define sf_service_set_smart_polling(smart_polling) \
sf_service_set_smart_polling_ex(&g_sf_context, smart_polling) sf_service_set_smart_polling_ex(&g_sf_context, smart_polling)
static inline void sf_service_set_connect_need_log_ex(
SFContext *sf_context, const bool need_log)
{
sf_context->connect_need_log = need_log;
}
#define sf_service_set_connect_need_log(need_log) \
sf_service_set_connect_need_log_ex(&g_sf_context, need_log)
int sf_setup_signal_handler(); int sf_setup_signal_handler();
int sf_startup_schedule(pthread_t *schedule_tid); int sf_startup_schedule(pthread_t *schedule_tid);

View File

@ -43,10 +43,12 @@ typedef int (*sf_accept_done_callback)(struct fast_task_info *task,
typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task,
const int buff_size, bool *new_alloc); const int buff_size, bool *new_alloc);
typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_deal_task_callback)(struct fast_task_info *task, const int stage);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
typedef int (*sf_send_done_callback)(struct fast_task_info *task, typedef int (*sf_send_done_callback)(struct fast_task_info *task,
const int length); const int length);
typedef void (*sf_connect_done_callback)(struct fast_task_info *task,
const int err_no);
/* calback for release iovec buffer */ /* calback for release iovec buffer */
typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); typedef void (*sf_release_buffer_callback)(struct fast_task_info *task);
@ -73,7 +75,7 @@ typedef void (*sf_close_server_callback)(struct sf_listener *listener);
typedef struct fast_task_info * (*sf_accept_connection_callback)( typedef struct fast_task_info * (*sf_accept_connection_callback)(
struct sf_listener *listener); struct sf_listener *listener);
typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task); typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task);
typedef int (*sf_connect_server_done_callback)(struct fast_task_info *task); typedef int (*sf_async_connect_check_callback)(struct fast_task_info *task);
typedef void (*sf_close_connection_callback)(struct fast_task_info *task); typedef void (*sf_close_connection_callback)(struct fast_task_info *task);
typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task,
@ -114,7 +116,7 @@ typedef struct sf_network_handler {
/* for client side */ /* for client side */
sf_async_connect_server_callback async_connect_server; sf_async_connect_server_callback async_connect_server;
sf_connect_server_done_callback connect_server_done; sf_async_connect_check_callback async_connect_check;
/* server and client both */ /* server and client both */
sf_close_connection_callback close_connection; sf_close_connection_callback close_connection;
@ -123,6 +125,18 @@ typedef struct sf_network_handler {
sf_recv_data_callback recv_data; sf_recv_data_callback recv_data;
} SFNetworkHandler; } SFNetworkHandler;
typedef struct sf_nio_callbacks {
TaskCleanUpCallback task_cleanup;
sf_deal_task_callback deal_task;
sf_set_body_length_callback set_body_length;
sf_alloc_recv_buffer_callback alloc_recv_buffer;
sf_accept_done_callback accept_done;
sf_connect_done_callback connect_done;
sf_send_done_callback send_done;
sf_recv_timeout_callback task_timeout;
sf_release_buffer_callback release_buffer;
} SFNIOCallbacks;
typedef struct sf_context { typedef struct sf_context {
char name[64]; char name[64];
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
@ -140,15 +154,10 @@ typedef struct sf_context {
int header_size; int header_size;
bool remove_from_ready_list; bool remove_from_ready_list;
bool realloc_task_buffer; bool realloc_task_buffer;
bool connect_need_log; //for client connect
FCSmartPollingConfig smart_polling; FCSmartPollingConfig smart_polling;
sf_deal_task_func deal_task;
sf_set_body_length_callback set_body_length; SFNIOCallbacks callbacks;
sf_alloc_recv_buffer_callback alloc_recv_buffer;
sf_accept_done_callback accept_done_func;
sf_send_done_callback send_done_callback;
TaskCleanUpCallback task_cleanup_func;
sf_recv_timeout_callback timeout_callback;
sf_release_buffer_callback release_buffer_callback;
} SFContext; } SFContext;
typedef struct { typedef struct {