send and recv data adapt for RDMA

support_rdma
YuQing 2023-09-12 16:03:22 +08:00
parent fca50e6d49
commit c6d4612862
3 changed files with 87 additions and 38 deletions

View File

@ -522,7 +522,6 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm,
const bool bg_thread_enabled) const bool bg_thread_enabled)
{ {
const int socket_domain = AF_INET; const int socket_domain = AF_INET;
const int padding_size = 1024;
struct { struct {
ConnectionExtraParams holder; ConnectionExtraParams holder;
ConnectionExtraParams *ptr; ConnectionExtraParams *ptr;
@ -545,16 +544,9 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm,
if (server_group->comm_type == fc_comm_type_sock) { if (server_group->comm_type == fc_comm_type_sock) {
extra_params.ptr = NULL; extra_params.ptr = NULL;
} else { } else {
FCServerInfo *first_server; if ((result=conn_pool_set_rdma_extra_params(&extra_params.holder,
server_cfg, server_group_index)) != 0)
first_server = FC_SID_SERVERS(*server_cfg); {
extra_params.holder.buffer_size = server_group->
buffer_size + padding_size;
extra_params.holder.pd = fc_alloc_rdma_pd(
G_RDMA_CONNECTION_CALLBACKS.alloc_pd,
&first_server->group_addrs[server_group_index].
address_array, &result);
if (result != 0) {
return result; return result;
} }
extra_params.ptr = &extra_params.holder; extra_params.ptr = &extra_params.holder;

View File

@ -70,8 +70,14 @@ int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response,
response->error.length = response->header.body_len; response->error.length = response->header.body_len;
} }
if ((result=tcprecvdata_nb_ex(conn->sock, response->error.message, if (conn->comm_type == fc_comm_type_rdma) {
response->error.length, network_timeout, &recv_bytes)) == 0) memcpy(response->error.message, G_RDMA_CONNECTION_CALLBACKS.
get_buffer(conn)->buff + sizeof(SFCommonProtoHeader),
response->error.length);
response->error.message[response->error.length] = '\0';
} else if ((result=tcprecvdata_nb_ex(conn->sock, response->
error.message, response->error.length,
network_timeout, &recv_bytes)) == 0)
{ {
response->error.message[response->error.length] = '\0'; response->error.message[response->error.length] = '\0';
} else { } else {
@ -96,19 +102,41 @@ static inline int sf_recv_response_header(ConnectionInfo *conn,
SFResponseInfo *response, const int network_timeout) SFResponseInfo *response, const int network_timeout)
{ {
int result; int result;
BufferInfo *buffer;
SFCommonProtoHeader header_proto; SFCommonProtoHeader header_proto;
if ((result=tcprecvdata_nb(conn->sock, &header_proto, if (conn->comm_type == fc_comm_type_rdma) {
sizeof(SFCommonProtoHeader), network_timeout)) != 0) buffer = G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn);
{ if ((result=sf_proto_parse_header((SFCommonProtoHeader *)
response->error.length = snprintf(response->error.message, buffer->buff, response)) != 0)
sizeof(response->error.message), {
"recv data fail, errno: %d, error info: %s", return result;
result, STRERROR(result)); }
return result;
}
return sf_proto_parse_header(&header_proto, response); if (buffer->length != (sizeof(SFCommonProtoHeader) +
response->header.body_len))
{
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv package length: %d != calculate: %d",
buffer->length, (int)(sizeof(SFCommonProtoHeader) +
response->header.body_len));
return EINVAL;
}
return 0;
} else {
if ((result=tcprecvdata_nb(conn->sock, &header_proto,
sizeof(SFCommonProtoHeader), network_timeout)) != 0)
{
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv data fail, errno: %d, error info: %s",
result, STRERROR(result));
return result;
}
return sf_proto_parse_header(&header_proto, response);
}
} }
int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data,
@ -116,11 +144,9 @@ int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data,
{ {
int result; int result;
if ((result=tcpsenddata_nb(conn->sock, data, len, network_timeout)) != 0) { if ((result=sf_proto_send_buf1(conn, data, len,
response->error.length = snprintf(response->error.message, response, network_timeout)) != 0)
sizeof(response->error.message), {
"send data fail, errno: %d, error info: %s",
result, STRERROR(result));
return result; return result;
} }
@ -183,7 +209,10 @@ int sf_send_and_recv_response_ex(ConnectionInfo *conn, char *send_data,
return 0; return 0;
} }
if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> if (conn->comm_type == fc_comm_type_rdma) {
memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->buff +
sizeof(SFCommonProtoHeader), response->header.body_len);
} else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response->
header.body_len, network_timeout, &recv_bytes)) != 0) header.body_len, network_timeout, &recv_bytes)) != 0)
{ {
response->error.length = snprintf(response->error.message, response->error.length = snprintf(response->error.message,
@ -223,7 +252,11 @@ int sf_send_and_recv_response_ex1(ConnectionInfo *conn, char *send_data,
return EOVERFLOW; return EOVERFLOW;
} }
if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response-> if (conn->comm_type == fc_comm_type_rdma) {
memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->buff +
sizeof(SFCommonProtoHeader), response->header.body_len);
*body_len = response->header.body_len;
} else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, response->
header.body_len, network_timeout, body_len)) != 0) header.body_len, network_timeout, body_len)) != 0)
{ {
response->error.length = snprintf(response->error.message, response->error.length = snprintf(response->error.message,
@ -264,7 +297,10 @@ int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response,
return 0; return 0;
} }
if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, expect_body_len, if (conn->comm_type == fc_comm_type_rdma) {
memcpy(recv_data, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->buff +
sizeof(SFCommonProtoHeader), response->header.body_len);
} else if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, expect_body_len,
network_timeout, &recv_bytes)) != 0) network_timeout, &recv_bytes)) != 0)
{ {
response->error.length = snprintf(response->error.message, response->error.length = snprintf(response->error.message,
@ -332,7 +368,10 @@ int sf_recv_vary_response(ConnectionInfo *conn, SFResponseInfo *response,
buffer->alloc_size = alloc_size; buffer->alloc_size = alloc_size;
} }
if ((result=tcprecvdata_nb_ex(conn->sock, buffer->buff, response-> if (conn->comm_type == fc_comm_type_rdma) {
memcpy(buffer->buff, G_RDMA_CONNECTION_CALLBACKS.get_buffer(conn)->
buff + sizeof(SFCommonProtoHeader), response->header.body_len);
} else if ((result=tcprecvdata_nb_ex(conn->sock, buffer->buff, response->
header.body_len, network_timeout, &recv_bytes)) != 0) header.body_len, network_timeout, &recv_bytes)) != 0)
{ {
response->error.length = snprintf(response->error.message, response->error.length = snprintf(response->error.message,
@ -353,13 +392,9 @@ int sf_send_and_recv_vary_response(ConnectionInfo *conn,
{ {
int result; int result;
if ((result=tcpsenddata_nb(conn->sock, send_data, if ((result=sf_proto_send_buf1(conn, send_data, send_len,
send_len, network_timeout)) != 0) response, network_timeout)) != 0)
{ {
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"send data fail, errno: %d, error info: %s",
result, STRERROR(result));
return result; return result;
} }
@ -566,6 +601,7 @@ int sf_proto_get_leader(ConnectionInfo *conn, const char *service_name,
memcpy(leader->conn.ip_addr, server_resp.ip_addr, IP_ADDRESS_SIZE); memcpy(leader->conn.ip_addr, server_resp.ip_addr, IP_ADDRESS_SIZE);
*(leader->conn.ip_addr + IP_ADDRESS_SIZE - 1) = '\0'; *(leader->conn.ip_addr + IP_ADDRESS_SIZE - 1) = '\0';
leader->conn.port = buff2short(server_resp.port); leader->conn.port = buff2short(server_resp.port);
leader->conn.comm_type = conn->comm_type;
} }
return result; return result;

View File

@ -475,6 +475,27 @@ static inline void sf_free_recv_buffer(SFProtoRecvBuffer *buffer)
} }
} }
static inline int sf_proto_send_buf1(ConnectionInfo *conn, char *data,
const int len, SFResponseInfo *response, const int network_timeout)
{
int result;
if (conn->comm_type == fc_comm_type_rdma) {
result = G_RDMA_CONNECTION_CALLBACKS.request_by_buf1(
conn, data, len, network_timeout * 1000);
} else {
result = tcpsenddata_nb(conn->sock, data, len, network_timeout);
}
if (result != 0) {
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"send data fail, errno: %d, error info: %s",
result, STRERROR(result));
}
return result;
}
int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data,
const int len, SFResponseInfo *response, const int network_timeout); const int len, SFResponseInfo *response, const int network_timeout);