From 4aeec5385aafe9fb801a21f544539c62e358a1bc Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 19 Feb 2021 21:05:38 +0800 Subject: [PATCH] sf_connection_manager impl all interfaces --- src/sf_connection_manager.c | 65 +++++++++++++++++++++++++++---------- src/sf_proto.c | 33 +++++++++++++++++++ src/sf_proto.h | 13 ++++++++ src/sf_types.h | 7 +++- 4 files changed, 99 insertions(+), 19 deletions(-) diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 5690f97..578e469 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -135,6 +135,15 @@ static ConnectionInfo *get_connection(SFConnectionManager *cm, return NULL; } +static inline void set_connection_params(ConnectionInfo *conn, + SFCMServerEntry *server, SFCMServerPtrArray *old_alives) +{ + SFConnectionParameters *cparam; + cparam = (SFConnectionParameters *)conn->args; + cparam->cm.sentry = server; + cparam->cm.old_alives = old_alives; +} + static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, SFCMConnGroupEntry *group, int *err_no) { @@ -146,6 +155,13 @@ static inline ConnectionInfo *make_master_connection(SFConnectionManager *cm, if ((conn=make_connection(cm, master->addr_array, err_no)) != NULL) { + if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) { + set_connection_params(conn, master, NULL); + } else { + SFCMServerPtrArray *alives; + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + set_connection_params(conn, master, alives); + } return conn; } @@ -222,6 +238,8 @@ static inline ConnectionInfo *make_readable_connection(SFConnectionManager *cm, addr_array, err_no)) == NULL) { remove_from_alives(cm, group, alives, alives->servers[index]); + } else { + set_connection_params(conn, alives->servers[index], alives); } return conn; @@ -316,9 +334,11 @@ static ConnectionInfo *get_readable_connection(SFConnectionManager *cm, static void release_connection(SFConnectionManager *cm, ConnectionInfo *conn) { - //TODO - if (((SFConnectionParameters *)conn->args)->group_id > 0) { - ((SFConnectionParameters *)conn->args)->group_id = 0; + SFConnectionParameters *cparam; + cparam = (SFConnectionParameters *)conn->args; + if (cparam->cm.sentry != NULL) { + cparam->cm.sentry = NULL; + cparam->cm.old_alives = NULL; } conn_pool_close_connection_ex(&cm->cpool, conn, false); @@ -326,12 +346,21 @@ static void release_connection(SFConnectionManager *cm, static void close_connection(SFConnectionManager *cm, ConnectionInfo *conn) { - //TODO - if (((SFConnectionParameters *)conn->args)->group_id > 0) { - int group_index; - group_index = ((SFConnectionParameters *)conn->args)-> - group_id - 1; - ((SFConnectionParameters *)conn->args)->group_id = 0; + SFConnectionParameters *cparam; + SFCMServerEntry *server; + SFCMConnGroupEntry *group; + + cparam = (SFConnectionParameters *)conn->args; + if (cparam->cm.sentry != NULL) { + server = cparam->cm.sentry; + group = cm->groups.entries + server->group_index; + if (cm->common_cfg->read_rule == sf_data_read_rule_master_only) { + __sync_bool_compare_and_swap(&group->master, server, NULL); + } else { + remove_from_alives(cm, group, cparam->cm.old_alives, server); + } + cparam->cm.sentry = NULL; + cparam->cm.old_alives = NULL; } conn_pool_close_connection_ex(&cm->cpool, conn, true); @@ -359,20 +388,20 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm, break; } - /* - if ((*err_no=fs_client_proto_get_leader(client_ctx, - conn, &leader)) != 0) - { - close_connection(cm, conn); - break; - } - */ + if ((*err_no=sf_proto_get_leader(conn, cm->common_cfg-> + network_timeout, &leader)) != 0) + { + close_connection(cm, conn); + break; + } if (FC_CONNECTION_SERVER_EQUAL1(*conn, leader.conn)) { return conn; } release_connection(cm, conn); - if ((conn=get_spec_connection(cm, &leader.conn, err_no)) == NULL) { + if ((conn=get_spec_connection(cm,&leader.conn, + err_no)) == NULL) + { break; } diff --git a/src/sf_proto.c b/src/sf_proto.c index 8448203..61bbf53 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -300,6 +300,10 @@ const char *sf_get_cmd_caption(const int cmd) return "GET_GROUP_SERVERS_REQ"; case SF_SERVICE_PROTO_GET_GROUP_SERVERS_RESP: return "GET_GROUP_SERVERS_RESP"; + case SF_SERVICE_PROTO_GET_LEADER_REQ: + return "GET_LEADER_REQ"; + case SF_SERVICE_PROTO_GET_LEADER_RESP: + return "GET_LEADER_RESP"; default: return "UNKOWN"; } @@ -437,3 +441,32 @@ int sf_proto_get_group_servers(ConnectionInfo *conn, return 0; } + +int sf_proto_get_leader(ConnectionInfo *conn, + const int network_timeout, + SFClientServerEntry *leader) +{ + int result; + SFCommonProtoHeader *header; + SFResponseInfo response; + SFProtoGetServerResp server_resp; + char out_buff[sizeof(SFCommonProtoHeader)]; + + header = (SFCommonProtoHeader *)out_buff; + SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_GET_LEADER_REQ, + sizeof(out_buff) - sizeof(SFCommonProtoHeader)); + if ((result=sf_send_and_recv_response(conn, out_buff, + sizeof(out_buff), &response, network_timeout, + SF_SERVICE_PROTO_GET_LEADER_RESP, (char *)&server_resp, + sizeof(SFProtoGetServerResp))) != 0) + { + sf_log_network_error(&response, conn, result); + } else { + leader->server_id = buff2int(server_resp.server_id); + memcpy(leader->conn.ip_addr, server_resp.ip_addr, IP_ADDRESS_SIZE); + *(leader->conn.ip_addr + IP_ADDRESS_SIZE - 1) = '\0'; + leader->conn.port = buff2short(server_resp.port); + } + + return result; +} diff --git a/src/sf_proto.h b/src/sf_proto.h index db0efdc..c0c76cc 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -30,6 +30,8 @@ //for connection manager #define SF_SERVICE_PROTO_GET_GROUP_SERVERS_REQ 111 #define SF_SERVICE_PROTO_GET_GROUP_SERVERS_RESP 112 +#define SF_SERVICE_PROTO_GET_LEADER_REQ 113 +#define SF_SERVICE_PROTO_GET_LEADER_RESP 114 #define SF_PROTO_ACK 116 @@ -110,6 +112,13 @@ typedef struct sf_proto_get_group_servers_resp_body_part { char padding[2]; } SFProtoGetGroupServersRespBodyPart; +typedef struct sf_proto_get_server_resp { + char ip_addr[IP_ADDRESS_SIZE]; + char server_id[4]; + char port[2]; + char padding[2]; +} SFProtoGetServerResp; + typedef struct sf_proto_idempotency_additional_header { char req_id[8]; } SFProtoIdempotencyAdditionalHeader; @@ -364,6 +373,10 @@ int sf_proto_get_group_servers(ConnectionInfo *conn, const int network_timeout, const int group_id, SFGroupServerArray *sarray); +int sf_proto_get_leader(ConnectionInfo *conn, + const int network_timeout, + SFClientServerEntry *leader); + #define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ do { \ if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \ diff --git a/src/sf_types.h b/src/sf_types.h index 63381cc..2f86411 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -171,9 +171,14 @@ typedef struct sf_client_common_config { SFNetRetryConfig net_retry_cfg; } SFClientCommonConfig; +struct sf_cm_server_entry; +struct sf_cm_server_ptr_array; typedef struct sf_connection_parameters { int buffer_size; - int group_id; + struct { + struct sf_cm_server_entry *sentry; + struct sf_cm_server_ptr_array *old_alives; + } cm; //for connection manager struct idempotency_client_channel *channel; } SFConnectionParameters;