From f8e3fcdc5519db27e0909f29b686a08ec934c727 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 25 Sep 2023 18:37:53 +0800 Subject: [PATCH] adapt to the newest struct fast_task_info from libfastcommon --- src/idempotency/client/receipt_handler.c | 71 ++++++++-------- src/idempotency/server/server_handler.c | 21 ++--- src/sf_nio.c | 104 +++++++++++++---------- src/sf_nio.h | 23 +++-- src/sf_proto.c | 12 +-- src/sf_proto.h | 33 +++++-- src/sf_service.c | 45 ++++------ src/sf_service.h | 11 +-- src/sf_types.h | 1 + 9 files changed, 176 insertions(+), 145 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 78a5d3d..ac1e62a 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -95,8 +95,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) task->handler->close_connection(task); } - task->length = 0; - task->offset = 0; + sf_nio_reset_task_length(task); task->req_count = 0; channel = (IdempotencyClientChannel *)task->arg; @@ -116,14 +115,15 @@ static void setup_channel_request(struct fast_task_info *task) SFProtoSetupChannelReq *req; channel = (IdempotencyClientChannel *)task->arg; - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; req = (SFProtoSetupChannelReq *)(header + 1); int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id); int2buff(__sync_add_and_fetch(&channel->key, 0), req->key); SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ, sizeof(SFProtoSetupChannelReq)); - task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq); + task->send.ptr->length = sizeof(SFCommonProtoHeader) + + sizeof(SFProtoSetupChannelReq); sf_send_add_event(task); } @@ -150,10 +150,10 @@ static int check_report_req_receipt(struct fast_task_info *task) return 0; } - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; rheader = (SFProtoReportReqReceiptHeader *)(header + 1); rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1); - buff_end = task->data + channel->buffer_size; + buff_end = task->send.ptr->data + channel->buffer_size; last = NULL; receipt = channel->waiting_resp_qinfo.head; do { @@ -183,8 +183,9 @@ static int check_report_req_receipt(struct fast_task_info *task) count = rbody - rstart; int2buff(count, rheader->count); - task->length = (char *)rbody - task->data; - int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); + task->send.ptr->length = (char *)rbody - task->send.ptr->data; + int2buff(task->send.ptr->length - sizeof(SFCommonProtoHeader), + header->body_len); header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; sf_send_add_event(task); return count; @@ -198,18 +199,18 @@ static void close_channel_request(struct fast_task_info *task) channel = (IdempotencyClientChannel *)task->arg; idempotency_client_channel_set_id_key(channel, 0, 0); - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0); - task->length = sizeof(SFCommonProtoHeader); + task->send.ptr->length = sizeof(SFCommonProtoHeader); sf_send_add_event(task); } static void active_test_request(struct fast_task_info *task) { SFCommonProtoHeader *header; - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0); - task->length = sizeof(SFCommonProtoHeader); + task->send.ptr->length = sizeof(SFCommonProtoHeader); sf_send_add_event(task); } @@ -243,11 +244,12 @@ static void report_req_receipt_request(struct fast_task_info *task, static inline int receipt_expect_body_length(struct fast_task_info *task, const int expect_body_len) { - if ((int)(task->length - sizeof(SFCommonProtoHeader)) != expect_body_len) { + int body_len; + body_len = task->recv.ptr->length - sizeof(SFCommonProtoHeader); + if (body_len != expect_body_len) { logError("file: "__FILE__", line: %d, " - "server %s:%u, response body length: %d != %d", - __LINE__, task->server_ip, task->port, (int)(task->length - - sizeof(SFCommonProtoHeader)), expect_body_len); + "server %s:%u, response body length: %d != %d", __LINE__, + task->server_ip, task->port, body_len, expect_body_len); return EINVAL; } @@ -279,8 +281,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) return 0; } - resp = (SFProtoSetupChannelResp *)(task->data + - sizeof(SFCommonProtoHeader)); + resp = (SFProtoSetupChannelResp *)SF_PROTO_RECV_BODY(task); channel_id = buff2int(resp->channel_id); channel_key = buff2int(resp->key); buffer_size = buff2int(resp->buffer_size); @@ -290,7 +291,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; fc_list_add_tail(&channel->dlink, &thread_ctx->head); } - channel->buffer_size = FC_MIN(buffer_size, task->size); + channel->buffer_size = FC_MIN(buffer_size, task->send.ptr->size); PTHREAD_MUTEX_LOCK(&channel->lcp.lock); pthread_cond_broadcast(&channel->lcp.cond); @@ -343,6 +344,7 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) static int receipt_deal_task(struct fast_task_info *task, const int stage) { int result; + SFCommonProtoHeader *header; do { if (stage == SF_NIO_STAGE_HANDSHAKE) { @@ -350,7 +352,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) result = 0; break; } else if (stage == SF_NIO_STAGE_CONTINUE) { - if (task->length == 0 && task->offset == 0) { + if (sf_nio_task_is_idle(task)) { if (((IdempotencyClientChannel *)task->arg)->established) { report_req_receipt_request(task, true); } else if (task->req_count > 0) { @@ -362,24 +364,24 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) break; } - result = buff2short(((SFCommonProtoHeader *)task->data)->status); + header = (SFCommonProtoHeader *)task->recv.ptr->data; + result = buff2short(header->status); if (result != 0) { int msg_len; char *message; - msg_len = task->length - sizeof(SFCommonProtoHeader); - message = task->data + sizeof(SFCommonProtoHeader); + msg_len = SF_RECV_BODY_LENGTH(task); + message = SF_PROTO_RECV_BODY(task); logError("file: "__FILE__", line: %d, " "response from server %s:%u, cmd: %d (%s), " - "status: %d, error info: %.*s", - __LINE__, task->server_ip, task->port, - ((SFCommonProtoHeader *)task->data)->cmd, - sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd), + "status: %d, error info: %.*s", __LINE__, + task->server_ip, task->port, header->cmd, + sf_get_cmd_caption(header->cmd), result, msg_len, message); break; } - switch (((SFCommonProtoHeader *)task->data)->cmd) { + switch (header->cmd) { case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP: result = deal_setup_channel_response(task); break; @@ -398,16 +400,15 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) default: logError("file: "__FILE__", line: %d, " "response from server %s:%u, unexpect cmd: %d (%s)", - __LINE__, task->server_ip, task->port, - ((SFCommonProtoHeader *)task->data)->cmd, - sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd)); + __LINE__, task->server_ip, task->port, header->cmd, + sf_get_cmd_caption(header->cmd)); result = EINVAL; break; } if (result == 0) { update_lru_chain(task); - task->offset = task->length = 0; + sf_nio_reset_task_length(task); report_req_receipt_request(task, false); } } while (0); @@ -489,6 +490,8 @@ static void *receipt_alloc_thread_extra_data(const int thread_index) static int do_init(FCAddressPtrArray *address_array) { + const int task_arg_size = 0; + const bool double_buffers = false; int result; int bytes; SFNetworkHandler *rdma_handler; @@ -518,8 +521,8 @@ static int do_init(FCAddressPtrArray *address_array) receipt_alloc_thread_extra_data, receipt_thread_loop_callback, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, - 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, 0, - receipt_init_task, NULL); + 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, + task_arg_size, double_buffers, receipt_init_task, NULL); } int receipt_handler_init(FCAddressPtrArray *address_array) diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index b3cfbaa..d5a9e13 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -37,9 +37,6 @@ #include "server_channel.h" #include "server_handler.h" -#define SF_TASK_BODY_LENGTH(task) \ - (task->length - sizeof(SFCommonProtoHeader)) - int sf_server_deal_setup_channel(struct fast_task_info *task, int *task_type, const int server_id, IdempotencyChannel **channel, SFResponseInfo *response) @@ -52,13 +49,13 @@ int sf_server_deal_setup_channel(struct fast_task_info *task, response->header.cmd = SF_SERVICE_PROTO_SETUP_CHANNEL_RESP; if ((result=sf_server_expect_body_length(response, - SF_TASK_BODY_LENGTH(task), + SF_RECV_BODY_LENGTH(task), sizeof(SFProtoSetupChannelReq))) != 0) { return result; } - req = (SFProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + req = (SFProtoSetupChannelReq *)SF_PROTO_RECV_BODY(task); channel_id = buff2int(req->channel_id); key = buff2int(req->key); if (*channel != NULL) { @@ -76,12 +73,11 @@ int sf_server_deal_setup_channel(struct fast_task_info *task, } *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER; - resp = (SFProtoSetupChannelResp *)(task->data + - sizeof(SFCommonProtoHeader)); + resp = (SFProtoSetupChannelResp *)SF_PROTO_SEND_BODY(task); int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->key, resp->key); int2buff(server_id, resp->server_id); - int2buff(task->size, resp->buffer_size); + int2buff(task->send.ptr->size, resp->buffer_size); response->header.body_len = sizeof(SFProtoSetupChannelResp); return 0; } @@ -139,15 +135,14 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, return result; } - body_len = SF_TASK_BODY_LENGTH(task); + body_len = SF_RECV_BODY_LENGTH(task); if ((result=sf_server_check_min_body_length(response, body_len, sizeof(SFProtoReportReqReceiptHeader))) != 0) { return result; } - body_header = (SFProtoReportReqReceiptHeader *) - (task->data + sizeof(SFCommonProtoHeader)); + body_header = (SFProtoReportReqReceiptHeader *)SF_PROTO_RECV_BODY(task); count = buff2int(body_header->count); calc_body_len = sizeof(SFProtoReportReqReceiptHeader) + sizeof(SFProtoReportReqReceiptBody) * count; @@ -220,7 +215,7 @@ int sf_server_deal_rebind_channel(struct fast_task_info *task, SFProtoRebindChannelReq *req; if ((result=sf_server_expect_body_length(response, - SF_TASK_BODY_LENGTH(task), + SF_RECV_BODY_LENGTH(task), sizeof(SFProtoRebindChannelReq))) != 0) { return result; @@ -240,7 +235,7 @@ int sf_server_deal_rebind_channel(struct fast_task_info *task, } idempotency_channel_release(*channel, false); - req = (SFProtoRebindChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + req = (SFProtoRebindChannelReq *)SF_PROTO_RECV_BODY(task); channel_id = buff2int(req->channel_id); key = buff2int(req->key); *channel = idempotency_channel_find_and_hold(channel_id, key, &result); diff --git a/src/sf_nio.c b/src/sf_nio.c index a2255af..e264be9 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -143,6 +143,8 @@ int sf_set_read_event(struct fast_task_info *task) { int result; + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; task->nio_stages.current = SF_NIO_STAGE_RECV; if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; @@ -488,8 +490,8 @@ void sf_recv_notify_read(int sock, short event, void *arg) int sf_send_add_event(struct fast_task_info *task) { - task->offset = 0; - if (task->length > 0) { + task->send.ptr->offset = 0; + if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { @@ -533,8 +535,7 @@ static inline int check_task(struct fast_task_info *task, return -1; } } else { - //TODO: for streaming should return EAGAIN - return 0; + return EAGAIN; } } @@ -546,8 +547,9 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) bytes = writev(task->event.fd, task->iovec_array.iovs, FC_MIN(task->iovec_array.count, IOV_MAX)); } else { - bytes = write(task->event.fd, task->data + task->offset, - task->length - task->offset); + bytes = write(task->event.fd, task->send.ptr->data + + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); } if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) @@ -567,19 +569,19 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) logWarning("file: "__FILE__", line: %d, " "client ip: %s, send fail, task offset: %d, length: %d, " "errno: %d, error info: %s", __LINE__, task->client_ip, - task->offset, task->length, errno, strerror(errno)); + task->send.ptr->offset, task->send.ptr->length, errno, strerror(errno)); return -1; } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, task length: %d, offset: %d, " "send failed, connection disconnected", __LINE__, - task->client_ip, task->event.fd, task->length, task->offset); + task->client_ip, task->event.fd, task->send.ptr->length, task->send.ptr->offset); return -1; } - task->offset += bytes; - if (task->offset >= task->length) { + task->send.ptr->offset += bytes; + if (task->send.ptr->offset >= task->send.ptr->length) { *action = sf_comm_action_finish; } else { *action = sf_comm_action_continue; @@ -623,16 +625,19 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) int recv_bytes; bool new_alloc; - if (task->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - task->offset; - bytes = read(task->event.fd, task->data + task->offset, recv_bytes); + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); } else { - recv_bytes = task->length - task->offset; + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; if (task->recv_body == NULL) { - bytes = read(task->event.fd, task->data + task->offset, recv_bytes); + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); } else { - bytes = read(task->event.fd, task->recv_body + (task->offset - - SF_CTX->header_size), recv_bytes); + bytes = read(task->event.fd, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); } } @@ -655,19 +660,19 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) return -1; } } else if (bytes == 0) { - if (task->offset > 0) { - if (task->length > 0) { + if (task->recv.ptr->offset > 0) { + if (task->recv.ptr->length > 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection disconnected, " "expect pkg length: %d, recv pkg length: %d", - __LINE__, task->client_ip, task->length, - task->offset); + __LINE__, task->client_ip, task->recv.ptr->length, + task->recv.ptr->offset); } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection " "disconnected, recv pkg length: %d", __LINE__, task->client_ip, - task->offset); + task->recv.ptr->offset); } } else { logDebug("file: "__FILE__", line: %d, " @@ -680,9 +685,9 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) } TCP_SET_QUICK_ACK(task->event.fd); - task->offset += bytes; - if (task->length == 0) { //pkg header - if (task->offset < SF_CTX->header_size) { + task->recv.ptr->offset += bytes; + if (task->recv.ptr->length == 0) { //pkg header + if (task->recv.ptr->offset < SF_CTX->header_size) { *action = sf_comm_action_continue; return bytes; } @@ -693,7 +698,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { task->recv_body = SF_CTX->callbacks.alloc_recv_buffer(task, - task->length - SF_CTX->header_size, &new_alloc); + task->recv.ptr->length - SF_CTX->header_size, &new_alloc); if (new_alloc && task->recv_body == NULL) { return -1; } @@ -702,36 +707,38 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) } if (!new_alloc) { - if (task->length > task->size) { + if (task->recv.ptr->length > task->recv.ptr->size) { int old_size; if (!SF_CTX->realloc_task_buffer) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d exceeds " "task size: %d, but realloc buffer disabled", - __LINE__, task->client_ip, task->size, - task->length); + __LINE__, task->client_ip, task->recv.ptr->size, + task->recv.ptr->length); return -1; } - old_size = task->size; - if (free_queue_realloc_buffer(task, task->length) != 0) { + old_size = task->recv.ptr->size; + if (free_queue_realloc_recv_buffer(task, task-> + recv.ptr->length) != 0) + { logError("file: "__FILE__", line: %d, " - "client ip: %s, realloc buffer size " - "from %d to %d fail", __LINE__, - task->client_ip, task->size, task->length); + "client ip: %s, realloc buffer size from %d " + "to %d fail", __LINE__, task->client_ip, + task->recv.ptr->size, task->recv.ptr->length); return -1; } logDebug("file: "__FILE__", line: %d, " "client ip: %s, task length: %d, realloc buffer " "size from %d to %d", __LINE__, task->client_ip, - task->length, old_size, task->size); + task->recv.ptr->length, old_size, task->recv.ptr->size); } } } - if (task->offset >= task->length) { //recv done + if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done *action = sf_comm_action_finish; } else { *action = sf_comm_action_continue; @@ -876,7 +883,7 @@ int sf_client_sock_read(int sock, short event, void *arg) } if (event & IOEVENT_TIMEOUT) { - if (task->offset == 0 && task->req_count > 0) { + if (task->recv.ptr->offset == 0 && task->req_count > 0) { if (SF_CTX->callbacks.task_timeout != NULL) { if (SF_CTX->callbacks.task_timeout(task) != 0) { ioevent_add_to_deleted_list(task); @@ -889,12 +896,12 @@ int sf_client_sock_read(int sock, short event, void *arg) fast_timer_add(&task->thread_data->timer, &task->event.timer); } else { - if (task->length > 0) { + if (task->recv.ptr->length > 0) { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, recv timeout, " - "recv offset: %d, expect length: %d", - __LINE__, task->client_ip, - task->offset, task->length); + "client ip: %s, recv timeout, recv " + "offset: %d, expect length: %d", __LINE__, + task->client_ip, task->recv.ptr->offset, + task->recv.ptr->length); } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, req_count: %"PRId64", recv timeout", @@ -962,8 +969,9 @@ int sf_client_sock_write(int sock, short event, void *arg) if (event & IOEVENT_TIMEOUT) { logError("file: "__FILE__", line: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, " - "remain: %d", __LINE__, task->client_ip, task->length, - task->offset, task->length - task->offset); + "remain: %d", __LINE__, task->client_ip, task->send.ptr->length, + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); ioevent_add_to_deleted_list(task); return -1; @@ -985,9 +993,11 @@ int sf_client_sock_write(int sock, short event, void *arg) if (action == sf_comm_action_finish) { release_iovec_buffer(task); - length = task->length; - task->offset = 0; - task->length = 0; + length = task->send.ptr->length; + if (task->free_queue->double_buffers) { + task->send.ptr->offset = 0; + task->send.ptr->length = 0; + } if (sf_set_read_event(task) != 0) { return -1; } diff --git a/src/sf_nio.h b/src/sf_nio.h index d4a8a1f..3297723 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -86,7 +86,18 @@ static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex( sf_get_task_cleanup_callback_ex(&g_sf_context) #define sf_nio_task_is_idle(task) \ - (task->offset == 0 && task->length == 0) + ((task->send.ptr->offset == 0 && task->send.ptr->length == 0) && \ + (task->recv.ptr->offset == 0 && task->recv.ptr->length == 0)) + +static inline void sf_nio_reset_task_length(struct fast_task_info *task) +{ + task->send.ptr->length = 0; + task->send.ptr->offset = 0; + if (task->free_queue->double_buffers) { + task->recv.ptr->length = 0; + task->recv.ptr->offset = 0; + } +} void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); @@ -109,20 +120,20 @@ static inline int sf_set_body_length(struct fast_task_info *task) if (SF_CTX->callbacks.set_body_length(task) != 0) { return -1; } - if (task->length < 0) { + if (task->recv.ptr->length < 0) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d < 0", __LINE__, task->client_ip, - task->length); + task->recv.ptr->length); return -1; } - task->length += SF_CTX->header_size; - if (task->length > g_sf_global_vars.max_pkg_size) { + task->recv.ptr->length += SF_CTX->header_size; + if (task->recv.ptr->length > g_sf_global_vars.max_pkg_size) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d > " "max pkg size: %d", __LINE__, - task->client_ip, task->length, + task->client_ip, task->recv.ptr->length, g_sf_global_vars.max_pkg_size); return -1; } diff --git a/src/sf_proto.c b/src/sf_proto.c index aabf519..727bbb9 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -30,7 +30,7 @@ int sf_proto_set_body_length(struct fast_task_info *task) { SFCommonProtoHeader *header; - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->recv.ptr->data; if (!SF_PROTO_CHECK_MAGIC(header->magic)) { logError("file: "__FILE__", line: %d, " "peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT @@ -41,7 +41,7 @@ int sf_proto_set_body_length(struct fast_task_info *task) return EINVAL; } - task->length = buff2int(header->body_len); //set body length + task->recv.ptr->length = buff2int(header->body_len); //set body length return 0; } @@ -655,18 +655,17 @@ int sf_proto_deal_task_done(struct fast_task_info *task, } if (ctx->response.header.status == 0) { - task->offset = task->length = 0; return sf_set_read_event(task); } else { return FC_NEGATIVE(ctx->response.header.status); } } - proto_header = (SFCommonProtoHeader *)task->data; + proto_header = (SFCommonProtoHeader *)task->send.ptr->data; if (!ctx->response_done) { ctx->response.header.body_len = ctx->response.error.length; if (ctx->response.error.length > 0) { - memcpy(task->data + sizeof(SFCommonProtoHeader), + memcpy(task->send.ptr->data + sizeof(SFCommonProtoHeader), ctx->response.error.message, ctx->response.error.length); } } @@ -675,7 +674,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task, short2buff(status, proto_header->status); proto_header->cmd = ctx->response.header.cmd; int2buff(ctx->response.header.body_len, proto_header->body_len); - task->length = sizeof(SFCommonProtoHeader) + ctx->response.header.body_len; + task->send.ptr->length = sizeof(SFCommonProtoHeader) + + ctx->response.header.body_len; r = sf_send_add_event(task); time_used = get_current_time_us() - ctx->req_start_time; diff --git a/src/sf_proto.h b/src/sf_proto.h index 430d97c..a137d43 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -89,8 +89,18 @@ int2buff((resp_header).body_len, (proto_header)->body_len);\ } while (0) -#define SF_PROTO_RESP_BODY(task) \ - (task->data + sizeof(SFCommonProtoHeader)) + +#define SF_PROTO_SEND_BODY(task) \ + (task->send.ptr->data + sizeof(SFCommonProtoHeader)) + +#define SF_PROTO_RECV_BODY(task) \ + (task->recv.ptr->data + sizeof(SFCommonProtoHeader)) + +#define SF_RECV_BODY_LENGTH(task) \ + (task->recv.ptr->length - sizeof(SFCommonProtoHeader)) + +#define SF_SEND_BUFF_END(task) (task->send.ptr->data + task->send.ptr->size) +#define SF_RECV_BUFF_END(task) (task->recv.ptr->data + task->recv.ptr->size) #define SF_PROTO_UPDATE_EXTRA_BODY_SIZE \ sizeof(SFProtoIdempotencyAdditionalHeader) + FCFS_AUTH_SESSION_ID_LEN @@ -282,6 +292,16 @@ const char *sf_get_cmd_caption(const int cmd); int sf_proto_deal_task_done(struct fast_task_info *task, const char *service_name, SFCommonTaskContext *ctx); +static inline void sf_proto_init_task_magic(struct fast_task_info *task) +{ + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->send.ptr->data)->magic); + if (task->free_queue->double_buffers) { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->recv.ptr->data)->magic); + } +} + static inline void sf_proto_init_task_context(struct fast_task_info *task, SFCommonTaskContext *ctx) { @@ -295,14 +315,15 @@ static inline void sf_proto_init_task_context(struct fast_task_info *task, ctx->response_done = false; ctx->need_response = true; - ctx->request.header.cmd = ((SFCommonProtoHeader *)task->data)->cmd; - ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader); + ctx->request.header.cmd = ((SFCommonProtoHeader *) + task->recv.ptr->data)->cmd; + ctx->request.header.body_len = SF_RECV_BODY_LENGTH(task); ctx->request.header.status = buff2short(((SFCommonProtoHeader *) - task->data)->status); + task->recv.ptr->data)->status); if (task->recv_body != NULL) { ctx->request.body = task->recv_body; } else { - ctx->request.body = task->data + sizeof(SFCommonProtoHeader); + ctx->request.body = SF_PROTO_RECV_BODY(task); } } diff --git a/src/sf_service.c b/src/sf_service.c index e883a16..ab931f5 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -66,22 +66,15 @@ int sf_init_task(struct fast_task_info *task) static void *worker_thread_entrance(void *arg); -static int sf_init_free_queues(const int task_padding_size, - const int task_arg_size, TaskInitCallback init_callback) +static int sf_init_free_queue(struct fast_task_queue *free_queue, + const char *name, const bool double_buffers, + const int task_padding_size, const int task_arg_size, + TaskInitCallback init_callback) { -#define ALLOC_CONNECTIONS_ONCE 1024 - - static bool sf_inited = false; int result; int m; - int init_connections; int alloc_conn_once; - if (sf_inited) { - return 0; - } - - sf_inited = true; if ((result=set_rand_seed()) != 0) { logCrit("file: "__FILE__", line: %d, " "set_rand_seed fail, program exit!", __LINE__); @@ -94,19 +87,13 @@ static int sf_init_free_queues(const int task_padding_size, } else if (m > 16) { m = 16; } - alloc_conn_once = ALLOC_CONNECTIONS_ONCE / m; - init_connections = g_sf_global_vars.max_connections < alloc_conn_once ? - g_sf_global_vars.max_connections : alloc_conn_once; - if ((result=free_queue_init_ex2(g_sf_global_vars.max_connections, - init_connections, alloc_conn_once, g_sf_global_vars. - min_buff_size, g_sf_global_vars.max_buff_size, - task_padding_size, task_arg_size, init_callback != NULL ? - init_callback : sf_init_task)) != 0) - { - return result; - } - - return 0; + alloc_conn_once = 256 / m; + return free_queue_init_ex2(free_queue, name, double_buffers, + g_sf_global_vars.max_connections, alloc_conn_once, + g_sf_global_vars.min_buff_size, g_sf_global_vars. + max_buff_size, task_padding_size, task_arg_size, + (init_callback != NULL ? init_callback : + sf_init_task)); } int sf_service_init_ex2(SFContext *sf_context, const char *name, @@ -120,8 +107,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, - const int task_arg_size, TaskInitCallback init_callback, - sf_release_buffer_callback release_buffer_callback) + const int task_arg_size, const bool double_buffers, + TaskInitCallback init_callback, sf_release_buffer_callback + release_buffer_callback) { int result; int bytes; @@ -143,7 +131,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, send_done_callback, deal_func, task_cleanup_func, timeout_callback, release_buffer_callback); - if ((result=sf_init_free_queues(task_padding_size, + if ((result=sf_init_free_queue(&sf_context->free_queue, + name, double_buffers, task_padding_size, task_arg_size, init_callback)) != 0) { return result; @@ -283,7 +272,7 @@ int sf_service_destroy_ex(SFContext *sf_context) { struct nio_thread_data *data_end, *thread_data; - free_queue_destroy(); + free_queue_destroy(&sf_context->free_queue); data_end = sf_context->thread_data + sf_context->work_threads; for (thread_data=sf_context->thread_data; thread_datactx->free_queue); if (task == NULL) { logError("file: "__FILE__", line: %d, " "malloc task buff failed, you should " diff --git a/src/sf_types.h b/src/sf_types.h index 36ce1ed..7230c56 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -158,6 +158,7 @@ typedef struct sf_context { FCSmartPollingConfig smart_polling; SFNIOCallbacks callbacks; + struct fast_task_queue free_queue; } SFContext; typedef struct {