tracker support multi ip

multi_ipaddr
YuQing 2019-09-30 19:46:16 +08:00
parent 4af6511d3f
commit 2b11a518d4
11 changed files with 523 additions and 299 deletions

View File

@ -68,7 +68,7 @@ static void insert_into_sorted_servers(TrackerServerGroup *pTrackerGroup, \
memcpy(pDestServer, pInsertedServer, sizeof(ConnectionInfo)); memcpy(pDestServer, pInsertedServer, sizeof(ConnectionInfo));
} }
static int copy_tracker_servers(TrackerServerGroup *pTrackerGroup, \ static int copy_tracker_servers(TrackerServerGroup *pTrackerGroup,
const char *filename, char **ppTrackerServers) const char *filename, char **ppTrackerServers)
{ {
char **ppSrc; char **ppSrc;
@ -86,7 +86,7 @@ static int copy_tracker_servers(TrackerServerGroup *pTrackerGroup, \
pTrackerGroup->server_count = 0; pTrackerGroup->server_count = 0;
for (ppSrc=ppTrackerServers; ppSrc<ppEnd; ppSrc++) for (ppSrc=ppTrackerServers; ppSrc<ppEnd; ppSrc++)
{ {
if ((pSeperator=strchr(*ppSrc, ':')) == NULL) if ((pSeperator=strrchr(*ppSrc, ':')) == NULL)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, " \
"conf file \"%s\", " \ "conf file \"%s\", " \
@ -148,6 +148,7 @@ int fdfs_load_tracker_group_ex(TrackerServerGroup *pTrackerGroup, \
const char *conf_filename, IniContext *pIniContext) const char *conf_filename, IniContext *pIniContext)
{ {
int result; int result;
int bytes;
char *ppTrackerServers[FDFS_MAX_TRACKERS]; char *ppTrackerServers[FDFS_MAX_TRACKERS];
if ((pTrackerGroup->server_count=iniGetValues(NULL, "tracker_server", \ if ((pTrackerGroup->server_count=iniGetValues(NULL, "tracker_server", \
@ -160,20 +161,17 @@ int fdfs_load_tracker_group_ex(TrackerServerGroup *pTrackerGroup, \
return ENOENT; return ENOENT;
} }
pTrackerGroup->servers = (ConnectionInfo *)malloc( \ bytes = sizeof(MultiConnectionInfo) * pTrackerGroup->server_count);
sizeof(ConnectionInfo) * pTrackerGroup->server_count); pTrackerGroup->servers = (MultiConnectionInfo *)malloc(bytes);
if (pTrackerGroup->servers == NULL) if (pTrackerGroup->servers == NULL)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__, \ "malloc %d bytes fail", __LINE__, bytes);
(int)sizeof(ConnectionInfo) * \
pTrackerGroup->server_count);
pTrackerGroup->server_count = 0; pTrackerGroup->server_count = 0;
return errno != 0 ? errno : ENOMEM; return errno != 0 ? errno : ENOMEM;
} }
memset(pTrackerGroup->servers, 0, \ memset(pTrackerGroup->servers, 0, bytes);
sizeof(ConnectionInfo) * pTrackerGroup->server_count);
if ((result=copy_tracker_servers(pTrackerGroup, conf_filename, \ if ((result=copy_tracker_servers(pTrackerGroup, conf_filename, \
ppTrackerServers)) != 0) ppTrackerServers)) != 0)
{ {

View File

@ -9,6 +9,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <limits.h> #include <limits.h>
#include <netdb.h>
#include "fastcommon/logger.h" #include "fastcommon/logger.h"
#include "fastcommon/sockopt.h" #include "fastcommon/sockopt.h"
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
@ -22,11 +23,97 @@ static FDFSStorageIdInfo **g_storage_ids_by_ip_port = NULL; //sorted by storage
int g_storage_id_count = 0; int g_storage_id_count = 0;
bool fdfs_server_contain(TrackerServerInfo *pServerInfo,
const char *target_ip, const int target_port)
{
ConnectionInfo *conn;
ConnectionInfo *end;
if (pServerInfo->count == 1)
{
return FC_CONNECTION_SERVER_EQUAL(pServerInfo->connections[0],
target_ip, target_port);
}
else if (pServerInfo->count == 2)
{
return FC_CONNECTION_SERVER_EQUAL(pServerInfo->connections[0],
target_ip, target_port) ||
FC_CONNECTION_SERVER_EQUAL(pServerInfo->connections[1],
target_ip, target_port);
}
end = pServerInfo->connections + pServerInfo->count;
for (conn=pServerInfo->connections; conn<end; conn++)
{
if (FC_CONNECTION_SERVER_EQUAL(*conn, target_ip, target_port))
{
return true;
}
}
return false;
}
bool fdfs_server_contain_ex(TrackerServerInfo *pServer1,
TrackerServerInfo *pServer2)
{
ConnectionInfo *conn;
ConnectionInfo *end;
if (pServer1->count == 1)
{
return fdfs_server_contain1(pServer2, pServer1->connections + 0);
}
else if (pServer1->count == 2)
{
if (fdfs_server_contain1(pServer2, pServer1->connections + 0))
{
return true;
}
return fdfs_server_contain1(pServer2, pServer1->connections + 1);
}
end = pServer1->connections + pServer1->count;
for (conn=pServer1->connections; conn<end; conn++)
{
if (fdfs_server_contain1(pServer2, conn))
{
return true;
}
}
return false;
}
void fdfs_server_sock_reset(TrackerServerInfo *pServerInfo)
{
ConnectionInfo *conn;
ConnectionInfo *end;
if (pServerInfo->count == 1)
{
pServerInfo->connections[0].sock = -1;
}
else if (pServerInfo->count == 2)
{
pServerInfo->connections[0].sock = -1;
pServerInfo->connections[1].sock = -1;
}
else
{
end = pServerInfo->connections + pServerInfo->count;
for (conn=pServerInfo->connections; conn<end; conn++)
{
conn->sock = -1;
}
}
}
int fdfs_get_tracker_leader_index_ex(TrackerServerGroup *pServerGroup, \ int fdfs_get_tracker_leader_index_ex(TrackerServerGroup *pServerGroup, \
const char *leaderIp, const int leaderPort) const char *leaderIp, const int leaderPort)
{ {
ConnectionInfo *pServer; TrackerServerInfo *pServer;
ConnectionInfo *pEnd; TrackerServerInfo *pEnd;
if (pServerGroup->server_count == 0) if (pServerGroup->server_count == 0)
{ {
@ -36,8 +123,7 @@ int fdfs_get_tracker_leader_index_ex(TrackerServerGroup *pServerGroup, \
pEnd = pServerGroup->servers + pServerGroup->server_count; pEnd = pServerGroup->servers + pServerGroup->server_count;
for (pServer=pServerGroup->servers; pServer<pEnd; pServer++) for (pServer=pServerGroup->servers; pServer<pEnd; pServer++)
{ {
if (strcmp(pServer->ip_addr, leaderIp) == 0 && \ if (fdfs_server_contain(pServer, leaderIp, leaderPort))
pServer->port == leaderPort)
{ {
return pServer - pServerGroup->servers; return pServer - pServerGroup->servers;
} }
@ -691,7 +777,7 @@ int fdfs_load_storage_ids(char *content, const char *pStorageIdsFilename)
return result; return result;
} }
int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer) int fdfs_get_storage_ids_from_tracker_server(TrackerServerInfo *pTrackerServer)
{ {
#define MAX_REQUEST_LOOP 32 #define MAX_REQUEST_LOOP 32
TrackerHeader *pHeader; TrackerHeader *pHeader;
@ -730,21 +816,19 @@ int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
while (1) while (1)
{ {
int2buff(start_index, p); int2buff(start_index, p);
if ((result=tcpsenddata_nb(conn->sock, out_buff, \ if ((result=tcpsenddata_nb(conn->sock, out_buff,
sizeof(out_buff), g_fdfs_network_timeout)) != 0) sizeof(out_buff), g_fdfs_network_timeout)) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"send data to tracker server %s:%d fail, " \ "send data to tracker server %s:%d fail, "
"errno: %d, error info: %s", __LINE__, \ "errno: %d, error info: %s", __LINE__,
pTrackerServer->ip_addr, \ conn->ip_addr, conn->port,
pTrackerServer->port, \
result, STRERROR(result)); result, STRERROR(result));
} }
else else
{ {
response = NULL; response = NULL;
result = fdfs_recv_response(conn, \ result = fdfs_recv_response(conn, &response, 0, &in_bytes);
&response, 0, &in_bytes);
if (result != 0) if (result != 0)
{ {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
@ -760,11 +844,10 @@ int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
if (in_bytes < 2 * sizeof(int)) if (in_bytes < 2 * sizeof(int))
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, recv data length: %d "\ "tracker server %s:%d, recv data length: %d "
"is invalid", __LINE__, "is invalid", __LINE__,
pTrackerServer->ip_addr, \ conn->ip_addr, conn->port, (int)in_bytes);
pTrackerServer->port, (int)in_bytes);
result = EINVAL; result = EINVAL;
break; break;
} }
@ -773,22 +856,21 @@ int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
current_count = buff2int(response + sizeof(int)); current_count = buff2int(response + sizeof(int));
if (total_count <= start_index) if (total_count <= start_index)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, total storage " \ "tracker server %s:%d, total storage "
"count: %d is invalid, which <= start " \ "count: %d is invalid, which <= start "
"index: %d", __LINE__, pTrackerServer->ip_addr,\ "index: %d", __LINE__, conn->ip_addr,
pTrackerServer->port, total_count, start_index); conn->port, total_count, start_index);
result = EINVAL; result = EINVAL;
break; break;
} }
if (current_count <= 0) if (current_count <= 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, current storage " \ "tracker server %s:%d, current storage "
"count: %d is invalid, which <= 0", \ "count: %d is invalid, which <= 0", __LINE__,
__LINE__, pTrackerServer->ip_addr,\ conn->ip_addr, conn->port, current_count);
pTrackerServer->port, current_count);
result = EINVAL; result = EINVAL;
break; break;
} }
@ -814,8 +896,8 @@ int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, " \
"response data from tracker " \ "response data from tracker " \
"server %s:%d is too large", \ "server %s:%d is too large", \
__LINE__, pTrackerServer->ip_addr,\ __LINE__, conn->ip_addr,\
pTrackerServer->port); conn->port);
result = ENOSPC; result = ENOSPC;
break; break;
} }
@ -874,11 +956,11 @@ int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup) int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)
{ {
ConnectionInfo *pGServer; TrackerServerInfo *pGServer;
ConnectionInfo *pTServer; TrackerServerInfo *pTServer;
ConnectionInfo *pServerStart; TrackerServerInfo *pServerStart;
ConnectionInfo *pServerEnd; TrackerServerInfo *pServerEnd;
ConnectionInfo trackerServer; TrackerServerInfo trackerServer;
int result; int result;
int leader_index; int leader_index;
int i; int i;
@ -901,8 +983,8 @@ int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)
{ {
for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++) for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++)
{ {
memcpy(pTServer, pGServer, sizeof(ConnectionInfo)); memcpy(pTServer, pGServer, sizeof(TrackerServerInfo));
pTServer->sock = -1; fdfs_server_sock_reset(pTServer);
result = fdfs_get_storage_ids_from_tracker_server(pTServer); result = fdfs_get_storage_ids_from_tracker_server(pTServer);
if (result == 0) if (result == 0)
{ {
@ -1036,3 +1118,54 @@ void fdfs_set_log_rotate_size(LogContext *pContext, const int64_t log_rotate_siz
} }
} }
int fdfs_parse_server_info(char *server_str, const int default_port,
TrackerServerInfo *pServer)
{
char *pColon;
char *hosts[FDFS_MULTI_IP_MAX_COUNT];
ConnectionInfo *conn;
int port;
int i;
memset(pServer, 0, sizeof(TrackerServerInfo));
if ((pColon=strrchr(server_str, ':')) == NULL)
{
logInfo("file: "__FILE__", line: %d, "
"no port part in %s, set port to %d",
__LINE__, server_str, default_port);
port = default_port;
}
else
{
port = atoi(pColon + 1);
}
*pColon = '\0';
conn = pServer->connections;
pServer->count = splitEx(server_str, ',',
hosts, FDFS_MULTI_IP_MAX_COUNT);
if (pServer->count == 1)
{
if (getIpaddrByName(hosts[0], conn->ip_addr,
sizeof(conn->ip_addr)) == INADDR_NONE)
{
logError("file: "__FILE__", line: %d, "
"host \"%s\" is invalid, error info: %s",
__LINE__, hosts[0], hstrerror(h_errno));
return EINVAL;
}
conn->port = port;
conn->sock = -1;
return 0;
}
for (i=0; i<pServer->count; i++)
{
snprintf(conn->ip_addr, sizeof(conn->ip_addr), "%s", hosts[i]);
conn->port = port;
conn->sock = -1;
conn++;
}
return 0;
}

View File

@ -66,7 +66,7 @@ FDFSStorageIdInfo *fdfs_get_storage_id_by_ip_port(const char *pIpAddr,
int fdfs_check_storage_id(const char *group_name, const char *id); int fdfs_check_storage_id(const char *group_name, const char *id);
int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer); int fdfs_get_storage_ids_from_tracker_server(TrackerServerInfo *pTrackerServer);
int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup); int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup);
@ -80,6 +80,23 @@ void fdfs_connection_pool_destroy();
void fdfs_set_log_rotate_size(LogContext *pContext, const int64_t log_rotate_size); void fdfs_set_log_rotate_size(LogContext *pContext, const int64_t log_rotate_size);
bool fdfs_server_contain(TrackerServerInfo *pServerInfo,
const char *target_ip, const int target_port);
static inline bool fdfs_server_contain1(TrackerServerInfo *pServerInfo,
const ConnectionInfo *target)
{
return fdfs_server_contain(pServerInfo, target->ip_addr, target->port);
}
bool fdfs_server_contain_ex(TrackerServerInfo *pServer1,
TrackerServerInfo *pServer2);
void fdfs_server_sock_reset(TrackerServerInfo *pServerInfo);
int fdfs_parse_server_info(char *server_str, const int default_port,
TrackerServerInfo *pServer);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -409,8 +409,9 @@ static int fdfs_dump_global_vars(char *buff, const int buffSize)
static int fdfs_dump_tracker_servers(char *buff, const int buffSize) static int fdfs_dump_tracker_servers(char *buff, const int buffSize)
{ {
int total_len; int total_len;
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
ConnectionInfo *pTrackerEnd; TrackerServerInfo *pTrackerEnd;
ConnectionInfo *conn;
total_len = snprintf(buff, buffSize, \ total_len = snprintf(buff, buffSize, \
"g_tracker_servers.server_count=%d, " \ "g_tracker_servers.server_count=%d, " \
@ -426,10 +427,11 @@ static int fdfs_dump_tracker_servers(char *buff, const int buffSize)
for (pTrackerServer=g_tracker_servers.servers; \ for (pTrackerServer=g_tracker_servers.servers; \
pTrackerServer<pTrackerEnd; pTrackerServer++) pTrackerServer<pTrackerEnd; pTrackerServer++)
{ {
conn = pTrackerServer->connections;
total_len += snprintf(buff + total_len, buffSize - total_len, total_len += snprintf(buff + total_len, buffSize - total_len,
"\t%d. tracker server=%s:%d\n", \ "\t%d. tracker server=%s:%d\n", \
(int)(pTrackerServer - g_tracker_servers.servers) + 1, \ (int)(pTrackerServer - g_tracker_servers.servers) + 1, \
pTrackerServer->ip_addr, pTrackerServer->port); conn->ip_addr, conn->port);
} }
return total_len; return total_len;

View File

@ -107,7 +107,7 @@
#define STORAGE_ITEM_LAST_HEART_BEAT_TIME "last_heart_beat_time" #define STORAGE_ITEM_LAST_HEART_BEAT_TIME "last_heart_beat_time"
TrackerServerGroup g_tracker_servers = {0, 0, -1, NULL}; TrackerServerGroup g_tracker_servers = {0, 0, -1, NULL};
ConnectionInfo *g_last_tracker_servers = NULL; //for delay free TrackerServerInfo *g_last_tracker_servers = NULL; //for delay free
int g_next_leader_index = -1; //next leader index int g_next_leader_index = -1; //next leader index
int g_trunk_server_chg_count = 1; //for notify other trackers int g_trunk_server_chg_count = 1; //for notify other trackers
int g_tracker_leader_chg_count = 0; //for notify storage servers int g_tracker_leader_chg_count = 0; //for notify storage servers
@ -3901,13 +3901,13 @@ static int tracker_mem_get_one_sys_file(ConnectionInfo *pTrackerServer, \
return result; return result;
} }
static int tracker_mem_get_sys_files(ConnectionInfo *pTrackerServer) static int tracker_mem_get_sys_files(TrackerServerInfo *pTrackerServer)
{ {
ConnectionInfo *conn; ConnectionInfo *conn;
int result; int result;
int index; int index;
pTrackerServer->sock = -1; fdfs_server_sock_reset(pTrackerServer);
if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL) if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL)
{ {
return result; return result;
@ -3963,15 +3963,15 @@ static int tracker_mem_cmp_tracker_running_status(const void *p1, const void *p2
static int tracker_mem_first_add_tracker_servers(FDFSStorageJoinBody *pJoinBody) static int tracker_mem_first_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
{ {
ConnectionInfo *pLocalTracker; TrackerServerInfo *pLocalTracker;
ConnectionInfo *pLocalEnd; TrackerServerInfo *pLocalEnd;
ConnectionInfo *servers; TrackerServerInfo *servers;
int tracker_count; int tracker_count;
int bytes; int bytes;
tracker_count = pJoinBody->tracker_count; tracker_count = pJoinBody->tracker_count;
bytes = sizeof(ConnectionInfo) * tracker_count; bytes = sizeof(TrackerServerInfo) * tracker_count;
servers = (ConnectionInfo *)malloc(bytes); servers = (TrackerServerInfo *)malloc(bytes);
if (servers == NULL) if (servers == NULL)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, " \
@ -3986,7 +3986,7 @@ static int tracker_mem_first_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
for (pLocalTracker=servers; pLocalTracker<pLocalEnd; \ for (pLocalTracker=servers; pLocalTracker<pLocalEnd; \
pLocalTracker++) pLocalTracker++)
{ {
pLocalTracker->sock = -1; fdfs_server_sock_reset(pLocalTracker);
} }
g_tracker_servers.servers = servers; g_tracker_servers.servers = servers;
@ -3996,27 +3996,25 @@ static int tracker_mem_first_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
static int tracker_mem_check_add_tracker_servers(FDFSStorageJoinBody *pJoinBody) static int tracker_mem_check_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
{ {
ConnectionInfo *pJoinTracker; TrackerServerInfo *pJoinTracker;
ConnectionInfo *pJoinEnd; TrackerServerInfo *pJoinEnd;
ConnectionInfo *pLocalTracker; TrackerServerInfo *pLocalTracker;
ConnectionInfo *pLocalEnd; TrackerServerInfo *pLocalEnd;
ConnectionInfo *pNewServer; TrackerServerInfo *pNewServer;
ConnectionInfo *new_servers; TrackerServerInfo *new_servers;
int add_count; int add_count;
int bytes; int bytes;
add_count = 0; add_count = 0;
pLocalEnd = g_tracker_servers.servers + g_tracker_servers.server_count; pLocalEnd = g_tracker_servers.servers + g_tracker_servers.server_count;
pJoinEnd = pJoinBody->tracker_servers + pJoinBody->tracker_count; pJoinEnd = pJoinBody->tracker_servers + pJoinBody->tracker_count;
for (pJoinTracker=pJoinBody->tracker_servers; \ for (pJoinTracker=pJoinBody->tracker_servers;
pJoinTracker<pJoinEnd; pJoinTracker++) pJoinTracker<pJoinEnd; pJoinTracker++)
{ {
for (pLocalTracker=g_tracker_servers.servers; \ for (pLocalTracker=g_tracker_servers.servers;
pLocalTracker<pLocalEnd; pLocalTracker++) pLocalTracker<pLocalEnd; pLocalTracker++)
{ {
if (pJoinTracker->port == pLocalTracker->port && \ if (fdfs_server_contain_ex(pJoinTracker, pLocalTracker))
strcmp(pJoinTracker->ip_addr, \
pLocalTracker->ip_addr) == 0)
{ {
break; break;
} }
@ -4043,36 +4041,34 @@ static int tracker_mem_check_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
if (g_tracker_servers.server_count + add_count > FDFS_MAX_TRACKERS) if (g_tracker_servers.server_count + add_count > FDFS_MAX_TRACKERS)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"too many tracker servers: %d", \ "too many tracker servers: %d",
__LINE__, g_tracker_servers.server_count + add_count); __LINE__, g_tracker_servers.server_count + add_count);
return ENOSPC; return ENOSPC;
} }
bytes = sizeof(ConnectionInfo) * (g_tracker_servers.server_count \ bytes = sizeof(TrackerServerInfo) * (g_tracker_servers.server_count
+ add_count); + add_count);
new_servers = (ConnectionInfo *)malloc(bytes); new_servers = (TrackerServerInfo *)malloc(bytes);
if (new_servers == NULL) if (new_servers == NULL)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail, " \ "malloc %d bytes fail, "
"errno: %d, error info: %s", \ "errno: %d, error info: %s",
__LINE__, bytes, errno, STRERROR(errno)); __LINE__, bytes, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM; return errno != 0 ? errno : ENOMEM;
} }
memcpy(new_servers, g_tracker_servers.servers, sizeof(ConnectionInfo)* \ memcpy(new_servers, g_tracker_servers.servers, sizeof(TrackerServerInfo) *
g_tracker_servers.server_count); g_tracker_servers.server_count);
pNewServer = new_servers + g_tracker_servers.server_count; pNewServer = new_servers + g_tracker_servers.server_count;
for (pJoinTracker=pJoinBody->tracker_servers; \ for (pJoinTracker=pJoinBody->tracker_servers;
pJoinTracker<pJoinEnd; pJoinTracker++) pJoinTracker<pJoinEnd; pJoinTracker++)
{ {
for (pLocalTracker=new_servers; \ for (pLocalTracker=new_servers;
pLocalTracker<pNewServer; pLocalTracker++) pLocalTracker<pNewServer; pLocalTracker++)
{ {
if (pJoinTracker->port == pLocalTracker->port && \ if (fdfs_server_contain_ex(pJoinTracker, pLocalTracker))
strcmp(pJoinTracker->ip_addr, \
pLocalTracker->ip_addr) == 0)
{ {
break; break;
} }
@ -4080,9 +4076,9 @@ static int tracker_mem_check_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
if (pLocalTracker == pNewServer) if (pLocalTracker == pNewServer)
{ {
memcpy(pNewServer, pJoinTracker, \ memcpy(pNewServer, pJoinTracker,
sizeof(ConnectionInfo)); sizeof(TrackerServerInfo));
pNewServer->sock = -1; fdfs_server_sock_reset(pNewServer);
pNewServer++; pNewServer++;
} }
} }
@ -4092,8 +4088,8 @@ static int tracker_mem_check_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
g_tracker_servers.servers = new_servers; g_tracker_servers.servers = new_servers;
g_tracker_servers.server_count += add_count; g_tracker_servers.server_count += add_count;
logInfo("file: "__FILE__", line: %d, " \ logInfo("file: "__FILE__", line: %d, "
"add %d tracker servers, total tracker servers: %d", \ "add %d tracker servers, total tracker servers: %d",
__LINE__, add_count, g_tracker_servers.server_count); __LINE__, add_count, g_tracker_servers.server_count);
return 0; return 0;
@ -4102,8 +4098,8 @@ static int tracker_mem_check_add_tracker_servers(FDFSStorageJoinBody *pJoinBody)
static int tracker_mem_get_tracker_server(FDFSStorageJoinBody *pJoinBody, \ static int tracker_mem_get_tracker_server(FDFSStorageJoinBody *pJoinBody, \
TrackerRunningStatus *pTrackerStatus) TrackerRunningStatus *pTrackerStatus)
{ {
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
ConnectionInfo *pTrackerEnd; TrackerServerInfo *pTrackerEnd;
TrackerRunningStatus *pStatus; TrackerRunningStatus *pStatus;
TrackerRunningStatus trackerStatus[FDFS_MAX_TRACKERS]; TrackerRunningStatus trackerStatus[FDFS_MAX_TRACKERS];
int count; int count;
@ -4115,11 +4111,11 @@ static int tracker_mem_get_tracker_server(FDFSStorageJoinBody *pJoinBody, \
pStatus = trackerStatus; pStatus = trackerStatus;
result = 0; result = 0;
pTrackerEnd = pJoinBody->tracker_servers + pJoinBody->tracker_count; pTrackerEnd = pJoinBody->tracker_servers + pJoinBody->tracker_count;
for (pTrackerServer=pJoinBody->tracker_servers; \ for (pTrackerServer=pJoinBody->tracker_servers;
pTrackerServer<pTrackerEnd; pTrackerServer++) pTrackerServer<pTrackerEnd; pTrackerServer++)
{ {
if (pTrackerServer->port == g_server_port && \ if (pTrackerServer->connections[0].port == g_server_port &&
is_local_host_ip(pTrackerServer->ip_addr)) is_local_host_ip(pTrackerServer->connections[0].ip_addr))
{ {
continue; continue;
} }
@ -4150,13 +4146,13 @@ static int tracker_mem_get_tracker_server(FDFSStorageJoinBody *pJoinBody, \
for (i=0; i<count; i++) for (i=0; i<count; i++)
{ {
logDebug("file: "__FILE__", line: %d, " \ logDebug("file: "__FILE__", line: %d, "
"%s:%d leader: %d, running time: %d, " \ "%s:%d leader: %d, running time: %d, "
"restart interval: %d", __LINE__, \ "restart interval: %d", __LINE__,
trackerStatus[i].pTrackerServer->ip_addr, \ trackerStatus[i].pTrackerServer->connections[0].ip_addr,
trackerStatus[i].pTrackerServer->port, \ trackerStatus[i].pTrackerServer->connections[0].port,
trackerStatus[i].if_leader, \ trackerStatus[i].if_leader,
trackerStatus[i].running_time, \ trackerStatus[i].running_time,
trackerStatus[i].restart_interval); trackerStatus[i].restart_interval);
} }
@ -4171,7 +4167,7 @@ static int tracker_mem_get_sys_files_from_others(FDFSStorageJoinBody *pJoinBody,
{ {
int result; int result;
TrackerRunningStatus trackerStatus; TrackerRunningStatus trackerStatus;
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
FDFSGroups newGroups; FDFSGroups newGroups;
FDFSGroups tempGroups; FDFSGroups tempGroups;
@ -4188,18 +4184,18 @@ static int tracker_mem_get_sys_files_from_others(FDFSStorageJoinBody *pJoinBody,
if (pRunningStatus != NULL) if (pRunningStatus != NULL)
{ {
if (tracker_mem_cmp_tracker_running_status(pRunningStatus, \ if (tracker_mem_cmp_tracker_running_status(pRunningStatus,
&trackerStatus) >= 0) &trackerStatus) >= 0)
{ {
logDebug("file: "__FILE__", line: %d, " \ logDebug("file: "__FILE__", line: %d, "
"%s:%d running time: %d, restart interval: %d, "\ "%s:%d running time: %d, restart interval: %d, "
"my running time: %d, restart interval: %d, " \ "my running time: %d, restart interval: %d, "
"do not need sync system files", __LINE__, \ "do not need sync system files", __LINE__,
trackerStatus.pTrackerServer->ip_addr, \ trackerStatus.pTrackerServer->connections[0].ip_addr,
trackerStatus.pTrackerServer->port, \ trackerStatus.pTrackerServer->connections[0].port,
trackerStatus.running_time, \ trackerStatus.running_time,
trackerStatus.restart_interval, \ trackerStatus.restart_interval,
pRunningStatus->running_time, \ pRunningStatus->running_time,
pRunningStatus->restart_interval); pRunningStatus->restart_interval);
return 0; return 0;
@ -4213,10 +4209,10 @@ static int tracker_mem_get_sys_files_from_others(FDFSStorageJoinBody *pJoinBody,
return result; return result;
} }
logInfo("file: "__FILE__", line: %d, " \ logInfo("file: "__FILE__", line: %d, "
"sys files loaded from tracker server %s:%d", \ "sys files loaded from tracker server %s:%d",
__LINE__, pTrackerServer->ip_addr, \ __LINE__, pTrackerServer->connections[0].ip_addr,
pTrackerServer->port); pTrackerServer->connections[0].port);
memset(&newGroups, 0, sizeof(newGroups)); memset(&newGroups, 0, sizeof(newGroups));
newGroups.store_lookup = g_groups.store_lookup; newGroups.store_lookup = g_groups.store_lookup;
@ -4903,7 +4899,7 @@ static int tracker_mem_get_trunk_binlog_size(
strcpy(storage_server.ip_addr, storage_ip); strcpy(storage_server.ip_addr, storage_ip);
storage_server.port = port; storage_server.port = port;
storage_server.sock = -1; storage_server.sock = -1;
if ((conn=tracker_connect_server(&storage_server, &result)) == NULL) if ((conn=tracker_make_connection(&storage_server, &result)) == NULL)
{ {
return result; return result;
} }

View File

@ -30,7 +30,7 @@ extern "C" {
#endif #endif
extern TrackerServerGroup g_tracker_servers; //save all tracker servers from storage server extern TrackerServerGroup g_tracker_servers; //save all tracker servers from storage server
extern ConnectionInfo *g_last_tracker_servers; //for delay free extern TrackerServerInfo *g_last_tracker_servers; //for delay free
extern int g_next_leader_index; //next leader index extern int g_next_leader_index; //next leader index
extern int g_tracker_leader_chg_count; //for notify storage servers extern int g_tracker_leader_chg_count; //for notify storage servers
extern int g_trunk_server_chg_count; //for notify other trackers extern int g_trunk_server_chg_count; //for notify other trackers

View File

@ -22,6 +22,7 @@
#include "fastcommon/sockopt.h" #include "fastcommon/sockopt.h"
#include "tracker_types.h" #include "tracker_types.h"
#include "tracker_proto.h" #include "tracker_proto.h"
#include "fdfs_shared_func.h"
int fdfs_recv_header(ConnectionInfo *pTrackerServer, int64_t *in_bytes) int fdfs_recv_header(ConnectionInfo *pTrackerServer, int64_t *in_bytes)
{ {
@ -212,7 +213,7 @@ int fdfs_deal_no_body_cmd_ex(const char *ip_addr, const int port, const int cmd)
strcpy(server_info.ip_addr, ip_addr); strcpy(server_info.ip_addr, ip_addr);
server_info.port = port; server_info.port = port;
server_info.sock = -1; server_info.sock = -1;
if ((conn=tracker_connect_server(&server_info, &result)) == NULL) if ((conn=tracker_make_connection(&server_info, &result)) == NULL)
{ {
return result; return result;
} }
@ -434,40 +435,106 @@ void tracker_disconnect_server_ex(ConnectionInfo *conn, \
} }
} }
ConnectionInfo *tracker_connect_server_ex(ConnectionInfo *pTrackerServer, \ ConnectionInfo *tracker_connect_server_ex(TrackerServerInfo *pServerInfo,
const int connect_timeout, int *err_no)
{
ConnectionInfo *conn;
ConnectionInfo *end;
ConnectionInfo *c;
int current_index;
c = tracker_make_connection(pServerInfo->connections + pServerInfo->index, err_no);
if (c != NULL)
{
return c;
}
if (pServerInfo->count == 1)
{
return NULL;
}
end = pServerInfo->connections + pServerInfo->count;
for (conn=pServerInfo->connections; conn<end; conn++)
{
current_index = conn - pServerInfo->connections;
if (current_index != pServerInfo->index)
{
if ((c=tracker_make_connection(conn, err_no)) != NULL)
{
pServerInfo->index = current_index;
return c;
}
}
}
return NULL;
}
ConnectionInfo *tracker_connect_server_no_pool_ex(TrackerServerInfo *pServerInfo,
const char *bind_addr, int *err_no)
{
ConnectionInfo *conn;
ConnectionInfo *end;
int current_index;
if (pServerInfo->connections[pServerInfo->index].sock >= 0)
{
*err_no = 0;
return pServerInfo->connections + pServerInfo->index;
}
*err_no = conn_pool_connect_server_ex(pServerInfo->connections
+ pServerInfo->index, g_fdfs_connect_timeout, bind_addr);
if (*err_no == 0)
{
return pServerInfo->connections + pServerInfo->index;
}
if (pServerInfo->count == 1)
{
return NULL;
}
end = pServerInfo->connections + pServerInfo->count;
for (conn=pServerInfo->connections; conn<end; conn++)
{
current_index = conn - pServerInfo->connections;
if (current_index != pServerInfo->index)
{
if ((*err_no=conn_pool_connect_server_ex(conn,
g_fdfs_connect_timeout, bind_addr)) == 0)
{
pServerInfo->index = current_index;
return pServerInfo->connections + pServerInfo->index;
}
}
}
return NULL;
}
ConnectionInfo *tracker_make_connection_ex(ConnectionInfo *conn,
const int connect_timeout, int *err_no) const int connect_timeout, int *err_no)
{ {
if (g_use_connection_pool) if (g_use_connection_pool)
{ {
return conn_pool_get_connection(&g_connection_pool, return conn_pool_get_connection(&g_connection_pool,
pTrackerServer, err_no); conn, err_no);
} }
else else
{ {
*err_no = conn_pool_connect_server(pTrackerServer, \ *err_no = conn_pool_connect_server(conn, connect_timeout);
connect_timeout);
if (*err_no != 0) if (*err_no != 0)
{ {
return NULL; return NULL;
} }
else else
{ {
return pTrackerServer; return conn;
} }
} }
} }
int tracker_connect_server_no_pool(ConnectionInfo *pTrackerServer)
{
if (pTrackerServer->sock >= 0)
{
return 0;
}
return conn_pool_connect_server(pTrackerServer, \
g_fdfs_connect_timeout);
}
static int fdfs_do_parameter_req(ConnectionInfo *pTrackerServer, \ static int fdfs_do_parameter_req(ConnectionInfo *pTrackerServer, \
char *buff, const int buff_size) char *buff, const int buff_size)
{ {
@ -515,11 +582,12 @@ int fdfs_get_ini_context_from_tracker(TrackerServerGroup *pTrackerGroup, \
IniContext *iniContext, bool * volatile continue_flag, \ IniContext *iniContext, bool * volatile continue_flag, \
const bool client_bind_addr, const char *bind_addr) const bool client_bind_addr, const char *bind_addr)
{ {
ConnectionInfo *pGlobalServer; ConnectionInfo *conn;
ConnectionInfo *pTServer; TrackerServerInfo *pGlobalServer;
ConnectionInfo *pServerStart; TrackerServerInfo *pServerStart;
ConnectionInfo *pServerEnd; TrackerServerInfo *pServerEnd;
ConnectionInfo trackerServer; TrackerServerInfo *pTServer;
TrackerServerInfo trackerServer;
char in_buff[1024]; char in_buff[1024];
int result; int result;
int leader_index; int leader_index;
@ -545,57 +613,56 @@ int fdfs_get_ini_context_from_tracker(TrackerServerGroup *pTrackerGroup, \
} }
do do
{ {
for (pGlobalServer=pServerStart; pGlobalServer<pServerEnd; \ conn = NULL;
pGlobalServer++) for (pGlobalServer=pServerStart; pGlobalServer<pServerEnd;
{ pGlobalServer++)
memcpy(pTServer, pGlobalServer, sizeof(ConnectionInfo)); {
for (i=0; i < 3; i++) memcpy(pTServer, pGlobalServer, sizeof(TrackerServerInfo));
{ fdfs_server_sock_reset(pTServer);
pTServer->sock = socketClientExAuto(pTServer->ip_addr, for (i=0; i < 3; i++)
pTServer->port, g_fdfs_connect_timeout,
O_NONBLOCK, bind_addr, &result);
if (pTServer->sock >= 0)
{ {
break; conn = tracker_connect_server_no_pool_ex(pTServer,
bind_addr, &result);
if (conn != NULL)
{
break;
}
sleep(1);
} }
sleep(1); if (conn == NULL)
} {
continue;
}
if (pTServer->sock < 0) result = fdfs_do_parameter_req(conn, in_buff, sizeof(in_buff));
{ if (result == 0)
continue; {
} result = iniLoadFromBuffer(in_buff, iniContext);
result = fdfs_do_parameter_req(pTServer, in_buff, \ close(conn->sock);
sizeof(in_buff)); return result;
if (result == 0) }
{
result = iniLoadFromBuffer(in_buff, iniContext);
close(pTServer->sock); logError("file: "__FILE__", line: %d, "
return result; "get parameters from tracker server %s:%d fail",
} __LINE__, conn->ip_addr, conn->port);
close(conn->sock);
sleep(1);
}
logError("file: "__FILE__", line: %d, " if (pServerStart != pTrackerGroup->servers)
"get parameters from tracker server %s:%d fail", {
__LINE__, pTServer->ip_addr, pTServer->port); pServerStart = pTrackerGroup->servers;
fdfs_quit(pTServer); }
close(pTServer->sock); } while (*continue_flag);
sleep(1);
}
if (pServerStart != pTrackerGroup->servers)
{
pServerStart = pTrackerGroup->servers;
}
} while (*continue_flag);
return EINTR; return EINTR;
} }
int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \ int fdfs_get_tracker_status(TrackerServerInfo *pTrackerServer,
TrackerRunningStatus *pStatus) TrackerRunningStatus *pStatus)
{ {
char in_buff[1 + 2 * FDFS_PROTO_PKG_LEN_SIZE]; char in_buff[1 + 2 * FDFS_PROTO_PKG_LEN_SIZE];
@ -605,7 +672,7 @@ int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \
int64_t in_bytes; int64_t in_bytes;
int result; int result;
pTrackerServer->sock = -1; fdfs_server_sock_reset(pTrackerServer);
if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL) if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL)
{ {
return result; return result;
@ -615,14 +682,13 @@ int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \
{ {
memset(&header, 0, sizeof(header)); memset(&header, 0, sizeof(header));
header.cmd = TRACKER_PROTO_CMD_TRACKER_GET_STATUS; header.cmd = TRACKER_PROTO_CMD_TRACKER_GET_STATUS;
if ((result=tcpsenddata_nb(conn->sock, &header, \ if ((result=tcpsenddata_nb(conn->sock, &header,
sizeof(header), g_fdfs_network_timeout)) != 0) sizeof(header), g_fdfs_network_timeout)) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"send data to tracker server %s:%d fail, " \ "send data to tracker server %s:%d fail, "
"errno: %d, error info: %s", __LINE__, \ "errno: %d, error info: %s", __LINE__,
pTrackerServer->ip_addr, \ conn->ip_addr, conn->port,
pTrackerServer->port, \
result, STRERROR(result)); result, STRERROR(result));
result = (result == ENOENT ? EACCES : result); result = (result == ENOENT ? EACCES : result);
@ -630,7 +696,7 @@ int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \
} }
pInBuff = in_buff; pInBuff = in_buff;
result = fdfs_recv_response(conn, &pInBuff, \ result = fdfs_recv_response(conn, &pInBuff,
sizeof(in_buff), &in_bytes); sizeof(in_buff), &in_bytes);
if (result != 0) if (result != 0)
{ {
@ -639,11 +705,11 @@ int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \
if (in_bytes != sizeof(in_buff)) if (in_bytes != sizeof(in_buff))
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d response data " \ "tracker server %s:%d response data "
"length: %"PRId64" is invalid, " \ "length: %"PRId64" is invalid, "
"expect length: %d.", __LINE__, \ "expect length: %d.", __LINE__,
pTrackerServer->ip_addr, pTrackerServer->port, \ conn->ip_addr, conn->port,
in_bytes, (int)sizeof(in_buff)); in_bytes, (int)sizeof(in_buff));
result = EINVAL; result = EINVAL;
break; break;
@ -660,4 +726,3 @@ int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \
return result; return result;
} }

View File

@ -208,8 +208,11 @@ typedef struct
extern "C" { extern "C" {
#endif #endif
#define tracker_connect_server(pTrackerServer, err_no) \ #define tracker_connect_server(pServerInfo, err_no) \
tracker_connect_server_ex(pTrackerServer, g_fdfs_connect_timeout, err_no) tracker_connect_server_ex(pServerInfo, g_fdfs_connect_timeout, err_no)
#define tracker_make_connection(conn, err_no) \
tracker_make_connection_ex(conn, g_fdfs_connect_timeout, err_no)
/** /**
* connect to the tracker server * connect to the tracker server
@ -219,17 +222,31 @@ extern "C" {
* err_no: return the error no * err_no: return the error no
* return: ConnectionInfo pointer for success, NULL for fail * return: ConnectionInfo pointer for success, NULL for fail
**/ **/
ConnectionInfo *tracker_connect_server_ex(ConnectionInfo *pTrackerServer, \ ConnectionInfo *tracker_connect_server_ex(TrackerServerInfo *pServerInfo,
const int connect_timeout, int *err_no); const int connect_timeout, int *err_no);
/**
* connect to the tracker server directly without connection pool
* params:
* pTrackerServer: tracker server
* return: ConnectionInfo pointer for success, NULL for fail
**/
ConnectionInfo *tracker_connect_server_no_pool_ex(TrackerServerInfo *pServerInfo,
const char *bind_addr, int *err_no);
/** /**
* connect to the tracker server directly without connection pool * connect to the tracker server directly without connection pool
* params: * params:
* pTrackerServer: tracker server * pTrackerServer: tracker server
* return: 0 for success, none zero for fail * return: 0 for success, none zero for fail
**/ **/
int tracker_connect_server_no_pool(ConnectionInfo *pTrackerServer); static inline ConnectionInfo *tracker_connect_server_no_pool(
TrackerServerInfo *pServerInfo, int *err_no)
{
const char *bind_addr = NULL;
return tracker_connect_server_no_pool_ex(pServerInfo, bind_addr, err_no);
}
#define tracker_disconnect_server(pTrackerServer) \ #define tracker_disconnect_server(pTrackerServer) \
tracker_disconnect_server_ex(pTrackerServer, false) tracker_disconnect_server_ex(pTrackerServer, false)
@ -241,9 +258,13 @@ int tracker_connect_server_no_pool(ConnectionInfo *pTrackerServer);
* bForceClose: if force close the connection when use connection pool * bForceClose: if force close the connection when use connection pool
* return: * return:
**/ **/
void tracker_disconnect_server_ex(ConnectionInfo *pTrackerServer, \ void tracker_disconnect_server_ex(ConnectionInfo *conn, \
const bool bForceClose); const bool bForceClose);
ConnectionInfo *tracker_make_connection_ex(ConnectionInfo *conn,
const int connect_timeout, int *err_no);
int fdfs_validate_group_name(const char *group_name); int fdfs_validate_group_name(const char *group_name);
int fdfs_validate_filename(const char *filename); int fdfs_validate_filename(const char *filename);
int metadata_cmp_by_name(const void *p1, const void *p2); int metadata_cmp_by_name(const void *p1, const void *p2);
@ -278,7 +299,7 @@ int fdfs_get_ini_context_from_tracker(TrackerServerGroup *pTrackerGroup, \
IniContext *iniContext, bool * volatile continue_flag, \ IniContext *iniContext, bool * volatile continue_flag, \
const bool client_bind_addr, const char *bind_addr); const bool client_bind_addr, const char *bind_addr);
int fdfs_get_tracker_status(ConnectionInfo *pTrackerServer, \ int fdfs_get_tracker_status(TrackerServerInfo *pTrackerServer,
TrackerRunningStatus *pStatus); TrackerRunningStatus *pStatus);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -21,9 +21,10 @@
#include "fdfs_define.h" #include "fdfs_define.h"
#include "fastcommon/logger.h" #include "fastcommon/logger.h"
#include "fastcommon/sockopt.h" #include "fastcommon/sockopt.h"
#include "fdfs_global.h"
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h" #include "fastcommon/pthread_func.h"
#include "fdfs_global.h"
#include "fdfs_shared_func.h"
#include "tracker_global.h" #include "tracker_global.h"
#include "tracker_proto.h" #include "tracker_proto.h"
#include "tracker_mem.h" #include "tracker_mem.h"
@ -141,8 +142,8 @@ static int relationship_cmp_tracker_status(const void *p1, const void *p2)
{ {
TrackerRunningStatus *pStatus1; TrackerRunningStatus *pStatus1;
TrackerRunningStatus *pStatus2; TrackerRunningStatus *pStatus2;
ConnectionInfo *pTrackerServer1; ConnectionInfo *conn1;
ConnectionInfo *pTrackerServer2; ConnectionInfo *conn2;
int sub; int sub;
pStatus1 = (TrackerRunningStatus *)p1; pStatus1 = (TrackerRunningStatus *)p1;
@ -165,21 +166,21 @@ static int relationship_cmp_tracker_status(const void *p1, const void *p2)
return sub; return sub;
} }
pTrackerServer1 = pStatus1->pTrackerServer; conn1 = pStatus1->pTrackerServer->connections;
pTrackerServer2 = pStatus2->pTrackerServer; conn2 = pStatus2->pTrackerServer->connections;
sub = strcmp(pTrackerServer1->ip_addr, pTrackerServer2->ip_addr); sub = strcmp(conn1->ip_addr, conn2->ip_addr);
if (sub != 0) if (sub != 0)
{ {
return sub; return sub;
} }
return pTrackerServer1->port - pTrackerServer2->port; return conn1->port - conn2->port;
} }
static int relationship_get_tracker_leader(TrackerRunningStatus *pTrackerStatus) static int relationship_get_tracker_leader(TrackerRunningStatus *pTrackerStatus)
{ {
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
ConnectionInfo *pTrackerEnd; TrackerServerInfo *pTrackerEnd;
TrackerRunningStatus *pStatus; TrackerRunningStatus *pStatus;
TrackerRunningStatus trackerStatus[FDFS_MAX_TRACKERS]; TrackerRunningStatus trackerStatus[FDFS_MAX_TRACKERS];
int count; int count;
@ -224,8 +225,8 @@ static int relationship_get_tracker_leader(TrackerRunningStatus *pTrackerStatus)
logDebug("file: "__FILE__", line: %d, " \ logDebug("file: "__FILE__", line: %d, " \
"%s:%d if_leader: %d, running time: %d, " \ "%s:%d if_leader: %d, running time: %d, " \
"restart interval: %d", __LINE__, \ "restart interval: %d", __LINE__, \
trackerStatus[i].pTrackerServer->ip_addr, \ trackerStatus[i].pTrackerServer->connections->ip_addr, \
trackerStatus[i].pTrackerServer->port, \ trackerStatus[i].pTrackerServer->connections->port, \
trackerStatus[i].if_leader, \ trackerStatus[i].if_leader, \
trackerStatus[i].running_time, \ trackerStatus[i].running_time, \
trackerStatus[i].restart_interval); trackerStatus[i].restart_interval);
@ -244,7 +245,7 @@ static int relationship_get_tracker_leader(TrackerRunningStatus *pTrackerStatus)
do_notify_leader_changed(pTrackerServer, pLeader, \ do_notify_leader_changed(pTrackerServer, pLeader, \
TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER, bConnectFail) TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER, bConnectFail)
static int do_notify_leader_changed(ConnectionInfo *pTrackerServer, \ static int do_notify_leader_changed(TrackerServerInfo *pTrackerServer, \
ConnectionInfo *pLeader, const char cmd, bool *bConnectFail) ConnectionInfo *pLeader, const char cmd, bool *bConnectFail)
{ {
char out_buff[sizeof(TrackerHeader) + FDFS_PROTO_IP_PORT_SIZE]; char out_buff[sizeof(TrackerHeader) + FDFS_PROTO_IP_PORT_SIZE];
@ -255,7 +256,7 @@ static int do_notify_leader_changed(ConnectionInfo *pTrackerServer, \
int64_t in_bytes; int64_t in_bytes;
int result; int result;
pTrackerServer->sock = -1; fdfs_server_sock_reset(pTrackerServer);
if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL) if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL)
{ {
*bConnectFail = true; *bConnectFail = true;
@ -274,12 +275,10 @@ static int do_notify_leader_changed(ConnectionInfo *pTrackerServer, \
if ((result=tcpsenddata_nb(conn->sock, out_buff, \ if ((result=tcpsenddata_nb(conn->sock, out_buff, \
sizeof(out_buff), g_fdfs_network_timeout)) != 0) sizeof(out_buff), g_fdfs_network_timeout)) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"send data to tracker server %s:%d fail, " \ "send data to tracker server %s:%d fail, "
"errno: %d, error info: %s", __LINE__, \ "errno: %d, error info: %s", __LINE__,
pTrackerServer->ip_addr, \ conn->ip_addr, conn->port, result, STRERROR(result));
pTrackerServer->port, \
result, STRERROR(result));
result = (result == ENOENT ? EACCES : result); result = (result == ENOENT ? EACCES : result);
break; break;
@ -292,26 +291,24 @@ static int do_notify_leader_changed(ConnectionInfo *pTrackerServer, \
{ {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"fdfs_recv_response from tracker server %s:%d fail, " "fdfs_recv_response from tracker server %s:%d fail, "
"result: %d", __LINE__, pTrackerServer->ip_addr, "result: %d", __LINE__, conn->ip_addr, conn->port, result);
pTrackerServer->port, result);
break; break;
} }
if (in_bytes != 0) if (in_bytes != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d response data " \ "tracker server %s:%d response data "
"length: %"PRId64" is invalid, " \ "length: %"PRId64" is invalid, "
"expect length: %d.", __LINE__, \ "expect length: %d.", __LINE__,
pTrackerServer->ip_addr, pTrackerServer->port, \ conn->ip_addr, conn->port, in_bytes, 0);
in_bytes, 0);
result = EINVAL; result = EINVAL;
break; break;
} }
} while (0); } while (0);
if (pTrackerServer->port == g_server_port && \ if (conn->port == g_server_port &&
is_local_host_ip(pTrackerServer->ip_addr)) is_local_host_ip(conn->ip_addr))
{ {
tracker_disconnect_server_ex(conn, true); tracker_disconnect_server_ex(conn, true);
} }
@ -325,8 +322,8 @@ static int do_notify_leader_changed(ConnectionInfo *pTrackerServer, \
static int relationship_notify_leader_changed(ConnectionInfo *pLeader) static int relationship_notify_leader_changed(ConnectionInfo *pLeader)
{ {
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
ConnectionInfo *pTrackerEnd; TrackerServerInfo *pTrackerEnd;
int result; int result;
bool bConnectFail; bool bConnectFail;
int success_count; int success_count;
@ -386,6 +383,7 @@ static int relationship_select_leader()
{ {
int result; int result;
TrackerRunningStatus trackerStatus; TrackerRunningStatus trackerStatus;
ConnectionInfo *conn;
if (g_tracker_servers.server_count <= 0) if (g_tracker_servers.server_count <= 0)
{ {
@ -400,19 +398,17 @@ static int relationship_select_leader()
return result; return result;
} }
if (trackerStatus.pTrackerServer->port == g_server_port && \ conn = trackerStatus.pTrackerServer->connections;
is_local_host_ip(trackerStatus.pTrackerServer->ip_addr)) if (conn->port == g_server_port && is_local_host_ip(conn->ip_addr))
{ {
if ((result=relationship_notify_leader_changed( \ if ((result=relationship_notify_leader_changed(conn)) != 0)
trackerStatus.pTrackerServer)) != 0)
{ {
return result; return result;
} }
logInfo("file: "__FILE__", line: %d, " \ logInfo("file: "__FILE__", line: %d, "
"I am the new tracker leader %s:%d", \ "I am the new tracker leader %s:%d",
__LINE__, trackerStatus.pTrackerServer->ip_addr, \ __LINE__, conn->ip_addr, conn->port);
trackerStatus.pTrackerServer->port);
tracker_mem_find_trunk_servers(); tracker_mem_find_trunk_servers();
} }
@ -434,14 +430,13 @@ static int relationship_select_leader()
return EINVAL; return EINVAL;
} }
logInfo("file: "__FILE__", line: %d, " \ logInfo("file: "__FILE__", line: %d, "
"the tracker leader %s:%d", __LINE__, \ "the tracker leader %s:%d", __LINE__,
trackerStatus.pTrackerServer->ip_addr, \ conn->ip_addr, conn->port);
trackerStatus.pTrackerServer->port);
} }
else else
{ {
logInfo("file: "__FILE__", line: %d, " \ logInfo("file: "__FILE__", line: %d, "
"waiting for tracker leader notify", __LINE__); "waiting for tracker leader notify", __LINE__);
return ENOENT; return ENOENT;
} }
@ -454,7 +449,8 @@ static int relationship_ping_leader()
{ {
int result; int result;
int leader_index; int leader_index;
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
ConnectionInfo *conn;
if (g_if_leader_self) if (g_if_leader_self)
{ {
@ -468,21 +464,13 @@ static int relationship_ping_leader()
} }
pTrackerServer = g_tracker_servers.servers + leader_index; pTrackerServer = g_tracker_servers.servers + leader_index;
if (pTrackerServer->sock < 0) if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL)
{ {
if ((result=conn_pool_connect_server(pTrackerServer, \ return result;
g_fdfs_connect_timeout)) != 0)
{
return result;
}
}
if ((result=fdfs_ping_leader(pTrackerServer)) != 0)
{
close(pTrackerServer->sock);
pTrackerServer->sock = -1;
} }
result = fdfs_ping_leader(conn);
tracker_disconnect_server_ex(conn, result != 0);
return result; return result;
} }

View File

@ -382,13 +382,15 @@ static int tracker_check_and_sync(struct fast_task_info *pTask, \
leader_index = g_tracker_servers.leader_index; leader_index = g_tracker_servers.leader_index;
if (leader_index >= 0) if (leader_index >= 0)
{ {
ConnectionInfo *pTServer; TrackerServerInfo *pTServer;
ConnectionInfo *conn;
pTServer = g_tracker_servers.servers + leader_index; pTServer = g_tracker_servers.servers + leader_index;
snprintf(pDestServer->id, FDFS_STORAGE_ID_MAX_SIZE, \ conn = pTServer->connections;
"%s", pTServer->ip_addr); snprintf(pDestServer->id, FDFS_STORAGE_ID_MAX_SIZE,
memcpy(pDestServer->ip_addr, pTServer->ip_addr, \ "%s", conn->ip_addr);
memcpy(pDestServer->ip_addr, conn->ip_addr,
IP_ADDRESS_SIZE); IP_ADDRESS_SIZE);
int2buff(pTServer->port, pDestServer->port); int2buff(conn->port, pDestServer->port);
} }
pDestServer++; pDestServer++;
@ -1321,10 +1323,9 @@ static int tracker_deal_storage_join(struct fast_task_info *pTask)
{ {
TrackerStorageJoinBodyResp *pJoinBodyResp; TrackerStorageJoinBodyResp *pJoinBodyResp;
TrackerStorageJoinBody *pBody; TrackerStorageJoinBody *pBody;
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
ConnectionInfo *pTrackerEnd; TrackerServerInfo *pTrackerEnd;
char *p; char *p;
char *pSeperator;
FDFSStorageJoinBody joinBody; FDFSStorageJoinBody joinBody;
int result; int result;
TrackerClientInfo *pClientInfo; TrackerClientInfo *pClientInfo;
@ -1438,26 +1439,18 @@ static int tracker_deal_storage_join(struct fast_task_info *pTask)
} }
p = pTask->data+sizeof(TrackerHeader)+sizeof(TrackerStorageJoinBody); p = pTask->data+sizeof(TrackerHeader)+sizeof(TrackerStorageJoinBody);
pTrackerEnd = joinBody.tracker_servers + \ pTrackerEnd = joinBody.tracker_servers + joinBody.tracker_count;
joinBody.tracker_count; for (pTrackerServer=joinBody.tracker_servers;
for (pTrackerServer=joinBody.tracker_servers; \
pTrackerServer<pTrackerEnd; pTrackerServer++) pTrackerServer<pTrackerEnd; pTrackerServer++)
{ {
* (p + FDFS_PROTO_IP_PORT_SIZE - 1) = '\0'; * (p + FDFS_PROTO_IP_PORT_SIZE - 1) = '\0';
if ((pSeperator=strchr(p, ':')) == NULL)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, invalid tracker server ip " \
"and port: %s", __LINE__, pTask->client_ip, p);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
*pSeperator = '\0'; if ((result=fdfs_parse_server_info(p, FDFS_TRACKER_SERVER_DEF_PORT,
snprintf(pTrackerServer->ip_addr, \ pTrackerServer)) != 0)
sizeof(pTrackerServer->ip_addr), "%s", p); {
pTrackerServer->port = atoi(pSeperator + 1); pTask->length = sizeof(TrackerHeader);
pTrackerServer->sock = -1; return result;
}
p += FDFS_PROTO_IP_PORT_SIZE; p += FDFS_PROTO_IP_PORT_SIZE;
} }

View File

@ -122,6 +122,10 @@
#define TRACKER_STORAGE_RESERVED_SPACE_FLAG_MB 0 #define TRACKER_STORAGE_RESERVED_SPACE_FLAG_MB 0
#define TRACKER_STORAGE_RESERVED_SPACE_FLAG_RATIO 1 #define TRACKER_STORAGE_RESERVED_SPACE_FLAG_RATIO 1
#define FDFS_MULTI_IP_INDEX_INNER 0 //inner ip index
#define FDFS_MULTI_IP_INDEX_OUTER 1 //outer ip index
#define FDFS_MULTI_IP_MAX_COUNT 2
typedef struct typedef struct
{ {
char status; char status;
@ -386,20 +390,27 @@ typedef struct
typedef struct typedef struct
{ {
int storage_port; int count;
int storage_http_port; int index; //current index for fast connect
int store_path_count; ConnectionInfo connections[FDFS_MULTI_IP_MAX_COUNT];
int subdir_count_per_path; } TrackerServerInfo;
int upload_priority;
int join_time; //storage join timestamp (create timestamp) typedef struct
int up_time; //storage service started timestamp {
char version[FDFS_VERSION_SIZE]; //storage version int storage_port;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; int storage_http_port;
char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE]; int store_path_count;
char init_flag; int subdir_count_per_path;
signed char status; int upload_priority;
int tracker_count; int join_time; //storage join timestamp (create timestamp)
ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS]; int up_time; //storage service started timestamp
char version[FDFS_VERSION_SIZE]; //storage version
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
char init_flag;
signed char status;
int tracker_count;
TrackerServerInfo tracker_servers[FDFS_MAX_TRACKERS];
} FDFSStorageJoinBody; } FDFSStorageJoinBody;
typedef struct typedef struct
@ -407,7 +418,7 @@ typedef struct
int server_count; int server_count;
int server_index; //server index for roundrobin int server_index; //server index for roundrobin
int leader_index; //leader server index int leader_index; //leader server index
ConnectionInfo *servers; TrackerServerInfo *servers;
} TrackerServerGroup; } TrackerServerGroup;
typedef struct typedef struct
@ -447,7 +458,7 @@ typedef struct {
} FDFSStorePaths; } FDFSStorePaths;
typedef struct { typedef struct {
ConnectionInfo *pTrackerServer; TrackerServerInfo *pTrackerServer;
int running_time; //running seconds, more means higher weight int running_time; //running seconds, more means higher weight
int restart_interval; //restart interval, less mean higher weight int restart_interval; //restart interval, less mean higher weight
bool if_leader; //if leader bool if_leader; //if leader