diff --git a/debian/changelog b/debian/changelog index 21241c1..0d410b7 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,21 @@ +libserverframe (1.2.0-3) unstable; urgency=medium + + * upgrade to 1.2.0-3 + + -- YuQing <384681@qq.com> Tue, 21 Nov 2023 14:36:16 +0000 + +libserverframe (1.2.0-2) unstable; urgency=medium + + * upgrade to 1.2.0-2 + + -- YuQing <384681@qq.com> Mon, 20 Nov 2023 13:24:02 +0000 + +libserverframe (1.2.0-1) unstable; urgency=medium + + * upgrade to 1.2.0-1 + + -- YuQing <384681@qq.com> Sun, 19 Nov 2023 14:46:16 +0000 + libserverframe (1.1.29-1) unstable; urgency=medium * upgrade to 1.1.29-1 diff --git a/debian/substvars b/debian/substvars index cc6b3ba..cb8793a 100644 --- a/debian/substvars +++ b/debian/substvars @@ -1 +1 @@ -libfastcommon:Version=1.0.69 +libfastcommon:Version=1.0.70 diff --git a/libserverframe.spec b/libserverframe.spec index 6bc68e4..e66d341 100644 --- a/libserverframe.spec +++ b/libserverframe.spec @@ -2,7 +2,7 @@ %define CommitVersion %(echo $COMMIT_VERSION) Name: libserverframe -Version: 1.1.29 +Version: 1.2.0 Release: 1%{?dist} Summary: network framework library License: AGPL v3.0 @@ -10,11 +10,11 @@ Group: Arch/Tech URL: http://github.com/happyfish100/libserverframe/ Source: http://github.com/happyfish100/libserverframe/%{name}-%{version}.tar.gz -BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) -BuildRequires: libfastcommon-devel >= 1.0.69 +BuildRequires: libfastcommon-devel >= 1.0.70 Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id -Requires: libfastcommon >= 1.0.69 +Requires: libfastcommon >= 1.0.70 %description common framework library diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 7253f46..822f008 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -31,11 +31,11 @@ #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/fc_queue.h" -#include "../../sf_util.h" -#include "../../sf_func.h" -#include "../../sf_nio.h" -#include "../../sf_global.h" -#include "../../sf_service.h" +#include "sf/sf_util.h" +#include "sf/sf_func.h" +#include "sf/sf_nio.h" +#include "sf/sf_global.h" +#include "sf/sf_service.h" #include "client_channel.h" typedef struct { @@ -171,12 +171,18 @@ void client_channel_destroy() } static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel - *channel, const uint32_t hash_code, const char *server_ip, - const uint16_t port, int *err_no) + *channel, const uint32_t hash_code, const FCCommunicationType comm_type, + const char *server_ip, const uint16_t port, int *err_no) { struct fast_task_info *task; + SFNetworkHandler *handler; - if ((task=sf_alloc_init_task(&g_sf_context, -1)) == NULL) { + if (comm_type == fc_comm_type_sock) { + handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + } else { + handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + } + if ((task=sf_alloc_init_task(handler, -1)) == NULL) { *err_no = ENOMEM; return NULL; } @@ -226,8 +232,8 @@ int idempotency_client_channel_check_reconnect( } struct idempotency_client_channel *idempotency_client_channel_get( - const char *server_ip, const uint16_t server_port, - const int timeout, int *err_no) + const FCCommunicationType comm_type, const char *server_ip, + const uint16_t server_port, const int timeout, int *err_no) { int r; int key_len; @@ -239,7 +245,7 @@ struct idempotency_client_channel *idempotency_client_channel_get( IdempotencyClientChannel *current; IdempotencyClientChannel *channel; - key_len = snprintf(key, sizeof(key), "%s_%u", server_ip, server_port); + key_len = snprintf(key, sizeof(key), "%s-%u", server_ip, server_port); hash_code = fc_simple_hash(key, key_len); bucket = channel_context.htable.buckets + hash_code % channel_context.htable.capacity; @@ -277,8 +283,8 @@ struct idempotency_client_channel *idempotency_client_channel_get( break; } - channel->task = alloc_channel_task(channel, - hash_code, server_ip, server_port, err_no); + channel->task = alloc_channel_task(channel, hash_code, + comm_type, server_ip, server_port, err_no); if (channel->task == NULL) { fast_mblock_free_object(&channel_context. channel_allocator, channel); diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index aafbe65..ba31309 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -22,6 +22,7 @@ #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/fc_atomic.h" +#include "sf/sf_types.h" #include "client_types.h" #ifdef __cplusplus @@ -40,8 +41,8 @@ void idempotency_client_channel_config_to_string_ex( char *output, const int size, const bool add_comma); struct idempotency_client_channel *idempotency_client_channel_get( - const char *server_ip, const uint16_t server_port, - const int timeout, int *err_no); + const FCCommunicationType comm_type, const char *server_ip, + const uint16_t server_port, const int timeout, int *err_no); static inline uint64_t idempotency_client_channel_next_seq_id( struct idempotency_client_channel *channel) diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h index 59fd063..0ba8a7f 100644 --- a/src/idempotency/client/client_types.h +++ b/src/idempotency/client/client_types.h @@ -63,6 +63,15 @@ typedef struct idempotency_receipt_thread_context { } last_check_times; } IdempotencyReceiptThreadContext; +typedef struct idempotency_receipt_global_vars { + struct { + int task_padding_size; + sf_init_connection_callback init_connection; + struct ibv_pd *pd; + } rdma; + IdempotencyReceiptThreadContext *thread_contexts; +} IdempotencyReceiptGlobalVars; + #ifdef __cplusplus extern "C" { #endif diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index ecaf850..b487da2 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -41,13 +41,22 @@ #include "client_channel.h" #include "receipt_handler.h" -static IdempotencyReceiptThreadContext *receipt_thread_contexts = NULL; +static IdempotencyReceiptGlobalVars receipt_global_vars; + +#define RECEIPT_THREAD_CONTEXTS receipt_global_vars.thread_contexts +#define TASK_PADDING_SIZE receipt_global_vars.rdma.task_padding_size +#define RDMA_INIT_CONNECTION receipt_global_vars.rdma.init_connection +#define RDMA_PD receipt_global_vars.rdma.pd static int receipt_init_task(struct fast_task_info *task) { task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side task->network_timeout = SF_G_NETWORK_TIMEOUT; - return 0; + if (RDMA_INIT_CONNECTION != NULL) { + return RDMA_INIT_CONNECTION(task, RDMA_PD); + } else { + return 0; + } } static int receipt_recv_timeout_callback(struct fast_task_info *task) @@ -83,13 +92,12 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) if (task->event.fd >= 0) { sf_task_detach_thread(task); - close(task->event.fd); - task->event.fd = -1; + task->handler->close_connection(task); } - task->length = 0; - task->offset = 0; + sf_nio_reset_task_length(task); task->req_count = 0; + task->pending_send_count = 0; channel = (IdempotencyClientChannel *)task->arg; fc_list_del_init(&channel->dlink); @@ -108,14 +116,15 @@ static void setup_channel_request(struct fast_task_info *task) SFProtoSetupChannelReq *req; channel = (IdempotencyClientChannel *)task->arg; - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; req = (SFProtoSetupChannelReq *)(header + 1); int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id); int2buff(__sync_add_and_fetch(&channel->key, 0), req->key); SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ, sizeof(SFProtoSetupChannelReq)); - task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq); + task->send.ptr->length = sizeof(SFCommonProtoHeader) + + sizeof(SFProtoSetupChannelReq); sf_send_add_event(task); } @@ -142,10 +151,10 @@ static int check_report_req_receipt(struct fast_task_info *task) return 0; } - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; rheader = (SFProtoReportReqReceiptHeader *)(header + 1); rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1); - buff_end = task->data + channel->buffer_size; + buff_end = task->send.ptr->data + channel->buffer_size; last = NULL; receipt = channel->waiting_resp_qinfo.head; do { @@ -175,8 +184,9 @@ static int check_report_req_receipt(struct fast_task_info *task) count = rbody - rstart; int2buff(count, rheader->count); - task->length = (char *)rbody - task->data; - int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); + task->send.ptr->length = (char *)rbody - task->send.ptr->data; + int2buff(task->send.ptr->length - sizeof(SFCommonProtoHeader), + header->body_len); header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; sf_send_add_event(task); return count; @@ -190,18 +200,18 @@ static void close_channel_request(struct fast_task_info *task) channel = (IdempotencyClientChannel *)task->arg; idempotency_client_channel_set_id_key(channel, 0, 0); - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ, 0); - task->length = sizeof(SFCommonProtoHeader); + task->send.ptr->length = sizeof(SFCommonProtoHeader); sf_send_add_event(task); } static void active_test_request(struct fast_task_info *task) { SFCommonProtoHeader *header; - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->send.ptr->data; SF_PROTO_SET_HEADER(header, SF_PROTO_ACTIVE_TEST_REQ, 0); - task->length = sizeof(SFCommonProtoHeader); + task->send.ptr->length = sizeof(SFCommonProtoHeader); sf_send_add_event(task); } @@ -229,17 +239,19 @@ static void report_req_receipt_request(struct fast_task_info *task, if (update_lru) { update_lru_chain(task); } + task->pending_send_count++; } } static inline int receipt_expect_body_length(struct fast_task_info *task, const int expect_body_len) { - if ((int)(task->length - sizeof(SFCommonProtoHeader)) != expect_body_len) { + int body_len; + body_len = task->recv.ptr->length - sizeof(SFCommonProtoHeader); + if (body_len != expect_body_len) { logError("file: "__FILE__", line: %d, " - "server %s:%u, response body length: %d != %d", - __LINE__, task->server_ip, task->port, (int)(task->length - - sizeof(SFCommonProtoHeader)), expect_body_len); + "server %s:%u, response body length: %d != %d", __LINE__, + task->server_ip, task->port, body_len, expect_body_len); return EINVAL; } @@ -271,8 +283,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) return 0; } - resp = (SFProtoSetupChannelResp *)(task->data + - sizeof(SFCommonProtoHeader)); + resp = (SFProtoSetupChannelResp *)SF_PROTO_RECV_BODY(task); channel_id = buff2int(resp->channel_id); channel_key = buff2int(resp->key); buffer_size = buff2int(resp->buffer_size); @@ -282,7 +293,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; fc_list_add_tail(&channel->dlink, &thread_ctx->head); } - channel->buffer_size = FC_MIN(buffer_size, task->size); + channel->buffer_size = FC_MIN(buffer_size, task->send.ptr->size); PTHREAD_MUTEX_LOCK(&channel->lcp.lock); pthread_cond_broadcast(&channel->lcp.cond); @@ -316,9 +327,10 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) "response from server %s:%u, unexpect cmd: " "REPORT_REQ_RECEIPT_RESP", __LINE__, task->server_ip, task->port); - return 0; + return EINVAL; } + task->pending_send_count--; current = channel->waiting_resp_qinfo.head; do { deleted = current; @@ -335,6 +347,7 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) static int receipt_deal_task(struct fast_task_info *task, const int stage) { int result; + SFCommonProtoHeader *header; do { if (stage == SF_NIO_STAGE_HANDSHAKE) { @@ -342,7 +355,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) result = 0; break; } else if (stage == SF_NIO_STAGE_CONTINUE) { - if (task->length == 0 && task->offset == 0) { + if (task->pending_send_count == 0) { if (((IdempotencyClientChannel *)task->arg)->established) { report_req_receipt_request(task, true); } else if (task->req_count > 0) { @@ -354,24 +367,24 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) break; } - result = buff2short(((SFCommonProtoHeader *)task->data)->status); + header = (SFCommonProtoHeader *)task->recv.ptr->data; + result = buff2short(header->status); if (result != 0) { int msg_len; char *message; - msg_len = task->length - sizeof(SFCommonProtoHeader); - message = task->data + sizeof(SFCommonProtoHeader); + msg_len = SF_RECV_BODY_LENGTH(task); + message = SF_PROTO_RECV_BODY(task); logError("file: "__FILE__", line: %d, " "response from server %s:%u, cmd: %d (%s), " - "status: %d, error info: %.*s", - __LINE__, task->server_ip, task->port, - ((SFCommonProtoHeader *)task->data)->cmd, - sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd), + "status: %d, error info: %.*s", __LINE__, + task->server_ip, task->port, header->cmd, + sf_get_cmd_caption(header->cmd), result, msg_len, message); break; } - switch (((SFCommonProtoHeader *)task->data)->cmd) { + switch (header->cmd) { case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP: result = deal_setup_channel_response(task); break; @@ -379,6 +392,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) result = deal_report_req_receipt_response(task); break; case SF_PROTO_ACTIVE_TEST_RESP: + task->pending_send_count--; result = 0; break; case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: @@ -390,17 +404,19 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) default: logError("file: "__FILE__", line: %d, " "response from server %s:%u, unexpect cmd: %d (%s)", - __LINE__, task->server_ip, task->port, - ((SFCommonProtoHeader *)task->data)->cmd, - sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd)); + __LINE__, task->server_ip, task->port, header->cmd, + sf_get_cmd_caption(header->cmd)); result = EINVAL; break; } if (result == 0) { update_lru_chain(task); - task->offset = task->length = 0; - report_req_receipt_request(task, false); + task->recv.ptr->length = 0; + task->recv.ptr->offset = 0; + if (task->pending_send_count == 0) { + report_req_receipt_request(task, false); + } } } while (0); @@ -420,9 +436,10 @@ static void receipt_thread_check_heartbeat( break; } - if (sf_nio_task_is_idle(channel->task)) { + if (channel->task->pending_send_count == 0) { channel->last_pkg_time = g_current_time; active_test_request(channel->task); + channel->task->pending_send_count++; } } } @@ -434,7 +451,7 @@ static void receipt_thread_close_idle_channel( IdempotencyClientChannel *tmp; fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) { - if (!sf_nio_task_is_idle(channel->task)) { + if (channel->task->pending_send_count > 0) { continue; } @@ -474,35 +491,55 @@ static void *receipt_alloc_thread_extra_data(const int thread_index) { IdempotencyReceiptThreadContext *ctx; - ctx = receipt_thread_contexts + thread_index; + ctx = RECEIPT_THREAD_CONTEXTS + thread_index; FC_INIT_LIST_HEAD(&ctx->head); return ctx; } -static int do_init() +static int do_init(FCAddressPtrArray *address_array) { + const int task_arg_size = 0; + const bool double_buffers = false; + const bool explicit_post_recv = false; + int result; int bytes; + SFNetworkHandler *rdma_handler; bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS; - receipt_thread_contexts = (IdempotencyReceiptThreadContext *) + RECEIPT_THREAD_CONTEXTS = (IdempotencyReceiptThreadContext *) fc_malloc(bytes); - if (receipt_thread_contexts == NULL) { + if (RECEIPT_THREAD_CONTEXTS == NULL) { return ENOMEM; } - memset(receipt_thread_contexts, 0, bytes); + memset(RECEIPT_THREAD_CONTEXTS, 0, bytes); + if ((rdma_handler=sf_get_rdma_network_handler(&g_sf_context)) != NULL) { + if ((result=sf_alloc_rdma_pd(&g_sf_context, address_array)) != 0) { + return result; + } + + TASK_PADDING_SIZE = rdma_handler->get_connection_size(); + RDMA_INIT_CONNECTION = rdma_handler->init_connection; + RDMA_PD = rdma_handler->pd; + } else { + TASK_PADDING_SIZE = 0; + RDMA_INIT_CONNECTION = NULL; + RDMA_PD = NULL; + } return sf_service_init_ex2(&g_sf_context, "idemp-receipt", receipt_alloc_thread_extra_data, receipt_thread_loop_callback, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, - 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL); + 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, + task_arg_size, double_buffers, explicit_post_recv, + receipt_init_task, NULL); } -int receipt_handler_init() +int receipt_handler_init(FCAddressPtrArray *address_array) { int result; - if ((result=do_init()) != 0) { + if ((result=do_init(address_array)) != 0) { return result; } diff --git a/src/idempotency/client/receipt_handler.h b/src/idempotency/client/receipt_handler.h index 953f614..7ea2dbe 100644 --- a/src/idempotency/client/receipt_handler.h +++ b/src/idempotency/client/receipt_handler.h @@ -24,7 +24,7 @@ extern "C" { #endif -int receipt_handler_init(); +int receipt_handler_init(FCAddressPtrArray *address_array); int receipt_handler_destroy(); #ifdef __cplusplus diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index b3cfbaa..1a7cfa3 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -37,9 +37,6 @@ #include "server_channel.h" #include "server_handler.h" -#define SF_TASK_BODY_LENGTH(task) \ - (task->length - sizeof(SFCommonProtoHeader)) - int sf_server_deal_setup_channel(struct fast_task_info *task, int *task_type, const int server_id, IdempotencyChannel **channel, SFResponseInfo *response) @@ -52,13 +49,13 @@ int sf_server_deal_setup_channel(struct fast_task_info *task, response->header.cmd = SF_SERVICE_PROTO_SETUP_CHANNEL_RESP; if ((result=sf_server_expect_body_length(response, - SF_TASK_BODY_LENGTH(task), + SF_RECV_BODY_LENGTH(task), sizeof(SFProtoSetupChannelReq))) != 0) { return result; } - req = (SFProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + req = (SFProtoSetupChannelReq *)SF_PROTO_RECV_BODY(task); channel_id = buff2int(req->channel_id); key = buff2int(req->key); if (*channel != NULL) { @@ -76,12 +73,11 @@ int sf_server_deal_setup_channel(struct fast_task_info *task, } *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER; - resp = (SFProtoSetupChannelResp *)(task->data + - sizeof(SFCommonProtoHeader)); + resp = (SFProtoSetupChannelResp *)SF_PROTO_SEND_BODY(task); int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->key, resp->key); int2buff(server_id, resp->server_id); - int2buff(task->size, resp->buffer_size); + int2buff(task->send.ptr->size, resp->buffer_size); response->header.body_len = sizeof(SFProtoSetupChannelResp); return 0; } @@ -135,19 +131,19 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, SFProtoReportReqReceiptBody *body_part; SFProtoReportReqReceiptBody *body_end; + response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; if ((result=check_holder_channel(task_type, channel, response)) != 0) { return result; } - body_len = SF_TASK_BODY_LENGTH(task); + body_len = SF_RECV_BODY_LENGTH(task); if ((result=sf_server_check_min_body_length(response, body_len, sizeof(SFProtoReportReqReceiptHeader))) != 0) { return result; } - body_header = (SFProtoReportReqReceiptHeader *) - (task->data + sizeof(SFCommonProtoHeader)); + body_header = (SFProtoReportReqReceiptHeader *)SF_PROTO_RECV_BODY(task); count = buff2int(body_header->count); calc_body_len = sizeof(SFProtoReportReqReceiptHeader) + sizeof(SFProtoReportReqReceiptBody) * count; @@ -169,7 +165,6 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, } //logInfo("receipt count: %d, success: %d", count, success); - response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; return 0; } @@ -220,7 +215,7 @@ int sf_server_deal_rebind_channel(struct fast_task_info *task, SFProtoRebindChannelReq *req; if ((result=sf_server_expect_body_length(response, - SF_TASK_BODY_LENGTH(task), + SF_RECV_BODY_LENGTH(task), sizeof(SFProtoRebindChannelReq))) != 0) { return result; @@ -240,7 +235,7 @@ int sf_server_deal_rebind_channel(struct fast_task_info *task, } idempotency_channel_release(*channel, false); - req = (SFProtoRebindChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + req = (SFProtoRebindChannelReq *)SF_PROTO_RECV_BODY(task); channel_id = buff2int(req->channel_id); key = buff2int(req->key); *channel = idempotency_channel_find_and_hold(channel_id, key, &result); diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 0a42955..e7e35d0 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -522,6 +522,12 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, const bool bg_thread_enabled) { const int socket_domain = AF_UNSPEC; + struct { + ConnectionExtraParams holder; + ConnectionExtraParams *ptr; + } extra_params; + FCServerGroupInfo *server_group; + int htable_init_capacity; int result; @@ -529,11 +535,29 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, if (htable_init_capacity < 256) { htable_init_capacity = 256; } + + if ((server_group=fc_server_get_group_by_index(server_cfg, + server_group_index)) == NULL) + { + return ENOENT; + } + + if (server_group->comm_type == fc_comm_type_sock) { + extra_params.ptr = NULL; + } else { + if ((result=conn_pool_set_rdma_extra_params(&extra_params.holder, + server_cfg, server_group_index)) != 0) + { + return result; + } + extra_params.ptr = &extra_params.holder; + } if ((result=conn_pool_init_ex1(&cm->cpool, common_cfg->connect_timeout, max_count_per_entry, max_idle_time, socket_domain, htable_init_capacity, connect_done_callback, args, sf_cm_validate_connection_callback, cm, - sizeof(SFConnectionParameters))) != 0) + sizeof(SFConnectionParameters), + extra_params.ptr)) != 0) { return result; } diff --git a/src/sf_global.c b/src/sf_global.c index ae251a5..4754bd3 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include "fastcommon/process_ctrl.h" #include "fastcommon/logger.h" #include "sf_nio.h" +#include "sf_service.h" #include "sf_global.h" SFGlobalVariables g_sf_global_vars = { @@ -38,15 +40,16 @@ SFGlobalVariables g_sf_global_vars = { {{'/', 't', 'm', 'p', '\0'}, false}, true, true, false, DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, - SF_DEF_MAX_BUFF_SIZE, 0, SF_DEF_THREAD_STACK_SIZE, - 0, 0, 0, {'\0'}, {'\0'}, {SF_DEF_SYNC_LOG_BUFF_INTERVAL, false}, + SF_DEF_MAX_BUFF_SIZE, 0, SF_DEF_THREAD_STACK_SIZE, 0, + {false, 0, 0, {'\0'}, {'\0'}}, + {SF_DEF_SYNC_LOG_BUFF_INTERVAL, false}, {0, 0}, NULL, {NULL, 0} }; -SFContext g_sf_context = { - {'\0'}, NULL, 0, -1, -1, 0, 0, 1, DEFAULT_WORK_THREADS, - {'\0'}, {'\0'}, 0, true, true, NULL, NULL, NULL, NULL, - NULL, sf_task_finish_clean_up, NULL +SFContext g_sf_context = {{'\0'}, NULL, 0, + {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, + 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, true, + {false, 0, 0}, {sf_task_finish_clean_up} }; static inline void set_config_str_value(const char *value, @@ -60,10 +63,11 @@ static inline void set_config_str_value(const char *value, } static int load_network_parameters(IniFullContext *ini_ctx, - const char *max_pkg_size_item_nm, + const char *max_pkg_size_item_nm, const int fixed_buff_size, const int task_buffer_extra_size) { int result; + int padding_buff_size; char *pMinBuffSize; char *pMaxBuffSize; @@ -93,6 +97,14 @@ static int load_network_parameters(IniFullContext *ini_ctx, return result; } + if (fixed_buff_size > 0) { + padding_buff_size = fixed_buff_size + task_buffer_extra_size; + g_sf_global_vars.min_buff_size = padding_buff_size; + g_sf_global_vars.max_buff_size = padding_buff_size; + g_sf_global_vars.max_pkg_size = padding_buff_size; + return 0; + } + g_sf_global_vars.max_pkg_size = iniGetByteCorrectValueEx(ini_ctx, max_pkg_size_item_nm, SF_DEF_MAX_PACKAGE_SIZE, 1, 8192, SF_MAX_NETWORK_BUFF_SIZE, true); @@ -281,10 +293,10 @@ int sf_load_global_base_path(IniFullContext *ini_ctx) return 0; } -int sf_load_global_config_ex(const char *server_name, +int sf_load_global_config_ex(const char *log_filename_prefix, IniFullContext *ini_ctx, const bool load_network_params, - const char *max_pkg_size_item_nm, const int task_buffer_extra_size, - const bool need_set_run_by) + const char *max_pkg_size_item_nm, const int fixed_buff_size, + const int task_buffer_extra_size, const bool need_set_run_by) { int result; const char *old_section_name; @@ -301,7 +313,7 @@ int sf_load_global_config_ex(const char *server_name, tcp_set_quick_ack(g_sf_global_vars.tcp_quick_ack); if (load_network_params) { if ((result=load_network_parameters(ini_ctx, max_pkg_size_item_nm, - task_buffer_extra_size)) != 0) + fixed_buff_size, task_buffer_extra_size)) != 0) { return result; } @@ -310,20 +322,20 @@ int sf_load_global_config_ex(const char *server_name, pRunByGroup = iniGetStrValue(NULL, "run_by_group", ini_ctx->context); pRunByUser = iniGetStrValue(NULL, "run_by_user", ini_ctx->context); if (pRunByGroup == NULL) { - *g_sf_global_vars.run_by_group = '\0'; + *g_sf_global_vars.run_by.group = '\0'; } else { - snprintf(g_sf_global_vars.run_by_group, - sizeof(g_sf_global_vars.run_by_group), + snprintf(g_sf_global_vars.run_by.group, + sizeof(g_sf_global_vars.run_by.group), "%s", pRunByGroup); } - if (*(g_sf_global_vars.run_by_group) == '\0') { - g_sf_global_vars.run_by_gid = getegid(); + if (*(g_sf_global_vars.run_by.group) == '\0') { + g_sf_global_vars.run_by.gid = getegid(); } else { struct group *pGroup; - pGroup = getgrnam(g_sf_global_vars.run_by_group); + pGroup = getgrnam(g_sf_global_vars.run_by.group); if (pGroup == NULL) { result = errno != 0 ? errno : ENOENT; logError("file: "__FILE__", line: %d, " @@ -333,24 +345,24 @@ int sf_load_global_config_ex(const char *server_name, return result; } - g_sf_global_vars.run_by_gid = pGroup->gr_gid; + g_sf_global_vars.run_by.gid = pGroup->gr_gid; } if (pRunByUser == NULL) { - *g_sf_global_vars.run_by_user = '\0'; + *g_sf_global_vars.run_by.user = '\0'; } else { - snprintf(g_sf_global_vars.run_by_user, - sizeof(g_sf_global_vars.run_by_user), + snprintf(g_sf_global_vars.run_by.user, + sizeof(g_sf_global_vars.run_by.user), "%s", pRunByUser); } - if (*(g_sf_global_vars.run_by_user) == '\0') { - g_sf_global_vars.run_by_uid = geteuid(); + if (*(g_sf_global_vars.run_by.user) == '\0') { + g_sf_global_vars.run_by.uid = geteuid(); } else { struct passwd *pUser; - pUser = getpwnam(g_sf_global_vars.run_by_user); + pUser = getpwnam(g_sf_global_vars.run_by.user); if (pUser == NULL) { result = errno != 0 ? errno : ENOENT; logError("file: "__FILE__", line: %d, " @@ -360,16 +372,17 @@ int sf_load_global_config_ex(const char *server_name, return result; } - g_sf_global_vars.run_by_uid = pUser->pw_uid; + g_sf_global_vars.run_by.uid = pUser->pw_uid; } + g_sf_global_vars.run_by.inited = true; if (SF_G_BASE_PATH_CREATED) { SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(SF_G_BASE_PATH_STR); } if (need_set_run_by) { - if ((result=set_run_by(g_sf_global_vars.run_by_group, - g_sf_global_vars.run_by_user)) != 0) + if ((result=set_run_by(g_sf_global_vars.run_by.group, + g_sf_global_vars.run_by.user)) != 0) { return result; } @@ -389,8 +402,10 @@ int sf_load_global_config_ex(const char *server_name, ini_ctx->section_name = old_section_name; load_log_level(ini_ctx->context); - if (server_name != NULL) { - if ((result=log_set_prefix(SF_G_BASE_PATH_STR, server_name)) != 0) { + if (log_filename_prefix != NULL) { + if ((result=log_set_prefix(SF_G_BASE_PATH_STR, + log_filename_prefix)) != 0) + { return result; } } @@ -398,12 +413,13 @@ int sf_load_global_config_ex(const char *server_name, return 0; } -int sf_load_config_ex(const char *server_name, SFContextIniConfig *config, +int sf_load_config_ex(const char *log_filename_prefix, + SFContextIniConfig *config, const int fixed_buff_size, const int task_buffer_extra_size, const bool need_set_run_by) { int result; - if ((result=sf_load_global_config_ex(server_name, &config->ini_ctx, - true, config->max_pkg_size_item_name, + if ((result=sf_load_global_config_ex(log_filename_prefix, &config->ini_ctx, + true, config->max_pkg_size_item_name, fixed_buff_size, task_buffer_extra_size, need_set_run_by)) != 0) { return result; @@ -411,17 +427,118 @@ int sf_load_config_ex(const char *server_name, SFContextIniConfig *config, return sf_load_context_from_config_ex(&g_sf_context, config); } +#define API_PREFIX_NAME "fast_rdma_" + +#define LOAD_API_EX(handler, prefix, fname) \ + do { \ + handler->fname = dlsym(dlhandle, API_PREFIX_NAME#prefix#fname); \ + if (handler->fname == NULL) { \ + logError("file: "__FILE__", line: %d, " \ + "dlsym api %s fail, error info: %s", \ + __LINE__, API_PREFIX_NAME#prefix#fname, dlerror()); \ + return ENOENT; \ + } \ + } while (0) + +#define LOAD_API(handler, fname) LOAD_API_EX(handler, server_, fname) + +static int load_rdma_apis(SFNetworkHandler *handler) +{ + const char *library = "libfastrdma.so"; + void *dlhandle; + + dlhandle = dlopen(library, RTLD_LAZY); + if (dlhandle == NULL) { + logError("file: "__FILE__", line: %d, " + "dlopen %s fail, error info: %s", + __LINE__, library, dlerror()); + return EFAULT; + } + + LOAD_API(handler, get_connection_size); + LOAD_API(handler, init_connection); + LOAD_API(handler, alloc_pd); + LOAD_API_EX(handler, , create_server); + LOAD_API_EX(handler, , close_server); + LOAD_API(handler, accept_connection); + LOAD_API_EX(handler, , async_connect_server); + LOAD_API_EX(handler, , async_connect_check); + LOAD_API(handler, close_connection); + LOAD_API(handler, send_data); + LOAD_API(handler, recv_data); + LOAD_API(handler, post_recv); + + return 0; +} + +static int init_network_handler(SFNetworkHandler *handler, + SFContext *sf_context) +{ + handler->ctx = sf_context; + handler->inner.handler = handler; + handler->outer.handler = handler; + handler->inner.is_inner = true; + handler->outer.is_inner = false; + handler->explicit_post_recv = false; + + if (handler->comm_type == fc_comm_type_sock) { + handler->inner.sock = -1; + handler->outer.sock = -1; + handler->create_server = sf_socket_create_server; + handler->close_server = sf_socket_close_server; + handler->accept_connection = sf_socket_accept_connection; + handler->async_connect_server = sf_socket_async_connect_server; + handler->async_connect_check = sf_socket_async_connect_check; + handler->close_connection = sf_socket_close_connection; + handler->send_data = sf_socket_send_data; + handler->recv_data = sf_socket_recv_data; + handler->post_recv = NULL; + return 0; + } else { + handler->inner.id = NULL; + handler->outer.id = NULL; + return load_rdma_apis(handler); + } +} + int sf_load_context_from_config_ex(SFContext *sf_context, SFContextIniConfig *config) { + SFNetworkHandler *sock_handler; + SFNetworkHandler *rdma_handler; char *inner_port; char *outer_port; char *inner_bind_addr; char *outer_bind_addr; char *bind_addr; int port; + int i; + int result; - sf_context->inner_port = sf_context->outer_port = 0; + sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + sock_handler->comm_type = fc_comm_type_sock; + rdma_handler->comm_type = fc_comm_type_rdma; + if (config->comm_type == fc_comm_type_sock) { + sock_handler->enabled = true; + rdma_handler->enabled = false; + } else if (config->comm_type == fc_comm_type_rdma) { + sock_handler->enabled = false; + rdma_handler->enabled = true; + } else if (config->comm_type == fc_comm_type_both) { + sock_handler->enabled = true; + rdma_handler->enabled = true; + } + for (i=0; ihandlers[i].enabled) { + continue; + } + if ((result=init_network_handler(sf_context->handlers + i, + sf_context)) != 0) + { + return result; + } + } inner_port = iniGetStrValue(config->ini_ctx.section_name, "inner_port", config->ini_ctx.context); @@ -431,24 +548,37 @@ int sf_load_context_from_config_ex(SFContext *sf_context, port = iniGetIntValue(config->ini_ctx.section_name, "port", config->ini_ctx.context, 0); if (port > 0) { - sf_context->inner_port = sf_context->outer_port = port; + sock_handler->inner.port = sock_handler->outer.port = port; } } else { if (inner_port != NULL) { - sf_context->inner_port = atoi(inner_port); + sock_handler->inner.port = strtol(inner_port, NULL, 10); } if (outer_port != NULL) { - sf_context->outer_port = atoi(outer_port); + sock_handler->outer.port = strtol(outer_port, NULL, 10); } } - if (sf_context->inner_port <= 0) { - sf_context->inner_port = config->default_inner_port; + if (sock_handler->inner.port <= 0) { + sock_handler->inner.port = config->default_inner_port; } - if (sf_context->outer_port <= 0) { - sf_context->outer_port = config->default_outer_port; + if (sock_handler->outer.port <= 0) { + sock_handler->outer.port = config->default_outer_port; } + if (sock_handler->inner.port == sock_handler->outer.port) { + sock_handler->inner.enabled = true; + sock_handler->outer.enabled = false; + } else { + sock_handler->inner.enabled = true; + sock_handler->outer.enabled = true; + } + + rdma_handler->inner.port = sock_handler->inner.port; + rdma_handler->inner.enabled = sock_handler->inner.enabled; + rdma_handler->outer.port = sock_handler->outer.port; + rdma_handler->outer.enabled = sock_handler->outer.enabled; + inner_bind_addr = iniGetStrValue(config->ini_ctx.section_name, "inner_bind_addr", config->ini_ctx.context); outer_bind_addr = iniGetStrValue(config->ini_ctx.section_name, @@ -492,26 +622,44 @@ int sf_load_context_from_config_ex(SFContext *sf_context, return 0; } +int sf_alloc_rdma_pd(SFContext *sf_context, + FCAddressPtrArray *address_array) +{ + SFNetworkHandler *handler; + int result; + + handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + if (!handler->enabled) { + return 0; + } + + handler->pd = fc_alloc_rdma_pd(handler->alloc_pd, + address_array, &result); + return result; +} + void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { + const SFNetworkHandler *sock_handler; int len; + sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; len = 0; - if ((sf_context->inner_port == sf_context->outer_port) && + if ((sock_handler->inner.port == sock_handler->outer.port) && (strcmp(sf_context->inner_bind_addr, sf_context->outer_bind_addr) == 0)) { len += snprintf(output + len, size - len, "port=%u, bind_addr=%s", - sf_context->inner_port, + sock_handler->inner.port, sf_context->inner_bind_addr); } else { len += snprintf(output + len, size - len, "inner_port=%u, inner_bind_addr=%s, " "outer_port=%u, outer_bind_addr=%s", - sf_context->inner_port, sf_context->inner_bind_addr, - sf_context->outer_port, sf_context->outer_bind_addr); + sock_handler->inner.port, sf_context->inner_bind_addr, + sock_handler->outer.port, sf_context->outer_bind_addr); } len += snprintf(output + len, size - len, @@ -596,8 +744,8 @@ void sf_global_config_to_string_ex(const char *max_pkg_size_item_nm, g_sf_global_vars.thread_stack_size / 1024, pkg_buff, g_sf_global_vars.tcp_quick_ack, log_get_level_caption(), - g_sf_global_vars.run_by_group, - g_sf_global_vars.run_by_user + g_sf_global_vars.run_by.group, + g_sf_global_vars.run_by.user ); sf_log_config_to_string(&g_sf_global_vars.error_log, diff --git a/src/sf_global.h b/src/sf_global.h index fc44599..00b17cd 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -49,10 +49,13 @@ typedef struct sf_global_variables { int thread_stack_size; time_t up_time; - gid_t run_by_gid; - uid_t run_by_uid; - char run_by_group[32]; - char run_by_user[32]; + struct { + bool inited; + gid_t gid; + uid_t uid; + char group[32]; + char user[32]; + } run_by; SFLogConfig error_log; SFConnectionStat connection_stat; @@ -66,6 +69,7 @@ typedef struct sf_context_ini_config { int default_inner_port; int default_outer_port; int default_work_threads; + FCCommunicationType comm_type; const char *max_pkg_size_item_name; } SFContextIniConfig; @@ -84,9 +88,12 @@ extern SFContext g_sf_context; #define SF_G_NETWORK_TIMEOUT g_sf_global_vars.network_timeout #define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size +#define SF_G_UP_TIME g_sf_global_vars.up_time -#define SF_G_OUTER_PORT g_sf_context.outer_port -#define SF_G_INNER_PORT g_sf_context.inner_port +#define SF_G_SOCK_HANDLER (g_sf_context.handlers + \ + SF_SOCKET_NETWORK_HANDLER_INDEX) +#define SF_G_OUTER_PORT SF_G_SOCK_HANDLER->outer.port +#define SF_G_INNER_PORT SF_G_SOCK_HANDLER->inner.port #define SF_G_OUTER_BIND_ADDR g_sf_context.outer_bind_addr #define SF_G_INNER_BIND_ADDR g_sf_context.inner_bind_addr @@ -110,19 +117,20 @@ extern SFContext g_sf_context; #define SF_CHOWN_RETURN_ON_ERROR(path, current_uid, current_gid) \ do { \ - if (!(g_sf_global_vars.run_by_gid == current_gid && \ - g_sf_global_vars.run_by_uid == current_uid)) \ - { \ - if (chown(path, g_sf_global_vars.run_by_uid, \ - g_sf_global_vars.run_by_gid) != 0) \ + if (g_sf_global_vars.run_by.inited && !(g_sf_global_vars. \ + run_by.gid == current_gid && g_sf_global_vars. \ + run_by.uid == current_uid)) \ { \ - logError("file: "__FILE__", line: %d, " \ - "chown \"%s\" fail, " \ - "errno: %d, error info: %s", \ - __LINE__, path, errno, STRERROR(errno)); \ - return errno != 0 ? errno : EPERM; \ + if (chown(path, g_sf_global_vars.run_by.uid, \ + g_sf_global_vars.run_by.gid) != 0) \ + { \ + logError("file: "__FILE__", line: %d, " \ + "chown \"%s\" fail, " \ + "errno: %d, error info: %s", \ + __LINE__, path, errno, STRERROR(errno)); \ + return errno != 0 ? errno : EPERM; \ + } \ } \ - } \ } while (0) #define SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(path) \ @@ -131,74 +139,82 @@ extern SFContext g_sf_context; #define SF_FCHOWN_RETURN_ON_ERROR(fd, path, current_uid, current_gid) \ do { \ - if (!(g_sf_global_vars.run_by_gid == current_gid && \ - g_sf_global_vars.run_by_uid == current_uid)) \ - { \ - if (fchown(fd, g_sf_global_vars.run_by_uid, \ - g_sf_global_vars.run_by_gid) != 0) \ + if (g_sf_global_vars.run_by.inited && !(g_sf_global_vars. \ + run_by.gid == current_gid && g_sf_global_vars. \ + run_by.uid == current_uid)) \ { \ - logError("file: "__FILE__", line: %d, " \ - "fchown \"%s\" fail, " \ - "errno: %d, error info: %s", \ - __LINE__, path, errno, STRERROR(errno)); \ - return errno != 0 ? errno : EPERM; \ + if (fchown(fd, g_sf_global_vars.run_by.uid, \ + g_sf_global_vars.run_by.gid) != 0) \ + { \ + logError("file: "__FILE__", line: %d, " \ + "fchown \"%s\" fail, " \ + "errno: %d, error info: %s", \ + __LINE__, path, errno, STRERROR(errno)); \ + return errno != 0 ? errno : EPERM; \ + } \ } \ - } \ } while (0) #define SF_FCHOWN_TO_RUNBY_RETURN_ON_ERROR(fd, path) \ SF_FCHOWN_RETURN_ON_ERROR(fd, path, geteuid(), getegid()) -#define SF_SET_CONTEXT_INI_CONFIG_EX(config, filename, pIniContext, \ - section_name, def_inner_port, def_outer_port, def_work_threads, \ - max_pkg_size_item_nm) \ +#define SF_SET_CONTEXT_INI_CONFIG_EX(config, the_comm_type, filename, \ + pIniContext, section_name, def_inner_port, def_outer_port, \ + def_work_threads, max_pkg_size_item_nm) \ do { \ FAST_INI_SET_FULL_CTX_EX(config.ini_ctx, filename, \ section_name, pIniContext); \ + config.comm_type = the_comm_type; \ config.default_inner_port = def_inner_port; \ config.default_outer_port = def_outer_port; \ config.default_work_threads = def_work_threads; \ config.max_pkg_size_item_name = max_pkg_size_item_nm; \ } while (0) -#define SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, \ - section_name, def_inner_port, def_outer_port, def_work_threads) \ - SF_SET_CONTEXT_INI_CONFIG_EX(config, filename, pIniContext, \ - section_name, def_inner_port, def_outer_port, def_work_threads, \ - "max_pkg_size") +#define SF_SET_CONTEXT_INI_CONFIG(config, the_comm_type, \ + filename, pIniContext, section_name, def_inner_port, \ + def_outer_port, def_work_threads) \ + SF_SET_CONTEXT_INI_CONFIG_EX(config, the_comm_type, filename, \ + pIniContext, section_name, def_inner_port, def_outer_port, \ + def_work_threads, "max_pkg_size") -int sf_load_global_config_ex(const char *server_name, +int sf_load_global_config_ex(const char *log_filename_prefix, IniFullContext *ini_ctx, const bool load_network_params, - const char *max_pkg_size_item_nm, const int task_buffer_extra_size, - const bool need_set_run_by); + const char *max_pkg_size_item_nm, const int fixed_buff_size, + const int task_buffer_extra_size, const bool need_set_run_by); -static inline int sf_load_global_config(const char *server_name, +static inline int sf_load_global_config(const char *log_filename_prefix, IniFullContext *ini_ctx) { const bool load_network_params = true; const char *max_pkg_size_item_nm = "max_pkg_size"; + const int fixed_buff_size = 0; const int task_buffer_extra_size = 0; const bool need_set_run_by = true; - return sf_load_global_config_ex(server_name, ini_ctx, load_network_params, - max_pkg_size_item_nm, task_buffer_extra_size, need_set_run_by); + return sf_load_global_config_ex(log_filename_prefix, ini_ctx, + load_network_params, max_pkg_size_item_nm, fixed_buff_size, + task_buffer_extra_size, need_set_run_by); } -int sf_load_config_ex(const char *server_name, SFContextIniConfig *config, +int sf_load_config_ex(const char *log_filename_prefix, + SFContextIniConfig *config, const int fixed_buff_size, const int task_buffer_extra_size, const bool need_set_run_by); -static inline int sf_load_config(const char *server_name, +static inline int sf_load_config(const char *log_filename_prefix, + const FCCommunicationType comm_type, const char *filename, IniContext *pIniContext, const char *section_name, const int default_inner_port, - const int default_outer_port, const int task_buffer_extra_size) + const int default_outer_port, const int fixed_buff_size, + const int task_buffer_extra_size) { const bool need_set_run_by = true; SFContextIniConfig config; - SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, + SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext, section_name, default_inner_port, default_outer_port, DEFAULT_WORK_THREADS); - return sf_load_config_ex(server_name, &config, + return sf_load_config_ex(log_filename_prefix, &config, fixed_buff_size, task_buffer_extra_size, need_set_run_by); } @@ -206,18 +222,22 @@ int sf_load_context_from_config_ex(SFContext *sf_context, SFContextIniConfig *config); static inline int sf_load_context_from_config(SFContext *sf_context, + const FCCommunicationType comm_type, const char *filename, IniContext *pIniContext, const char *section_name, const int default_inner_port, const int default_outer_port) { SFContextIniConfig config; - SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, + SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext, section_name, default_inner_port, default_outer_port, DEFAULT_WORK_THREADS); return sf_load_context_from_config_ex(sf_context, &config); } +int sf_alloc_rdma_pd(SFContext *sf_context, + FCAddressPtrArray *address_array); + int sf_load_log_config(IniFullContext *ini_ctx, LogContext *log_ctx, SFLogConfig *log_cfg); diff --git a/src/sf_nio.c b/src/sf_nio.c index c143c94..c4d7350 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -38,7 +38,6 @@ #include "fastcommon/fast_task_queue.h" #include "fastcommon/ioevent_loop.h" #include "fastcommon/fc_atomic.h" -#include "sf_global.h" #include "sf_service.h" #include "sf_nio.h" @@ -46,18 +45,18 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback) { sf_context->header_size = header_size; - sf_context->set_body_length = set_body_length_func; - sf_context->alloc_recv_buffer = alloc_recv_buffer_func; - sf_context->send_done_callback = send_done_callback; - sf_context->deal_task = deal_func; - sf_context->task_cleanup_func = cleanup_func; - sf_context->timeout_callback = timeout_callback; - sf_context->release_buffer_callback = release_buffer_callback; + sf_context->callbacks.set_body_length = set_body_length_func; + sf_context->callbacks.alloc_recv_buffer = alloc_recv_buffer_func; + sf_context->callbacks.send_done = send_done_callback; + sf_context->callbacks.deal_task = deal_func; + sf_context->callbacks.task_cleanup = cleanup_func; + sf_context->callbacks.task_timeout = timeout_callback; + sf_context->callbacks.release_buffer = release_buffer_callback; } void sf_task_detach_thread(struct fast_task_info *task) @@ -85,8 +84,8 @@ void sf_task_switch_thread(struct fast_task_info *task, static inline void release_iovec_buffer(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { - if (SF_CTX->release_buffer_callback != NULL) { - SF_CTX->release_buffer_callback(task); + if (SF_CTX->callbacks.release_buffer != NULL) { + SF_CTX->callbacks.release_buffer(task); } task->iovec_array.iovs = NULL; task->iovec_array.count = 0; @@ -111,10 +110,8 @@ void sf_task_finish_clean_up(struct fast_task_info *task) } release_iovec_buffer(task); - sf_task_detach_thread(task); - close(task->event.fd); - task->event.fd = -1; + task->handler->close_connection(task); __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); sf_release_task(task); @@ -130,25 +127,22 @@ static inline int set_write_event(struct fast_task_info *task) task->event.callback = (IOEventCallback)sf_client_sock_write; if (ioevent_modify(&task->thread_data->ev_puller, - task->event.fd, IOEVENT_WRITE, task) != 0) + task->event.fd, IOEVENT_WRITE, task) != 0) { result = errno != 0 ? errno : ENOENT; - ioevent_add_to_deleted_list(task); - logError("file: "__FILE__", line: %d, " - "ioevent_modify fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); return result; } return 0; } -int sf_set_read_event(struct fast_task_info *task) +static inline int set_read_event(struct fast_task_info *task) { int result; - task->nio_stages.current = SF_NIO_STAGE_RECV; if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } @@ -170,6 +164,14 @@ int sf_set_read_event(struct fast_task_info *task) return 0; } +int sf_set_read_event(struct fast_task_info *task) +{ + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + task->nio_stages.current = SF_NIO_STAGE_RECV; + return set_read_event(task); +} + static inline int sf_ioevent_add(struct fast_task_info *task, IOEventCallback callback, const int timeout) { @@ -180,87 +182,128 @@ static inline int sf_ioevent_add(struct fast_task_info *task, return result > 0 ? -1 * result : result; } -static inline int sf_nio_init(struct fast_task_info *task) +static inline void inc_connection_current_count() { int current_connections; - current_connections = __sync_add_and_fetch( - &g_sf_global_vars.connection_stat.current_count, 1); + current_connections = FC_ATOMIC_INC(g_sf_global_vars. + connection_stat.current_count); if (current_connections > g_sf_global_vars.connection_stat.max_count) { g_sf_global_vars.connection_stat.max_count = current_connections; } +} +static inline int sf_nio_init(struct fast_task_info *task) +{ + inc_connection_current_count(); return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, task->network_timeout); } -static int sf_client_sock_connect(int sock, short event, void *arg) +int sf_socket_async_connect_check(struct fast_task_info *task) { int result; socklen_t len; + + len = sizeof(result); + if (getsockopt(task->event.fd, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { + result = errno != 0 ? errno : EACCES; + } + return result; +} + +static int sf_client_connect_done(int sock, short event, void *arg) +{ + int result; struct fast_task_info *task; task = (struct fast_task_info *)arg; + if (task->canceled) { + return ENOTCONN; + } + if (event & IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { - len = sizeof(result); - if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { - result = errno != 0 ? errno : EACCES; + result = task->handler->async_connect_check(task); + if (result == EINPROGRESS) { + return 0; } } + if (SF_CTX->callbacks.connect_done != NULL) { + SF_CTX->callbacks.connect_done(task, result); + } + if (result != 0) { - logError("file: "__FILE__", line: %d, " - "connect to server %s:%u fail, errno: %d, " - "error info: %s", __LINE__, task->server_ip, - task->port, result, STRERROR(result)); + if (SF_CTX->connect_need_log) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%u fail, errno: %d, " + "error info: %s", __LINE__, task->server_ip, + task->port, result, STRERROR(result)); + } ioevent_add_to_deleted_list(task); return -1; } - logInfo("file: "__FILE__", line: %d, " - "connect to server %s:%u successfully", - __LINE__, task->server_ip, task->port); - return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); + if (SF_CTX->connect_need_log) { + logInfo("file: "__FILE__", line: %d, " + "connect to server %s:%u successfully", + __LINE__, task->server_ip, task->port); + } + return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE); } -static int sf_connect_server(struct fast_task_info *task) +int sf_socket_async_connect_server(struct fast_task_info *task) { int result; - if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, O_NONBLOCK, NULL, &result)) < 0) { return result > 0 ? -1 * result : result; } - result = asyncconnectserverbyip(task->event.fd, + return asyncconnectserverbyip(task->event.fd, task->server_ip, task->port); - if (result == 0) { - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, task->network_timeout)) != 0) - { - return result; - } +} - logInfo("file: "__FILE__", line: %d, " - "connect to server %s:%u successfully", - __LINE__, task->server_ip, task->port); - return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); - } else if (result == EINPROGRESS) { +static int sf_async_connect_server(struct fast_task_info *task) +{ + int result; + + if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) { result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) - sf_client_sock_connect, task->connect_timeout); + sf_client_connect_done, task->connect_timeout); return result > 0 ? -1 * result : result; } else { - close(task->event.fd); - task->event.fd = -1; - logError("file: "__FILE__", line: %d, " - "connect to server %s:%u fail, errno: %d, " - "error info: %s", __LINE__, task->server_ip, - task->port, result, STRERROR(result)); - return result > 0 ? -1 * result : result; + if (SF_CTX->callbacks.connect_done != NULL) { + SF_CTX->callbacks.connect_done(task, result); + } + + if (result == 0) { + if ((result=sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_read, task->network_timeout)) != 0) + { + return result; + } + + if (SF_CTX->connect_need_log) { + logInfo("file: "__FILE__", line: %d, " + "connect to server %s:%u successfully", + __LINE__, task->server_ip, task->port); + } + return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE); + } else { + task->handler->close_connection(task); + if (SF_CTX->connect_need_log) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%u fail, errno: %d, " + "error info: %s", __LINE__, task->server_ip, + task->port, result, STRERROR(result)); + } + return result > 0 ? -1 * result : result; + } } } @@ -269,15 +312,17 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) int result; switch (stage) { - case SF_NIO_STAGE_INIT: + case SF_NIO_STAGE_INIT: //for server init task->nio_stages.current = SF_NIO_STAGE_RECV; result = sf_nio_init(task); break; - case SF_NIO_STAGE_CONNECT: - result = sf_connect_server(task); + case SF_NIO_STAGE_CONNECT: //for client init + inc_connection_current_count(); + result = sf_async_connect_server(task); break; case SF_NIO_STAGE_RECV: - if ((result=sf_set_read_event(task)) == 0) { + task->nio_stages.current = SF_NIO_STAGE_RECV; + if ((result=set_read_event(task)) == 0) { if (sf_client_sock_read(task->event.fd, IOEVENT_READ, task) < 0) { @@ -289,14 +334,14 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = sf_send_add_event(task); break; case SF_NIO_STAGE_CONTINUE: //continue deal - result = SF_CTX->deal_task(task, SF_NIO_STAGE_CONTINUE); + result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread if ((result=sf_ioevent_add(task, (IOEventCallback) sf_client_sock_read, task->network_timeout)) == 0) { - result = SF_CTX->deal_task(task, SF_NIO_STAGE_SEND); + result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); } break; case SF_NIO_STAGE_CLOSE: @@ -451,8 +496,8 @@ void sf_recv_notify_read(int sock, short event, void *arg) int sf_send_add_event(struct fast_task_info *task) { - task->offset = 0; - if (task->length > 0) { + task->send.ptr->offset = 0; + if (task->send.ptr->length > 0) { /* direct send */ task->nio_stages.current = SF_NIO_STAGE_SEND; if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { @@ -484,308 +529,77 @@ static inline int check_task(struct fast_task_info *task, return 0; } - if (tcp_socket_connected(task->event.fd)) { - return EAGAIN; + if (task->handler->comm_type == fc_comm_type_sock) { + if (tcp_socket_connected(task->event.fd)) { + return EAGAIN; + } else { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, connection is closed", + __LINE__, task->client_ip); + + ioevent_add_to_deleted_list(task); + return -1; + } } else { - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, connection is closed", - __LINE__, task->client_ip); - - ioevent_add_to_deleted_list(task); - return -1; + return EAGAIN; } } -int sf_client_sock_read(int sock, short event, void *arg) +ssize_t sf_socket_send_data(struct fast_task_info *task, + SFCommAction *action, bool *send_done) { - int result; int bytes; - int recv_bytes; - int total_read; - bool new_alloc; - struct fast_task_info *task; - task = (struct fast_task_info *)arg; - if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { - return result >= 0 ? 0 : -1; + if (task->iovec_array.iovs != NULL) { + bytes = writev(task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX)); + } else { + bytes = write(task->event.fd, task->send.ptr->data + + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); } - - if (event & IOEVENT_TIMEOUT) { - if (task->offset == 0 && task->req_count > 0) { - if (SF_CTX->timeout_callback != NULL) { - if (SF_CTX->timeout_callback(task) != 0) { - ioevent_add_to_deleted_list(task); - return -1; - } + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + if (set_write_event(task) != 0) { + return -1; } - - task->event.timer.expires = g_current_time + - task->network_timeout; - fast_timer_add(&task->thread_data->timer, - &task->event.timer); - } else { - if (task->length > 0) { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, recv timeout, " - "recv offset: %d, expect length: %d", - __LINE__, task->client_ip, - task->offset, task->length); - } else { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, req_count: %"PRId64", recv timeout", - __LINE__, task->client_ip, task->req_count); - } - - ioevent_add_to_deleted_list(task); - return -1; - } - - return 0; - } - - total_read = 0; - while (1) { - fast_timer_modify(&task->thread_data->timer, - &task->event.timer, g_current_time + - task->network_timeout); - if (task->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - task->offset; - bytes = read(sock, task->data + task->offset, recv_bytes); - } else { - recv_bytes = task->length - task->offset; - if (task->recv_body == NULL) { - bytes = read(sock, task->data + task->offset, recv_bytes); - } else { - bytes = read(sock, task->recv_body + (task->offset - - SF_CTX->header_size), recv_bytes); - } - } - - if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - break; - } else if (errno == EINTR) { //should retry - logDebug("file: "__FILE__", line: %d, " + *action = sf_comm_action_break; + return 0; + } else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); - continue; - } else { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, recv fail, " - "errno: %d, error info: %s", - __LINE__, task->client_ip, - errno, strerror(errno)); - - ioevent_add_to_deleted_list(task); - return -1; - } - } else if (bytes == 0) { - if (task->offset > 0) { - if (task->length > 0) { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, connection disconnected, " - "expect pkg length: %d, recv pkg length: %d", - __LINE__, task->client_ip, task->length, - task->offset); - } else { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, connection " - "disconnected, recv pkg length: %d", - __LINE__, task->client_ip, - task->offset); - } - } else { - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, recv fail, " - "connection disconnected", __LINE__, - task->client_ip, sock); - } - - ioevent_add_to_deleted_list(task); - return -1; - } - - TCP_SET_QUICK_ACK(sock); - total_read += bytes; - task->offset += bytes; - if (task->length == 0) { //pkg header - if (task->offset < SF_CTX->header_size) { - continue; - } - - if (SF_CTX->set_body_length(task) != 0) { - ioevent_add_to_deleted_list(task); - return -1; - } - if (task->length < 0) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d < 0", - __LINE__, task->client_ip, - task->length); - - ioevent_add_to_deleted_list(task); - return -1; - } - - task->length += SF_CTX->header_size; - if (task->length > g_sf_global_vars.max_pkg_size) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d > " - "max pkg size: %d", __LINE__, - task->client_ip, task->length, - g_sf_global_vars.max_pkg_size); - - ioevent_add_to_deleted_list(task); - return -1; - } - - if (SF_CTX->alloc_recv_buffer != NULL) { - task->recv_body = SF_CTX->alloc_recv_buffer(task, - task->length - SF_CTX->header_size, &new_alloc); - if (new_alloc && task->recv_body == NULL) { - ioevent_add_to_deleted_list(task); - return -1; - } - } else { - new_alloc = false; - } - - if (!new_alloc) { - if (task->length > task->size) { - int old_size; - - if (!SF_CTX->realloc_task_buffer) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d exceeds " - "task size: %d, but realloc buffer disabled", - __LINE__, task->client_ip, task->size, - task->length); - - ioevent_add_to_deleted_list(task); - return -1; - } - - old_size = task->size; - if (free_queue_realloc_buffer(task, task->length) != 0) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, realloc buffer size " - "from %d to %d fail", __LINE__, - task->client_ip, task->size, task->length); - - ioevent_add_to_deleted_list(task); - return -1; - } - - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, task length: %d, realloc buffer " - "size from %d to %d", __LINE__, task->client_ip, - task->length, old_size, task->size); - } - } - } - - if (task->offset >= task->length) { //recv done - task->req_count++; - task->nio_stages.current = SF_NIO_STAGE_SEND; - if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error - ioevent_add_to_deleted_list(task); - return -1; - } - break; - } - } - - return total_read; -} - -int sf_client_sock_write(int sock, short event, void *arg) -{ - int result; - int bytes; - int total_write; - int length; - struct fast_task_info *task; - - task = (struct fast_task_info *)arg; - if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { - return result >= 0 ? 0 : -1; - } - - if (event & IOEVENT_TIMEOUT) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, send timeout. total length: %d, offset: %d, " - "remain: %d", __LINE__, task->client_ip, task->length, - task->offset, task->length - task->offset); - - ioevent_add_to_deleted_list(task); - return -1; - } - - total_write = 0; - while (1) { - fast_timer_modify(&task->thread_data->timer, - &task->event.timer, g_current_time + - task->network_timeout); - - if (task->iovec_array.iovs != NULL) { - bytes = writev(sock, task->iovec_array.iovs, - FC_MIN(task->iovec_array.count, IOV_MAX)); + *action = sf_comm_action_continue; + return 0; } else { - bytes = write(sock, task->data + task->offset, - task->length - task->offset); - } - if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - if (set_write_event(task) != 0) { - return -1; - } - break; - } else if (errno == EINTR) { //should retry - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, ignore interupt signal", - __LINE__, task->client_ip); - continue; - } else { - logWarning("file: "__FILE__", line: %d, " + logWarning("file: "__FILE__", line: %d, " "client ip: %s, send fail, task offset: %d, length: %d, " "errno: %d, error info: %s", __LINE__, task->client_ip, - task->offset, task->length, errno, strerror(errno)); - - ioevent_add_to_deleted_list(task); - return -1; - } - } else if (bytes == 0) { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, task length: %d, offset: %d, " - "send failed, connection disconnected", __LINE__, - task->client_ip, sock, task->length, task->offset); - - ioevent_add_to_deleted_list(task); + task->send.ptr->offset, task->send.ptr->length, + errno, strerror(errno)); return -1; } + } else if (bytes == 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, sock: %d, task length: %d, offset: %d, " + "send failed, connection disconnected", __LINE__, + task->client_ip, task->event.fd, task->send.ptr->length, + task->send.ptr->offset); + return -1; + } - total_write += bytes; - task->offset += bytes; - if (task->offset >= task->length) { - release_iovec_buffer(task); - - length = task->length; - task->offset = 0; - task->length = 0; - if (sf_set_read_event(task) != 0) { - return -1; - } - - if (SF_CTX->send_done_callback != NULL) { - if (SF_CTX->send_done_callback(task, length) != 0) { - ioevent_add_to_deleted_list(task); - return -1; - } - } - - break; + task->send.ptr->offset += bytes; + if (task->send.ptr->offset >= task->send.ptr->length) { + if (task->send.ptr != task->recv.ptr) { //double buffers + task->send.ptr->offset = 0; + task->send.ptr->length = 0; } + *action = sf_comm_action_finish; + *send_done = true; + } else { + *action = sf_comm_action_continue; + *send_done = false; /* set next writev iovec array */ if (task->iovec_array.iovs != NULL) { @@ -817,5 +631,427 @@ int sf_client_sock_write(int sock, short event, void *arg) } } + return bytes; +} + +ssize_t sf_socket_recv_data(struct fast_task_info *task, + const bool call_post_recv, SFCommAction *action) +{ + int bytes; + int recv_bytes; + bool new_alloc; + + if (task->recv.ptr->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->recv.ptr->offset; + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + recv_bytes = task->recv.ptr->length - task->recv.ptr->offset; + if (task->recv_body == NULL) { + bytes = read(task->event.fd, task->recv.ptr->data + + task->recv.ptr->offset, recv_bytes); + } else { + bytes = read(task->event.fd, task->recv_body + + (task->recv.ptr->offset - SF_CTX-> + header_size), recv_bytes); + } + } + + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + *action = sf_comm_action_break; + return 0; + } else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, ignore interupt signal", + __LINE__, task->client_ip); + *action = sf_comm_action_continue; + return 0; + } else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, recv fail, " + "errno: %d, error info: %s", + __LINE__, task->client_ip, + errno, strerror(errno)); + return -1; + } + } else if (bytes == 0) { + if (task->recv.ptr->offset > 0) { + if (task->recv.ptr->length > 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, connection disconnected, " + "expect pkg length: %d, recv pkg length: %d", + __LINE__, task->client_ip, task->recv.ptr->length, + task->recv.ptr->offset); + } else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, connection " + "disconnected, recv pkg length: %d", + __LINE__, task->client_ip, + task->recv.ptr->offset); + } + } else { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, sock: %d, recv fail, " + "connection disconnected", __LINE__, + task->client_ip, task->event.fd); + } + + return -1; + } + + TCP_SET_QUICK_ACK(task->event.fd); + task->recv.ptr->offset += bytes; + if (task->recv.ptr->length == 0) { //pkg header + if (task->recv.ptr->offset < SF_CTX->header_size) { + *action = sf_comm_action_continue; + return bytes; + } + + if (sf_set_body_length(task) != 0) { + return -1; + } + + if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { + task->recv_body = SF_CTX->callbacks.alloc_recv_buffer(task, + task->recv.ptr->length - SF_CTX->header_size, &new_alloc); + if (new_alloc && task->recv_body == NULL) { + return -1; + } + } else { + new_alloc = false; + } + + if (!new_alloc) { + if (task->recv.ptr->length > task->recv.ptr->size) { + int old_size; + + if (!SF_CTX->realloc_task_buffer) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d exceeds " + "task size: %d, but realloc buffer disabled", + __LINE__, task->client_ip, task->recv.ptr->size, + task->recv.ptr->length); + return -1; + } + + old_size = task->recv.ptr->size; + if (free_queue_realloc_recv_buffer(task, task-> + recv.ptr->length) != 0) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, realloc buffer size from %d " + "to %d fail", __LINE__, task->client_ip, + task->recv.ptr->size, task->recv.ptr->length); + return -1; + } + + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, task length: %d, realloc buffer " + "size from %d to %d", __LINE__, task->client_ip, + task->recv.ptr->length, old_size, task->recv.ptr->size); + } + } + } + + if (task->recv.ptr->offset >= task->recv.ptr->length) { //recv done + *action = sf_comm_action_finish; + } else { + *action = sf_comm_action_continue; + } + + return bytes; +} + +static int calc_iops_and_trigger_polling(struct fast_task_info *task) +{ + int time_distance; + int result = 0; + + time_distance = g_current_time - task->polling.last_calc_time; + if (time_distance > 0) { + if ((task->req_count - task->polling.last_req_count) / + time_distance >= SF_CTX->smart_polling.switch_on_iops) + { + task->polling.continuous_count++; + if (task->polling.continuous_count >= SF_CTX-> + smart_polling.switch_on_count) + { + task->polling.continuous_count = 0; + task->polling.in_queue = true; + result = ioevent_detach(&task->thread_data-> + ev_puller, task->event.fd); + fast_timer_remove(&task->thread_data->timer, + &task->event.timer); + + if (fc_list_empty(&task->thread_data->polling_queue)) { + ioevent_set_timeout(&task->thread_data-> + ev_puller, 0); + } + fc_list_add_tail(&task->polling.dlink, + &task->thread_data->polling_queue); + + logInfo("file: "__FILE__", line: %d, client: %s:%u, " + "trigger polling iops: %"PRId64, __LINE__, + task->client_ip, task->port, (task->req_count - + task->polling.last_req_count) / time_distance); + } + } else { + if (task->polling.continuous_count > 0) { + task->polling.continuous_count = 0; + } + } + + task->polling.last_calc_time = g_current_time; + task->polling.last_req_count = task->req_count; + } + + return result; +} + +static int calc_iops_and_remove_polling(struct fast_task_info *task) +{ + int time_distance; + int result = 0; + + time_distance = g_current_time - task->polling.last_calc_time; + if (time_distance > 0) { + if ((task->req_count - task->polling.last_req_count) / + time_distance < SF_CTX->smart_polling.switch_on_iops) + { + task->polling.continuous_count++; + if (task->polling.continuous_count >= SF_CTX-> + smart_polling.switch_on_count) + { + task->polling.continuous_count = 0; + task->polling.in_queue = false; + fc_list_del_init(&task->polling.dlink); + if (fc_list_empty(&task->thread_data->polling_queue)) { + ioevent_set_timeout(&task->thread_data->ev_puller, + task->thread_data->timeout_ms); + } + result = sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_read, task->network_timeout); + + logInfo("file: "__FILE__", line: %d, client: %s:%u, " + "remove polling iops: %"PRId64, __LINE__, + task->client_ip, task->port, (task->req_count - + task->polling.last_req_count) / time_distance); + } + } else { + if (task->polling.continuous_count > 0) { + task->polling.continuous_count = 0; + } + } + + task->polling.last_calc_time = g_current_time; + task->polling.last_req_count = task->req_count; + } + + return result; +} + +int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) +{ + struct fast_task_info *task; + struct fast_task_info *tmp; + int bytes; + SFCommAction action; + + fc_list_for_each_entry_safe(task, tmp, &thread_data-> + polling_queue, polling.dlink) + { + if (task->canceled) { + continue; + } + if ((bytes=task->handler->recv_data(task, !task->handler-> + explicit_post_recv, &action)) < 0) + { + ioevent_add_to_deleted_list(task); + continue; + } + + if (action == sf_comm_action_finish) { + task->req_count++; + task->nio_stages.current = SF_NIO_STAGE_SEND; + if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { + /* fatal error */ + ioevent_add_to_deleted_list(task); + } else if (task->handler->explicit_post_recv) { + if (task->handler->post_recv(task) != 0) { + ioevent_add_to_deleted_list(task); + } + } + } else { + if (calc_iops_and_remove_polling(task) != 0) { + ioevent_add_to_deleted_list(task); + } + } + } + + return 0; +} + +int sf_client_sock_read(int sock, short event, void *arg) +{ + int result; + int bytes; + int total_read; + SFCommAction action; + struct fast_task_info *task; + + task = (struct fast_task_info *)arg; + if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { + return result >= 0 ? 0 : -1; + } + + if (event & IOEVENT_TIMEOUT) { + if (task->recv.ptr->offset == 0 && task->req_count > 0) { + if (SF_CTX->callbacks.task_timeout != NULL) { + if (SF_CTX->callbacks.task_timeout(task) != 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + } + + task->event.timer.expires = g_current_time + + task->network_timeout; + fast_timer_add(&task->thread_data->timer, + &task->event.timer); + } else { + if (task->recv.ptr->length > 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, recv timeout, recv " + "offset: %d, expect length: %d", __LINE__, + task->client_ip, task->recv.ptr->offset, + task->recv.ptr->length); + } else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, req_count: %"PRId64", recv timeout", + __LINE__, task->client_ip, task->req_count); + } + + ioevent_add_to_deleted_list(task); + return -1; + } + + return 0; + } + + total_read = 0; + action = sf_comm_action_continue; + while (1) { + fast_timer_modify(&task->thread_data->timer, + &task->event.timer, g_current_time + + task->network_timeout); + + if ((bytes=task->handler->recv_data(task, !task->handler-> + explicit_post_recv, &action)) < 0) + { + ioevent_add_to_deleted_list(task); + return -1; + } + + total_read += bytes; + if (action == sf_comm_action_finish) { + task->req_count++; + task->nio_stages.current = SF_NIO_STAGE_SEND; + if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + + if (task->handler->explicit_post_recv) { + if (task->handler->post_recv(task) != 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + } + + if (SF_CTX->smart_polling.enabled) { + if (calc_iops_and_trigger_polling(task) != 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + } + + break; + } else if (action == sf_comm_action_break) { + break; + } + } + + return total_read; +} + +int sf_client_sock_write(int sock, short event, void *arg) +{ + int result; + int bytes; + int total_write; + int length; + int next_stage; + SFCommAction action; + bool send_done; + struct fast_task_info *task; + + task = (struct fast_task_info *)arg; + if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { + return result >= 0 ? 0 : -1; + } + + if (event & IOEVENT_TIMEOUT) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, send timeout. total length: %d, offset: %d, " + "remain: %d", __LINE__, task->client_ip, task->send.ptr->length, + task->send.ptr->offset, task->send.ptr->length - + task->send.ptr->offset); + + ioevent_add_to_deleted_list(task); + return -1; + } + + total_write = 0; + length = task->send.ptr->length; + action = sf_comm_action_continue; + while (1) { + fast_timer_modify(&task->thread_data->timer, + &task->event.timer, g_current_time + + task->network_timeout); + + if ((bytes=task->handler->send_data(task, &action, &send_done)) < 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + + total_write += bytes; + if (action == sf_comm_action_finish) { + release_iovec_buffer(task); + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + if (set_read_event(task) != 0) { + return -1; + } + + if (SF_CTX->callbacks.send_done == NULL || !send_done) { + task->nio_stages.current = SF_NIO_STAGE_RECV; + } else { + if (SF_CTX->callbacks.send_done(task, + length, &next_stage) != 0) + { + return -1; + } + + if (task->nio_stages.current != next_stage) { + task->nio_stages.current = next_stage; + } + } + + break; + } else if (action == sf_comm_action_break) { + break; + } + } + return total_write; } diff --git a/src/sf_nio.h b/src/sf_nio.h index cbdefb4..1841429 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -25,8 +25,9 @@ #include "fastcommon/ioevent_loop.h" #include "sf_define.h" #include "sf_types.h" +#include "sf_global.h" -#define SF_CTX ((SFContext *)(task->ctx)) +#define SF_CTX (task->handler->ctx) #ifdef __cplusplus extern "C" { @@ -36,7 +37,7 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback); @@ -46,17 +47,28 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, set_body_length_func, alloc_recv_buffer_func, \ deal_func, cleanup_func, timeout_callback, NULL) -static inline void sf_set_deal_task_func_ex(SFContext *sf_context, - sf_deal_task_func deal_func) +static inline void sf_set_deal_task_callback_ex(SFContext *sf_context, + sf_deal_task_callback deal_func) { - sf_context->deal_task = deal_func; + sf_context->callbacks.deal_task = deal_func; } -#define sf_set_deal_task_func(deal_func) \ - sf_set_deal_task_func_ex(&g_sf_context, deal_func) +#define sf_set_deal_task_callback(deal_func) \ + sf_set_deal_task_callback_ex(&g_sf_context, deal_func) -static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context, - const bool enabled) + +static inline void sf_set_connect_done_callback_ex(SFContext *sf_context, + sf_connect_done_callback done_callback) +{ + sf_context->callbacks.connect_done = done_callback; +} + +#define sf_set_connect_done_callback(done_callback) \ + sf_set_connect_done_callback_ex(&g_sf_context, done_callback) + + +static inline void sf_set_remove_from_ready_list_ex( + SFContext *sf_context, const bool enabled) { sf_context->remove_from_ready_list = enabled; } @@ -64,17 +76,27 @@ static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context, #define sf_set_remove_from_ready_list(enabled) \ sf_set_remove_from_ready_list_ex(&g_sf_context, enabled); -static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex( +static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex( SFContext *sf_context) { - return sf_context->task_cleanup_func; + return sf_context->callbacks.task_cleanup; } -#define sf_get_task_cleanup_func() \ - sf_get_task_cleanup_func_ex(&g_sf_context) +#define sf_get_task_cleanup_callback() \ + sf_get_task_cleanup_callback_ex(&g_sf_context) -#define sf_nio_task_is_idle(task) \ - (task->offset == 0 && task->length == 0) +#define sf_nio_task_send_done(task) \ + (task->send.ptr->offset == 0 && task->send.ptr->length == 0) + +static inline void sf_nio_reset_task_length(struct fast_task_info *task) +{ + task->send.ptr->length = 0; + task->send.ptr->offset = 0; + if (task->recv.ptr != task->send.ptr) { + task->recv.ptr->length = 0; + task->recv.ptr->offset = 0; + } +} void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); @@ -92,6 +114,42 @@ void sf_task_switch_thread(struct fast_task_info *task, void sf_task_detach_thread(struct fast_task_info *task); +static inline int sf_set_body_length(struct fast_task_info *task) +{ + if (SF_CTX->callbacks.set_body_length(task) != 0) { + return -1; + } + if (task->recv.ptr->length < 0) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d < 0", + __LINE__, task->client_ip, + task->recv.ptr->length); + return -1; + } + + task->recv.ptr->length += SF_CTX->header_size; + if (task->recv.ptr->length > g_sf_global_vars.max_pkg_size) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d > " + "max pkg size: %d", __LINE__, + task->client_ip, task->recv.ptr->length, + g_sf_global_vars.max_pkg_size); + return -1; + } + + return 0; +} + +int sf_socket_async_connect_server(struct fast_task_info *task); +int sf_socket_async_connect_check(struct fast_task_info *task); + +ssize_t sf_socket_send_data(struct fast_task_info *task, + SFCommAction *action, bool *send_done); +ssize_t sf_socket_recv_data(struct fast_task_info *task, + const bool call_post_recv, SFCommAction *action); + +int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data); + static inline int sf_nio_forward_request(struct fast_task_info *task, const int new_thread_index) { diff --git a/src/sf_proto.c b/src/sf_proto.c index ffe049c..0350435 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -30,18 +30,20 @@ int sf_proto_set_body_length(struct fast_task_info *task) { SFCommonProtoHeader *header; - header = (SFCommonProtoHeader *)task->data; + header = (SFCommonProtoHeader *)task->recv.ptr->data; if (!SF_PROTO_CHECK_MAGIC(header->magic)) { logError("file: "__FILE__", line: %d, " - "peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT - " is invalid, expect: "SF_PROTO_MAGIC_FORMAT, - __LINE__, task->client_ip, task->port, + "%s peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT" is invalid, " + "expect: "SF_PROTO_MAGIC_FORMAT", cmd: %d, body length: %d", + __LINE__, (task->handler != NULL ? task->handler->ctx->name : + ""), task->client_ip, task->port, SF_PROTO_MAGIC_PARAMS(header->magic), - SF_PROTO_MAGIC_EXPECT_PARAMS); + SF_PROTO_MAGIC_EXPECT_PARAMS, header->cmd, + buff2int(header->body_len)); return EINVAL; } - task->length = buff2int(header->body_len); //set body length + task->recv.ptr->length = buff2int(header->body_len); //set body length return 0; } @@ -70,8 +72,14 @@ int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response, response->error.length = response->header.body_len; } - if ((result=tcprecvdata_nb_ex(conn->sock, response->error.message, - response->error.length, network_timeout, &recv_bytes)) == 0) + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(response->error.message, G_RDMA_CONNECTION_CALLBACKS. + get_recv_buffer(conn)->buff + sizeof(SFCommonProtoHeader), + response->error.length); + response->error.message[response->error.length] = '\0'; + } else if ((result=tcprecvdata_nb_ex(conn->sock, response-> + error.message, response->error.length, + network_timeout, &recv_bytes)) == 0) { response->error.message[response->error.length] = '\0'; } else { @@ -96,30 +104,48 @@ static inline int sf_recv_response_header(ConnectionInfo *conn, SFResponseInfo *response, const int network_timeout) { int result; + BufferInfo *buffer; SFCommonProtoHeader header_proto; - if ((result=tcprecvdata_nb(conn->sock, &header_proto, - sizeof(SFCommonProtoHeader), network_timeout)) != 0) - { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "recv data fail, errno: %d, error info: %s", - result, STRERROR(result)); - return result; - } + if (conn->comm_type == fc_comm_type_rdma) { + buffer = G_RDMA_CONNECTION_CALLBACKS.get_recv_buffer(conn); + if (buffer->length < sizeof(SFCommonProtoHeader)) { + response->error.length = sprintf(response->error.message, + "recv pkg length: %d < header size: %d", + buffer->length, (int)sizeof(SFCommonProtoHeader)); + return EINVAL; + } - if (!SF_PROTO_CHECK_MAGIC(header_proto.magic)) { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "magic "SF_PROTO_MAGIC_FORMAT" is invalid, " - "expect: "SF_PROTO_MAGIC_FORMAT, - SF_PROTO_MAGIC_PARAMS(header_proto.magic), - SF_PROTO_MAGIC_EXPECT_PARAMS); - return EINVAL; - } + if ((result=sf_proto_parse_header((SFCommonProtoHeader *) + buffer->buff, response)) != 0) + { + return result; + } - sf_proto_extract_header(&header_proto, &response->header); - return 0; + if (buffer->length != (sizeof(SFCommonProtoHeader) + + response->header.body_len)) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv package length: %d != calculate: %d", + buffer->length, (int)(sizeof(SFCommonProtoHeader) + + response->header.body_len)); + return EINVAL; + } + + return 0; + } else { + if ((result=tcprecvdata_nb(conn->sock, &header_proto, + sizeof(SFCommonProtoHeader), network_timeout)) != 0) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv data fail, errno: %d, error info: %s", + result, STRERROR(result)); + return result; + } + return sf_proto_parse_header(&header_proto, response); + } } int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, @@ -127,11 +153,9 @@ int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, { int result; - if ((result=tcpsenddata_nb(conn->sock, data, len, network_timeout)) != 0) { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "send data fail, errno: %d, error info: %s", - result, STRERROR(result)); + if ((result=sf_proto_send_buf1(conn, data, len, + response, network_timeout)) != 0) + { return result; } @@ -194,7 +218,10 @@ int sf_send_and_recv_response_ex(ConnectionInfo *conn, char *send_data, return 0; } - if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_recv_buffer(conn)-> + buff + sizeof(SFCommonProtoHeader), response->header.body_len); + } else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> header.body_len, network_timeout, &recv_bytes)) != 0) { response->error.length = snprintf(response->error.message, @@ -234,7 +261,11 @@ int sf_send_and_recv_response_ex1(ConnectionInfo *conn, char *send_data, return EOVERFLOW; } - if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_recv_buffer(conn)-> + buff + sizeof(SFCommonProtoHeader), response->header.body_len); + *body_len = response->header.body_len; + } else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> header.body_len, network_timeout, body_len)) != 0) { response->error.length = snprintf(response->error.message, @@ -275,7 +306,10 @@ int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response, return 0; } - if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, expect_body_len, + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_recv_buffer(conn)-> + buff + sizeof(SFCommonProtoHeader), response->header.body_len); + } else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, expect_body_len, network_timeout, &recv_bytes)) != 0) { response->error.length = snprintf(response->error.message, @@ -343,7 +377,10 @@ int sf_recv_vary_response(ConnectionInfo *conn, SFResponseInfo *response, buffer->alloc_size = alloc_size; } - if ((result=tcprecvdata_nb_ex(conn->sock, buffer->buff, response-> + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(buffer->buff, G_RDMA_CONNECTION_CALLBACKS.get_recv_buffer(conn)-> + buff + sizeof(SFCommonProtoHeader), response->header.body_len); + } else if ((result=tcprecvdata_nb_ex(conn->sock, buffer->buff, response-> header.body_len, network_timeout, &recv_bytes)) != 0) { response->error.length = snprintf(response->error.message, @@ -364,13 +401,9 @@ int sf_send_and_recv_vary_response(ConnectionInfo *conn, { int result; - if ((result=tcpsenddata_nb(conn->sock, send_data, - send_len, network_timeout)) != 0) + if ((result=sf_proto_send_buf1(conn, send_data, send_len, + response, network_timeout)) != 0) { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "send data fail, errno: %d, error info: %s", - result, STRERROR(result)); return result; } @@ -577,6 +610,7 @@ int sf_proto_get_leader(ConnectionInfo *conn, const char *service_name, memcpy(leader->conn.ip_addr, server_resp.ip_addr, IP_ADDRESS_SIZE); *(leader->conn.ip_addr + IP_ADDRESS_SIZE - 1) = '\0'; leader->conn.port = buff2short(server_resp.port); + leader->conn.comm_type = conn->comm_type; } return result; @@ -589,7 +623,7 @@ void sf_proto_set_handler_context(const SFHandlerContext *ctx) } int sf_proto_deal_task_done(struct fast_task_info *task, - SFCommonTaskContext *ctx) + const char *service_name, SFCommonTaskContext *ctx) { SFCommonProtoHeader *proto_header; int status; @@ -600,10 +634,10 @@ int sf_proto_deal_task_done(struct fast_task_info *task, if (ctx->log_level != LOG_NOTHING && ctx->response.error.length > 0) { log_it_ex(&g_log_context, ctx->log_level, - "file: "__FILE__", line: %d, " + "file: "__FILE__", line: %d, %s " "peer %s:%u, cmd: %d (%s), req body length: %d, " - "resp status: %d, %s", __LINE__, task->client_ip, - task->port, ctx->request.header.cmd, + "resp status: %d, %s", __LINE__, service_name, + task->client_ip, task->port, ctx->request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.status, ctx->response.error.message); @@ -614,8 +648,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task, time_used = get_current_time_us() - ctx->req_start_time; log_level = GET_CMD_LOG_LEVEL(ctx->request.header.cmd); log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " - "client %s:%u, req cmd: %d (%s), req body_len: %d, " - "resp status: %d, time used: %s us", __LINE__, + "%s client %s:%u, req cmd: %d (%s), req body_len: %d, " + "resp status: %d, time used: %s us", __LINE__, service_name, task->client_ip, task->port, ctx->request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.status, @@ -623,18 +657,17 @@ int sf_proto_deal_task_done(struct fast_task_info *task, } if (ctx->response.header.status == 0) { - task->offset = task->length = 0; return sf_set_read_event(task); } else { return FC_NEGATIVE(ctx->response.header.status); } } - proto_header = (SFCommonProtoHeader *)task->data; + proto_header = (SFCommonProtoHeader *)task->send.ptr->data; if (!ctx->response_done) { ctx->response.header.body_len = ctx->response.error.length; if (ctx->response.error.length > 0) { - memcpy(task->data + sizeof(SFCommonProtoHeader), + memcpy(task->send.ptr->data + sizeof(SFCommonProtoHeader), ctx->response.error.message, ctx->response.error.length); } } @@ -643,7 +676,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task, short2buff(status, proto_header->status); proto_header->cmd = ctx->response.header.cmd; int2buff(ctx->response.header.body_len, proto_header->body_len); - task->length = sizeof(SFCommonProtoHeader) + ctx->response.header.body_len; + task->send.ptr->length = sizeof(SFCommonProtoHeader) + + ctx->response.header.body_len; r = sf_send_add_event(task); time_used = get_current_time_us() - ctx->req_start_time; @@ -653,11 +687,11 @@ int sf_proto_deal_task_done(struct fast_task_info *task, char buff[256]; int blen; - blen = sprintf(buff, "timed used: %s us, client %s:%u, " + blen = sprintf(buff, "timed used: %s us, %s client %s:%u, " "req cmd: %d (%s), req body len: %d, resp cmd: %d (%s), " "status: %d, resp body len: %d", long_to_comma_str(time_used, - time_buff), task->client_ip, task->port, ctx->request. - header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), + time_buff), service_name, task->client_ip, task->port, ctx-> + request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.cmd, GET_CMD_CAPTION(ctx->response.header.cmd), ctx->response.header.status, ctx->response.header.body_len); @@ -667,9 +701,9 @@ int sf_proto_deal_task_done(struct fast_task_info *task, if (sf_handler_ctx.callbacks.get_cmd_log_level != NULL) { log_level = GET_CMD_LOG_LEVEL(ctx->request.header.cmd); log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " - "client %s:%u, req cmd: %d (%s), req body_len: %d, " + "%s client %s:%u, req cmd: %d (%s), req body_len: %d, " "resp cmd: %d (%s), status: %d, resp body_len: %d, " - "time used: %s us", __LINE__, + "time used: %s us", __LINE__, service_name, task->client_ip, task->port, ctx->request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.cmd, diff --git a/src/sf_proto.h b/src/sf_proto.h index f7bcf95..dd63dce 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -89,8 +89,18 @@ int2buff((resp_header).body_len, (proto_header)->body_len);\ } while (0) -#define SF_PROTO_RESP_BODY(task) \ - (task->data + sizeof(SFCommonProtoHeader)) + +#define SF_PROTO_SEND_BODY(task) \ + (task->send.ptr->data + sizeof(SFCommonProtoHeader)) + +#define SF_PROTO_RECV_BODY(task) \ + (task->recv.ptr->data + sizeof(SFCommonProtoHeader)) + +#define SF_RECV_BODY_LENGTH(task) \ + (task->recv.ptr->length - sizeof(SFCommonProtoHeader)) + +#define SF_SEND_BUFF_END(task) (task->send.ptr->data + task->send.ptr->size) +#define SF_RECV_BUFF_END(task) (task->recv.ptr->data + task->recv.ptr->size) #define SF_PROTO_UPDATE_EXTRA_BODY_SIZE \ sizeof(SFProtoIdempotencyAdditionalHeader) + FCFS_AUTH_SESSION_ID_LEN @@ -280,7 +290,17 @@ int sf_proto_set_body_length(struct fast_task_info *task); const char *sf_get_cmd_caption(const int cmd); int sf_proto_deal_task_done(struct fast_task_info *task, - SFCommonTaskContext *ctx); + const char *service_name, SFCommonTaskContext *ctx); + +static inline void sf_proto_init_task_magic(struct fast_task_info *task) +{ + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->send.ptr->data)->magic); + if (task->recv.ptr != task->send.ptr) { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->recv.ptr->data)->magic); + } +} static inline void sf_proto_init_task_context(struct fast_task_info *task, SFCommonTaskContext *ctx) @@ -295,17 +315,116 @@ static inline void sf_proto_init_task_context(struct fast_task_info *task, ctx->response_done = false; ctx->need_response = true; - ctx->request.header.cmd = ((SFCommonProtoHeader *)task->data)->cmd; - ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader); + ctx->request.header.cmd = ((SFCommonProtoHeader *) + task->recv.ptr->data)->cmd; + ctx->request.header.body_len = SF_RECV_BODY_LENGTH(task); ctx->request.header.status = buff2short(((SFCommonProtoHeader *) - task->data)->status); + task->recv.ptr->data)->status); if (task->recv_body != NULL) { ctx->request.body = task->recv_body; } else { - ctx->request.body = task->data + sizeof(SFCommonProtoHeader); + ctx->request.body = SF_PROTO_RECV_BODY(task); } } +/* task send and recv buffer operations */ +static inline int sf_set_task_send_buffer_size( + struct fast_task_info *task, const int expect_size) +{ + int result; + if ((result=free_queue_set_buffer_size(task, task->send.ptr, + expect_size)) == 0) + { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->send.ptr->data)->magic); + } + return result; +} + +static inline int sf_set_task_recv_buffer_size( + struct fast_task_info *task, const int expect_size) +{ + int result; + if ((result=free_queue_set_buffer_size(task, task->recv.ptr, + expect_size)) == 0) + { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->recv.ptr->data)->magic); + } + return result; +} + +static inline int sf_set_task_send_max_buffer_size( + struct fast_task_info *task) +{ + int result; + if ((result=free_queue_set_max_buffer_size(task, task->send.ptr)) == 0) { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->send.ptr->data)->magic); + } + return result; +} + +static inline int sf_set_task_recv_max_buffer_size( + struct fast_task_info *task) +{ + int result; + if ((result=free_queue_set_max_buffer_size(task, task->recv.ptr)) == 0) { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->recv.ptr->data)->magic); + } + return result; +} + +static inline int sf_realloc_task_send_buffer( + struct fast_task_info *task, const int expect_size) +{ + int result; + if ((result=free_queue_realloc_buffer(task, task->send.ptr, + expect_size)) == 0) + { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->send.ptr->data)->magic); + } + return result; +} + +static inline int sf_realloc_task_recv_buffer( + struct fast_task_info *task, const int expect_size) +{ + int result; + if ((result=free_queue_realloc_buffer(task, task->recv.ptr, + expect_size)) == 0) + { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->recv.ptr->data)->magic); + } + return result; +} + +static inline int sf_realloc_task_send_max_buffer( + struct fast_task_info *task) +{ + int result; + if ((result=free_queue_realloc_max_buffer(task, task->send.ptr)) == 0) { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->send.ptr->data)->magic); + } + return result; +} + +static inline int sf_realloc_task_recv_max_buffer( + struct fast_task_info *task) +{ + int result; + if ((result=free_queue_realloc_max_buffer(task, task->recv.ptr)) == 0) { + SF_PROTO_SET_MAGIC(((SFCommonProtoHeader *) + task->recv.ptr->data)->magic); + } + return result; +} + + static inline void sf_log_network_error_ex1(SFResponseInfo *response, const ConnectionInfo *conn, const char *service_name, const int result, const int log_level, @@ -475,6 +594,27 @@ static inline void sf_free_recv_buffer(SFProtoRecvBuffer *buffer) } } +static inline int sf_proto_send_buf1(ConnectionInfo *conn, char *data, + const int len, SFResponseInfo *response, const int network_timeout) +{ + int result; + + if (conn->comm_type == fc_comm_type_rdma) { + result = G_RDMA_CONNECTION_CALLBACKS.request_by_buf1( + conn, data, len, network_timeout * 1000); + } else { + result = tcpsenddata_nb(conn->sock, data, len, network_timeout); + } + if (result != 0) { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "send data fail, errno: %d, error info: %s", + result, STRERROR(result)); + } + + return result; +} + int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, const int len, SFResponseInfo *response, const int network_timeout); @@ -535,16 +675,27 @@ int sf_send_and_recv_vary_response(ConnectionInfo *conn, const int network_timeout, const unsigned char expect_cmd, SFProtoRecvBuffer *buffer, const int min_body_len); -static inline void sf_proto_extract_header(const SFCommonProtoHeader - *header_proto, SFHeaderInfo *header_info) +static inline int sf_proto_parse_header(const SFCommonProtoHeader + *header_proto, SFResponseInfo *response) { - header_info->cmd = header_proto->cmd; - header_info->body_len = buff2int(header_proto->body_len); - header_info->flags = buff2short(header_proto->flags); - header_info->status = buff2short(header_proto->status); - if (header_info->status > 255) { - header_info->status = sf_localize_errno(header_info->status); + if (!SF_PROTO_CHECK_MAGIC(header_proto->magic)) { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "magic "SF_PROTO_MAGIC_FORMAT" is invalid, " + "expect: "SF_PROTO_MAGIC_FORMAT, + SF_PROTO_MAGIC_PARAMS(header_proto->magic), + SF_PROTO_MAGIC_EXPECT_PARAMS); + return EINVAL; } + + response->header.cmd = header_proto->cmd; + response->header.body_len = buff2int(header_proto->body_len); + response->header.flags = buff2short(header_proto->flags); + response->header.status = buff2short(header_proto->status); + if (response->header.status > 255) { + response->header.status = sf_localize_errno(response->header.status); + } + return 0; } static inline void sf_proto_pack_limit(const SFListLimitInfo diff --git a/src/sf_service.c b/src/sf_service.c index 829a076..a202283 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -58,12 +58,6 @@ struct worker_thread_context { struct nio_thread_data *thread_data; }; -struct accept_thread_context { - SFContext *sf_context; - int server_sock; -}; - - int sf_init_task(struct fast_task_info *task) { task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side @@ -73,22 +67,15 @@ int sf_init_task(struct fast_task_info *task) static void *worker_thread_entrance(void *arg); -static int sf_init_free_queues(const int task_arg_size, +static int sf_init_free_queue(struct fast_task_queue *free_queue, + const char *name, const bool double_buffers, + const int task_padding_size, const int task_arg_size, TaskInitCallback init_callback) { -#define ALLOC_CONNECTIONS_ONCE 1024 - - static bool sf_inited = false; int result; int m; - int init_connections; int alloc_conn_once; - if (sf_inited) { - return 0; - } - - sf_inited = true; if ((result=set_rand_seed()) != 0) { logCrit("file: "__FILE__", line: %d, " "set_rand_seed fail, program exit!", __LINE__); @@ -101,19 +88,13 @@ static int sf_init_free_queues(const int task_arg_size, } else if (m > 16) { m = 16; } - alloc_conn_once = ALLOC_CONNECTIONS_ONCE / m; - init_connections = g_sf_global_vars.max_connections < alloc_conn_once ? - g_sf_global_vars.max_connections : alloc_conn_once; - if ((result=free_queue_init_ex2(g_sf_global_vars.max_connections, - init_connections, alloc_conn_once, g_sf_global_vars. - min_buff_size, g_sf_global_vars.max_buff_size, - task_arg_size, init_callback != NULL ? - init_callback : sf_init_task)) != 0) - { - return result; - } - - return 0; + alloc_conn_once = 256 / m; + return free_queue_init_ex2(free_queue, name, double_buffers, + g_sf_global_vars.max_connections, alloc_conn_once, + g_sf_global_vars.min_buff_size, g_sf_global_vars. + max_buff_size, task_padding_size, task_arg_size, + (init_callback != NULL ? init_callback : + sf_init_task)); } int sf_service_init_ex2(SFContext *sf_context, const char *name, @@ -124,11 +105,12 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, - const int proto_header_size, const int task_arg_size, - TaskInitCallback init_callback, sf_release_buffer_callback - release_buffer_callback) + const int proto_header_size, const int task_padding_size, + const int task_arg_size, const bool double_buffers, + const bool explicit_post_recv, TaskInitCallback init_callback, + sf_release_buffer_callback release_buffer_callback) { int result; int bytes; @@ -141,15 +123,23 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, pthread_attr_t thread_attr; snprintf(sf_context->name, sizeof(sf_context->name), "%s", name); + sf_context->connect_need_log = true; sf_context->realloc_task_buffer = g_sf_global_vars. min_buff_size < g_sf_global_vars.max_buff_size; - sf_context->accept_done_func = accept_done_callback; + sf_context->callbacks.accept_done = accept_done_callback; sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func, alloc_recv_buffer_func, send_done_callback, deal_func, task_cleanup_func, timeout_callback, release_buffer_callback); + if (explicit_post_recv) { + sf_context->handlers[SF_RDMACM_NETWORK_HANDLER_INDEX]. + explicit_post_recv = true; + } - if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) { + if ((result=sf_init_free_queue(&sf_context->free_queue, + name, double_buffers, task_padding_size, + task_arg_size, init_callback)) != 0) + { return result; } @@ -192,6 +182,15 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts; thread_datatimeout_ms = net_timeout_ms; + FC_INIT_LIST_HEAD(&thread_data->polling_queue); + if (sf_context->smart_polling.enabled) { + thread_data->busy_polling_callback = + sf_rdma_busy_polling_callback; + } else { + thread_data->busy_polling_callback = NULL; + } + thread_data->thread_loop_callback = thread_loop_callback; if (alloc_thread_extra_data_callback != NULL) { thread_data->arg = alloc_thread_extra_data_callback( @@ -278,7 +277,7 @@ int sf_service_destroy_ex(SFContext *sf_context) { struct nio_thread_data *data_end, *thread_data; - free_queue_destroy(); + free_queue_destroy(&sf_context->free_queue); data_end = sf_context->thread_data + sf_context->work_threads; for (thread_data=sf_context->thread_data; thread_datathread_data, sf_recv_notify_read, - thread_ctx->sf_context->task_cleanup_func, + thread_ctx->sf_context->callbacks.task_cleanup, &g_sf_global_vars.continue_flag); ioevent_destroy(&thread_ctx->thread_data->ev_puller); @@ -346,31 +345,32 @@ static void *worker_thread_entrance(void *arg) return NULL; } -static int _socket_server(const char *bind_addr, int port, int *sock) +int sf_socket_create_server(SFListener *listener, + int af, const char *bind_addr) { int result; // 如果bind_addr未设置 - if(strlen(bind_addr) == 0){ - + if (strlen(bind_addr) == 0) { // 如果当前服务不存在IPv4地址,但是存在IPv6地址,则自动绑定IPv6地址 if(!checkHostHasIPv4Addr() && checkHostHasIPv6Addr()){ - *sock = socketServerIPv6(bind_addr, port, &result); + listener->sock = socketServerIPv6(bind_addr, port, &result); }else{ - *sock = socketServer(bind_addr, port, &result); + listener->sock = socketServer(bind_addr, port, &result); } - }else if (is_ipv6_addr(bind_addr)) // 通过判断IP地址是IPv4或者IPv6,根据结果进行初始化 + } else if (is_ipv6_addr(bind_addr)) // 通过判断IP地址是IPv4或者IPv6,根据结果进行初始化 { - *sock = socketServerIPv6(bind_addr, port, &result); + listener->sock = socketServerIPv6(bind_addr, port, &result); }else{ - *sock = socketServer(bind_addr, port, &result); + listener->sock = socketServer(bind_addr, port, &result); } - if (*sock < 0) { + // listener->sock = socketServer2(af, bind_addr, listener->port, &result); + if (listener->sock < 0) { return result; } - if ((result=tcpsetserveropt(*sock, SF_G_NETWORK_TIMEOUT)) != 0) { + if ((result=tcpsetserveropt(listener->sock, SF_G_NETWORK_TIMEOUT)) != 0) { return result; } @@ -380,133 +380,201 @@ static int _socket_server(const char *bind_addr, int port, int *sock) int sf_socket_server_ex(SFContext *sf_context) { int result; + int af = AF_INET; + bool dual_ports; const char *bind_addr; + SFNetworkHandler *handler; + SFNetworkHandler *end; - sf_context->inner_sock = sf_context->outer_sock = -1; - if (sf_context->outer_port == sf_context->inner_port) { - if (*sf_context->outer_bind_addr == '\0' || - *sf_context->inner_bind_addr == '\0') { - bind_addr = ""; - return _socket_server(bind_addr, sf_context->outer_port, - &sf_context->outer_sock); - } else if (strcmp(sf_context->outer_bind_addr, - sf_context->inner_bind_addr) == 0) { - bind_addr = sf_context->outer_bind_addr; - if (is_private_ip(bind_addr)) { - return _socket_server(bind_addr, sf_context-> - inner_port, &sf_context->inner_sock); - } else { - return _socket_server(bind_addr, sf_context-> - outer_port, &sf_context->outer_sock); - } + end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + continue; } - } - if ((result=_socket_server(sf_context->outer_bind_addr, - sf_context->outer_port, &sf_context->outer_sock)) != 0) - { - return result; - } + handler->inner.enabled = false; + handler->outer.enabled = false; + if (handler->outer.port == handler->inner.port) { + if (*sf_context->outer_bind_addr == '\0' || + *sf_context->inner_bind_addr == '\0') { + bind_addr = ""; + if ((result=handler->create_server(&handler-> + outer, af, bind_addr)) != 0) + { + return result; + } + handler->outer.enabled = true; + dual_ports = false; + } else if (strcmp(sf_context->outer_bind_addr, + sf_context->inner_bind_addr) == 0) { + bind_addr = sf_context->outer_bind_addr; + if (is_private_ip(bind_addr)) { + if ((result=handler->create_server(&handler-> + inner, af, bind_addr)) != 0) + { + return result; + } + handler->inner.enabled = true; + } else { + if ((result=handler->create_server(&handler-> + outer, af, bind_addr)) != 0) + { + return result; + } + handler->outer.enabled = true; + } + dual_ports = false; + } else { + dual_ports = true; + } + } else { + dual_ports = true; + } - if ((result=_socket_server(sf_context->inner_bind_addr, - sf_context->inner_port, &sf_context->inner_sock)) != 0) - { - return result; + if (dual_ports) { + if ((result=handler->create_server(&handler->outer, af, + sf_context->outer_bind_addr)) != 0) + { + return result; + } + + if ((result=handler->create_server(&handler->inner, af, + sf_context->inner_bind_addr)) != 0) + { + return result; + } + handler->inner.enabled = true; + handler->outer.enabled = true; + } + + /* + logInfo("%p [%d] inner {port: %d, enabled: %d}, " + "outer {port: %d, enabled: %d}", sf_context, + (int)(handler-sf_context->handlers), + handler->inner.port, handler->inner.enabled, + handler->outer.port, handler->outer.enabled); + */ } return 0; } -void sf_socket_close_ex(SFContext *sf_context) +void sf_socket_close_server(SFListener *listener) { - if (sf_context->inner_sock >= 0) { - close(sf_context->inner_sock); - sf_context->inner_sock = -1; - } - - if (sf_context->outer_sock >= 0) { - close(sf_context->outer_sock); - sf_context->outer_sock = -1; + if (listener->sock >= 0) { + close(listener->sock); + listener->sock = -1; } } -static void accept_run(struct accept_thread_context *accept_context) +struct fast_task_info *sf_socket_accept_connection(SFListener *listener) { int incomesock; int port; - struct sockaddr_in inaddr; socklen_t sockaddr_len; struct fast_task_info *task; + sockaddr_len = sizeof(listener->inaddr); + incomesock = accept(listener->sock, (struct sockaddr *) + &listener->inaddr, &sockaddr_len); + if (incomesock < 0) { //error + if (!(errno == EINTR || errno == EAGAIN)) { + logError("file: "__FILE__", line: %d, " + "accept fail, errno: %d, error info: %s", + __LINE__, errno, strerror(errno)); + } + + return NULL; + } + + if (tcpsetnonblockopt(incomesock) != 0) { + close(incomesock); + return NULL; + } + FC_SET_CLOEXEC(incomesock); + + if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) { + close(incomesock); + return NULL; + } + + getPeerIpAddPort(incomesock, task->client_ip, + sizeof(task->client_ip), &port); + task->port = port; + return task; +} + +void sf_socket_close_connection(struct fast_task_info *task) +{ + close(task->event.fd); + task->event.fd = -1; +} + +void sf_socket_close_ex(SFContext *sf_context) +{ + SFNetworkHandler *handler; + SFNetworkHandler *end; + + end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + continue; + } + if (handler->outer.enabled) { + handler->close_server(&handler->outer); + } + if (handler->inner.enabled) { + handler->close_server(&handler->inner); + } + } +} + +static void accept_run(SFListener *listener) +{ + struct fast_task_info *task; + while (g_sf_global_vars.continue_flag) { - sockaddr_len = sizeof(inaddr); - incomesock = accept(accept_context->server_sock, - (struct sockaddr*)&inaddr, &sockaddr_len); - if (incomesock < 0) { //error - if (!(errno == EINTR || errno == EAGAIN)) { - logError("file: "__FILE__", line: %d, " - "accept fail, errno: %d, error info: %s", - __LINE__, errno, strerror(errno)); - } - + if ((task=listener->handler->accept_connection(listener)) == NULL) { continue; } - if (tcpsetnonblockopt(incomesock) != 0) { - close(incomesock); - continue; - } - FC_SET_CLOEXEC(incomesock); - - if ((task=sf_alloc_init_task(accept_context-> - sf_context, incomesock)) == NULL) - { - close(incomesock); - continue; - } - - getPeerIpAddPort(incomesock, task->client_ip, - sizeof(task->client_ip), &port); - task->port = port; - task->thread_data = accept_context->sf_context->thread_data + - incomesock % accept_context->sf_context->work_threads; - if (accept_context->sf_context->accept_done_func != NULL) { - if (accept_context->sf_context->accept_done_func(task, - inaddr.sin_addr.s_addr, - accept_context->server_sock == - accept_context->sf_context->inner_sock) != 0) + task->thread_data = listener->handler->ctx->thread_data + + task->event.fd % listener->handler->ctx->work_threads; + if (listener->handler->ctx->callbacks.accept_done != NULL) { + if (listener->handler->ctx->callbacks.accept_done(task, + listener->inaddr.sin_addr.s_addr, + listener->is_inner) != 0) { - close(incomesock); + listener->handler->close_connection(task); sf_release_task(task); continue; } } if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { - close(incomesock); + listener->handler->close_connection(task); sf_release_task(task); } } } -static void *accept_thread_entrance(struct accept_thread_context - *accept_context) +static void *accept_thread_entrance(SFListener *listener) { #ifdef OS_LINUX { char thread_name[32]; - snprintf(thread_name, sizeof(thread_name), "%s-listen", - accept_context->sf_context->name); + snprintf(thread_name, sizeof(thread_name), "%s-%s-listen", + listener->handler->comm_type == fc_comm_type_sock ? + "sock" : "rdma", listener->handler->ctx->name); prctl(PR_SET_NAME, thread_name); } #endif - accept_run(accept_context); + accept_run(listener); return NULL; } -void _accept_loop(struct accept_thread_context *accept_context, - const int accept_threads) +int _accept_loop(SFListener *listener, const int accept_threads) { pthread_t tid; pthread_attr_t thread_attr; @@ -514,7 +582,7 @@ void _accept_loop(struct accept_thread_context *accept_context, int i; if (accept_threads <= 0) { - return; + return 0; } if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars. @@ -522,68 +590,73 @@ void _accept_loop(struct accept_thread_context *accept_context, { logWarning("file: "__FILE__", line: %d, " "init_pthread_attr fail!", __LINE__); + return result; } - else { - for (i=0; iouter_sock >= 0) { - count = 2; + listener = listeners; + hend = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + continue; + } + + if (handler->inner.enabled) { + *listener++ = &handler->inner; + } + if (handler->outer.enabled) { + *listener++ = &handler->outer; + } + } + + if (listener == listeners) { + logError("file: "__FILE__", line: %d, " + "no listener!", __LINE__); + return ENOENT; + } + + last = listener - 1; + if (blocked) { + lend = listener - 1; } else { - count = 1; + lend = listener; } - bytes = sizeof(struct accept_thread_context) * count; - accept_contexts = (struct accept_thread_context *)fc_malloc(bytes); - if (accept_contexts == NULL) { - return; + for (listener=listeners; listeneraccept_threads); } - accept_contexts[0].sf_context = sf_context; - accept_contexts[0].server_sock = sf_context->inner_sock; - - if (sf_context->outer_sock >= 0) { - accept_contexts[1].sf_context = sf_context; - accept_contexts[1].server_sock = sf_context->outer_sock; - - if (sf_context->inner_sock >= 0) { - _accept_loop(accept_contexts, sf_context->accept_threads); - } - - if (block) { - _accept_loop(accept_contexts + 1, sf_context->accept_threads - 1); - accept_run(accept_contexts + 1); - } else { - _accept_loop(accept_contexts + 1, sf_context->accept_threads); - } - } else { - if (block) { - _accept_loop(accept_contexts, sf_context->accept_threads - 1); - accept_run(accept_contexts); - } else { - _accept_loop(accept_contexts, sf_context->accept_threads); - } + if (blocked) { + _accept_loop(*last, sf_context->accept_threads - 1); + accept_run(*last); } + + return 0; } #if defined(DEBUG_FLAG) @@ -741,6 +814,12 @@ void sf_set_current_time() srand(g_sf_global_vars.up_time); } +int sf_global_init(const char *log_filename_prefix) +{ + sf_set_current_time(); + return log_set_prefix(SF_G_BASE_PATH_STR, log_filename_prefix); +} + void sf_enable_thread_notify_ex(SFContext *sf_context, const bool enabled) { struct nio_thread_data *thread_data; diff --git a/src/sf_service.h b/src/sf_service.h index d628737..dd2b428 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -41,11 +41,12 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, - const int proto_header_size, const int task_arg_size, - TaskInitCallback init_callback, sf_release_buffer_callback - release_buffer_callback); + const int proto_header_size, const int task_padding_size, + const int task_arg_size, const bool double_buffers, + const bool explicit_post_recv, TaskInitCallback init_callback, + sf_release_buffer_callback release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -55,16 +56,17 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \ timeout_callback, net_timeout_ms, proto_header_size, \ - task_arg_size, NULL, NULL) + 0, task_arg_size, false, false, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ net_timeout_ms, proto_header_size, task_arg_size) \ - sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ - thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \ + sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ + thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ - net_timeout_ms, proto_header_size, task_arg_size, NULL, NULL) + net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ + NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); @@ -76,12 +78,37 @@ void sf_service_set_thread_loop_callback_ex(SFContext *sf_context, #define sf_service_set_thread_loop_callback(thread_loop_callback) \ sf_service_set_thread_loop_callback_ex(&g_sf_context, thread_loop_callback) +static inline void sf_service_set_smart_polling_ex(SFContext *sf_context, + const FCSmartPollingConfig *smart_polling) +{ + sf_context->smart_polling = *smart_polling; +} +#define sf_service_set_smart_polling(smart_polling) \ + sf_service_set_smart_polling_ex(&g_sf_context, smart_polling) + +static inline void sf_service_set_connect_need_log_ex( + SFContext *sf_context, const bool need_log) +{ + sf_context->connect_need_log = need_log; +} +#define sf_service_set_connect_need_log(need_log) \ + sf_service_set_connect_need_log_ex(&g_sf_context, need_log) + + int sf_setup_signal_handler(); int sf_startup_schedule(pthread_t *schedule_tid); int sf_add_slow_log_schedule(SFSlowLogContext *slowlog_ctx); void sf_set_current_time(); +int sf_global_init(const char *log_filename_prefix); + +int sf_socket_create_server(SFListener *listener, + int af, const char *bind_addr); +void sf_socket_close_server(SFListener *listener); +struct fast_task_info *sf_socket_accept_connection(SFListener *listener); + +void sf_socket_close_connection(struct fast_task_info *task); int sf_socket_server_ex(SFContext *sf_context); #define sf_socket_server() sf_socket_server_ex(&g_sf_context) @@ -89,7 +116,7 @@ int sf_socket_server_ex(SFContext *sf_context); void sf_socket_close_ex(SFContext *sf_context); #define sf_socket_close() sf_socket_close_ex(&g_sf_context) -void sf_accept_loop_ex(SFContext *sf_context, const bool block); +int sf_accept_loop_ex(SFContext *sf_context, const bool blocked); #define sf_accept_loop() sf_accept_loop_ex(&g_sf_context, true) @@ -123,12 +150,13 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler); int sf_init_task(struct fast_task_info *task); -static inline struct fast_task_info *sf_alloc_init_task( - SFContext *sf_context, const int sock) +static inline struct fast_task_info *sf_alloc_init_task_ex( + SFNetworkHandler *handler, const int fd, + const int reffer_count) { struct fast_task_info *task; - task = free_queue_pop(); + task = free_queue_pop(&handler->ctx->free_queue); if (task == NULL) { logError("file: "__FILE__", line: %d, " "malloc task buff failed, you should " @@ -136,15 +164,16 @@ static inline struct fast_task_info *sf_alloc_init_task( __LINE__); return NULL; } - __sync_add_and_fetch(&task->reffer_count, 1); - __sync_bool_compare_and_swap(&task->canceled, 1, 0); - task->ctx = sf_context; - task->event.fd = sock; + __sync_add_and_fetch(&task->reffer_count, reffer_count); + __sync_bool_compare_and_swap(&task->canceled, 1, 0); + task->handler = handler; + task->event.fd = fd; return task; } #define sf_hold_task(task) __sync_add_and_fetch(&task->reffer_count, 1) +#define sf_alloc_init_task(handler, fd) sf_alloc_init_task_ex(handler, fd, 1) static inline void sf_release_task(struct fast_task_info *task) { @@ -167,6 +196,60 @@ bool checkHostHasIPv4Addr(); // 判断当前服务器是否存在IPv6地址 bool checkHostHasIPv6Addr(); +static inline SFNetworkHandler *sf_get_first_network_handler_ex( + SFContext *sf_context) +{ + SFNetworkHandler *handler; + SFNetworkHandler *end; + + end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + return handler; + } + } + + return NULL; +} +#define sf_get_first_network_handler() \ + sf_get_first_network_handler_ex(&g_sf_context) + + +static inline SFNetworkHandler *sf_get_rdma_network_handler( + SFContext *sf_context) +{ + SFNetworkHandler *handler; + + handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + return (handler->enabled ? handler : NULL); +} + +static inline SFNetworkHandler *sf_get_rdma_network_handler2( + SFContext *sf_context1, SFContext *sf_context2) +{ + SFNetworkHandler *handler; + + if ((handler=sf_get_rdma_network_handler(sf_context1)) != NULL) { + return handler; + } + return sf_get_rdma_network_handler(sf_context2); +} + +static inline SFNetworkHandler *sf_get_rdma_network_handler3( + SFContext *sf_context1, SFContext *sf_context2, + SFContext *sf_context3) +{ + SFNetworkHandler *handler; + + if ((handler=sf_get_rdma_network_handler(sf_context1)) != NULL) { + return handler; + } + if ((handler=sf_get_rdma_network_handler(sf_context2)) != NULL) { + return handler; + } + return sf_get_rdma_network_handler(sf_context3); +} + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index 80646ca..32f427a 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -34,30 +34,120 @@ #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency +#define SF_NETWORK_HANDLER_COUNT 2 +#define SF_SOCKET_NETWORK_HANDLER_INDEX 0 +#define SF_RDMACM_NETWORK_HANDLER_INDEX 1 + typedef int (*sf_accept_done_callback)(struct fast_task_info *task, const in_addr_64_t client_addr, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, const int buff_size, bool *new_alloc); -typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); +typedef int (*sf_deal_task_callback)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef int (*sf_send_done_callback)(struct fast_task_info *task, - const int length); + const int length, int *next_stage); +typedef void (*sf_connect_done_callback)(struct fast_task_info *task, + const int err_no); /* calback for release iovec buffer */ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); typedef int (*sf_error_handler_callback)(const int errnum); +typedef enum { + sf_comm_action_continue = 'c', + sf_comm_action_break = 'b', + sf_comm_action_finish = 'f' +} SFCommAction; + +struct ibv_pd; +struct sf_listener; + +typedef int (*sf_get_connection_size_callback)(); +typedef int (*sf_init_connection_callback)( + struct fast_task_info *task, void *arg); +#define sf_alloc_pd_callback fc_alloc_pd_callback + +typedef int (*sf_create_server_callback)(struct sf_listener + *listener, int af, const char *bind_addr); +typedef void (*sf_close_server_callback)(struct sf_listener *listener); +typedef struct fast_task_info * (*sf_accept_connection_callback)( + struct sf_listener *listener); +typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task); +typedef int (*sf_async_connect_check_callback)(struct fast_task_info *task); +typedef void (*sf_close_connection_callback)(struct fast_task_info *task); + +typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, + SFCommAction *action, bool *send_done); +typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task, + const bool call_post_recv, SFCommAction *action); +typedef int (*sf_post_recv_callback)(struct fast_task_info *task); + +struct sf_network_handler; +typedef struct sf_listener { + struct sf_network_handler *handler; + int port; + bool enabled; + bool is_inner; + union { + int sock; //for socket + void *id; //for rdma_cm + }; + struct sockaddr_in inaddr; //for accept +} SFListener; + +struct sf_context; +typedef struct sf_network_handler { + bool enabled; + bool explicit_post_recv; + FCCommunicationType comm_type; + struct sf_context *ctx; + struct ibv_pd *pd; + + SFListener inner; + SFListener outer; + + /* for server side */ + sf_get_connection_size_callback get_connection_size; + sf_init_connection_callback init_connection; + sf_alloc_pd_callback alloc_pd; + sf_create_server_callback create_server; + sf_close_server_callback close_server; + sf_accept_connection_callback accept_connection; + + /* for client side */ + sf_async_connect_server_callback async_connect_server; + sf_async_connect_check_callback async_connect_check; + + /* server and client both */ + sf_close_connection_callback close_connection; + + sf_send_data_callback send_data; + sf_recv_data_callback recv_data; + sf_post_recv_callback post_recv; //for rdma +} SFNetworkHandler; + +typedef struct sf_nio_callbacks { + TaskCleanUpCallback task_cleanup; + sf_deal_task_callback deal_task; + sf_set_body_length_callback set_body_length; + sf_alloc_recv_buffer_callback alloc_recv_buffer; + sf_accept_done_callback accept_done; + sf_connect_done_callback connect_done; + sf_send_done_callback send_done; + sf_recv_timeout_callback task_timeout; + sf_release_buffer_callback release_buffer; +} SFNIOCallbacks; + typedef struct sf_context { char name[64]; struct nio_thread_data *thread_data; volatile int thread_count; - int outer_sock; - int inner_sock; - int outer_port; - int inner_port; + //int rdma_port_offset; + SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT]; + int accept_threads; int work_threads; @@ -67,14 +157,11 @@ typedef struct sf_context { int header_size; bool remove_from_ready_list; bool realloc_task_buffer; - sf_deal_task_func deal_task; - sf_set_body_length_callback set_body_length; - sf_alloc_recv_buffer_callback alloc_recv_buffer; - sf_accept_done_callback accept_done_func; - sf_send_done_callback send_done_callback; - TaskCleanUpCallback task_cleanup_func; - sf_recv_timeout_callback timeout_callback; - sf_release_buffer_callback release_buffer_callback; + bool connect_need_log; //for client connect + FCSmartPollingConfig smart_polling; + + SFNIOCallbacks callbacks; + struct fast_task_queue free_queue; } SFContext; typedef struct {