diff --git a/HISTORY b/HISTORY index 73e9214..c682110 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,9 @@ -Version 6.03 2019-11-14 +Version 6.03 2019-11-15 * dual IPs support two different types of inner (intranet) IPs + * storage server request tracker server to change it's status + to that of tracker leader when the storage server found + it's status inconsistence Version 6.02 2019-11-12 * get_file_info calculate CRC32 for appender file type diff --git a/client/tracker_client.c b/client/tracker_client.c index bbd7ceb..3062b05 100644 --- a/client/tracker_client.c +++ b/client/tracker_client.c @@ -1429,8 +1429,8 @@ int tracker_set_trunk_server(TrackerServerGroup *pTrackerGroup, \ return result; } -int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \ - const char *group_name, const char *ip_addr, \ +int tracker_get_storage_status(ConnectionInfo *pTrackerServer, + const char *group_name, const char *ip_addr, FDFSStorageBrief *pDestBuff) { TrackerHeader *pHeader; diff --git a/storage/storage_func.c b/storage/storage_func.c index 7d7cb12..722da52 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -1121,6 +1121,7 @@ static int init_my_result_per_tracker() for (pTrackerServer=g_tracker_group.servers; pTrackerServermy_status = -1; pReportStatus->my_result = -1; pReportStatus->src_storage_result = -1; pReportStatus++; diff --git a/storage/storage_global.h b/storage/storage_global.h index a2f24b0..3687700 100644 --- a/storage/storage_global.h +++ b/storage/storage_global.h @@ -59,9 +59,11 @@ typedef struct typedef struct { - signed char my_result; - signed char src_storage_result; + signed char my_status; //my status from tracker server + signed char my_result; //my report result + signed char src_storage_result; //src storage report result bool get_my_ip_done; + bool report_my_status; } StorageStatusPerTracker; extern volatile bool g_continue_flag; diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index f2a1e3b..8259830 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -45,12 +45,16 @@ static pthread_mutex_t reporter_thread_lock; static pthread_t *report_tids = NULL; static bool need_rejoin_tracker = false; -static int tracker_heart_beat(ConnectionInfo *pTrackerServer, \ - int *pstat_chg_sync_count, bool *bServerPortChanged); -static int tracker_report_df_stat(ConnectionInfo *pTrackerServer, \ - bool *bServerPortChanged); -static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer, \ - bool *bServerPortChanged); +static int tracker_heart_beat(ConnectionInfo *pTrackerServer, + const int tracker_index, int *pstat_chg_sync_count, + bool *bServerPortChanged); +static int tracker_report_df_stat(ConnectionInfo *pTrackerServer, + const int tracker_index, bool *bServerPortChanged); +static int tracker_report_sync_timestamp(ConnectionInfo *pTrackerServer, + const int tracker_index, bool *bServerPortChanged); + +static int tracker_storage_change_status(ConnectionInfo *pTrackerServer, + const int tracker_index); static int tracker_sync_dest_req(ConnectionInfo *pTrackerServer); static int tracker_sync_dest_query(ConnectionInfo *pTrackerServer); @@ -440,11 +444,12 @@ static void *tracker_report_thread_entrance(void *arg) while (g_continue_flag) { current_time = g_current_time; - if (current_time - last_beat_time >= \ + if (current_time - last_beat_time >= g_heart_beat_interval) { - if (tracker_heart_beat(conn, &stat_chg_sync_count, - &bServerPortChanged) != 0) + if (tracker_heart_beat(conn, tracker_index, + &stat_chg_sync_count, + &bServerPortChanged) != 0) { break; } @@ -462,8 +467,9 @@ static void *tracker_report_thread_entrance(void *arg) current_time - last_sync_report_time >= g_heart_beat_interval) { - if (tracker_report_sync_timestamp( - conn, &bServerPortChanged)!=0) + if (tracker_report_sync_timestamp(conn, + tracker_index, + &bServerPortChanged) != 0) { break; } @@ -476,7 +482,8 @@ static void *tracker_report_thread_entrance(void *arg) g_stat_report_interval) { if (tracker_report_df_stat(conn, - &bServerPortChanged) != 0) + tracker_index, + &bServerPortChanged) != 0) { break; } @@ -484,6 +491,16 @@ static void *tracker_report_thread_entrance(void *arg) last_df_report_time = current_time; } + if (g_my_report_status[tracker_index].report_my_status) + { + if (tracker_storage_change_status(conn, tracker_index) == 0) + { + g_my_report_status[tracker_index].report_my_status = false; + } + + break; + } + if (g_if_trunker_self) { if (last_trunk_file_id < g_current_trunk_file_id) @@ -715,8 +732,48 @@ static int tracker_start_sync_threads(const FDFSStorageBrief *pStorage) return result; } +static void tracker_check_my_status(const int tracker_index) +{ + int my_status; + int leader_index; + int leader_status; + + leader_index = g_tracker_group.leader_index; + if ((leader_index < 0) || (tracker_index == leader_index)) + { + return; + } + + my_status = g_my_report_status[tracker_index].my_status; + leader_status = g_my_report_status[leader_index].my_status; + if (my_status < 0 || leader_status < 0) //NOT inited + { + return; + } + if (my_status == leader_status) + { + return; + } + + if (FDFS_IS_AVAILABLE_STATUS(my_status) && + FDFS_IS_AVAILABLE_STATUS(leader_status)) + { + return; + } + + g_my_report_status[tracker_index].report_my_status = true; + + logInfo("file: "__FILE__", line: %d, " + "my status: %d (%s) from tracker #%d != my status: %d (%s)" + "from leader tracker #%d, set report_my_status to true", + __LINE__, my_status, get_storage_status_caption( + my_status), tracker_index, leader_status, + get_storage_status_caption(leader_status), leader_index); +} + static int tracker_merge_servers(ConnectionInfo *pTrackerServer, - FDFSStorageBrief *briefServers, const int server_count) + const int tracker_index, FDFSStorageBrief *briefServers, + const int server_count) { FDFSStorageBrief *pServer; FDFSStorageBrief *pEnd; @@ -742,9 +799,10 @@ static int tracker_merge_servers(ConnectionInfo *pTrackerServer, { memcpy(&(targetServer.server),pServer,sizeof(FDFSStorageBrief)); - - if (is_local_host_ip(pServer->ip_addr)) + if (strcmp(pServer->id, g_my_server_id_str) == 0) { + g_my_report_status[tracker_index].my_status = pServer->status; + tracker_check_my_status(tracker_index); } ppFound = (FDFSStorageServer **)bsearch(&pTargetServer, @@ -1034,6 +1092,21 @@ static int notify_reselect_tracker_leader(TrackerServerInfo *pTrackerServer) return result; } +static void check_my_status_for_all_trackers() +{ + int tracker_index; + + if (g_tracker_group.leader_index < 0) + { + return; + } + for (tracker_index=0; tracker_indexip_addr, pTrackerServer->port, + old_status, get_storage_status_caption(old_status), + new_status, get_storage_status_caption(new_status)); + + body_len = 1; + memset(out_buff, 0, sizeof(out_buff)); + pHeader = (TrackerHeader *)out_buff; + long2buff(body_len, pHeader->pkg_len); + pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS; + *(out_buff + sizeof(TrackerHeader)) = new_status; + + if((result=tcpsenddata_nb(pTrackerServer->sock, out_buff, + sizeof(TrackerHeader) + body_len, g_fdfs_network_timeout)) != 0) + { + logError("file: "__FILE__", line: %d, " + "tracker server %s:%d, send data fail, " + "errno: %d, error info: %s.", + __LINE__, pTrackerServer->ip_addr, + pTrackerServer->port, + result, STRERROR(result)); + return result; + } + + pInBuff = in_buff; + result = fdfs_recv_response(pTrackerServer, + &pInBuff, sizeof(in_buff), &nInPackLen); + if (result != 0) + { + logError("file: "__FILE__", line: %d, " + "fdfs_recv_response fail, result: %d", + __LINE__, result); + return result; + } + + if (nInPackLen != 0) + { + logError("file: "__FILE__", line: %d, " + "tracker server %s:%d, response body length: %d != 0", + __LINE__, pTrackerServer->ip_addr, pTrackerServer->port, + (int)nInPackLen); + return EINVAL; + } + + return 0; } static int tracker_storage_changelog_req(ConnectionInfo *pTrackerServer) diff --git a/tracker/fdfs_shared_func.c b/tracker/fdfs_shared_func.c index f283c7c..299e00d 100644 --- a/tracker/fdfs_shared_func.c +++ b/tracker/fdfs_shared_func.c @@ -743,3 +743,4 @@ void fdfs_set_server_info_ex(TrackerServerInfo *pServer, ip_addrs->ips[i].address, port); } } + diff --git a/tracker/fdfs_shared_func.h b/tracker/fdfs_shared_func.h index dd6a026..a82080f 100644 --- a/tracker/fdfs_shared_func.h +++ b/tracker/fdfs_shared_func.h @@ -23,6 +23,11 @@ #define FDFS_IP_TYPE_PRIVATE_192 3 #define FDFS_IP_TYPE_OUTER 4 +#define FDFS_IS_AVAILABLE_STATUS(status) \ + (status == FDFS_STORAGE_STATUS_OFFLINE || \ + status == FDFS_STORAGE_STATUS_ONLINE || \ + status == FDFS_STORAGE_STATUS_ACTIVE) + #ifdef __cplusplus extern "C" { #endif diff --git a/tracker/tracker_mem.c b/tracker/tracker_mem.c index 827a8b8..c29b699 100644 --- a/tracker/tracker_mem.c +++ b/tracker/tracker_mem.c @@ -4883,14 +4883,14 @@ int tracker_mem_sync_storages(FDFSGroupInfo *pGroup, \ continue; } - memcpy(target_storage.id, pServer->id, \ + memcpy(target_storage.id, pServer->id, FDFS_STORAGE_ID_MAX_SIZE); pTargetStorage = &target_storage; - if ((ppFound=(FDFSStorageDetail **)bsearch( \ - &pTargetStorage, \ - pGroup->sorted_servers, \ - pGroup->count, \ - sizeof(FDFSStorageDetail *), \ + if ((ppFound=(FDFSStorageDetail **)bsearch( + &pTargetStorage, + pGroup->sorted_servers, + pGroup->count, + sizeof(FDFSStorageDetail *), tracker_mem_cmp_by_storage_id)) != NULL) { if ((*ppFound)->status == pServer->status \ @@ -4948,7 +4948,7 @@ int tracker_mem_sync_storages(FDFSGroupInfo *pGroup, \ &pStorageServer, pServer->id, pServer->ip_addr, true, false, &bInserted); - if (result != 0) + if (result == 0 && bInserted) { pStorageServer->status = pServer->status; } diff --git a/tracker/tracker_proto.h b/tracker/tracker_proto.h index 2d754dc..c47ba85 100644 --- a/tracker/tracker_proto.h +++ b/tracker/tracker_proto.h @@ -35,6 +35,7 @@ #define TRACKER_PROTO_CMD_STORAGE_GET_STATUS 71 //get storage status from tracker #define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID 70 //get storage server id from tracker #define TRACKER_PROTO_CMD_STORAGE_GET_MY_IP 60 //get storage server ip from tracker +#define TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS 59 //current storage can change it's status #define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69 //get all storage ids from tracker #define TRACKER_PROTO_CMD_STORAGE_GET_GROUP_NAME 109 //get storage group name from tracker @@ -146,6 +147,7 @@ typedef struct typedef struct { + unsigned char my_status; //storage server status char src_id[FDFS_STORAGE_ID_MAX_SIZE]; //src storage id } TrackerStorageJoinBodyResp; diff --git a/tracker/tracker_service.c b/tracker/tracker_service.c index 5095860..bb9e3b5 100644 --- a/tracker/tracker_service.c +++ b/tracker/tracker_service.c @@ -1382,6 +1382,59 @@ static int tracker_deal_storage_report_status(struct fast_task_info *pTask) return tracker_mem_sync_storages(pGroup, briefServers, 1); } +static int tracker_deal_storage_change_status(struct fast_task_info *pTask) +{ + TrackerClientInfo *pClientInfo; + int old_status; + int new_status; + + if (pTask->length - sizeof(TrackerHeader) != 1) + { + logError("file: "__FILE__", line: %d, " + "cmd=%d, client ip addr: %s, " + "body size "PKG_LEN_PRINTF_FORMAT" " + "is not correct", __LINE__, + TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS, + pTask->client_ip, pTask->length - + (int)sizeof(TrackerHeader)); + pTask->length = sizeof(TrackerHeader); + return EINVAL; + } + + pClientInfo = (TrackerClientInfo *)pTask->arg; + old_status = pClientInfo->pStorage->status; + pTask->length = sizeof(TrackerHeader); + + new_status = *(pTask->data + sizeof(TrackerHeader)); + if ((old_status == new_status) || + (FDFS_IS_AVAILABLE_STATUS(old_status) && + FDFS_IS_AVAILABLE_STATUS(new_status))) + { + logInfo("file: "__FILE__", line: %d, " + "client ip: %s, do NOT change storage status, " + "old status: %d (%s), new status: %d (%s)", + __LINE__, pTask->client_ip, + old_status, get_storage_status_caption(old_status), + new_status, get_storage_status_caption(new_status)); + return 0; + } + if (new_status == FDFS_STORAGE_STATUS_ONLINE || + new_status == FDFS_STORAGE_STATUS_ACTIVE) + { + new_status = FDFS_STORAGE_STATUS_OFFLINE; + } + + pClientInfo->pStorage->status = new_status; + tracker_save_storages(); + + logInfo("file: "__FILE__", line: %d, " + "client ip: %s, set storage status from %d (%s) " + "to %d (%s)", __LINE__, pTask->client_ip, + old_status, get_storage_status_caption(old_status), + new_status, get_storage_status_caption(new_status)); + return 0; +} + static int tracker_deal_storage_join(struct fast_task_info *pTask) { TrackerStorageJoinBodyResp *pJoinBodyResp; @@ -3863,6 +3916,10 @@ int tracker_deal_task(struct fast_task_info *pTask) case TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS: result = tracker_deal_storage_report_status(pTask); break; + case TRACKER_PROTO_CMD_STORAGE_CHANGE_STATUS: + TRACKER_CHECK_LOGINED(pTask) + result = tracker_deal_storage_change_status(pTask); + break; case TRACKER_PROTO_CMD_STORAGE_GET_STATUS: result = tracker_deal_server_get_storage_status(pTask); break;