sf_send_data_callback and sf_send_done_callback changed

support_rdma
YuQing 2023-10-26 10:48:22 +08:00
parent 89a451b8ce
commit a8867a19c4
3 changed files with 32 additions and 14 deletions

View File

@ -143,7 +143,6 @@ static inline int set_read_event(struct fast_task_info *task)
{ {
int result; int result;
task->nio_stages.current = SF_NIO_STAGE_RECV;
if (task->event.callback == (IOEventCallback)sf_client_sock_read) { if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
return 0; return 0;
} }
@ -169,6 +168,7 @@ int sf_set_read_event(struct fast_task_info *task)
{ {
task->recv.ptr->offset = 0; task->recv.ptr->offset = 0;
task->recv.ptr->length = 0; task->recv.ptr->length = 0;
task->nio_stages.current = SF_NIO_STAGE_RECV;
return set_read_event(task); return set_read_event(task);
} }
@ -321,6 +321,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
result = sf_async_connect_server(task); result = sf_async_connect_server(task);
break; break;
case SF_NIO_STAGE_RECV: case SF_NIO_STAGE_RECV:
task->nio_stages.current = SF_NIO_STAGE_RECV;
if ((result=set_read_event(task)) == 0) { if ((result=set_read_event(task)) == 0) {
if (sf_client_sock_read(task->event.fd, if (sf_client_sock_read(task->event.fd,
IOEVENT_READ, task) < 0) IOEVENT_READ, task) < 0)
@ -544,10 +545,10 @@ static inline int check_task(struct fast_task_info *task,
} }
} }
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) ssize_t sf_socket_send_data(struct fast_task_info *task,
SFCommAction *action, bool *send_done)
{ {
int bytes; int bytes;
int length;
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
bytes = writev(task->event.fd, task->iovec_array.iovs, bytes = writev(task->event.fd, task->iovec_array.iovs,
@ -590,19 +591,15 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action)
task->send.ptr->offset += bytes; task->send.ptr->offset += bytes;
if (task->send.ptr->offset >= task->send.ptr->length) { if (task->send.ptr->offset >= task->send.ptr->length) {
length = task->send.ptr->length;
if (task->send.ptr != task->recv.ptr) { //double buffers if (task->send.ptr != task->recv.ptr) { //double buffers
task->send.ptr->offset = 0; task->send.ptr->offset = 0;
task->send.ptr->length = 0; task->send.ptr->length = 0;
} }
if (SF_CTX->callbacks.send_done != NULL) {
if (SF_CTX->callbacks.send_done(task, length) != 0) {
return -1;
}
}
*action = sf_comm_action_finish; *action = sf_comm_action_finish;
*send_done = true;
} else { } else {
*action = sf_comm_action_continue; *action = sf_comm_action_continue;
*send_done = false;
/* set next writev iovec array */ /* set next writev iovec array */
if (task->iovec_array.iovs != NULL) { if (task->iovec_array.iovs != NULL) {
@ -990,7 +987,10 @@ int sf_client_sock_write(int sock, short event, void *arg)
int result; int result;
int bytes; int bytes;
int total_write; int total_write;
int length;
int next_stage;
SFCommAction action; SFCommAction action;
bool send_done;
struct fast_task_info *task; struct fast_task_info *task;
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
@ -1010,13 +1010,14 @@ int sf_client_sock_write(int sock, short event, void *arg)
} }
total_write = 0; total_write = 0;
length = task->send.ptr->length;
action = sf_comm_action_continue; action = sf_comm_action_continue;
while (1) { while (1) {
fast_timer_modify(&task->thread_data->timer, fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time + &task->event.timer, g_current_time +
task->network_timeout); task->network_timeout);
if ((bytes=task->handler->send_data(task, &action)) < 0) { if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) {
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -1024,10 +1025,26 @@ int sf_client_sock_write(int sock, short event, void *arg)
total_write += bytes; total_write += bytes;
if (action == sf_comm_action_finish) { if (action == sf_comm_action_finish) {
release_iovec_buffer(task); release_iovec_buffer(task);
if (sf_set_read_event(task) != 0) { task->recv.ptr->offset = 0;
task->recv.ptr->length = 0;
if (set_read_event(task) != 0) {
return -1; return -1;
} }
if (SF_CTX->callbacks.send_done == NULL || !send_done) {
task->nio_stages.current = SF_NIO_STAGE_RECV;
} else {
if (SF_CTX->callbacks.send_done(task,
length, &next_stage) != 0)
{
return -1;
}
if (task->nio_stages.current != next_stage) {
task->nio_stages.current = next_stage;
}
}
break; break;
} else if (action == sf_comm_action_break) { } else if (action == sf_comm_action_break) {
break; break;

View File

@ -143,7 +143,8 @@ static inline int sf_set_body_length(struct fast_task_info *task)
int sf_socket_async_connect_server(struct fast_task_info *task); int sf_socket_async_connect_server(struct fast_task_info *task);
int sf_socket_async_connect_check(struct fast_task_info *task); int sf_socket_async_connect_check(struct fast_task_info *task);
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_send_data(struct fast_task_info *task,
SFCommAction *action, bool *send_done);
ssize_t sf_socket_recv_data(struct fast_task_info *task, ssize_t sf_socket_recv_data(struct fast_task_info *task,
const bool call_post_recv, SFCommAction *action); const bool call_post_recv, SFCommAction *action);

View File

@ -46,7 +46,7 @@ typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task,
typedef int (*sf_deal_task_callback)(struct fast_task_info *task, const int stage); typedef int (*sf_deal_task_callback)(struct fast_task_info *task, const int stage);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
typedef int (*sf_send_done_callback)(struct fast_task_info *task, typedef int (*sf_send_done_callback)(struct fast_task_info *task,
const int length); const int length, int *next_stage);
typedef void (*sf_connect_done_callback)(struct fast_task_info *task, typedef void (*sf_connect_done_callback)(struct fast_task_info *task,
const int err_no); const int err_no);
@ -79,7 +79,7 @@ typedef int (*sf_async_connect_check_callback)(struct fast_task_info *task);
typedef void (*sf_close_connection_callback)(struct fast_task_info *task); typedef void (*sf_close_connection_callback)(struct fast_task_info *task);
typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task,
SFCommAction *action); SFCommAction *action, bool *send_done);
typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task, typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task,
const bool call_post_recv, SFCommAction *action); const bool call_post_recv, SFCommAction *action);
typedef int (*sf_post_recv_callback)(struct fast_task_info *task); typedef int (*sf_post_recv_callback)(struct fast_task_info *task);