diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 95e121a..878d0f0 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -522,7 +522,6 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, const bool bg_thread_enabled) { const int socket_domain = AF_INET; - const int padding_size = 1024; struct { ConnectionExtraParams holder; ConnectionExtraParams *ptr; @@ -545,16 +544,9 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, if (server_group->comm_type == fc_comm_type_sock) { extra_params.ptr = NULL; } else { - FCServerInfo *first_server; - - first_server = FC_SID_SERVERS(*server_cfg); - extra_params.holder.buffer_size = server_group-> - buffer_size + padding_size; - extra_params.holder.pd = fc_alloc_rdma_pd( - G_RDMA_CONNECTION_CALLBACKS.alloc_pd, - &first_server->group_addrs[server_group_index]. - address_array, &result); - if (result != 0) { + if ((result=conn_pool_set_rdma_extra_params(&extra_params.holder, + server_cfg, server_group_index)) != 0) + { return result; } extra_params.ptr = &extra_params.holder; diff --git a/src/sf_proto.c b/src/sf_proto.c index 905507f..a344421 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -70,8 +70,14 @@ int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response, response->error.length = response->header.body_len; } - if ((result=tcprecvdata_nb_ex(conn->sock, response->error.message, - response->error.length, network_timeout, &recv_bytes)) == 0) + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(response->error.message, G_RDMA_CONNECTION_CALLBACKS. + get_buffer(conn)->buff + sizeof(SFCommonProtoHeader), + response->error.length); + response->error.message[response->error.length] = '\0'; + } else if ((result=tcprecvdata_nb_ex(conn->sock, response-> + error.message, response->error.length, + network_timeout, &recv_bytes)) == 0) { response->error.message[response->error.length] = '\0'; } else { @@ -96,19 +102,41 @@ static inline int sf_recv_response_header(ConnectionInfo *conn, SFResponseInfo *response, const int network_timeout) { int result; + BufferInfo *buffer; SFCommonProtoHeader header_proto; - if ((result=tcprecvdata_nb(conn->sock, &header_proto, - sizeof(SFCommonProtoHeader), network_timeout)) != 0) - { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "recv data fail, errno: %d, error info: %s", - result, STRERROR(result)); - return result; - } + if (conn->comm_type == fc_comm_type_rdma) { + buffer = G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn); + if ((result=sf_proto_parse_header((SFCommonProtoHeader *) + buffer->buff, response)) != 0) + { + return result; + } - return sf_proto_parse_header(&header_proto, response); + if (buffer->length != (sizeof(SFCommonProtoHeader) + + response->header.body_len)) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv package length: %d != calculate: %d", + buffer->length, (int)(sizeof(SFCommonProtoHeader) + + response->header.body_len)); + return EINVAL; + } + + return 0; + } else { + if ((result=tcprecvdata_nb(conn->sock, &header_proto, + sizeof(SFCommonProtoHeader), network_timeout)) != 0) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv data fail, errno: %d, error info: %s", + result, STRERROR(result)); + return result; + } + return sf_proto_parse_header(&header_proto, response); + } } int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, @@ -116,11 +144,9 @@ int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, { int result; - if ((result=tcpsenddata_nb(conn->sock, data, len, network_timeout)) != 0) { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "send data fail, errno: %d, error info: %s", - result, STRERROR(result)); + if ((result=sf_proto_send_buf1(conn, data, len, + response, network_timeout)) != 0) + { return result; } @@ -183,7 +209,10 @@ int sf_send_and_recv_response_ex(ConnectionInfo *conn, char *send_data, return 0; } - if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->buff + + sizeof(SFCommonProtoHeader), response->header.body_len); + } else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> header.body_len, network_timeout, &recv_bytes)) != 0) { response->error.length = snprintf(response->error.message, @@ -223,7 +252,11 @@ int sf_send_and_recv_response_ex1(ConnectionInfo *conn, char *send_data, return EOVERFLOW; } - if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->buff + + sizeof(SFCommonProtoHeader), response->header.body_len); + *body_len = response->header.body_len; + } else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> header.body_len, network_timeout, body_len)) != 0) { response->error.length = snprintf(response->error.message, @@ -264,7 +297,10 @@ int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response, return 0; } - if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, expect_body_len, + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->buff + + sizeof(SFCommonProtoHeader), response->header.body_len); + } else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, expect_body_len, network_timeout, &recv_bytes)) != 0) { response->error.length = snprintf(response->error.message, @@ -332,7 +368,10 @@ int sf_recv_vary_response(ConnectionInfo *conn, SFResponseInfo *response, buffer->alloc_size = alloc_size; } - if ((result=tcprecvdata_nb_ex(conn->sock, buffer->buff, response-> + if (conn->comm_type == fc_comm_type_rdma) { + memcpy(buffer->buff, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)-> + buff + sizeof(SFCommonProtoHeader), response->header.body_len); + } else if ((result=tcprecvdata_nb_ex(conn->sock, buffer->buff, response-> header.body_len, network_timeout, &recv_bytes)) != 0) { response->error.length = snprintf(response->error.message, @@ -353,13 +392,9 @@ int sf_send_and_recv_vary_response(ConnectionInfo *conn, { int result; - if ((result=tcpsenddata_nb(conn->sock, send_data, - send_len, network_timeout)) != 0) + if ((result=sf_proto_send_buf1(conn, send_data, send_len, + response, network_timeout)) != 0) { - response->error.length = snprintf(response->error.message, - sizeof(response->error.message), - "send data fail, errno: %d, error info: %s", - result, STRERROR(result)); return result; } @@ -566,6 +601,7 @@ int sf_proto_get_leader(ConnectionInfo *conn, const char *service_name, memcpy(leader->conn.ip_addr, server_resp.ip_addr, IP_ADDRESS_SIZE); *(leader->conn.ip_addr + IP_ADDRESS_SIZE - 1) = '\0'; leader->conn.port = buff2short(server_resp.port); + leader->conn.comm_type = conn->comm_type; } return result; diff --git a/src/sf_proto.h b/src/sf_proto.h index 5871f9f..430d97c 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -475,6 +475,27 @@ static inline void sf_free_recv_buffer(SFProtoRecvBuffer *buffer) } } +static inline int sf_proto_send_buf1(ConnectionInfo *conn, char *data, + const int len, SFResponseInfo *response, const int network_timeout) +{ + int result; + + if (conn->comm_type == fc_comm_type_rdma) { + result = G_RDMA_CONNECTION_CALLBACKS.request_by_buf1( + conn, data, len, network_timeout * 1000); + } else { + result = tcpsenddata_nb(conn->sock, data, len, network_timeout); + } + if (result != 0) { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "send data fail, errno: %d, error info: %s", + result, STRERROR(result)); + } + + return result; +} + int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, const int len, SFResponseInfo *response, const int network_timeout);