adapt to the newest struct fast_task_info from libfastcommon

support_rdma
YuQing 2023-09-25 18:37:53 +08:00
parent cd1920872a
commit f8e3fcdc55
9 changed files with 176 additions and 145 deletions

View File

@ -95,8 +95,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
task->handler->close_connection(task); task->handler->close_connection(task);
} }
task->length = 0; sf_nio_reset_task_length(task);
task->offset = 0;
task->req_count = 0; task->req_count = 0;
channel = (IdempotencyClientChannel *)task->arg; channel = (IdempotencyClientChannel *)task->arg;
@ -116,14 +115,15 @@ static void setup_channel_request(struct fast_task_info *task)
SFProtoSetupChannelReq *req; SFProtoSetupChannelReq *req;
channel = (IdempotencyClientChannel *)task->arg; channel = (IdempotencyClientChannel *)task->arg;
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->send.ptr->data;
req = (SFProtoSetupChannelReq *)(header + 1); req = (SFProtoSetupChannelReq *)(header + 1);
int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id); int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id);
int2buff(__sync_add_and_fetch(&channel->key, 0), req->key); int2buff(__sync_add_and_fetch(&channel->key, 0), req->key);
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ, SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ,
sizeof(SFProtoSetupChannelReq)); sizeof(SFProtoSetupChannelReq));
task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq); task->send.ptr->length = sizeof(SFCommonProtoHeader) +
sizeof(SFProtoSetupChannelReq);
sf_send_add_event(task); sf_send_add_event(task);
} }
@ -150,10 +150,10 @@ static int check_report_req_receipt(struct fast_task_info *task)
return 0; return 0;
} }
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->send.ptr->data;
rheader = (SFProtoReportReqReceiptHeader *)(header + 1); rheader = (SFProtoReportReqReceiptHeader *)(header + 1);
rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1); rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1);
buff_end = task->data + channel->buffer_size; buff_end = task->send.ptr->data + channel->buffer_size;
last = NULL; last = NULL;
receipt = channel->waiting_resp_qinfo.head; receipt = channel->waiting_resp_qinfo.head;
do { do {
@ -183,8 +183,9 @@ static int check_report_req_receipt(struct fast_task_info *task)
count = rbody - rstart; count = rbody - rstart;
int2buff(count, rheader->count); int2buff(count, rheader->count);
task->length = (char *)rbody - task->data; task->send.ptr->length = (char *)rbody - task->send.ptr->data;
int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); int2buff(task->send.ptr->length - sizeof(SFCommonProtoHeader),
header->body_len);
header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ;
sf_send_add_event(task); sf_send_add_event(task);
return count; return count;
@ -198,18 +199,18 @@ static void close_channel_request(struct fast_task_info *task)
channel = (IdempotencyClientChannel *)task->arg; channel = (IdempotencyClientChannel *)task->arg;
idempotency_client_channel_set_id_key(channel, 0, 0); idempotency_client_channel_set_id_key(channel, 0, 0);
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->send.ptr->data;
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0); SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0);
task->length = sizeof(SFCommonProtoHeader); task->send.ptr->length = sizeof(SFCommonProtoHeader);
sf_send_add_event(task); sf_send_add_event(task);
} }
static void active_test_request(struct fast_task_info *task) static void active_test_request(struct fast_task_info *task)
{ {
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->send.ptr->data;
SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0); SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0);
task->length = sizeof(SFCommonProtoHeader); task->send.ptr->length = sizeof(SFCommonProtoHeader);
sf_send_add_event(task); sf_send_add_event(task);
} }
@ -243,11 +244,12 @@ static void report_req_receipt_request(struct fast_task_info *task,
static inline int receipt_expect_body_length(struct fast_task_info *task, static inline int receipt_expect_body_length(struct fast_task_info *task,
const int expect_body_len) const int expect_body_len)
{ {
if ((int)(task->length - sizeof(SFCommonProtoHeader)) != expect_body_len) { int body_len;
body_len = task->recv.ptr->length - sizeof(SFCommonProtoHeader);
if (body_len != expect_body_len) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"server %s:%u, response body length: %d != %d", "server %s:%u, response body length: %d != %d", __LINE__,
__LINE__, task->server_ip, task->port, (int)(task->length - task->server_ip, task->port, body_len, expect_body_len);
sizeof(SFCommonProtoHeader)), expect_body_len);
return EINVAL; return EINVAL;
} }
@ -279,8 +281,7 @@ static int deal_setup_channel_response(struct fast_task_info *task)
return 0; return 0;
} }
resp = (SFProtoSetupChannelResp *)(task->data + resp = (SFProtoSetupChannelResp *)SF_PROTO_RECV_BODY(task);
sizeof(SFCommonProtoHeader));
channel_id = buff2int(resp->channel_id); channel_id = buff2int(resp->channel_id);
channel_key = buff2int(resp->key); channel_key = buff2int(resp->key);
buffer_size = buff2int(resp->buffer_size); buffer_size = buff2int(resp->buffer_size);
@ -290,7 +291,7 @@ static int deal_setup_channel_response(struct fast_task_info *task)
thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg;
fc_list_add_tail(&channel->dlink, &thread_ctx->head); fc_list_add_tail(&channel->dlink, &thread_ctx->head);
} }
channel->buffer_size = FC_MIN(buffer_size, task->size); channel->buffer_size = FC_MIN(buffer_size, task->send.ptr->size);
PTHREAD_MUTEX_LOCK(&channel->lcp.lock); PTHREAD_MUTEX_LOCK(&channel->lcp.lock);
pthread_cond_broadcast(&channel->lcp.cond); pthread_cond_broadcast(&channel->lcp.cond);
@ -343,6 +344,7 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task)
static int receipt_deal_task(struct fast_task_info *task, const int stage) static int receipt_deal_task(struct fast_task_info *task, const int stage)
{ {
int result; int result;
SFCommonProtoHeader *header;
do { do {
if (stage == SF_NIO_STAGE_HANDSHAKE) { if (stage == SF_NIO_STAGE_HANDSHAKE) {
@ -350,7 +352,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
result = 0; result = 0;
break; break;
} else if (stage == SF_NIO_STAGE_CONTINUE) { } else if (stage == SF_NIO_STAGE_CONTINUE) {
if (task->length == 0 && task->offset == 0) { if (sf_nio_task_is_idle(task)) {
if (((IdempotencyClientChannel *)task->arg)->established) { if (((IdempotencyClientChannel *)task->arg)->established) {
report_req_receipt_request(task, true); report_req_receipt_request(task, true);
} else if (task->req_count > 0) { } else if (task->req_count > 0) {
@ -362,24 +364,24 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
break; break;
} }
result = buff2short(((SFCommonProtoHeader *)task->data)->status); header = (SFCommonProtoHeader *)task->recv.ptr->data;
result = buff2short(header->status);
if (result != 0) { if (result != 0) {
int msg_len; int msg_len;
char *message; char *message;
msg_len = task->length - sizeof(SFCommonProtoHeader); msg_len = SF_RECV_BODY_LENGTH(task);
message = task->data + sizeof(SFCommonProtoHeader); message = SF_PROTO_RECV_BODY(task);
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"response from server %s:%u, cmd: %d (%s), " "response from server %s:%u, cmd: %d (%s), "
"status: %d, error info: %.*s", "status: %d, error info: %.*s", __LINE__,
__LINE__, task->server_ip, task->port, task->server_ip, task->port, header->cmd,
((SFCommonProtoHeader *)task->data)->cmd, sf_get_cmd_caption(header->cmd),
sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd),
result, msg_len, message); result, msg_len, message);
break; break;
} }
switch (((SFCommonProtoHeader *)task->data)->cmd) { switch (header->cmd) {
case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP: case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP:
result = deal_setup_channel_response(task); result = deal_setup_channel_response(task);
break; break;
@ -398,16 +400,15 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
default: default:
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"response from server %s:%u, unexpect cmd: %d (%s)", "response from server %s:%u, unexpect cmd: %d (%s)",
__LINE__, task->server_ip, task->port, __LINE__, task->server_ip, task->port, header->cmd,
((SFCommonProtoHeader *)task->data)->cmd, sf_get_cmd_caption(header->cmd));
sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd));
result = EINVAL; result = EINVAL;
break; break;
} }
if (result == 0) { if (result == 0) {
update_lru_chain(task); update_lru_chain(task);
task->offset = task->length = 0; sf_nio_reset_task_length(task);
report_req_receipt_request(task, false); report_req_receipt_request(task, false);
} }
} while (0); } while (0);
@ -489,6 +490,8 @@ static void *receipt_alloc_thread_extra_data(const int thread_index)
static int do_init(FCAddressPtrArray *address_array) static int do_init(FCAddressPtrArray *address_array)
{ {
const int task_arg_size = 0;
const bool double_buffers = false;
int result; int result;
int bytes; int bytes;
SFNetworkHandler *rdma_handler; SFNetworkHandler *rdma_handler;
@ -518,8 +521,8 @@ static int do_init(FCAddressPtrArray *address_array)
receipt_alloc_thread_extra_data, receipt_thread_loop_callback, receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback, receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, 0, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
receipt_init_task, NULL); task_arg_size, double_buffers, receipt_init_task, NULL);
} }
int receipt_handler_init(FCAddressPtrArray *address_array) int receipt_handler_init(FCAddressPtrArray *address_array)

View File

@ -37,9 +37,6 @@
#include "server_channel.h" #include "server_channel.h"
#include "server_handler.h" #include "server_handler.h"
#define SF_TASK_BODY_LENGTH(task) \
(task->length - sizeof(SFCommonProtoHeader))
int sf_server_deal_setup_channel(struct fast_task_info *task, int sf_server_deal_setup_channel(struct fast_task_info *task,
int *task_type, const int server_id, IdempotencyChannel int *task_type, const int server_id, IdempotencyChannel
**channel, SFResponseInfo *response) **channel, SFResponseInfo *response)
@ -52,13 +49,13 @@ int sf_server_deal_setup_channel(struct fast_task_info *task,
response->header.cmd = SF_SERVICE_PROTO_SETUP_CHANNEL_RESP; response->header.cmd = SF_SERVICE_PROTO_SETUP_CHANNEL_RESP;
if ((result=sf_server_expect_body_length(response, if ((result=sf_server_expect_body_length(response,
SF_TASK_BODY_LENGTH(task), SF_RECV_BODY_LENGTH(task),
sizeof(SFProtoSetupChannelReq))) != 0) sizeof(SFProtoSetupChannelReq))) != 0)
{ {
return result; return result;
} }
req = (SFProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); req = (SFProtoSetupChannelReq *)SF_PROTO_RECV_BODY(task);
channel_id = buff2int(req->channel_id); channel_id = buff2int(req->channel_id);
key = buff2int(req->key); key = buff2int(req->key);
if (*channel != NULL) { if (*channel != NULL) {
@ -76,12 +73,11 @@ int sf_server_deal_setup_channel(struct fast_task_info *task,
} }
*task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER; *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER;
resp = (SFProtoSetupChannelResp *)(task->data + resp = (SFProtoSetupChannelResp *)SF_PROTO_SEND_BODY(task);
sizeof(SFCommonProtoHeader));
int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->id, resp->channel_id);
int2buff((*channel)->key, resp->key); int2buff((*channel)->key, resp->key);
int2buff(server_id, resp->server_id); int2buff(server_id, resp->server_id);
int2buff(task->size, resp->buffer_size); int2buff(task->send.ptr->size, resp->buffer_size);
response->header.body_len = sizeof(SFProtoSetupChannelResp); response->header.body_len = sizeof(SFProtoSetupChannelResp);
return 0; return 0;
} }
@ -139,15 +135,14 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task,
return result; return result;
} }
body_len = SF_TASK_BODY_LENGTH(task); body_len = SF_RECV_BODY_LENGTH(task);
if ((result=sf_server_check_min_body_length(response, body_len, if ((result=sf_server_check_min_body_length(response, body_len,
sizeof(SFProtoReportReqReceiptHeader))) != 0) sizeof(SFProtoReportReqReceiptHeader))) != 0)
{ {
return result; return result;
} }
body_header = (SFProtoReportReqReceiptHeader *) body_header = (SFProtoReportReqReceiptHeader *)SF_PROTO_RECV_BODY(task);
(task->data + sizeof(SFCommonProtoHeader));
count = buff2int(body_header->count); count = buff2int(body_header->count);
calc_body_len = sizeof(SFProtoReportReqReceiptHeader) + calc_body_len = sizeof(SFProtoReportReqReceiptHeader) +
sizeof(SFProtoReportReqReceiptBody) * count; sizeof(SFProtoReportReqReceiptBody) * count;
@ -220,7 +215,7 @@ int sf_server_deal_rebind_channel(struct fast_task_info *task,
SFProtoRebindChannelReq *req; SFProtoRebindChannelReq *req;
if ((result=sf_server_expect_body_length(response, if ((result=sf_server_expect_body_length(response,
SF_TASK_BODY_LENGTH(task), SF_RECV_BODY_LENGTH(task),
sizeof(SFProtoRebindChannelReq))) != 0) sizeof(SFProtoRebindChannelReq))) != 0)
{ {
return result; return result;
@ -240,7 +235,7 @@ int sf_server_deal_rebind_channel(struct fast_task_info *task,
} }
idempotency_channel_release(*channel, false); idempotency_channel_release(*channel, false);
req = (SFProtoRebindChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); req = (SFProtoRebindChannelReq *)SF_PROTO_RECV_BODY(task);
channel_id = buff2int(req->channel_id); channel_id = buff2int(req->channel_id);
key = buff2int(req->key); key = buff2int(req->key);
*channel = idempotency_channel_find_and_hold(channel_id, key, &result); *channel = idempotency_channel_find_and_hold(channel_id, key, &result);

View File

@ -143,6 +143,8 @@ int sf_set_read_event(struct fast_task_info *task)
{ {
int result; int result;
task->recv.ptr->offset = 0;
task->recv.ptr->length = 0;
task->nio_stages.current = SF_NIO_STAGE_RECV; task->nio_stages.current = SF_NIO_STAGE_RECV;
if (task->event.callback == (IOEventCallback)sf_client_sock_read) { if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
return 0; return 0;
@ -488,8 +490,8 @@ void sf_recv_notify_read(int sock, short event, void *arg)
int sf_send_add_event(struct fast_task_info *task) int sf_send_add_event(struct fast_task_info *task)
{ {
task->offset = 0; task->send.ptr->offset = 0;
if (task->length > 0) { if (task->send.ptr->length > 0) {
/* direct send */ /* direct send */
task->nio_stages.current = SF_NIO_STAGE_SEND; task->nio_stages.current = SF_NIO_STAGE_SEND;
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
@ -533,8 +535,7 @@ static inline int check_task(struct fast_task_info *task,
return -1; return -1;
} }
} else { } else {
//TODO: for streaming should return EAGAIN return EAGAIN;
return 0;
} }
} }
@ -546,8 +547,9 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action)
bytes = writev(task->event.fd, task->iovec_array.iovs, bytes = writev(task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX)); FC_MIN(task->iovec_array.count, IOV_MAX));
} else { } else {
bytes = write(task->event.fd, task->data + task->offset, bytes = write(task->event.fd, task->send.ptr->data +
task->length - task->offset); task->send.ptr->offset, task->send.ptr->length -
task->send.ptr->offset);
} }
if (bytes < 0) { if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) if (errno == EAGAIN || errno == EWOULDBLOCK)
@ -567,19 +569,19 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action)
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, task offset: %d, length: %d, " "client ip: %s, send fail, task offset: %d, length: %d, "
"errno: %d, error info: %s", __LINE__, task->client_ip, "errno: %d, error info: %s", __LINE__, task->client_ip,
task->offset, task->length, errno, strerror(errno)); task->send.ptr->offset, task->send.ptr->length, errno, strerror(errno));
return -1; return -1;
} }
} else if (bytes == 0) { } else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, task length: %d, offset: %d, " "client ip: %s, sock: %d, task length: %d, offset: %d, "
"send failed, connection disconnected", __LINE__, "send failed, connection disconnected", __LINE__,
task->client_ip, task->event.fd, task->length, task->offset); task->client_ip, task->event.fd, task->send.ptr->length, task->send.ptr->offset);
return -1; return -1;
} }
task->offset += bytes; task->send.ptr->offset += bytes;
if (task->offset >= task->length) { if (task->send.ptr->offset >= task->send.ptr->length) {
*action = sf_comm_action_finish; *action = sf_comm_action_finish;
} else { } else {
*action = sf_comm_action_continue; *action = sf_comm_action_continue;
@ -623,16 +625,19 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
int recv_bytes; int recv_bytes;
bool new_alloc; bool new_alloc;
if (task->length == 0) { //recv header if (task->recv.ptr->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->offset; recv_bytes = SF_CTX->header_size - task->recv.ptr->offset;
bytes = read(task->event.fd, task->data + task->offset, recv_bytes); bytes = read(task->event.fd, task->recv.ptr->data +
task->recv.ptr->offset, recv_bytes);
} else { } else {
recv_bytes = task->length - task->offset; recv_bytes = task->recv.ptr->length - task->recv.ptr->offset;
if (task->recv_body == NULL) { if (task->recv_body == NULL) {
bytes = read(task->event.fd, task->data + task->offset, recv_bytes); bytes = read(task->event.fd, task->recv.ptr->data +
task->recv.ptr->offset, recv_bytes);
} else { } else {
bytes = read(task->event.fd, task->recv_body + (task->offset - bytes = read(task->event.fd, task->recv_body +
SF_CTX->header_size), recv_bytes); (task->recv.ptr->offset - SF_CTX->
header_size), recv_bytes);
} }
} }
@ -655,19 +660,19 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
return -1; return -1;
} }
} else if (bytes == 0) { } else if (bytes == 0) {
if (task->offset > 0) { if (task->recv.ptr->offset > 0) {
if (task->length > 0) { if (task->recv.ptr->length > 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection disconnected, " "client ip: %s, connection disconnected, "
"expect pkg length: %d, recv pkg length: %d", "expect pkg length: %d, recv pkg length: %d",
__LINE__, task->client_ip, task->length, __LINE__, task->client_ip, task->recv.ptr->length,
task->offset); task->recv.ptr->offset);
} else { } else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection " "client ip: %s, connection "
"disconnected, recv pkg length: %d", "disconnected, recv pkg length: %d",
__LINE__, task->client_ip, __LINE__, task->client_ip,
task->offset); task->recv.ptr->offset);
} }
} else { } else {
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
@ -680,9 +685,9 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
} }
TCP_SET_QUICK_ACK(task->event.fd); TCP_SET_QUICK_ACK(task->event.fd);
task->offset += bytes; task->recv.ptr->offset += bytes;
if (task->length == 0) { //pkg header if (task->recv.ptr->length == 0) { //pkg header
if (task->offset < SF_CTX->header_size) { if (task->recv.ptr->offset < SF_CTX->header_size) {
*action = sf_comm_action_continue; *action = sf_comm_action_continue;
return bytes; return bytes;
} }
@ -693,7 +698,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { if (SF_CTX->callbacks.alloc_recv_buffer != NULL) {
task->recv_body = SF_CTX->callbacks.alloc_recv_buffer(task, task->recv_body = SF_CTX->callbacks.alloc_recv_buffer(task,
task->length - SF_CTX->header_size, &new_alloc); task->recv.ptr->length - SF_CTX->header_size, &new_alloc);
if (new_alloc && task->recv_body == NULL) { if (new_alloc && task->recv_body == NULL) {
return -1; return -1;
} }
@ -702,36 +707,38 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
} }
if (!new_alloc) { if (!new_alloc) {
if (task->length > task->size) { if (task->recv.ptr->length > task->recv.ptr->size) {
int old_size; int old_size;
if (!SF_CTX->realloc_task_buffer) { if (!SF_CTX->realloc_task_buffer) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d exceeds " "client ip: %s, pkg length: %d exceeds "
"task size: %d, but realloc buffer disabled", "task size: %d, but realloc buffer disabled",
__LINE__, task->client_ip, task->size, __LINE__, task->client_ip, task->recv.ptr->size,
task->length); task->recv.ptr->length);
return -1; return -1;
} }
old_size = task->size; old_size = task->recv.ptr->size;
if (free_queue_realloc_buffer(task, task->length) != 0) { if (free_queue_realloc_recv_buffer(task, task->
recv.ptr->length) != 0)
{
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, realloc buffer size " "client ip: %s, realloc buffer size from %d "
"from %d to %d fail", __LINE__, "to %d fail", __LINE__, task->client_ip,
task->client_ip, task->size, task->length); task->recv.ptr->size, task->recv.ptr->length);
return -1; return -1;
} }
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"client ip: %s, task length: %d, realloc buffer " "client ip: %s, task length: %d, realloc buffer "
"size from %d to %d", __LINE__, task->client_ip, "size from %d to %d", __LINE__, task->client_ip,
task->length, old_size, task->size); task->recv.ptr->length, old_size, task->recv.ptr->size);
} }
} }
} }
if (task->offset >= task->length) { //recv done if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done
*action = sf_comm_action_finish; *action = sf_comm_action_finish;
} else { } else {
*action = sf_comm_action_continue; *action = sf_comm_action_continue;
@ -876,7 +883,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
} }
if (event & IOEVENT_TIMEOUT) { if (event & IOEVENT_TIMEOUT) {
if (task->offset == 0 && task->req_count > 0) { if (task->recv.ptr->offset == 0 && task->req_count > 0) {
if (SF_CTX->callbacks.task_timeout != NULL) { if (SF_CTX->callbacks.task_timeout != NULL) {
if (SF_CTX->callbacks.task_timeout(task) != 0) { if (SF_CTX->callbacks.task_timeout(task) != 0) {
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
@ -889,12 +896,12 @@ int sf_client_sock_read(int sock, short event, void *arg)
fast_timer_add(&task->thread_data->timer, fast_timer_add(&task->thread_data->timer,
&task->event.timer); &task->event.timer);
} else { } else {
if (task->length > 0) { if (task->recv.ptr->length > 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv timeout, " "client ip: %s, recv timeout, recv "
"recv offset: %d, expect length: %d", "offset: %d, expect length: %d", __LINE__,
__LINE__, task->client_ip, task->client_ip, task->recv.ptr->offset,
task->offset, task->length); task->recv.ptr->length);
} else { } else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, req_count: %"PRId64", recv timeout", "client ip: %s, req_count: %"PRId64", recv timeout",
@ -962,8 +969,9 @@ int sf_client_sock_write(int sock, short event, void *arg)
if (event & IOEVENT_TIMEOUT) { if (event & IOEVENT_TIMEOUT) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout. total length: %d, offset: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, "
"remain: %d", __LINE__, task->client_ip, task->length, "remain: %d", __LINE__, task->client_ip, task->send.ptr->length,
task->offset, task->length - task->offset); task->send.ptr->offset, task->send.ptr->length -
task->send.ptr->offset);
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
@ -985,9 +993,11 @@ int sf_client_sock_write(int sock, short event, void *arg)
if (action == sf_comm_action_finish) { if (action == sf_comm_action_finish) {
release_iovec_buffer(task); release_iovec_buffer(task);
length = task->length; length = task->send.ptr->length;
task->offset = 0; if (task->free_queue->double_buffers) {
task->length = 0; task->send.ptr->offset = 0;
task->send.ptr->length = 0;
}
if (sf_set_read_event(task) != 0) { if (sf_set_read_event(task) != 0) {
return -1; return -1;
} }

View File

@ -86,7 +86,18 @@ static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex(
sf_get_task_cleanup_callback_ex(&g_sf_context) sf_get_task_cleanup_callback_ex(&g_sf_context)
#define sf_nio_task_is_idle(task) \ #define sf_nio_task_is_idle(task) \
(task->offset == 0 && task->length == 0) ((task->send.ptr->offset == 0 && task->send.ptr->length == 0) && \
(task->recv.ptr->offset == 0 && task->recv.ptr->length == 0))
static inline void sf_nio_reset_task_length(struct fast_task_info *task)
{
task->send.ptr->length = 0;
task->send.ptr->offset = 0;
if (task->free_queue->double_buffers) {
task->recv.ptr->length = 0;
task->recv.ptr->offset = 0;
}
}
void sf_recv_notify_read(int sock, short event, void *arg); void sf_recv_notify_read(int sock, short event, void *arg);
int sf_send_add_event(struct fast_task_info *task); int sf_send_add_event(struct fast_task_info *task);
@ -109,20 +120,20 @@ static inline int sf_set_body_length(struct fast_task_info *task)
if (SF_CTX->callbacks.set_body_length(task) != 0) { if (SF_CTX->callbacks.set_body_length(task) != 0) {
return -1; return -1;
} }
if (task->length < 0) { if (task->recv.ptr->length < 0) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d < 0", "client ip: %s, pkg length: %d < 0",
__LINE__, task->client_ip, __LINE__, task->client_ip,
task->length); task->recv.ptr->length);
return -1; return -1;
} }
task->length += SF_CTX->header_size; task->recv.ptr->length += SF_CTX->header_size;
if (task->length > g_sf_global_vars.max_pkg_size) { if (task->recv.ptr->length > g_sf_global_vars.max_pkg_size) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d > " "client ip: %s, pkg length: %d > "
"max pkg size: %d", __LINE__, "max pkg size: %d", __LINE__,
task->client_ip, task->length, task->client_ip, task->recv.ptr->length,
g_sf_global_vars.max_pkg_size); g_sf_global_vars.max_pkg_size);
return -1; return -1;
} }

View File

@ -30,7 +30,7 @@ int sf_proto_set_body_length(struct fast_task_info *task)
{ {
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->recv.ptr->data;
if (!SF_PROTO_CHECK_MAGIC(header->magic)) { if (!SF_PROTO_CHECK_MAGIC(header->magic)) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT "peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT
@ -41,7 +41,7 @@ int sf_proto_set_body_length(struct fast_task_info *task)
return EINVAL; return EINVAL;
} }
task->length = buff2int(header->body_len); //set body length task->recv.ptr->length = buff2int(header->body_len); //set body length
return 0; return 0;
} }
@ -655,18 +655,17 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
} }
if (ctx->response.header.status == 0) { if (ctx->response.header.status == 0) {
task->offset = task->length = 0;
return sf_set_read_event(task); return sf_set_read_event(task);
} else { } else {
return FC_NEGATIVE(ctx->response.header.status); return FC_NEGATIVE(ctx->response.header.status);
} }
} }
proto_header = (SFCommonProtoHeader *)task->data; proto_header = (SFCommonProtoHeader *)task->send.ptr->data;
if (!ctx->response_done) { if (!ctx->response_done) {
ctx->response.header.body_len = ctx->response.error.length; ctx->response.header.body_len = ctx->response.error.length;
if (ctx->response.error.length > 0) { if (ctx->response.error.length > 0) {
memcpy(task->data + sizeof(SFCommonProtoHeader), memcpy(task->send.ptr->data + sizeof(SFCommonProtoHeader),
ctx->response.error.message, ctx->response.error.length); ctx->response.error.message, ctx->response.error.length);
} }
} }
@ -675,7 +674,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
short2buff(status, proto_header->status); short2buff(status, proto_header->status);
proto_header->cmd = ctx->response.header.cmd; proto_header->cmd = ctx->response.header.cmd;
int2buff(ctx->response.header.body_len, proto_header->body_len); int2buff(ctx->response.header.body_len, proto_header->body_len);
task->length = sizeof(SFCommonProtoHeader) + ctx->response.header.body_len; task->send.ptr->length = sizeof(SFCommonProtoHeader) +
ctx->response.header.body_len;
r = sf_send_add_event(task); r = sf_send_add_event(task);
time_used = get_current_time_us() - ctx->req_start_time; time_used = get_current_time_us() - ctx->req_start_time;

View File

@ -89,8 +89,18 @@
int2buff((resp_header).body_len, (proto_header)->body_len);\ int2buff((resp_header).body_len, (proto_header)->body_len);\
} while (0) } while (0)
#define SF_PROTO_RESP_BODY(task) \
(task->data + sizeof(SFCommonProtoHeader)) #define SF_PROTO_SEND_BODY(task) \
(task->send.ptr->data + sizeof(SFCommonProtoHeader))
#define SF_PROTO_RECV_BODY(task) \
(task->recv.ptr->data + sizeof(SFCommonProtoHeader))
#define SF_RECV_BODY_LENGTH(task) \
(task->recv.ptr->length - sizeof(SFCommonProtoHeader))
#define SF_SEND_BUFF_END(task) (task->send.ptr->data + task->send.ptr->size)
#define SF_RECV_BUFF_END(task) (task->recv.ptr->data + task->recv.ptr->size)
#define SF_PROTO_UPDATE_EXTRA_BODY_SIZE \ #define SF_PROTO_UPDATE_EXTRA_BODY_SIZE \
sizeof(SFProtoIdempotencyAdditionalHeader) + FCFS_AUTH_SESSION_ID_LEN sizeof(SFProtoIdempotencyAdditionalHeader) + FCFS_AUTH_SESSION_ID_LEN
@ -282,6 +292,16 @@ const char *sf_get_cmd_caption(const int cmd);
int sf_proto_deal_task_done(struct fast_task_info *task, int sf_proto_deal_task_done(struct fast_task_info *task,
const char *service_name, SFCommonTaskContext *ctx); const char *service_name, SFCommonTaskContext *ctx);
static inline void sf_proto_init_task_magic(struct fast_task_info *task)
{
SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *)
task->send.ptr->data)->magic);
if (task->free_queue->double_buffers) {
SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *)
task->recv.ptr->data)->magic);
}
}
static inline void sf_proto_init_task_context(struct fast_task_info *task, static inline void sf_proto_init_task_context(struct fast_task_info *task,
SFCommonTaskContext *ctx) SFCommonTaskContext *ctx)
{ {
@ -295,14 +315,15 @@ static inline void sf_proto_init_task_context(struct fast_task_info *task,
ctx->response_done = false; ctx->response_done = false;
ctx->need_response = true; ctx->need_response = true;
ctx->request.header.cmd = ((SFCommonProtoHeader *)task->data)->cmd; ctx->request.header.cmd = ((SFCommonProtoHeader *)
ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader); task->recv.ptr->data)->cmd;
ctx->request.header.body_len = SF_RECV_BODY_LENGTH(task);
ctx->request.header.status = buff2short(((SFCommonProtoHeader *) ctx->request.header.status = buff2short(((SFCommonProtoHeader *)
task->data)->status); task->recv.ptr->data)->status);
if (task->recv_body != NULL) { if (task->recv_body != NULL) {
ctx->request.body = task->recv_body; ctx->request.body = task->recv_body;
} else { } else {
ctx->request.body = task->data + sizeof(SFCommonProtoHeader); ctx->request.body = SF_PROTO_RECV_BODY(task);
} }
} }

View File

@ -66,22 +66,15 @@ int sf_init_task(struct fast_task_info *task)
static void *worker_thread_entrance(void *arg); static void *worker_thread_entrance(void *arg);
static int sf_init_free_queues(const int task_padding_size, static int sf_init_free_queue(struct fast_task_queue *free_queue,
const int task_arg_size, TaskInitCallback init_callback) const char *name, const bool double_buffers,
const int task_padding_size, const int task_arg_size,
TaskInitCallback init_callback)
{ {
#define ALLOC_CONNECTIONS_ONCE 1024
static bool sf_inited = false;
int result; int result;
int m; int m;
int init_connections;
int alloc_conn_once; int alloc_conn_once;
if (sf_inited) {
return 0;
}
sf_inited = true;
if ((result=set_rand_seed()) != 0) { if ((result=set_rand_seed()) != 0) {
logCrit("file: "__FILE__", line: %d, " logCrit("file: "__FILE__", line: %d, "
"set_rand_seed fail, program exit!", __LINE__); "set_rand_seed fail, program exit!", __LINE__);
@ -94,19 +87,13 @@ static int sf_init_free_queues(const int task_padding_size,
} else if (m > 16) { } else if (m > 16) {
m = 16; m = 16;
} }
alloc_conn_once = ALLOC_CONNECTIONS_ONCE / m; alloc_conn_once = 256 / m;
init_connections = g_sf_global_vars.max_connections < alloc_conn_once ? return free_queue_init_ex2(free_queue, name, double_buffers,
g_sf_global_vars.max_connections : alloc_conn_once; g_sf_global_vars.max_connections, alloc_conn_once,
if ((result=free_queue_init_ex2(g_sf_global_vars.max_connections, g_sf_global_vars.min_buff_size, g_sf_global_vars.
init_connections, alloc_conn_once, g_sf_global_vars. max_buff_size, task_padding_size, task_arg_size,
min_buff_size, g_sf_global_vars.max_buff_size, (init_callback != NULL ? init_callback :
task_padding_size, task_arg_size, init_callback != NULL ? sf_init_task));
init_callback : sf_init_task)) != 0)
{
return result;
}
return 0;
} }
int sf_service_init_ex2(SFContext *sf_context, const char *name, int sf_service_init_ex2(SFContext *sf_context, const char *name,
@ -120,8 +107,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, TaskInitCallback init_callback, const int task_arg_size, const bool double_buffers,
sf_release_buffer_callback release_buffer_callback) TaskInitCallback init_callback, sf_release_buffer_callback
release_buffer_callback)
{ {
int result; int result;
int bytes; int bytes;
@ -143,7 +131,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
send_done_callback, deal_func, task_cleanup_func, send_done_callback, deal_func, task_cleanup_func,
timeout_callback, release_buffer_callback); timeout_callback, release_buffer_callback);
if ((result=sf_init_free_queues(task_padding_size, if ((result=sf_init_free_queue(&sf_context->free_queue,
name, double_buffers, task_padding_size,
task_arg_size, init_callback)) != 0) task_arg_size, init_callback)) != 0)
{ {
return result; return result;
@ -283,7 +272,7 @@ int sf_service_destroy_ex(SFContext *sf_context)
{ {
struct nio_thread_data *data_end, *thread_data; struct nio_thread_data *data_end, *thread_data;
free_queue_destroy(); free_queue_destroy(&sf_context->free_queue);
data_end = sf_context->thread_data + sf_context->work_threads; data_end = sf_context->thread_data + sf_context->work_threads;
for (thread_data=sf_context->thread_data; thread_data<data_end; for (thread_data=sf_context->thread_data; thread_data<data_end;
thread_data++) thread_data++)

View File

@ -44,8 +44,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, TaskInitCallback init_callback, const int task_arg_size, const bool double_buffers,
sf_release_buffer_callback release_buffer_callback); TaskInitCallback init_callback, sf_release_buffer_callback
release_buffer_callback);
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -55,7 +56,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
NULL, send_done_callback, deal_func, task_cleanup_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \
timeout_callback, net_timeout_ms, proto_header_size, \ timeout_callback, net_timeout_ms, proto_header_size, \
0, task_arg_size, NULL, NULL) 0, task_arg_size, false, NULL, NULL)
#define sf_service_init(name, alloc_thread_extra_data_callback, \ #define sf_service_init(name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -64,7 +65,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \ thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
net_timeout_ms, proto_header_size, 0, task_arg_size, NULL, NULL) net_timeout_ms, proto_header_size, 0, task_arg_size, false, NULL, NULL)
int sf_service_destroy_ex(SFContext *sf_context); int sf_service_destroy_ex(SFContext *sf_context);
@ -153,7 +154,7 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
{ {
struct fast_task_info *task; struct fast_task_info *task;
task = free_queue_pop(); task = free_queue_pop(&handler->ctx->free_queue);
if (task == NULL) { if (task == NULL) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"malloc task buff failed, you should " "malloc task buff failed, you should "

View File

@ -158,6 +158,7 @@ typedef struct sf_context {
FCSmartPollingConfig smart_polling; FCSmartPollingConfig smart_polling;
SFNIOCallbacks callbacks; SFNIOCallbacks callbacks;
struct fast_task_queue free_queue;
} SFContext; } SFContext;
typedef struct { typedef struct {