diff --git a/src/sf_nio.c b/src/sf_nio.c index d87bfb2..8a820b6 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -143,7 +143,6 @@ static inline int set_read_event(struct fast_task_info *task) { int result; - task->nio_stages.current = SF_NIO_STAGE_RECV; if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } @@ -169,6 +168,7 @@ int sf_set_read_event(struct fast_task_info *task) { task->recv.ptr->offset = 0; task->recv.ptr->length = 0; + task->nio_stages.current = SF_NIO_STAGE_RECV; return set_read_event(task); } @@ -321,6 +321,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = sf_async_connect_server(task); break; case SF_NIO_STAGE_RECV: + task->nio_stages.current = SF_NIO_STAGE_RECV; if ((result=set_read_event(task)) == 0) { if (sf_client_sock_read(task->event.fd, IOEVENT_READ, task) < 0) @@ -544,10 +545,10 @@ static inline int check_task(struct fast_task_info *task, } } -ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) +ssize_t sf_socket_send_data(struct fast_task_info *task, + SFCommAction *action, bool *send_done) { int bytes; - int length; if (task->iovec_array.iovs != NULL) { bytes = writev(task->event.fd, task->iovec_array.iovs, @@ -590,19 +591,15 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) task->send.ptr->offset += bytes; if (task->send.ptr->offset >= task->send.ptr->length) { - length = task->send.ptr->length; if (task->send.ptr != task->recv.ptr) { //double buffers task->send.ptr->offset = 0; task->send.ptr->length = 0; } - if (SF_CTX->callbacks.send_done != NULL) { - if (SF_CTX->callbacks.send_done(task, length) != 0) { - return -1; - } - } *action = sf_comm_action_finish; + *send_done = true; } else { *action = sf_comm_action_continue; + *send_done = false; /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { @@ -990,7 +987,10 @@ int sf_client_sock_write(int sock, short event, void *arg) int result; int bytes; int total_write; + int length; + int next_stage; SFCommAction action; + bool send_done; struct fast_task_info *task; task = (struct fast_task_info *)arg; @@ -1010,13 +1010,14 @@ int sf_client_sock_write(int sock, short event, void *arg) } total_write = 0; + length = task->send.ptr->length; action = sf_comm_action_continue; while (1) { fast_timer_modify(&task->thread_data->timer, &task->event.timer, g_current_time + task->network_timeout); - if ((bytes=task->handler->send_data(task, &action)) < 0) { + if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) { ioevent_add_to_deleted_list(task); return -1; } @@ -1024,10 +1025,26 @@ int sf_client_sock_write(int sock, short event, void *arg) total_write += bytes; if (action == sf_comm_action_finish) { release_iovec_buffer(task); - if (sf_set_read_event(task) != 0) { + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + if (set_read_event(task) != 0) { return -1; } + if (SF_CTX->callbacks.send_done == NULL || !send_done) { + task->nio_stages.current = SF_NIO_STAGE_RECV; + } else { + if (SF_CTX->callbacks.send_done(task, + length, &next_stage) != 0) + { + return -1; + } + + if (task->nio_stages.current != next_stage) { + task->nio_stages.current = next_stage; + } + } + break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_nio.h b/src/sf_nio.h index 14a80f5..1841429 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -143,7 +143,8 @@ static inline int sf_set_body_length(struct fast_task_info *task) int sf_socket_async_connect_server(struct fast_task_info *task); int sf_socket_async_connect_check(struct fast_task_info *task); -ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); +ssize_t sf_socket_send_data(struct fast_task_info *task, + SFCommAction *action, bool *send_done); ssize_t sf_socket_recv_data(struct fast_task_info *task, const bool call_post_recv, SFCommAction *action); diff --git a/src/sf_types.h b/src/sf_types.h index ef52625..f3c77d2 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -46,7 +46,7 @@ typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, typedef int (*sf_deal_task_callback)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef int (*sf_send_done_callback)(struct fast_task_info *task, - const int length); + const int length, int *next_stage); typedef void (*sf_connect_done_callback)(struct fast_task_info *task, const int err_no); @@ -79,7 +79,7 @@ typedef int (*sf_async_connect_check_callback)(struct fast_task_info *task); typedef void (*sf_close_connection_callback)(struct fast_task_info *task); typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, - SFCommAction *action); + SFCommAction *action, bool *send_done); typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task, const bool call_post_recv, SFCommAction *action); typedef int (*sf_post_recv_callback)(struct fast_task_info *task);