sf_connection_manager impl all interfaces

connection_manager
YuQing 2021-02-19 21:05:38 +08:00
parent 85f76e2f47
commit 4aeec5385a
4 changed files with 99 additions and 19 deletions

View File

@ -135,6 +135,15 @@ static ConnectionInfo *get_connection(SFConnectionManager *cm,
return NULL;
}
static inline void set_connection_params(ConnectionInfo *conn,
SFCMServerEntry *server, SFCMServerPtrArray *old_alives)
{
SFConnectionParameters *cparam;
cparam = (SFConnectionParameters *)conn->args;
cparam->cm.sentry = server;
cparam->cm.old_alives = old_alives;
}
static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
SFCMConnGroupEntry *group, int *err_no)
{
@ -146,6 +155,13 @@ static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm,
if ((conn=make_connection(cm, master->addr_array,
err_no)) != NULL)
{
if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) {
set_connection_params(conn, master, NULL);
} else {
SFCMServerPtrArray *alives;
alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives);
set_connection_params(conn, master, alives);
}
return conn;
}
@ -222,6 +238,8 @@ static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm,
addr_array, err_no)) == NULL)
{
remove_from_alives(cm, group, alives, alives->servers[index]);
} else {
set_connection_params(conn, alives->servers[index], alives);
}
return conn;
@ -316,9 +334,11 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm,
static void release_connection(SFConnectionManager *cm,
ConnectionInfo *conn)
{
//TODO
if (((SFConnectionParameters *)conn->args)->group_id > 0) {
((SFConnectionParameters *)conn->args)->group_id = 0;
SFConnectionParameters *cparam;
cparam = (SFConnectionParameters *)conn->args;
if (cparam->cm.sentry != NULL) {
cparam->cm.sentry = NULL;
cparam->cm.old_alives = NULL;
}
conn_pool_close_connection_ex(&cm->cpool, conn, false);
@ -326,12 +346,21 @@ static void release_connection(SFConnectionManager *cm,
static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn)
{
//TODO
if (((SFConnectionParameters *)conn->args)->group_id > 0) {
int group_index;
group_index = ((SFConnectionParameters *)conn->args)->
group_id - 1;
((SFConnectionParameters *)conn->args)->group_id = 0;
SFConnectionParameters *cparam;
SFCMServerEntry *server;
SFCMConnGroupEntry *group;
cparam = (SFConnectionParameters *)conn->args;
if (cparam->cm.sentry != NULL) {
server = cparam->cm.sentry;
group = cm->groups.entries + server->group_index;
if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) {
__sync_bool_compare_and_swap(&group->master, server, NULL);
} else {
remove_from_alives(cm, group, cparam->cm.old_alives, server);
}
cparam->cm.sentry = NULL;
cparam->cm.old_alives = NULL;
}
conn_pool_close_connection_ex(&cm->cpool, conn, true);
@ -359,20 +388,20 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm,
break;
}
/*
if ((*err_no=fs_client_proto_get_leader(client_ctx,
conn, &leader)) != 0)
{
close_connection(cm, conn);
break;
}
*/
if ((*err_no=sf_proto_get_leader(conn, cm->common_cfg->
network_timeout, &leader)) != 0)
{
close_connection(cm, conn);
break;
}
if (FC_CONNECTION_SERVER_EQUAL1(*conn, leader.conn)) {
return conn;
}
release_connection(cm, conn);
if ((conn=get_spec_connection(cm, &leader.conn, err_no)) == NULL) {
if ((conn=get_spec_connection(cm,&leader.conn,
err_no)) == NULL)
{
break;
}

View File

@ -300,6 +300,10 @@ const char *sf_get_cmd_caption(const int cmd)
return "GET_GROUP_SERVERS_REQ";
case SF_SERVICE_PROTO_GET_GROUP_SERVERS_RESP:
return "GET_GROUP_SERVERS_RESP";
case SF_SERVICE_PROTO_GET_LEADER_REQ:
return "GET_LEADER_REQ";
case SF_SERVICE_PROTO_GET_LEADER_RESP:
return "GET_LEADER_RESP";
default:
return "UNKOWN";
}
@ -437,3 +441,32 @@ int sf_proto_get_group_servers(ConnectionInfo *conn,
return 0;
}
int sf_proto_get_leader(ConnectionInfo *conn,
const int network_timeout,
SFClientServerEntry *leader)
{
int result;
SFCommonProtoHeader *header;
SFResponseInfo response;
SFProtoGetServerResp server_resp;
char out_buff[sizeof(SFCommonProtoHeader)];
header = (SFCommonProtoHeader *)out_buff;
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_GET_LEADER_REQ,
sizeof(out_buff) - sizeof(SFCommonProtoHeader));
if ((result=sf_send_and_recv_response(conn, out_buff,
sizeof(out_buff), &response, network_timeout,
SF_SERVICE_PROTO_GET_LEADER_RESP, (char *)&server_resp,
sizeof(SFProtoGetServerResp))) != 0)
{
sf_log_network_error(&response, conn, result);
} else {
leader->server_id = buff2int(server_resp.server_id);
memcpy(leader->conn.ip_addr, server_resp.ip_addr, IP_ADDRESS_SIZE);
*(leader->conn.ip_addr + IP_ADDRESS_SIZE - 1) = '\0';
leader->conn.port = buff2short(server_resp.port);
}
return result;
}

View File

@ -30,6 +30,8 @@
//for connection manager
#define SF_SERVICE_PROTO_GET_GROUP_SERVERS_REQ 111
#define SF_SERVICE_PROTO_GET_GROUP_SERVERS_RESP 112
#define SF_SERVICE_PROTO_GET_LEADER_REQ 113
#define SF_SERVICE_PROTO_GET_LEADER_RESP 114
#define SF_PROTO_ACK 116
@ -110,6 +112,13 @@ typedef struct sf_proto_get_group_servers_resp_body_part {
char padding[2];
} SFProtoGetGroupServersRespBodyPart;
typedef struct sf_proto_get_server_resp {
char ip_addr[IP_ADDRESS_SIZE];
char server_id[4];
char port[2];
char padding[2];
} SFProtoGetServerResp;
typedef struct sf_proto_idempotency_additional_header {
char req_id[8];
} SFProtoIdempotencyAdditionalHeader;
@ -364,6 +373,10 @@ int sf_proto_get_group_servers(ConnectionInfo *conn,
const int network_timeout, const int group_id,
SFGroupServerArray *sarray);
int sf_proto_get_leader(ConnectionInfo *conn,
const int network_timeout,
SFClientServerEntry *leader);
#define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \
do { \
if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \

View File

@ -171,9 +171,14 @@ typedef struct sf_client_common_config {
SFNetRetryConfig net_retry_cfg;
} SFClientCommonConfig;
struct sf_cm_server_entry;
struct sf_cm_server_ptr_array;
typedef struct sf_connection_parameters {
int buffer_size;
int group_id;
struct {
struct sf_cm_server_entry *sentry;
struct sf_cm_server_ptr_array *old_alives;
} cm; //for connection manager
struct idempotency_client_channel *channel;
} SFConnectionParameters;