diff --git a/storage/storage_func.c b/storage/storage_func.c index 521cd98..e0eb796 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -1084,6 +1084,38 @@ static int storage_check_tracker_ipaddr(const char *filename) return 0; } +static int init_my_status_per_tracker() +{ + int bytes; + TrackerServerInfo *pTrackerServer; + TrackerServerInfo *pServerEnd; + StorageStatusPerTracker *pReportStatus; + + bytes = sizeof(StorageStatusPerTracker) * g_tracker_group.server_count; + g_my_report_status = (StorageStatusPerTracker *)malloc(bytes); + if (g_my_report_status == NULL) + { + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, " + "errno: %d, error info: %s", __LINE__, + bytes, errno, STRERROR(errno)); + return errno != 0 ? errno : ENOMEM; + } + memset(g_my_report_status, 0, bytes); + + pReportStatus = g_my_report_status; + pServerEnd = g_tracker_group.servers + g_tracker_group.server_count; + for (pTrackerServer=g_tracker_group.servers; pTrackerServermy_status = -1; + pReportStatus->src_storage_status = -1; + pReportStatus++; + } + + return 0; +} + int storage_func_init(const char *filename, \ char *bind_addr, const int addr_size) { @@ -1890,6 +1922,11 @@ int storage_func_init(const char *filename, \ return result; } + if ((result=init_my_status_per_tracker()) != 0) + { + return result; + } + if ((result=storage_get_my_tracker_client_ip()) != 0) { return result; @@ -2010,6 +2047,115 @@ bool storage_id_is_myself(const char *storage_id) } } +static int storage_get_my_ip_from_tracker(ConnectionInfo *conn, + char *ip_addrs, const int buff_size) +{ + char out_buff[sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN]; + TrackerHeader *pHeader; + int result; + int64_t in_bytes; + + memset(out_buff, 0, sizeof(out_buff)); + pHeader = (TrackerHeader *)out_buff; + + long2buff(FDFS_GROUP_NAME_MAX_LEN, pHeader->pkg_len); + pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_GET_MY_IP; + strcpy(out_buff + sizeof(TrackerHeader), g_group_name); + if((result=tcpsenddata_nb(conn->sock, out_buff, + sizeof(out_buff), g_fdfs_network_timeout)) != 0) + { + logError("file: "__FILE__", line: %d, " + "tracker server %s:%d, send data fail, " + "errno: %d, error info: %s.", + __LINE__, conn->ip_addr, conn->port, + result, STRERROR(result)); + return result; + } + + if ((result=fdfs_recv_response(conn, &ip_addrs, + buff_size - 1, &in_bytes)) != 0) + { + logError("file: "__FILE__", line: %d, " + "tracker server %s:%d, recv response fail, " + "errno: %d, error info: %s.", + __LINE__, conn->ip_addr, conn->port, + result, STRERROR(result)); + return result; + } + + *(ip_addrs + in_bytes) = '\0'; + return 0; +} + +int storage_set_tracker_client_ips(ConnectionInfo *conn, + const int tracker_index) +{ + char my_ip_addrs[256]; + char error_info[256]; + FDFSMultiIP multi_ip; + int result; + int i; + + if (g_my_report_status[tracker_index].get_my_ip_done) + { + return 0; + } + + if ((result=storage_get_my_ip_from_tracker(conn, my_ip_addrs, + sizeof(my_ip_addrs))) != 0) + { + return result; + } + + if ((result=fdfs_parse_multi_ips_ex(my_ip_addrs, &multi_ip, + error_info, sizeof(error_info), false)) != 0) + { + return result; + } + + for (i = 0; i < multi_ip.count; i++) + { + result = storage_insert_ip_addr_to_multi_ips(&g_tracker_client_ip, + multi_ip.ips[i], multi_ip.count); + if (result == 0) + { + if ((result=fdfs_check_and_format_ips(&g_tracker_client_ip, + error_info, sizeof(error_info))) != 0) + { + logCrit("file: "__FILE__", line: %d, " + "as a client of tracker server %s:%d, " + "my ip: %s not valid, error info: %s. " + "program exit!", __LINE__, + conn->ip_addr, conn->port, + multi_ip.ips[i], error_info); + + return result; + } + + insert_into_local_host_ip(multi_ip.ips[i]); + } + else if (result != EEXIST) + { + char ip_str[256]; + + fdfs_multi_ips_to_string(&g_tracker_client_ip, + ip_str, sizeof(ip_str)); + logError("file: "__FILE__", line: %d, " + "as a client of tracker server %s:%d, " + "my ip: %s not consistent with client ips: %s " + "of other tracker client. program exit!", __LINE__, + conn->ip_addr, conn->port, + multi_ip.ips[i], ip_str); + + return result; + } + } + + g_my_report_status[tracker_index].get_my_ip_done = true; + return 0; +} + + /* int write_serialized(int fd, const char *buff, size_t count, const bool bSync) { diff --git a/storage/storage_func.h b/storage/storage_func.h index e959d2a..1208498 100644 --- a/storage/storage_func.h +++ b/storage/storage_func.h @@ -34,6 +34,9 @@ bool storage_server_is_myself(const FDFSStorageBrief *pStorageBrief); bool storage_id_is_myself(const char *storage_id); +int storage_set_tracker_client_ips(ConnectionInfo *conn, + const int tracker_index); + #define STORAGE_CHOWN(path, current_uid, current_gid) \ if (!(g_run_by_gid == current_gid && g_run_by_uid == current_uid)) \ { \ diff --git a/storage/storage_global.c b/storage/storage_global.c index 99065db..9be4a42 100644 --- a/storage/storage_global.c +++ b/storage/storage_global.c @@ -90,6 +90,7 @@ char g_key_namespace[FDHT_MAX_NAMESPACE_LEN+1] = {0}; int g_namespace_len = 0; int g_allow_ip_count = 0; in_addr_t *g_allow_ip_addrs = NULL; +StorageStatusPerTracker *g_my_report_status = NULL; //returned by tracker server TimeInfo g_access_log_rotate_time = {0, 0}; //rotate access log time base TimeInfo g_error_log_rotate_time = {0, 0}; //rotate error log time base diff --git a/storage/storage_global.h b/storage/storage_global.h index 4223f58..0fc19ce 100644 --- a/storage/storage_global.h +++ b/storage/storage_global.h @@ -57,6 +57,13 @@ typedef struct int free_mb; //free spaces } FDFSStorePathInfo; +typedef struct +{ + signed char my_status; + signed char src_storage_status; + bool get_my_ip_done; +} StorageStatusPerTracker; + extern volatile bool g_continue_flag; extern FDFSStorePathInfo *g_path_space_list; @@ -140,6 +147,8 @@ extern int g_namespace_len; extern int g_allow_ip_count; /* -1 means match any ip address */ extern in_addr_t *g_allow_ip_addrs; /* sorted array, asc order */ +extern StorageStatusPerTracker *g_my_report_status; //returned by tracker server + extern gid_t g_run_by_gid; extern uid_t g_run_by_uid; diff --git a/storage/storage_ip_changed_dealer.c b/storage/storage_ip_changed_dealer.c index 985b2fb..acd6a26 100644 --- a/storage/storage_ip_changed_dealer.c +++ b/storage/storage_ip_changed_dealer.c @@ -117,107 +117,6 @@ static int storage_report_ip_changed(ConnectionInfo *pTrackerServer) } } -static int storage_get_my_ip_from_tracker(ConnectionInfo *conn, - char *ip_addrs, const int buff_size) -{ - char out_buff[sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN]; - TrackerHeader *pHeader; - int result; - int64_t in_bytes; - - memset(out_buff, 0, sizeof(out_buff)); - pHeader = (TrackerHeader *)out_buff; - - long2buff(FDFS_GROUP_NAME_MAX_LEN, pHeader->pkg_len); - pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_GET_MY_IP; - strcpy(out_buff + sizeof(TrackerHeader), g_group_name); - if((result=tcpsenddata_nb(conn->sock, out_buff, - sizeof(out_buff), g_fdfs_network_timeout)) != 0) - { - logError("file: "__FILE__", line: %d, " - "tracker server %s:%d, send data fail, " - "errno: %d, error info: %s.", - __LINE__, conn->ip_addr, conn->port, - result, STRERROR(result)); - return result; - } - - if ((result=fdfs_recv_response(conn, &ip_addrs, - buff_size - 1, &in_bytes)) != 0) - { - logError("file: "__FILE__", line: %d, " - "tracker server %s:%d, recv response fail, " - "errno: %d, error info: %s.", - __LINE__, conn->ip_addr, conn->port, - result, STRERROR(result)); - return result; - } - - *(ip_addrs + in_bytes) = '\0'; - return 0; -} - -static int storage_set_tracker_client_ips(ConnectionInfo *conn) -{ - char my_ip_addrs[256]; - char error_info[256]; - FDFSMultiIP multi_ip; - int result; - int i; - - if ((result=storage_get_my_ip_from_tracker(conn, my_ip_addrs, - sizeof(my_ip_addrs))) != 0) - { - return result; - } - - if ((result=fdfs_parse_multi_ips_ex(my_ip_addrs, &multi_ip, - error_info, sizeof(error_info), false)) != 0) - { - return result; - } - - for (i = 0; i < multi_ip.count; i++) - { - result = storage_insert_ip_addr_to_multi_ips(&g_tracker_client_ip, - multi_ip.ips[i], multi_ip.count); - if (result == 0) - { - if ((result=fdfs_check_and_format_ips(&g_tracker_client_ip, - error_info, sizeof(error_info))) != 0) - { - logCrit("file: "__FILE__", line: %d, " - "as a client of tracker server %s:%d, " - "my ip: %s not valid, error info: %s. " - "program exit!", __LINE__, - conn->ip_addr, conn->port, - multi_ip.ips[i], error_info); - - return result; - } - - insert_into_local_host_ip(multi_ip.ips[i]); - } - else if (result != EEXIST) - { - char ip_str[256]; - - fdfs_multi_ips_to_string(&g_tracker_client_ip, - ip_str, sizeof(ip_str)); - logError("file: "__FILE__", line: %d, " - "as a client of tracker server %s:%d, " - "my ip: %s not consistent with client ips: %s " - "of other tracker client. program exit!", __LINE__, - conn->ip_addr, conn->port, - multi_ip.ips[i], ip_str); - - return result; - } - } - - return 0; -} - int storage_get_my_tracker_client_ip() { TrackerServerInfo *pGlobalServer; @@ -267,11 +166,13 @@ int storage_get_my_tracker_client_ip() continue; } - if ((result=storage_set_tracker_client_ips(conn)) != 0) + if ((result=storage_set_tracker_client_ips(conn, + pGlobalServer - g_tracker_group.servers)) != 0) { close(conn->sock); return result; } + //pGlobalServer-> getSockIpaddr(conn->sock, tracker_client_ip, IP_ADDRESS_SIZE); insert_into_local_host_ip(tracker_client_ip); diff --git a/storage/storage_service.c b/storage/storage_service.c index f69582f..c6e7f47 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -3668,7 +3668,7 @@ static int storage_server_query_file_info(struct fast_task_info *pTask) pStorageIdInfo = fdfs_get_storage_by_id(id); if (pStorageIdInfo != NULL) { - strcpy(p, fdfs_get_ipaddr_by_client_ip( + strcpy(p, fdfs_get_ipaddr_by_peer_ip( &pStorageIdInfo->ip_addrs, pTask->client_ip)); } diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index 0ccf92f..4982b89 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -43,8 +43,6 @@ static pthread_mutex_t reporter_thread_lock; /* save report thread ids */ static pthread_t *report_tids = NULL; -static int *src_storage_status = NULL; //returned by tracker server -static signed char *my_report_status = NULL; //returned by tracker server static bool need_rejoin_tracker = false; static int tracker_heart_beat(ConnectionInfo *pTrackerServer, \ @@ -212,7 +210,6 @@ static void *tracker_report_thread_entrance(void *arg) int previousCode; int nContinuousFail; int tracker_index; - int ips_limit; int64_t last_trunk_total_free_space; bool bServerPortChanged; @@ -239,7 +236,6 @@ static void *tracker_report_thread_entrance(void *arg) previousCode = 0; nContinuousFail = 0; conn = NULL; - ips_limit = g_use_storage_id ? FDFS_MULTI_IP_MAX_COUNT : 1; while (g_continue_flag) { if (conn != NULL) @@ -274,6 +270,12 @@ static void *tracker_report_thread_entrance(void *arg) } } + if ((result=storage_set_tracker_client_ips(conn, tracker_index)) != 0) + { + g_continue_flag = false; + break; + } + tcpsetserveropt(conn->sock, g_fdfs_network_timeout); getSockIpaddr(conn->sock, tracker_client_ip, IP_ADDRESS_SIZE); @@ -290,7 +292,8 @@ static void *tracker_report_thread_entrance(void *arg) "successfully connect to tracker server %s:%d%s, " "as a tracker client, my ip is %s", __LINE__, conn->ip_addr, conn->port, - szFailPrompt, tracker_client_ip); + szFailPrompt, fdfs_get_ipaddr_by_peer_ip( + &g_tracker_client_ip, conn->ip_addr)); previousCode = 0; nContinuousFail = 0; @@ -387,14 +390,14 @@ static void *tracker_report_thread_entrance(void *arg) sync_old_done = true; } - src_storage_status[tracker_index] = \ + g_my_report_status[tracker_index].src_storage_status = tracker_sync_notify(conn, tracker_index); - if (src_storage_status[tracker_index] != 0) + if (g_my_report_status[tracker_index].src_storage_status != 0) { int k; for (k=0; kip_addr, \ pTrackerServer->port, \ (int)sizeof(respBody), in_bytes); - my_report_status[tracker_index] = EINVAL; + g_my_report_status[tracker_index].my_status = EINVAL; return EINVAL; } @@ -2463,6 +2466,7 @@ int tracker_report_thread_start() TrackerServerInfo *pServerEnd; pthread_attr_t pattr; pthread_t tid; + int bytes; int result; if ((result=init_pthread_attr(&pattr, g_thread_stack_size)) != 0) @@ -2470,49 +2474,21 @@ int tracker_report_thread_start() return result; } - report_tids = (pthread_t *)malloc(sizeof(pthread_t) * \ - g_tracker_group.server_count); + bytes = sizeof(pthread_t) * g_tracker_group.server_count; + report_tids = (pthread_t *)malloc(bytes); if (report_tids == NULL) { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, " \ - "errno: %d, error info: %s", \ - __LINE__, (int)sizeof(pthread_t) * \ - g_tracker_group.server_count, \ - errno, STRERROR(errno)); + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, " + "errno: %d, error info: %s", + __LINE__, bytes, errno, STRERROR(errno)); return errno != 0 ? errno : ENOMEM; } - memset(report_tids, 0, sizeof(pthread_t)*g_tracker_group.server_count); - - src_storage_status = (int *)malloc(sizeof(int) * \ - g_tracker_group.server_count); - if (src_storage_status == NULL) - { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, " \ - "errno: %d, error info: %s", __LINE__, \ - (int)sizeof(int) * g_tracker_group.server_count, \ - errno, STRERROR(errno)); - return errno != 0 ? errno : ENOMEM; - } - memset(src_storage_status,-1,sizeof(int)*g_tracker_group.server_count); - - my_report_status = (signed char *)malloc(sizeof(signed char) * \ - g_tracker_group.server_count); - if (my_report_status == NULL) - { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, " \ - "errno: %d, error info: %s", __LINE__, \ - (int)sizeof(signed char) * g_tracker_group.server_count, \ - errno, STRERROR(errno)); - return errno != 0 ? errno : ENOMEM; - } - memset(my_report_status, -1, sizeof(char)*g_tracker_group.server_count); + memset(report_tids, 0, bytes); g_tracker_reporter_count = 0; pServerEnd = g_tracker_group.servers + g_tracker_group.server_count; - for (pTrackerServer=g_tracker_group.servers; pTrackerServerdata + sizeof(TrackerHeader); memcpy(p, pGroup->group_name, FDFS_GROUP_NAME_MAX_LEN); p += FDFS_GROUP_NAME_MAX_LEN; - strcpy(p, fdfs_get_ipaddr_by_client_ip( + strcpy(p, fdfs_get_ipaddr_by_peer_ip( &ppStoreServers[0]->ip_addrs, pTask->client_ip)); p += IP_ADDRESS_SIZE - 1; long2buff(pGroup->storage_port, p); @@ -2562,7 +2562,7 @@ static int tracker_deal_service_query_fetch_update( \ for (ppServer=ppStoreServers+1; ppServerip_addrs, pTask->client_ip)); p += IP_ADDRESS_SIZE - 1; } @@ -2939,7 +2939,7 @@ static int tracker_deal_service_query_storage( \ for (ppServer=pStoreGroup->active_servers; ppServerip_addrs, pTask->client_ip)); p += IP_ADDRESS_SIZE - 1; @@ -2949,7 +2949,7 @@ static int tracker_deal_service_query_storage( \ } else { - strcpy(p, fdfs_get_ipaddr_by_client_ip( + strcpy(p, fdfs_get_ipaddr_by_peer_ip( &pStorageServer->ip_addrs, pTask->client_ip)); p += IP_ADDRESS_SIZE - 1;