From 16bbaf7884cebe2a6956ad2541555c80054d6ebf Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 2 Oct 2019 14:40:55 +0800 Subject: [PATCH] storage support multi ip for tracker server --- storage/storage_disk_recovery.c | 6 +- storage/storage_func.c | 65 ++++---- storage/storage_ip_changed_dealer.c | 123 +++++++------- storage/storage_param_getter.c | 20 +-- storage/storage_service.c | 2 +- storage/storage_sync.c | 85 +++++----- storage/tracker_client_thread.c | 246 +++++++++++++--------------- storage/trunk_mgr/trunk_client.c | 6 +- tracker/fdfs_shared_func.c | 23 +++ tracker/fdfs_shared_func.h | 10 ++ tracker/tracker_proto.c | 14 +- tracker/tracker_proto.h | 18 +- 12 files changed, 333 insertions(+), 285 deletions(-) diff --git a/storage/storage_disk_recovery.c b/storage/storage_disk_recovery.c index ffa7bea..35ee241 100644 --- a/storage/storage_disk_recovery.c +++ b/storage/storage_disk_recovery.c @@ -531,7 +531,7 @@ static int storage_do_recovery(const char *pBasePath, StorageBinLogReader *pRead char local_filename[MAX_PATH_SIZE]; char src_filename[MAX_PATH_SIZE]; - pTrackerServer = g_tracker_group.servers; + pTrackerServer = g_tracker_group.servers->connections; //TODO: fix me !!! count = 0; total_count = 0; success_count = 0; @@ -549,7 +549,7 @@ static int storage_do_recovery(const char *pBasePath, StorageBinLogReader *pRead { break; } - if ((pStorageConn=tracker_connect_server(pSrcStorage, &result)) == NULL) + if ((pStorageConn=tracker_make_connection(pSrcStorage, &result)) == NULL) { sleep(5); continue; @@ -1114,7 +1114,7 @@ int storage_disk_recovery_start(const int store_path_index) return EINTR; } - if ((pStorageConn=tracker_connect_server(&srcStorage, &result)) == NULL) + if ((pStorageConn=tracker_make_connection(&srcStorage, &result)) == NULL) { return result; } diff --git a/storage/storage_func.c b/storage/storage_func.c index cf32938..58bb502 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -174,28 +174,27 @@ static int storage_do_get_group_name(ConnectionInfo *pTrackerServer) static int storage_get_group_name_from_tracker() { - ConnectionInfo *pTrackerServer; - ConnectionInfo *pServerEnd; + TrackerServerInfo *pTrackerServer; + TrackerServerInfo *pServerEnd; ConnectionInfo *pTrackerConn; - ConnectionInfo tracker_server; + TrackerServerInfo tracker_server; int result; result = ENOENT; pServerEnd = g_tracker_group.servers + g_tracker_group.server_count; - for (pTrackerServer=g_tracker_group.servers; \ + for (pTrackerServer=g_tracker_group.servers; pTrackerServerconnections + pServer->count; + for (conn=pServer->connections; connip_addr, conn->port); + if (strcmp(conn->ip_addr, "127.0.0.1") == 0) + { + logError("file: "__FILE__", line: %d, " + "conf file \"%s\", tracker: \"%s:%d\" is invalid, " + "tracker server ip can't be 127.0.0.1", + __LINE__, filename, conn->ip_addr, conn->port); + return EINVAL; + } + } + } + + return 0; +} + int storage_func_init(const char *filename, \ char *bind_addr, const int addr_size) { @@ -1070,8 +1097,6 @@ int storage_func_init(const char *filename, \ int64_t buff_size; int64_t rotate_access_log_size; int64_t rotate_error_log_size; - ConnectionInfo *pServer; - ConnectionInfo *pEnd; /* while (nThreadCount > 0) @@ -1181,23 +1206,7 @@ int storage_func_init(const char *filename, \ break; } - pEnd = g_tracker_group.servers + g_tracker_group.server_count; - for (pServer=g_tracker_group.servers; pServerip_addr, pServer->port); - if (strcmp(pServer->ip_addr, "127.0.0.1") == 0) - { - logError("file: "__FILE__", line: %d, " \ - "conf file \"%s\", " \ - "tracker: \"%s:%d\" is invalid, " \ - "tracker server ip can't be 127.0.0.1",\ - __LINE__, filename, pServer->ip_addr, \ - pServer->port); - result = EINVAL; - break; - } - } - if (result != 0) + if ((result=storage_check_tracker_ipaddr(filename)) != 0) { break; } diff --git a/storage/storage_ip_changed_dealer.c b/storage/storage_ip_changed_dealer.c index dbf4d7e..5dd5d20 100644 --- a/storage/storage_ip_changed_dealer.c +++ b/storage/storage_ip_changed_dealer.c @@ -119,15 +119,17 @@ static int storage_report_ip_changed(ConnectionInfo *pTrackerServer) int storage_get_my_tracker_client_ip() { - ConnectionInfo *pGlobalServer; - ConnectionInfo *pTServer; - ConnectionInfo *pTServerEnd; - ConnectionInfo trackerServer; + TrackerServerInfo *pGlobalServer; + TrackerServerInfo *pTServer; + TrackerServerInfo *pTServerEnd; + TrackerServerInfo trackerServer; + ConnectionInfo *conn; char tracker_client_ip[IP_ADDRESS_SIZE]; int success_count; int result; int i; + conn = NULL; result = 0; success_count = 0; pTServer = &trackerServer; @@ -138,13 +140,13 @@ int storage_get_my_tracker_client_ip() for (pGlobalServer=g_tracker_group.servers; pGlobalServersock = socketClientExAuto(pTServer->ip_addr, - pTServer->port, g_fdfs_connect_timeout, O_NONBLOCK, - g_client_bind_addr ? g_bind_addr : NULL, &result); - if (pTServer->sock >= 0) + conn = tracker_connect_server_no_pool_ex(pTServer, + g_client_bind_addr ? g_bind_addr : NULL, &result, false); + if (conn != NULL) { break; } @@ -152,37 +154,40 @@ int storage_get_my_tracker_client_ip() sleep(5); } - if (pTServer->sock < 0) + if (conn == NULL) { - logError("file: "__FILE__", line: %d, " \ - "connect to tracker server %s:%d fail, " \ - "errno: %d, error info: %s", \ - __LINE__, pTServer->ip_addr, pTServer->port, \ + logError("file: "__FILE__", line: %d, " + "connect to tracker server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, pTServer->connections[0].ip_addr, + pTServer->connections[0].port, result, STRERROR(result)); continue; } - getSockIpaddr(pTServer->sock,tracker_client_ip,IP_ADDRESS_SIZE); + //TODO support multi IPs !!! + + getSockIpaddr(conn->sock,tracker_client_ip,IP_ADDRESS_SIZE); if (*g_tracker_client_ip == '\0') { strcpy(g_tracker_client_ip, tracker_client_ip); } else if (strcmp(tracker_client_ip, g_tracker_client_ip) != 0) { - logError("file: "__FILE__", line: %d, " \ - "as a client of tracker server %s:%d, " \ - "my ip: %s != client ip: %s of other " \ - "tracker client", __LINE__, \ - pTServer->ip_addr, pTServer->port, \ + logError("file: "__FILE__", line: %d, " + "as a client of tracker server %s:%d, " + "my ip: %s != client ip: %s of other " + "tracker client", __LINE__, + conn->ip_addr, conn->port, tracker_client_ip, g_tracker_client_ip); - close(pTServer->sock); + close(conn->sock); return EINVAL; } - fdfs_quit(pTServer); - close(pTServer->sock); + fdfs_quit(conn); + close(conn->sock); success_count++; } } @@ -197,10 +202,11 @@ int storage_get_my_tracker_client_ip() static int storage_report_storage_ip_addr() { - ConnectionInfo *pGlobalServer; - ConnectionInfo *pTServer; - ConnectionInfo *pTServerEnd; - ConnectionInfo trackerServer; + TrackerServerInfo *pGlobalServer; + TrackerServerInfo *pTServer; + TrackerServerInfo *pTServerEnd; + TrackerServerInfo trackerServer; + ConnectionInfo *conn; int success_count; int result; int i; @@ -229,13 +235,13 @@ static int storage_report_storage_ip_addr() for (pGlobalServer=g_tracker_group.servers; pGlobalServersock = socketClientExAuto(pTServer->ip_addr, - pTServer->port, g_fdfs_connect_timeout, O_NONBLOCK, - g_client_bind_addr ? g_bind_addr : NULL, &result); - if (pTServer->sock >= 0) + conn = tracker_connect_server_no_pool_ex(pTServer, + g_client_bind_addr ? g_bind_addr : NULL, &result, false); + if (conn != NULL) { break; } @@ -243,18 +249,19 @@ static int storage_report_storage_ip_addr() sleep(1); } - if (pTServer->sock < 0) + if (conn == NULL) { - logError("file: "__FILE__", line: %d, " \ - "connect to tracker server %s:%d fail, " \ - "errno: %d, error info: %s", \ - __LINE__, pTServer->ip_addr, pTServer->port, \ + logError("file: "__FILE__", line: %d, " + "connect to tracker server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, pTServer->connections[0].ip_addr, + pTServer->connections[0].port, result, STRERROR(result)); continue; } - if ((result=storage_report_ip_changed(pTServer)) == 0) + if ((result=storage_report_ip_changed(conn)) == 0) { success_count++; } @@ -263,8 +270,8 @@ static int storage_report_storage_ip_addr() sleep(1); } - fdfs_quit(pTServer); - close(pTServer->sock); + fdfs_quit(conn); + close(conn->sock); } } @@ -278,10 +285,11 @@ static int storage_report_storage_ip_addr() int storage_changelog_req() { - ConnectionInfo *pGlobalServer; - ConnectionInfo *pTServer; - ConnectionInfo *pTServerEnd; - ConnectionInfo trackerServer; + TrackerServerInfo *pGlobalServer; + TrackerServerInfo *pTServer; + TrackerServerInfo *pTServerEnd; + TrackerServerInfo trackerServer; + ConnectionInfo *conn; int success_count; int result; int i; @@ -296,13 +304,13 @@ int storage_changelog_req() for (pGlobalServer=g_tracker_group.servers; pGlobalServersock = socketClientExAuto(pTServer->ip_addr, - pTServer->port, g_fdfs_connect_timeout, O_NONBLOCK, - g_client_bind_addr ? g_bind_addr : NULL, &result); - if (pTServer->sock >= 0) + conn = tracker_connect_server_no_pool_ex(pTServer, + g_client_bind_addr ? g_bind_addr : NULL, &result, false); + if (conn != NULL) { break; } @@ -310,18 +318,19 @@ int storage_changelog_req() sleep(1); } - if (pTServer->sock < 0) + if (conn == NULL) { - logError("file: "__FILE__", line: %d, " \ - "connect to tracker server %s:%d fail, " \ - "errno: %d, error info: %s", \ - __LINE__, pTServer->ip_addr, pTServer->port, \ + logError("file: "__FILE__", line: %d, " + "connect to tracker server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, pTServer->connections[0].ip_addr, + pTServer->connections[0].port, result, STRERROR(result)); continue; } - result = storage_do_changelog_req(pTServer); + result = storage_do_changelog_req(conn); if (result == 0 || result == ENOENT) { success_count++; @@ -331,8 +340,8 @@ int storage_changelog_req() sleep(1); } - fdfs_quit(pTServer); - close(pTServer->sock); + fdfs_quit(conn); + close(conn->sock); } } diff --git a/storage/storage_param_getter.c b/storage/storage_param_getter.c index 4f28830..443ebbf 100644 --- a/storage/storage_param_getter.c +++ b/storage/storage_param_getter.c @@ -33,29 +33,29 @@ static int storage_convert_src_server_id() { - ConnectionInfo *pTrackerServer; - ConnectionInfo *pServerEnd; + TrackerServerInfo *pTrackerServer; + TrackerServerInfo *pServerEnd; ConnectionInfo *pTrackerConn; - ConnectionInfo tracker_server; + TrackerServerInfo tracker_server; int result; result = ENOENT; pServerEnd = g_tracker_group.servers + g_tracker_group.server_count; - for (pTrackerServer=g_tracker_group.servers; \ + for (pTrackerServer=g_tracker_group.servers; pTrackerServersock = socketClientExAuto(pTServer->ip_addr, - pTServer->port, g_fdfs_connect_timeout, O_NONBLOCK, - g_client_bind_addr ? g_bind_addr : NULL, &result); - if (pTServer->sock >= 0) + conn = tracker_connect_server_no_pool_ex(pTServer, + g_client_bind_addr ? g_bind_addr : NULL, &result, false); + if (conn != NULL) { break; } @@ -1688,20 +1690,26 @@ int storage_report_storage_status(const char *storage_id, \ sleep(5); } - if (pTServer->sock < 0) - { - continue; - } + if (conn == NULL) + { + logError("file: "__FILE__", line: %d, " + "connect to tracker server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, pTServer->connections[0].ip_addr, + pTServer->connections[0].port, + result, STRERROR(result)); + continue; + } report_count++; - if ((result=tracker_report_storage_status(pTServer, \ + if ((result=tracker_report_storage_status(conn, &briefServer)) == 0) { success_count++; } - fdfs_quit(pTServer); - close(pTServer->sock); + fdfs_quit(conn); + close(conn->sock); } logDebug("file: "__FILE__", line: %d, " \ @@ -1714,9 +1722,10 @@ int storage_report_storage_status(const char *storage_id, \ static int storage_reader_sync_init_req(StorageBinLogReader *pReader) { - ConnectionInfo *pTrackerServers; - ConnectionInfo *pTServer; - ConnectionInfo *pTServerEnd; + TrackerServerInfo *pTrackerServers; + TrackerServerInfo *pTServer; + TrackerServerInfo *pTServerEnd; + ConnectionInfo *conn; char tracker_client_ip[IP_ADDRESS_SIZE]; int result; @@ -1733,27 +1742,27 @@ static int storage_reader_sync_init_req(StorageBinLogReader *pReader) } } - pTrackerServers = (ConnectionInfo *)malloc( \ - sizeof(ConnectionInfo) * g_tracker_group.server_count); + pTrackerServers = (TrackerServerInfo *)malloc( + sizeof(TrackerServerInfo) * g_tracker_group.server_count); if (pTrackerServers == NULL) { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail", __LINE__, \ - (int)sizeof(ConnectionInfo) * \ + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail", __LINE__, + (int)sizeof(TrackerServerInfo) * g_tracker_group.server_count); return errno != 0 ? errno : ENOMEM; } - memcpy(pTrackerServers, g_tracker_group.servers, \ - sizeof(ConnectionInfo) * g_tracker_group.server_count); + memcpy(pTrackerServers, g_tracker_group.servers, + sizeof(TrackerServerInfo) * g_tracker_group.server_count); pTServerEnd = pTrackerServers + g_tracker_group.server_count; for (pTServer=pTrackerServers; pTServersock = -1; + fdfs_server_sock_reset(pTServer); } result = EINTR; - if (g_tracker_group.leader_index >= 0 && \ + if (g_tracker_group.leader_index >= 0 && g_tracker_group.leader_index < g_tracker_group.server_count) { pTServer = pTrackerServers + g_tracker_group.leader_index; @@ -1766,10 +1775,9 @@ static int storage_reader_sync_init_req(StorageBinLogReader *pReader) { while (g_continue_flag) { - pTServer->sock = socketClientExAuto(pTServer->ip_addr, - pTServer->port, g_fdfs_connect_timeout, O_NONBLOCK, - g_client_bind_addr ? g_bind_addr : NULL, &result); - if (pTServer->sock >= 0) + conn = tracker_connect_server_no_pool_ex(pTServer, + g_client_bind_addr ? g_bind_addr : NULL, &result, true); + if (conn != NULL) { break; } @@ -1788,20 +1796,19 @@ static int storage_reader_sync_init_req(StorageBinLogReader *pReader) break; } - getSockIpaddr(pTServer->sock, \ - tracker_client_ip, IP_ADDRESS_SIZE); + getSockIpaddr(conn->sock, tracker_client_ip, IP_ADDRESS_SIZE); insert_into_local_host_ip(tracker_client_ip); - if ((result=tracker_sync_src_req(pTServer, pReader)) != 0) + if ((result=tracker_sync_src_req(conn, pReader)) != 0) { - fdfs_quit(pTServer); - close(pTServer->sock); + fdfs_quit(conn); + close(conn->sock); sleep(g_heart_beat_interval); continue; } - fdfs_quit(pTServer); - close(pTServer->sock); + fdfs_quit(conn); + close(conn->sock); break; } while (1); diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index 7758114..07bf224 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -127,7 +127,7 @@ int kill_tracker_report_threads() return kill_res; } -static void thracker_report_thread_exit(ConnectionInfo *pTrackerServer) +static void thracker_report_thread_exit(TrackerServerInfo *pTrackerServer) { int result; int i; @@ -165,9 +165,10 @@ static void thracker_report_thread_exit(ConnectionInfo *pTrackerServer) __LINE__, result, STRERROR(result)); } - logDebug("file: "__FILE__", line: %d, " \ - "report thread to tracker server %s:%d exit", \ - __LINE__, pTrackerServer->ip_addr, pTrackerServer->port); + logDebug("file: "__FILE__", line: %d, " + "report thread to tracker server %s:%d exit", + __LINE__, pTrackerServer->connections[0].ip_addr, + pTrackerServer->connections[0].port); } static int tracker_unlink_mark_files(const char *storage_id) @@ -194,7 +195,8 @@ static int tracker_rename_mark_files(const char *old_ip_addr, \ static void *tracker_report_thread_entrance(void *arg) { - ConnectionInfo *pTrackerServer; + ConnectionInfo *conn; + TrackerServerInfo *pTrackerServer; char my_server_id[FDFS_STORAGE_ID_MAX_SIZE]; char tracker_client_ip[IP_ADDRESS_SIZE]; char szFailPrompt[36]; @@ -216,13 +218,14 @@ static void *tracker_report_thread_entrance(void *arg) bServerPortChanged = (g_last_server_port != 0) && \ (g_server_port != g_last_server_port); - pTrackerServer = (ConnectionInfo *)arg; - pTrackerServer->sock = -1; + pTrackerServer = (TrackerServerInfo *)arg; + fdfs_server_sock_reset(pTrackerServer); tracker_index = pTrackerServer - g_tracker_group.servers; - logDebug("file: "__FILE__", line: %d, " \ - "report thread to tracker server %s:%d started", \ - __LINE__, pTrackerServer->ip_addr, pTrackerServer->port); + logDebug("file: "__FILE__", line: %d, " + "report thread to tracker server %s:%d started", + __LINE__, pTrackerServer->connections[0].ip_addr, + pTrackerServer->connections[0].port); sync_old_done = g_sync_old_done; while (g_continue_flag && \ @@ -234,36 +237,25 @@ static void *tracker_report_thread_entrance(void *arg) result = 0; previousCode = 0; nContinuousFail = 0; + conn = NULL; while (g_continue_flag) { - if (pTrackerServer->sock >= 0) - { - close(pTrackerServer->sock); - } - - pTrackerServer->sock = socketCreateExAuto(pTrackerServer->ip_addr, - g_fdfs_connect_timeout, O_NONBLOCK, - g_client_bind_addr ? g_bind_addr : NULL, &result); - if (pTrackerServer->sock < 0) + if (conn != NULL) { - logCrit("file: "__FILE__", line: %d, " - "socket create fail, program exit!", __LINE__); - g_continue_flag = false; - break; + conn_pool_disconnect_server(conn); } - tcpsetserveropt(pTrackerServer->sock, g_fdfs_network_timeout); - if ((result=connectserverbyip_nb(pTrackerServer->sock, - pTrackerServer->ip_addr, pTrackerServer->port, - g_fdfs_connect_timeout)) != 0) + conn = tracker_connect_server_no_pool_ex(pTrackerServer, + g_client_bind_addr ? g_bind_addr : NULL, &result, false); + if (conn == NULL) { if (previousCode != result) { - logError("file: "__FILE__", line: %d, " \ - "connect to tracker server %s:%d fail" \ - ", errno: %d, error info: %s", \ - __LINE__, pTrackerServer->ip_addr, \ - pTrackerServer->port, \ + logError("file: "__FILE__", line: %d, " + "connect to tracker server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, pTrackerServer->connections[0].ip_addr, + pTrackerServer->connections[0].port, result, STRERROR(result)); previousCode = result; } @@ -280,8 +272,8 @@ static void *tracker_report_thread_entrance(void *arg) } } - getSockIpaddr(pTrackerServer->sock, \ - tracker_client_ip, IP_ADDRESS_SIZE); + tcpsetserveropt(conn->sock, g_fdfs_network_timeout); + getSockIpaddr(conn->sock, tracker_client_ip, IP_ADDRESS_SIZE); if (nContinuousFail == 0) { @@ -289,14 +281,14 @@ static void *tracker_report_thread_entrance(void *arg) } else { - sprintf(szFailPrompt, ", continuous fail count: %d", \ + sprintf(szFailPrompt, ", continuous fail count: %d", nContinuousFail); } - logInfo("file: "__FILE__", line: %d, " \ - "successfully connect to tracker server %s:%d%s, " \ - "as a tracker client, my ip is %s", \ - __LINE__, pTrackerServer->ip_addr, \ - pTrackerServer->port, szFailPrompt, tracker_client_ip); + logInfo("file: "__FILE__", line: %d, " + "successfully connect to tracker server %s:%d%s, " + "as a tracker client, my ip is %s", + __LINE__, conn->ip_addr, conn->port, + szFailPrompt, tracker_client_ip); previousCode = 0; nContinuousFail = 0; @@ -307,15 +299,15 @@ static void *tracker_report_thread_entrance(void *arg) } else if (strcmp(tracker_client_ip, g_tracker_client_ip) != 0) { - logError("file: "__FILE__", line: %d, " \ - "as a client of tracker server %s:%d, " \ - "my ip: %s != client ip: %s of other " \ - "tracker client", __LINE__, \ - pTrackerServer->ip_addr, pTrackerServer->port, \ + logError("file: "__FILE__", line: %d, " + "as a client of tracker server %s:%d, " + "my ip: %s != client ip: %s of other " + "tracker client", __LINE__, + conn->ip_addr, conn->port, tracker_client_ip, g_tracker_client_ip); - close(pTrackerServer->sock); - pTrackerServer->sock = -1; + close(conn->sock); + conn->sock = -1; break; } @@ -328,7 +320,7 @@ static void *tracker_report_thread_entrance(void *arg) //print_local_host_ip_addrs(); */ - if (tracker_report_join(pTrackerServer, tracker_index, \ + if (tracker_report_join(conn, tracker_index, sync_old_done) != 0) { sleep(g_heart_beat_interval); @@ -353,14 +345,14 @@ static void *tracker_report_thread_entrance(void *arg) "errno: %d, error info: %s", \ __LINE__, result, STRERROR(result)); - fdfs_quit(pTrackerServer); + fdfs_quit(conn); sleep(g_heart_beat_interval); continue; } if (!g_sync_old_done) { - if (tracker_sync_dest_req(pTrackerServer) == 0) + if (tracker_sync_dest_req(conn) == 0) { g_sync_old_done = true; if (storage_write_to_sync_ini_file() \ @@ -382,18 +374,18 @@ static void *tracker_report_thread_entrance(void *arg) pthread_mutex_unlock( \ &reporter_thread_lock); - fdfs_quit(pTrackerServer); + fdfs_quit(conn); sleep(g_heart_beat_interval); continue; } } else { - if (tracker_sync_notify(pTrackerServer, tracker_index) != 0) + if (tracker_sync_notify(conn, tracker_index) != 0) { pthread_mutex_unlock( \ &reporter_thread_lock); - fdfs_quit(pTrackerServer); + fdfs_quit(conn); sleep(g_heart_beat_interval); continue; } @@ -412,7 +404,7 @@ static void *tracker_report_thread_entrance(void *arg) } src_storage_status[tracker_index] = \ - tracker_sync_notify(pTrackerServer, tracker_index); + tracker_sync_notify(conn, tracker_index); if (src_storage_status[tracker_index] != 0) { int k; @@ -432,7 +424,7 @@ static void *tracker_report_thread_entrance(void *arg) g_tracker_client_ip, my_server_id, \ &my_status) == 0) { - tracker_sync_dest_query(pTrackerServer); + tracker_sync_dest_query(conn); if(my_status= \ g_heart_beat_interval) { - if (tracker_heart_beat(pTrackerServer, \ - &stat_chg_sync_count, \ + if (tracker_heart_beat(conn, &stat_chg_sync_count, &bServerPortChanged) != 0) { break; } - if (g_storage_ip_changed_auto_adjust && \ - tracker_storage_changelog_req( \ - pTrackerServer) != 0) + if (g_storage_ip_changed_auto_adjust && + tracker_storage_changelog_req(conn) != 0) { break; } @@ -483,12 +473,12 @@ static void *tracker_report_thread_entrance(void *arg) last_beat_time = current_time; } - if (sync_time_chg_count != g_sync_change_count && \ - current_time - last_sync_report_time >= \ + if (sync_time_chg_count != g_sync_change_count && + current_time - last_sync_report_time >= g_heart_beat_interval) { - if (tracker_report_sync_timestamp( \ - pTrackerServer, &bServerPortChanged)!=0) + if (tracker_report_sync_timestamp( + conn, &bServerPortChanged)!=0) { break; } @@ -497,10 +487,10 @@ static void *tracker_report_thread_entrance(void *arg) last_sync_report_time = current_time; } - if (current_time - last_df_report_time >= \ + if (current_time - last_df_report_time >= g_stat_report_interval) { - if (tracker_report_df_stat(pTrackerServer, \ + if (tracker_report_df_stat(conn, &bServerPortChanged) != 0) { break; @@ -513,7 +503,7 @@ static void *tracker_report_thread_entrance(void *arg) { if (last_trunk_file_id < g_current_trunk_file_id) { - if (tracker_report_trunk_fid(pTrackerServer)!=0) + if (tracker_report_trunk_fid(conn)!=0) { break; } @@ -522,7 +512,7 @@ static void *tracker_report_thread_entrance(void *arg) if (last_trunk_total_free_space != g_trunk_total_free_space) { - if (tracker_report_trunk_free_space(pTrackerServer)!=0) + if (tracker_report_trunk_free_space(conn)!=0) { break; } @@ -538,12 +528,7 @@ static void *tracker_report_thread_entrance(void *arg) sleep(1); } - if ((!g_continue_flag) && fdfs_quit(pTrackerServer) != 0) - { - } - - close(pTrackerServer->sock); - pTrackerServer->sock = -1; + conn_pool_disconnect_server(conn); if (g_continue_flag) { sleep(1); @@ -552,11 +537,11 @@ static void *tracker_report_thread_entrance(void *arg) if (nContinuousFail > 0) { - logError("file: "__FILE__", line: %d, " \ - "connect to tracker server %s:%d fail, try count: %d" \ - ", errno: %d, error info: %s", \ - __LINE__, pTrackerServer->ip_addr, \ - pTrackerServer->port, nContinuousFail, \ + logError("file: "__FILE__", line: %d, " + "connect to tracker server %s:%d fail, try count: %d" + ", errno: %d, error info: %s", + __LINE__, pTrackerServer->connections[0].ip_addr, + pTrackerServer->connections[0].port, nContinuousFail, result, STRERROR(result)); } @@ -998,7 +983,7 @@ static int tracker_merge_servers(ConnectionInfo *pTrackerServer, \ diffServers, pDiffServer - diffServers); } -static int _notify_reselect_tleader(ConnectionInfo *pTrackerServer) +static int _notify_reselect_tleader(ConnectionInfo *conn) { char out_buff[sizeof(TrackerHeader)]; TrackerHeader *pHeader; @@ -1008,19 +993,18 @@ static int _notify_reselect_tleader(ConnectionInfo *pTrackerServer) pHeader = (TrackerHeader *)out_buff; memset(out_buff, 0, sizeof(out_buff)); pHeader->cmd = TRACKER_PROTO_CMD_TRACKER_NOTIFY_RESELECT_LEADER; - if ((result=tcpsenddata_nb(pTrackerServer->sock, out_buff, \ + if ((result=tcpsenddata_nb(conn->sock, out_buff, \ sizeof(out_buff), g_fdfs_network_timeout)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "tracker server %s:%d, send data fail, " \ - "errno: %d, error info: %s.", \ - __LINE__, pTrackerServer->ip_addr, \ - pTrackerServer->port, \ + logError("file: "__FILE__", line: %d, " + "tracker server %s:%d, send data fail, " + "errno: %d, error info: %s.", + __LINE__, conn->ip_addr, conn->port, result, STRERROR(result)); return result; } - if ((result=fdfs_recv_header(pTrackerServer, &in_bytes)) != 0) + if ((result=fdfs_recv_header(conn, &in_bytes)) != 0) { logError("file: "__FILE__", line: %d, " "fdfs_recv_header fail, result: %d", @@ -1030,28 +1014,28 @@ static int _notify_reselect_tleader(ConnectionInfo *pTrackerServer) if (in_bytes != 0) { - logError("file: "__FILE__", line: %d, " \ - "tracker server %s:%d, recv body length: " \ - "%"PRId64" != 0", __LINE__, pTrackerServer->ip_addr, \ - pTrackerServer->port, in_bytes); + logError("file: "__FILE__", line: %d, " + "tracker server %s:%d, recv body length: " + "%"PRId64" != 0", __LINE__, conn->ip_addr, + conn->port, in_bytes); return EINVAL; } return 0; } -static int notify_reselect_tracker_leader(ConnectionInfo *pTrackerServer) +static int notify_reselect_tracker_leader(TrackerServerInfo *pTrackerServer) { int result; ConnectionInfo *conn; - pTrackerServer->sock = -1; + fdfs_server_sock_reset(pTrackerServer); if ((conn=tracker_connect_server(pTrackerServer, &result)) == NULL) { return result; } - result = _notify_reselect_tleader(pTrackerServer); + result = _notify_reselect_tleader(conn); tracker_disconnect_server_ex(conn, result != 0); return result; } @@ -1063,22 +1047,24 @@ static void set_tracker_leader(const int leader_index) if (old_index >= 0 && old_index != leader_index) { TrackerRunningStatus tracker_status; - ConnectionInfo old_leader_server; + TrackerServerInfo old_leader_server; memcpy(&old_leader_server, g_tracker_group.servers + old_index, - sizeof(ConnectionInfo)); + sizeof(TrackerServerInfo)); if (fdfs_get_tracker_status(&old_leader_server, &tracker_status) == 0) { if (tracker_status.if_leader) { - ConnectionInfo new_leader_server; + TrackerServerInfo new_leader_server; memcpy(&new_leader_server, g_tracker_group.servers + leader_index, - sizeof(ConnectionInfo)); + sizeof(TrackerServerInfo)); logWarning("file: "__FILE__", line: %d, " "two tracker leaders occur, old leader is %s:%d, " "new leader is %s:%d, notify to re-select " "tracker leader", __LINE__, - old_leader_server.ip_addr, old_leader_server.port, - new_leader_server.ip_addr, new_leader_server.port); + old_leader_server.connections[0].ip_addr, + old_leader_server.connections[0].port, + new_leader_server.connections[0].ip_addr, + new_leader_server.connections[0].port); notify_reselect_tracker_leader(&old_leader_server); notify_reselect_tracker_leader(&new_leader_server); @@ -1094,12 +1080,12 @@ static void get_tracker_leader() { int i; TrackerRunningStatus tracker_status; - ConnectionInfo tracker_server; + TrackerServerInfo tracker_server; for (i=0; i= 0) { - ConnectionInfo *pTrackerLeader; - pTrackerLeader = g_tracker_group.servers + \ + TrackerServerInfo *pTrackerLeader; + pTrackerLeader = g_tracker_group.servers + g_tracker_group.leader_index; - logWarning("file: "__FILE__", line: %d, " \ - "tracker server %s:%d, " \ - "my tracker leader is: %s:%d, " \ - "but response tracker leader is null", \ - __LINE__, pTrackerServer->ip_addr, \ - pTrackerServer->port, pTrackerLeader->ip_addr, \ - pTrackerLeader->port); + logWarning("file: "__FILE__", line: %d, " + "tracker server %s:%d, " + "my tracker leader is: %s:%d, " + "but response tracker leader is null", + __LINE__, pTrackerServer->ip_addr, + pTrackerServer->port, pTrackerLeader->connections[0].ip_addr, + pTrackerLeader->connections[0].port); g_tracker_group.leader_index = -1; } @@ -1876,19 +1862,18 @@ int tracker_report_join(ConnectionInfo *pTrackerServer, \ const int tracker_index, const bool sync_old_done) { char out_buff[sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody) + \ - FDFS_MAX_TRACKERS * FDFS_PROTO_IP_PORT_SIZE]; + FDFS_MAX_TRACKERS * FDFS_PROTO_MULTI_IP_PORT_SIZE]; TrackerHeader *pHeader; TrackerStorageJoinBody *pReqBody; TrackerStorageJoinBodyResp respBody; char *pInBuff; char *p; - ConnectionInfo *pServer; - ConnectionInfo *pServerEnd; + TrackerServerInfo *pServer; + TrackerServerInfo *pServerEnd; FDFSStorageServer *pTargetServer; FDFSStorageServer **ppFound; FDFSStorageServer targetServer; int out_len; - //int tracker_count; int result; int i; int64_t in_bytes; @@ -1932,8 +1917,10 @@ int tracker_report_join(ConnectionInfo *pTrackerServer, \ { logInfo("file: "__FILE__", line: %d, " "tracker server: #%d. %s:%d, my_report_status: %d", - __LINE__, i, g_tracker_group.servers[i].ip_addr, - g_tracker_group.servers[i].port, my_report_status[i]); + __LINE__, i, + g_tracker_group.servers[i].connections[0].ip_addr, + g_tracker_group.servers[i].connections[0].port, + my_report_status[i]); break; } } @@ -1953,22 +1940,13 @@ int tracker_report_join(ConnectionInfo *pTrackerServer, \ } } - //tracker_count = 0; p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody); pServerEnd = g_tracker_group.servers + g_tracker_group.server_count; for (pServer=g_tracker_group.servers; pServerip_addr, pTrackerServer->ip_addr) == 0 && \ - pServer->port == pTrackerServer->port) - { - continue; - } - tracker_count++; - */ - - sprintf(p, "%s:%d", pServer->ip_addr, pServer->port); - p += FDFS_PROTO_IP_PORT_SIZE; + fdfs_server_info_to_string(pServer, p, + FDFS_PROTO_MULTI_IP_PORT_SIZE); + p += FDFS_PROTO_MULTI_IP_PORT_SIZE; } out_len = p - out_buff; @@ -2463,8 +2441,8 @@ int tracker_deal_changelog_response(ConnectionInfo *pTrackerServer) int tracker_report_thread_start() { - ConnectionInfo *pTrackerServer; - ConnectionInfo *pServerEnd; + TrackerServerInfo *pTrackerServer; + TrackerServerInfo *pServerEnd; pthread_attr_t pattr; pthread_t tid; int result; diff --git a/storage/trunk_mgr/trunk_client.c b/storage/trunk_mgr/trunk_client.c index c664658..af9cf45 100644 --- a/storage/trunk_mgr/trunk_client.c +++ b/storage/trunk_mgr/trunk_client.c @@ -113,7 +113,7 @@ int trunk_client_trunk_alloc_space(const int file_size, \ } memcpy(&trunk_server, &g_trunk_server, sizeof(ConnectionInfo)); - if ((pTrunkServer=tracker_connect_server(&trunk_server, &result)) == NULL) + if ((pTrunkServer=tracker_make_connection(&trunk_server, &result)) == NULL) { logError("file: "__FILE__", line: %d, " \ "can't alloc trunk space because connect to trunk " \ @@ -216,7 +216,7 @@ int trunk_client_trunk_alloc_confirm(const FDFSTrunkFullInfo *pTrunkInfo, \ } memcpy(&trunk_server, &g_trunk_server, sizeof(ConnectionInfo)); - if ((pTrunkServer=tracker_connect_server(&trunk_server, &result)) == NULL) + if ((pTrunkServer=tracker_make_connection(&trunk_server, &result)) == NULL) { logError("file: "__FILE__", line: %d, " \ "trunk alloc confirm fail because connect to trunk " \ @@ -249,7 +249,7 @@ int trunk_client_trunk_free_space(const FDFSTrunkFullInfo *pTrunkInfo) } memcpy(&trunk_server, &g_trunk_server, sizeof(ConnectionInfo)); - if ((pTrunkServer=tracker_connect_server(&trunk_server, &result)) == NULL) + if ((pTrunkServer=tracker_make_connection(&trunk_server, &result)) == NULL) { logError("file: "__FILE__", line: %d, " \ "free trunk space fail because connect to trunk " \ diff --git a/tracker/fdfs_shared_func.c b/tracker/fdfs_shared_func.c index af3915f..ab38cba 100644 --- a/tracker/fdfs_shared_func.c +++ b/tracker/fdfs_shared_func.c @@ -1169,3 +1169,26 @@ int fdfs_parse_server_info(char *server_str, const int default_port, return 0; } + +int fdfs_server_info_to_string_ex(TrackerServerInfo *pServer, + const int port, char *buff, const int buffSize) +{ + ConnectionInfo *conn; + ConnectionInfo *end; + int len; + + if (pServer->count == 1) + { + return snprintf(buff, buffSize, "%s:%d", + pServer->connections[0].ip_addr, port); + } + + len = snprintf(buff, buffSize, "%s", pServer->connections[0].ip_addr); + end = pServer->connections + pServer->count; + for (conn=pServer->connections + 1; connip_addr); + } + len += snprintf(buff + len, buffSize - len, ":%d", port); + return len; +} diff --git a/tracker/fdfs_shared_func.h b/tracker/fdfs_shared_func.h index e8128b1..900065c 100644 --- a/tracker/fdfs_shared_func.h +++ b/tracker/fdfs_shared_func.h @@ -97,6 +97,16 @@ void fdfs_server_sock_reset(TrackerServerInfo *pServerInfo); int fdfs_parse_server_info(char *server_str, const int default_port, TrackerServerInfo *pServer); +int fdfs_server_info_to_string_ex(TrackerServerInfo *pServer, + const int port, char *buff, const int buffSize); + +static inline int fdfs_server_info_to_string(TrackerServerInfo *pServer, + char *buff, const int buffSize) +{ + return fdfs_server_info_to_string_ex(pServer, + pServer->connections[0].port, buff, buffSize); +} + #ifdef __cplusplus } #endif diff --git a/tracker/tracker_proto.c b/tracker/tracker_proto.c index a5e0b4d..47e2e76 100644 --- a/tracker/tracker_proto.c +++ b/tracker/tracker_proto.c @@ -471,7 +471,7 @@ ConnectionInfo *tracker_connect_server_ex(TrackerServerInfo *pServerInfo, } ConnectionInfo *tracker_connect_server_no_pool_ex(TrackerServerInfo *pServerInfo, - const char *bind_addr, int *err_no) + const char *bind_addr, int *err_no, const bool log_connect_error) { ConnectionInfo *conn; ConnectionInfo *end; @@ -484,7 +484,8 @@ ConnectionInfo *tracker_connect_server_no_pool_ex(TrackerServerInfo *pServerInfo } *err_no = conn_pool_connect_server_ex(pServerInfo->connections - + pServerInfo->index, g_fdfs_connect_timeout, bind_addr); + + pServerInfo->index, g_fdfs_connect_timeout, + bind_addr, log_connect_error); if (*err_no == 0) { return pServerInfo->connections + pServerInfo->index; @@ -502,7 +503,8 @@ ConnectionInfo *tracker_connect_server_no_pool_ex(TrackerServerInfo *pServerInfo if (current_index != pServerInfo->index) { if ((*err_no=conn_pool_connect_server_ex(conn, - g_fdfs_connect_timeout, bind_addr)) == 0) + g_fdfs_connect_timeout, bind_addr, + log_connect_error)) == 0) { pServerInfo->index = current_index; return pServerInfo->connections + pServerInfo->index; @@ -623,7 +625,7 @@ int fdfs_get_ini_context_from_tracker(TrackerServerGroup *pTrackerGroup, \ for (i=0; i < 3; i++) { conn = tracker_connect_server_no_pool_ex(pTServer, - bind_addr, &result); + bind_addr, &result, false); if (conn != NULL) { break; @@ -634,6 +636,10 @@ int fdfs_get_ini_context_from_tracker(TrackerServerGroup *pTrackerGroup, \ if (conn == NULL) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%d fail, errno: %d, " + "error info: %s", __LINE__, conn->ip_addr, + conn->port, result, STRERROR(result)); continue; } diff --git a/tracker/tracker_proto.h b/tracker/tracker_proto.h index 1c9f6fb..da377bd 100644 --- a/tracker/tracker_proto.h +++ b/tracker/tracker_proto.h @@ -103,9 +103,10 @@ #define STORAGE_SET_METADATA_FLAG_MERGE 'M' #define STORAGE_SET_METADATA_FLAG_MERGE_STR "M" -#define FDFS_PROTO_PKG_LEN_SIZE 8 -#define FDFS_PROTO_CMD_SIZE 1 -#define FDFS_PROTO_IP_PORT_SIZE (IP_ADDRESS_SIZE + 6) +#define FDFS_PROTO_PKG_LEN_SIZE 8 +#define FDFS_PROTO_CMD_SIZE 1 +#define FDFS_PROTO_IP_PORT_SIZE (IP_ADDRESS_SIZE + 6) +#define FDFS_PROTO_MULTI_IP_PORT_SIZE (2 * IP_ADDRESS_SIZE + 8) #define TRACKER_QUERY_STORAGE_FETCH_BODY_LEN (FDFS_GROUP_NAME_MAX_LEN \ + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE) @@ -230,22 +231,27 @@ ConnectionInfo *tracker_connect_server_ex(TrackerServerInfo *pServerInfo, * connect to the tracker server directly without connection pool * params: * pTrackerServer: tracker server +* bind_ipaddr: the ip address to bind, NULL or empty for any +* err_no: return the error no +* log_connect_error: if log error info when connect fail * return: ConnectionInfo pointer for success, NULL for fail **/ ConnectionInfo *tracker_connect_server_no_pool_ex(TrackerServerInfo *pServerInfo, - const char *bind_addr, int *err_no); + const char *bind_addr, int *err_no, const bool log_connect_error); /** * connect to the tracker server directly without connection pool * params: * pTrackerServer: tracker server -* return: 0 for success, none zero for fail +* err_no: return the error no +* return: ConnectionInfo pointer for success, NULL for fail **/ static inline ConnectionInfo *tracker_connect_server_no_pool( TrackerServerInfo *pServerInfo, int *err_no) { const char *bind_addr = NULL; - return tracker_connect_server_no_pool_ex(pServerInfo, bind_addr, err_no); + return tracker_connect_server_no_pool_ex(pServerInfo, + bind_addr, err_no, true); } #define tracker_disconnect_server(pTrackerServer) \