storage server request tracker server to change it's status

v6.03_dev
YuQing 2019-11-15 13:19:26 +08:00
parent 6ea2f5e1ca
commit 22865e0542
10 changed files with 275 additions and 41 deletions

View File

@ -1,6 +1,9 @@
Version 6.03 2019-11-14
Version 6.03 2019-11-15
* dual IPs support two different types of inner (intranet) IPs
* storage server request tracker server to change it's status
to that of tracker leader when the storage server found
it's status inconsistence
Version 6.02 2019-11-12
* get_file_info calculate CRC32 for appender file type

View File

@ -1429,8 +1429,8 @@ int tracker_set_trunk_server(TrackerServerGroup *pTrackerGroup, \
return result;
}
int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \
const char *group_name, const char *ip_addr, \
int tracker_get_storage_status(ConnectionInfo *pTrackerServer,
const char *group_name, const char *ip_addr,
FDFSStorageBrief *pDestBuff)
{
TrackerHeader *pHeader;

View File

@ -1121,6 +1121,7 @@ static int init_my_result_per_tracker()
for (pTrackerServer=g_tracker_group.servers; pTrackerServer<pServerEnd;
pTrackerServer++)
{
pReportStatus->my_status = -1;
pReportStatus->my_result = -1;
pReportStatus->src_storage_result = -1;
pReportStatus++;

View File

@ -59,9 +59,11 @@ typedef struct
typedef struct
{
signed char my_result;
signed char src_storage_result;
signed char my_status; //my status from tracker server
signed char my_result; //my report result
signed char src_storage_result; //src storage report result
bool get_my_ip_done;
bool report_my_status;
} StorageStatusPerTracker;
extern volatile bool g_continue_flag;

View File

@ -45,12 +45,16 @@ static pthread_mutex_t reporter_thread_lock;
static pthread_t *report_tids = NULL;
static bool need_rejoin_tracker = false;
static int tracker_heart_beat(ConnectionInfo *pTrackerServer, \
int *pstat_chg_sync_count, bool *bServerPortChanged);
static int tracker_report_df_stat(ConnectionInfo *pTrackerServer, \
bool *bServerPortChanged);
static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer, \
bool *bServerPortChanged);
static int tracker_heart_beat(ConnectionInfo *pTrackerServer,
const int tracker_index, int *pstat_chg_sync_count,
bool *bServerPortChanged);
static int tracker_report_df_stat(ConnectionInfo *pTrackerServer,
const int tracker_index, bool *bServerPortChanged);
static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer,
const int tracker_index, bool *bServerPortChanged);
static int tracker_storage_change_status(ConnectionInfo *pTrackerServer,
const int tracker_index);
static int tracker_sync_dest_req(ConnectionInfo *pTrackerServer);
static int tracker_sync_dest_query(ConnectionInfo *pTrackerServer);
@ -440,11 +444,12 @@ static void *tracker_report_thread_entrance(void *arg)
while (g_continue_flag)
{
current_time = g_current_time;
if (current_time - last_beat_time >= \
if (current_time - last_beat_time >=
g_heart_beat_interval)
{
if (tracker_heart_beat(conn, &stat_chg_sync_count,
&bServerPortChanged) != 0)
if (tracker_heart_beat(conn, tracker_index,
&stat_chg_sync_count,
&bServerPortChanged) != 0)
{
break;
}
@ -462,8 +467,9 @@ static void *tracker_report_thread_entrance(void *arg)
current_time - last_sync_report_time >=
g_heart_beat_interval)
{
if (tracker_report_sync_timestamp(
conn, &bServerPortChanged)!=0)
if (tracker_report_sync_timestamp(conn,
tracker_index,
&bServerPortChanged) != 0)
{
break;
}
@ -476,7 +482,8 @@ static void *tracker_report_thread_entrance(void *arg)
g_stat_report_interval)
{
if (tracker_report_df_stat(conn,
&bServerPortChanged) != 0)
tracker_index,
&bServerPortChanged) != 0)
{
break;
}
@ -484,6 +491,16 @@ static void *tracker_report_thread_entrance(void *arg)
last_df_report_time = current_time;
}
if (g_my_report_status[tracker_index].report_my_status)
{
if (tracker_storage_change_status(conn, tracker_index) == 0)
{
g_my_report_status[tracker_index].report_my_status = false;
}
break;
}
if (g_if_trunker_self)
{
if (last_trunk_file_id < g_current_trunk_file_id)
@ -715,8 +732,48 @@ static int tracker_start_sync_threads(const FDFSStorageBrief *pStorage)
return result;
}
static void tracker_check_my_status(const int tracker_index)
{
int my_status;
int leader_index;
int leader_status;
leader_index = g_tracker_group.leader_index;
if ((leader_index < 0) || (tracker_index == leader_index))
{
return;
}
my_status = g_my_report_status[tracker_index].my_status;
leader_status = g_my_report_status[leader_index].my_status;
if (my_status < 0 || leader_status < 0) //NOT inited
{
return;
}
if (my_status == leader_status)
{
return;
}
if (FDFS_IS_AVAILABLE_STATUS(my_status) &&
FDFS_IS_AVAILABLE_STATUS(leader_status))
{
return;
}
g_my_report_status[tracker_index].report_my_status = true;
logInfo("file: "__FILE__", line: %d, "
"my status: %d (%s) from tracker #%d != my status: %d (%s)"
"from leader tracker #%d, set report_my_status to true",
__LINE__, my_status, get_storage_status_caption(
my_status), tracker_index, leader_status,
get_storage_status_caption(leader_status), leader_index);
}
static int tracker_merge_servers(ConnectionInfo *pTrackerServer,
FDFSStorageBrief *briefServers, const int server_count)
const int tracker_index, FDFSStorageBrief *briefServers,
const int server_count)
{
FDFSStorageBrief *pServer;
FDFSStorageBrief *pEnd;
@ -742,9 +799,10 @@ static int tracker_merge_servers(ConnectionInfo *pTrackerServer,
{
memcpy(&(targetServer.server),pServer,sizeof(FDFSStorageBrief));
if (is_local_host_ip(pServer->ip_addr))
if (strcmp(pServer->id, g_my_server_id_str) == 0)
{
g_my_report_status[tracker_index].my_status = pServer->status;
tracker_check_my_status(tracker_index);
}
ppFound = (FDFSStorageServer **)bsearch(&pTargetServer,
@ -1034,6 +1092,21 @@ static int notify_reselect_tracker_leader(TrackerServerInfo *pTrackerServer)
return result;
}
static void check_my_status_for_all_trackers()
{
int tracker_index;
if (g_tracker_group.leader_index < 0)
{
return;
}
for (tracker_index=0; tracker_index<g_tracker_group.server_count;
tracker_index++)
{
tracker_check_my_status(tracker_index);
}
}
static void set_tracker_leader(const int leader_index)
{
int old_index;
@ -1067,7 +1140,12 @@ static void set_tracker_leader(const int leader_index)
}
}
}
g_tracker_group.leader_index = leader_index;
if (g_tracker_group.leader_index != leader_index)
{
g_tracker_group.leader_index = leader_index;
check_my_status_for_all_trackers();
}
}
static void get_tracker_leader()
@ -1085,6 +1163,8 @@ static void get_tracker_leader()
if (tracker_status.if_leader)
{
g_tracker_group.leader_index = i;
check_my_status_for_all_trackers();
logInfo("file: "__FILE__", line: %d, "
"the tracker server leader is #%d. %s:%d",
__LINE__, i, tracker_server.connections[0].ip_addr,
@ -1124,8 +1204,8 @@ static void set_trunk_server(const char *ip_addr, const int port)
}
}
static int tracker_check_response(ConnectionInfo *pTrackerServer, \
bool *bServerPortChanged)
static int tracker_check_response(ConnectionInfo *pTrackerServer,
const int tracker_index, bool *bServerPortChanged)
{
int64_t nInPackLen;
TrackerHeader resp;
@ -1458,8 +1538,8 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer, \
}
}
return tracker_merge_servers(pTrackerServer, \
pBriefServers, server_count);
return tracker_merge_servers(pTrackerServer, tracker_index,
pBriefServers, server_count);
}
int tracker_sync_src_req(ConnectionInfo *pTrackerServer, \
@ -2016,6 +2096,9 @@ int tracker_report_join(ConnectionInfo *pTrackerServer, \
return EINVAL;
}
g_my_report_status[tracker_index].my_status = respBody.my_status;
tracker_check_my_status(tracker_index);
if (*(respBody.src_id) == '\0' && *g_sync_src_id != '\0')
{
return tracker_sync_notify(pTrackerServer, tracker_index);
@ -2026,8 +2109,8 @@ int tracker_report_join(ConnectionInfo *pTrackerServer, \
}
}
static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer, \
bool *bServerPortChanged)
static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer,
const int tracker_index, bool *bServerPortChanged)
{
char out_buff[sizeof(TrackerHeader) + (FDFS_STORAGE_ID_MAX_SIZE + 4) * \
FDFS_MAX_SERVERS_EACH_GROUP];
@ -2072,11 +2155,12 @@ static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer, \
return result;
}
return tracker_check_response(pTrackerServer, bServerPortChanged);
return tracker_check_response(pTrackerServer, tracker_index,
bServerPortChanged);
}
static int tracker_report_df_stat(ConnectionInfo *pTrackerServer, \
bool *bServerPortChanged)
static int tracker_report_df_stat(ConnectionInfo *pTrackerServer,
const int tracker_index, bool *bServerPortChanged)
{
char out_buff[sizeof(TrackerHeader) + \
sizeof(TrackerStatReportReqBody) * 16];
@ -2182,11 +2266,13 @@ static int tracker_report_df_stat(ConnectionInfo *pTrackerServer, \
return result;
}
return tracker_check_response(pTrackerServer, bServerPortChanged);
return tracker_check_response(pTrackerServer, tracker_index,
bServerPortChanged);
}
static int tracker_heart_beat(ConnectionInfo *pTrackerServer, \
int *pstat_chg_sync_count, bool *bServerPortChanged)
static int tracker_heart_beat(ConnectionInfo *pTrackerServer,
const int tracker_index, int *pstat_chg_sync_count,
bool *bServerPortChanged)
{
char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];
TrackerHeader *pHeader;
@ -2312,7 +2398,84 @@ static int tracker_heart_beat(ConnectionInfo *pTrackerServer, \
return result;
}
return tracker_check_response(pTrackerServer, bServerPortChanged);
return tracker_check_response(pTrackerServer, tracker_index,
bServerPortChanged);
}
static int tracker_storage_change_status(ConnectionInfo *pTrackerServer,
const int tracker_index)
{
char out_buff[sizeof(TrackerHeader) + 8];
char in_buff[8];
TrackerHeader *pHeader;
char *pInBuff;
int result;
int leader_index;
int old_status;
int new_status;
int body_len;
int64_t nInPackLen;
leader_index = g_tracker_group.leader_index;
if (leader_index < 0 || tracker_index == leader_index)
{
return 0;
}
old_status = g_my_report_status[tracker_index].my_status;
new_status = g_my_report_status[leader_index].my_status;
if (new_status < 0 || new_status == old_status)
{
return 0;
}
logInfo("file: "__FILE__", line: %d, "
"tracker server: %s:%d, try to set storage "
"status from %d (%s) to %d (%s)", __LINE__,
pTrackerServer->ip_addr, pTrackerServer->port,
old_status, get_storage_status_caption(old_status),
new_status, get_storage_status_caption(new_status));
body_len = 1;
memset(out_buff, 0, sizeof(out_buff));
pHeader = (TrackerHeader *)out_buff;
long2buff(body_len, pHeader->pkg_len);
pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS;
*(out_buff + sizeof(TrackerHeader)) = new_status;
if((result=tcpsenddata_nb(pTrackerServer->sock, out_buff,
sizeof(TrackerHeader) + body_len, g_fdfs_network_timeout)) != 0)
{
logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, send data fail, "
"errno: %d, error info: %s.",
__LINE__, pTrackerServer->ip_addr,
pTrackerServer->port,
result, STRERROR(result));
return result;
}
pInBuff = in_buff;
result = fdfs_recv_response(pTrackerServer,
&pInBuff, sizeof(in_buff), &nInPackLen);
if (result != 0)
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
return result;
}
if (nInPackLen != 0)
{
logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, response body length: %d != 0",
__LINE__, pTrackerServer->ip_addr, pTrackerServer->port,
(int)nInPackLen);
return EINVAL;
}
return 0;
}
static int tracker_storage_changelog_req(ConnectionInfo *pTrackerServer)

View File

@ -743,3 +743,4 @@ void fdfs_set_server_info_ex(TrackerServerInfo *pServer,
ip_addrs->ips[i].address, port);
}
}

View File

@ -23,6 +23,11 @@
#define FDFS_IP_TYPE_PRIVATE_192 3
#define FDFS_IP_TYPE_OUTER 4
#define FDFS_IS_AVAILABLE_STATUS(status) \
(status == FDFS_STORAGE_STATUS_OFFLINE || \
status == FDFS_STORAGE_STATUS_ONLINE || \
status == FDFS_STORAGE_STATUS_ACTIVE)
#ifdef __cplusplus
extern "C" {
#endif

View File

@ -4883,14 +4883,14 @@ int tracker_mem_sync_storages(FDFSGroupInfo *pGroup, \
continue;
}
memcpy(target_storage.id, pServer->id, \
memcpy(target_storage.id, pServer->id,
FDFS_STORAGE_ID_MAX_SIZE);
pTargetStorage = &target_storage;
if ((ppFound=(FDFSStorageDetail **)bsearch( \
&pTargetStorage, \
pGroup->sorted_servers, \
pGroup->count, \
sizeof(FDFSStorageDetail *), \
if ((ppFound=(FDFSStorageDetail **)bsearch(
&pTargetStorage,
pGroup->sorted_servers,
pGroup->count,
sizeof(FDFSStorageDetail *),
tracker_mem_cmp_by_storage_id)) != NULL)
{
if ((*ppFound)->status == pServer->status \
@ -4948,7 +4948,7 @@ int tracker_mem_sync_storages(FDFSGroupInfo *pGroup, \
&pStorageServer, pServer->id,
pServer->ip_addr, true, false,
&bInserted);
if (result != 0)
if (result == 0 && bInserted)
{
pStorageServer->status = pServer->status;
}

View File

@ -35,6 +35,7 @@
#define TRACKER_PROTO_CMD_STORAGE_GET_STATUS 71 //get storage status from tracker
#define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID 70 //get storage server id from tracker
#define TRACKER_PROTO_CMD_STORAGE_GET_MY_IP 60 //get storage server ip from tracker
#define TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS 59 //current storage can change it's status
#define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69 //get all storage ids from tracker
#define TRACKER_PROTO_CMD_STORAGE_GET_GROUP_NAME 109 //get storage group name from tracker
@ -146,6 +147,7 @@ typedef struct
typedef struct
{
unsigned char my_status; //storage server status
char src_id[FDFS_STORAGE_ID_MAX_SIZE]; //src storage id
} TrackerStorageJoinBodyResp;

View File

@ -1382,6 +1382,59 @@ static int tracker_deal_storage_report_status(struct fast_task_info *pTask)
return tracker_mem_sync_storages(pGroup, briefServers, 1);
}
static int tracker_deal_storage_change_status(struct fast_task_info *pTask)
{
TrackerClientInfo *pClientInfo;
int old_status;
int new_status;
if (pTask->length - sizeof(TrackerHeader) != 1)
{
logError("file: "__FILE__", line: %d, "
"cmd=%d, client ip addr: %s, "
"body size "PKG_LEN_PRINTF_FORMAT" "
"is not correct", __LINE__,
TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS,
pTask->client_ip, pTask->length -
(int)sizeof(TrackerHeader));
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
pClientInfo = (TrackerClientInfo *)pTask->arg;
old_status = pClientInfo->pStorage->status;
pTask->length = sizeof(TrackerHeader);
new_status = *(pTask->data + sizeof(TrackerHeader));
if ((old_status == new_status) ||
(FDFS_IS_AVAILABLE_STATUS(old_status) &&
FDFS_IS_AVAILABLE_STATUS(new_status)))
{
logInfo("file: "__FILE__", line: %d, "
"client ip: %s, do NOT change storage status, "
"old status: %d (%s), new status: %d (%s)",
__LINE__, pTask->client_ip,
old_status, get_storage_status_caption(old_status),
new_status, get_storage_status_caption(new_status));
return 0;
}
if (new_status == FDFS_STORAGE_STATUS_ONLINE ||
new_status == FDFS_STORAGE_STATUS_ACTIVE)
{
new_status = FDFS_STORAGE_STATUS_OFFLINE;
}
pClientInfo->pStorage->status = new_status;
tracker_save_storages();
logInfo("file: "__FILE__", line: %d, "
"client ip: %s, set storage status from %d (%s) "
"to %d (%s)", __LINE__, pTask->client_ip,
old_status, get_storage_status_caption(old_status),
new_status, get_storage_status_caption(new_status));
return 0;
}
static int tracker_deal_storage_join(struct fast_task_info *pTask)
{
TrackerStorageJoinBodyResp *pJoinBodyResp;
@ -3863,6 +3916,10 @@ int tracker_deal_task(struct fast_task_info *pTask)
case TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS:
result = tracker_deal_storage_report_status(pTask);
break;
case TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS:
TRACKER_CHECK_LOGINED(pTask)
result = tracker_deal_storage_change_status(pTask);
break;
case TRACKER_PROTO_CMD_STORAGE_GET_STATUS:
result = tracker_deal_server_get_storage_status(pTask);
break;