From cf0ec7e4cf3f4c26791512582c13c06d3556b22c Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 14 Dec 2019 21:03:35 +0800 Subject: [PATCH] trunk server support compress the trunk binlog periodically --- HISTORY | 5 +- conf/tracker.conf | 17 ++- storage/storage_func.c | 15 +- storage/storage_param_getter.c | 19 ++- storage/storage_sync.c | 9 +- storage/tracker_client_thread.c | 148 ++++++++++--------- storage/trunk_mgr/trunk_mem.c | 143 +++++++++++++++--- storage/trunk_mgr/trunk_mem.h | 5 + storage/trunk_mgr/trunk_sync.c | 247 ++++++++++++++++++++------------ storage/trunk_mgr/trunk_sync.h | 1 + tracker/tracker_func.c | 18 ++- tracker/tracker_global.c | 2 + tracker/tracker_global.h | 2 + tracker/tracker_service.c | 5 + 14 files changed, 438 insertions(+), 198 deletions(-) diff --git a/HISTORY b/HISTORY index 5592bab..05c1522 100644 --- a/HISTORY +++ b/HISTORY @@ -1,8 +1,11 @@ -Version 6.05 2019-12-13 +Version 6.05 2019-12-14 * fdfs_trackerd and fdfs_storaged print the server version in usage. you can execute fdfs_trackerd or fdfs_storaged without parameters to show the server version + * trunk server support compress the trunk binlog periodically, + config items in tracker.conf: trunk_compress_binlog_interval + and trunk_compress_binlog_time_base Version 6.04 2019-12-05 * storage_report_ip_changed ignore result EEXIST diff --git a/conf/tracker.conf b/conf/tracker.conf index a92e0aa..adccc65 100644 --- a/conf/tracker.conf +++ b/conf/tracker.conf @@ -197,13 +197,26 @@ trunk_init_check_occupying = false trunk_init_reload_from_binlog = false # the min interval for compressing the trunk binlog file -# unit: second +# unit: second, 0 means never compress # FastDFS compress the trunk binlog when trunk init and trunk destroy # recommand to set this parameter to 86400 (one day) -# default value is 0, 0 means never compress +# default value is 0 # since V5.01 trunk_compress_binlog_min_interval = 86400 +# the interval for compressing the trunk binlog file +# unit: second, 0 means never compress +# recommand to set this parameter to 86400 (one day) +# default value is 0 +# since V6.05 +trunk_compress_binlog_interval = 86400 + +# compress the trunk binlog time base, time format: Hour:Minute +# Hour from 0 to 23, Minute from 0 to 59 +# default value is 03:00 +# since V6.05 +trunk_compress_binlog_time_base = 03:00 + # if use storage server ID instead of IP address # if you want to use dual IPs for storage server, you MUST set # this parameter to true, and configure the dual IPs in the file diff --git a/storage/storage_func.c b/storage/storage_func.c index 7d167c6..44acd99 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -73,6 +73,8 @@ typedef struct #define INIT_ITEM_LAST_HTTP_PORT "last_http_port" #define INIT_ITEM_CURRENT_TRUNK_FILE_ID "current_trunk_file_id" #define INIT_ITEM_TRUNK_LAST_COMPRESS_TIME "trunk_last_compress_time" +#define INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS \ + "trunk_binlog_compress_in_progress" #define INIT_ITEM_STORE_PATH_MARK_PREFIX "store_path_mark" #define STAT_ITEM_TOTAL_UPLOAD "total_upload_count" @@ -658,6 +660,7 @@ int storage_write_to_sync_ini_file() "%s=%d\n" "%s=%d\n" "%s=%d\n" + "%s=%d\n" "%s=%d\n", INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time, INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done, @@ -667,10 +670,11 @@ int storage_write_to_sync_ini_file() INIT_ITEM_LAST_SERVER_PORT, g_last_server_port, INIT_ITEM_LAST_HTTP_PORT, g_last_http_port, INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id, - INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time + INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time, + INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS, + g_trunk_binlog_compress_in_progress ); - if (g_check_store_path_mark) { for (i=0; iip_addr, local_ip_addr); */ - if (is_local_host_ip(pStorage->ip_addr)) + if (strcmp(pStorage->id, g_my_server_id_str) == 0 || + is_local_host_ip(pStorage->ip_addr)) { //can't self sync to self logError("file: "__FILE__", line: %d, " \ "ip_addr %s belong to the local host," \ @@ -3263,11 +3264,11 @@ int storage_sync_thread_start(const FDFSStorageBrief *pStorage) return 0; } - if (storage_server_is_myself(pStorage) || \ + if (strcmp(pStorage->id, g_my_server_id_str) == 0 || is_local_host_ip(pStorage->ip_addr)) //can't self sync to self { - logWarning("file: "__FILE__", line: %d, " \ - "storage id: %s is myself, can't start sync thread!", \ + logWarning("file: "__FILE__", line: %d, " + "storage id: %s is myself, can't start sync thread!", __LINE__, pStorage->id); return 0; } diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index a7f4fe3..cd48015 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -37,7 +37,8 @@ #include "trunk_sync.h" #include "storage_param_getter.h" -#define TRUNK_FILE_CREATOR_TASK_ID 88 +#define TRUNK_FILE_CREATOR_TASK_ID 88 +#define TRUNK_BINLOG_COMPRESS_TASK_ID 89 static pthread_mutex_t reporter_thread_lock; @@ -876,9 +877,11 @@ static int tracker_merge_servers(ConnectionInfo *pTrackerServer, FDFS_STORAGE_STATUS_SYNCING)) && \ ((*ppFound)->server.status > pServer->status)) { + pServer->id[FDFS_STORAGE_ID_MAX_SIZE - 1] = '\0'; *(pServer->ip_addr + IP_ADDRESS_SIZE - 1) = '\0'; - if (is_local_host_ip(pServer->ip_addr) && \ - buff2int(pServer->port) == g_server_port) + if ((strcmp(pServer->id, g_my_server_id_str) == 0) || + (is_local_host_ip(pServer->ip_addr) && + buff2int(pServer->port) == g_server_port)) { need_rejoin_tracker = true; logWarning("file: "__FILE__", line: %d, " \ @@ -1204,6 +1207,74 @@ static void set_trunk_server(const char *ip_addr, const int port) } } +static int do_set_trunk_server_myself(ConnectionInfo *pTrackerServer) +{ + int result; + ScheduleArray scheduleArray; + ScheduleEntry entries[2]; + ScheduleEntry *entry; + + tracker_fetch_trunk_fid(pTrackerServer); + g_if_trunker_self = true; + + if ((result=storage_trunk_init()) != 0) + { + return result; + } + + scheduleArray.entries = entries; + entry = entries; + if (g_trunk_create_file_advance && + g_trunk_create_file_interval > 0) + { + INIT_SCHEDULE_ENTRY_EX(*entry, TRUNK_FILE_CREATOR_TASK_ID, + g_trunk_create_file_time_base, + g_trunk_create_file_interval, + trunk_create_trunk_file_advance, NULL); + entry->new_thread = true; + entry++; + } + + if (g_trunk_compress_binlog_interval > 0) + { + INIT_SCHEDULE_ENTRY_EX(*entry, TRUNK_BINLOG_COMPRESS_TASK_ID, + g_trunk_compress_binlog_time_base, + g_trunk_compress_binlog_interval, + trunk_binlog_compress_func, NULL); + entry->new_thread = true; + entry++; + } + + scheduleArray.count = entry - entries; + if (scheduleArray.count > 0) + { + sched_add_entries(&scheduleArray); + } + + trunk_sync_thread_start_all(); + return 0; +} + +static void do_unset_trunk_server_myself(ConnectionInfo *pTrackerServer) +{ + tracker_report_trunk_fid(pTrackerServer); + g_if_trunker_self = false; + + trunk_waiting_sync_thread_exit(); + + storage_trunk_destroy_ex(true); + if (g_trunk_create_file_advance && + g_trunk_create_file_interval > 0) + { + sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID); + } + + if (g_trunk_compress_binlog_interval > 0) + { + sched_del_entry(TRUNK_BINLOG_COMPRESS_TASK_ID); + } +} + static int tracker_check_response(ConnectionInfo *pTrackerServer, const int tracker_index, bool *bServerPortChanged) { @@ -1384,11 +1455,13 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer, { int port; + pBriefServers->id[FDFS_STORAGE_ID_MAX_SIZE - 1] = '\0'; pBriefServers->ip_addr[IP_ADDRESS_SIZE - 1] = '\0'; port = buff2int(pBriefServers->port); set_trunk_server(pBriefServers->ip_addr, port); - if (is_local_host_ip(pBriefServers->ip_addr) && - port == g_server_port) + if ((strcmp(pBriefServers->id, g_my_server_id_str) == 0) || + (is_local_host_ip(pBriefServers->ip_addr) && + port == g_server_port)) { if (g_if_trunker_self) { @@ -1403,32 +1476,10 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer, "I am the the trunk server %s:%d", __LINE__, pBriefServers->ip_addr, port); - tracker_fetch_trunk_fid(pTrackerServer); - g_if_trunker_self = true; - - if ((result=storage_trunk_init()) != 0) - { - return result; - } - - if (g_trunk_create_file_advance && - g_trunk_create_file_interval > 0) - { - ScheduleArray scheduleArray; - ScheduleEntry entries[1]; - - entries[0].id = TRUNK_FILE_CREATOR_TASK_ID; - entries[0].time_base = g_trunk_create_file_time_base; - entries[0].interval = g_trunk_create_file_interval; - entries[0].task_func = trunk_create_trunk_file_advance; - entries[0].func_args = NULL; - - scheduleArray.count = 1; - scheduleArray.entries = entries; - sched_add_entries(&scheduleArray); - } - - trunk_sync_thread_start_all(); + if ((result=do_set_trunk_server_myself(pTrackerServer)) != 0) + { + return result; + } } } else @@ -1440,46 +1491,13 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer, if (g_if_trunker_self) { - int saved_trunk_sync_thread_count; - logWarning("file: "__FILE__", line: %d, " \ "I am the old trunk server, " \ "the new trunk server is %s:%d", \ __LINE__, g_trunk_server.connections[0].ip_addr, \ g_trunk_server.connections[0].port); - tracker_report_trunk_fid(pTrackerServer); - g_if_trunker_self = false; - - saved_trunk_sync_thread_count = \ - g_trunk_sync_thread_count; - if (saved_trunk_sync_thread_count > 0) - { - logInfo("file: "__FILE__", line: %d, "\ - "waiting %d trunk sync " \ - "threads exit ...", __LINE__, \ - saved_trunk_sync_thread_count); - } - - while (g_trunk_sync_thread_count > 0) - { - usleep(50000); - } - - if (saved_trunk_sync_thread_count > 0) - { - logInfo("file: "__FILE__", line: %d, " \ - "%d trunk sync threads exited",\ - __LINE__, \ - saved_trunk_sync_thread_count); - } - - storage_trunk_destroy_ex(true); - if (g_trunk_create_file_advance && \ - g_trunk_create_file_interval > 0) - { - sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID); - } + do_unset_trunk_server_myself(pTrackerServer); } } } diff --git a/storage/trunk_mgr/trunk_mem.c b/storage/trunk_mgr/trunk_mem.c index 1add39d..882a82d 100644 --- a/storage/trunk_mgr/trunk_mem.c +++ b/storage/trunk_mgr/trunk_mem.c @@ -52,14 +52,18 @@ int g_avg_storage_reserved_mb = FDFS_DEF_STORAGE_RESERVED_MB; int g_store_path_index = 0; int g_current_trunk_file_id = 0; TimeInfo g_trunk_create_file_time_base = {0, 0}; +TimeInfo g_trunk_compress_binlog_time_base = {0, 0}; int g_trunk_create_file_interval = 86400; int g_trunk_compress_binlog_min_interval = 0; +int g_trunk_compress_binlog_interval = 0; TrackerServerInfo g_trunk_server = {0, 0}; bool g_if_use_trunk_file = false; bool g_if_trunker_self = false; bool g_trunk_create_file_advance = false; bool g_trunk_init_check_occupying = false; bool g_trunk_init_reload_from_binlog = false; +volatile int g_trunk_binlog_compress_in_progress = 0; +volatile int g_trunk_data_save_in_progress = 0; static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE; int64_t g_trunk_total_free_space = 0; int64_t g_trunk_create_file_space_threshold = 0; @@ -387,7 +391,7 @@ static int tree_walk_callback(void *data, void *args) return 0; } -static int storage_trunk_do_save() +static int do_save_trunk_data() { int64_t trunk_binlog_size; char trunk_data_filename[MAX_PATH_SIZE]; @@ -493,40 +497,133 @@ static int storage_trunk_do_save() return result; } -static int storage_trunk_save() +static int storage_trunk_do_save() { int result; + if (__sync_add_and_fetch(&g_trunk_data_save_in_progress, 1) != 1) + { + __sync_sub_and_fetch(&g_trunk_data_save_in_progress, 1); + logError("file: "__FILE__", line: %d, " + "trunk binlog compress already in progress, " + "g_trunk_data_save_in_progress=%d", __LINE__, + g_trunk_data_save_in_progress); + return EINPROGRESS; + } - if (!(g_trunk_compress_binlog_min_interval > 0 && \ + result = do_save_trunk_data(); + __sync_sub_and_fetch(&g_trunk_data_save_in_progress, 1); + + return result; +} + +static int storage_trunk_compress() +{ + int result; + + if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 1) != 1) + { + __sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1); + logError("file: "__FILE__", line: %d, " + "trunk binlog compress already in progress, " + "g_trunk_binlog_compress_in_progress=%d", + __LINE__, g_trunk_binlog_compress_in_progress); + return EINPROGRESS; + } + + storage_write_to_sync_ini_file(); + + logInfo("file: "__FILE__", line: %d, " + "start compress trunk binlog ...", __LINE__); + do + { + if ((result=trunk_binlog_compress_apply()) != 0) + { + break; + } + + if ((result=storage_trunk_do_save()) != 0) + { + trunk_binlog_compress_rollback(); + break; + } + + if ((result=trunk_binlog_compress_commit()) != 0) + { + trunk_binlog_compress_rollback(); + break; + } + + g_trunk_last_compress_time = g_current_time; + } while (0); + + __sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1); + storage_write_to_sync_ini_file(); + + if (result == 0) + { + logInfo("file: "__FILE__", line: %d, " + "compress trunk binlog successfully.", __LINE__); + return trunk_unlink_all_mark_files(); //because the binlog file be compressed + } + else + { + logError("file: "__FILE__", line: %d, " + "compress trunk binlog fail.", __LINE__); + } + + return result; +} + +static int storage_trunk_save() +{ + if (!(g_trunk_compress_binlog_min_interval > 0 && g_current_time - g_trunk_last_compress_time > g_trunk_compress_binlog_min_interval)) { - return storage_trunk_do_save(); + if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 0) == 0) + { + return storage_trunk_do_save(); + } + else + { + logWarning("file: "__FILE__", line: %d, " + "trunk binlog compress already in progress, " + "g_trunk_binlog_compress_in_progress=%d", + __LINE__, g_trunk_binlog_compress_in_progress); + return 0; + } } + + return storage_trunk_compress(); +} - logInfo("start compress trunk binlog ..."); - if ((result=trunk_binlog_compress_apply()) != 0) - { - return result; - } +int trunk_binlog_compress_func(void *args) +{ + int result; - if ((result=storage_trunk_do_save()) != 0) - { - trunk_binlog_compress_rollback(); - return result; - } + if (!g_if_trunker_self) + { + return 0; + } - if ((result=trunk_binlog_compress_commit()) != 0) - { - trunk_binlog_compress_rollback(); - return result; - } + result = storage_trunk_compress(); + if (result != 0) + { + return result; + } - g_trunk_last_compress_time = g_current_time; - storage_write_to_sync_ini_file(); + if (!g_if_trunker_self) + { + return 0; + } - logInfo("compress trunk binlog done."); - return trunk_unlink_all_mark_files(); //because the binlog file be compressed + g_if_trunker_self = false; //for sync thread exit + trunk_waiting_sync_thread_exit(); + + g_if_trunker_self = true; //restore to true + trunk_sync_thread_start_all(); + + return 0; } static bool storage_trunk_is_space_occupied(const FDFSTrunkFullInfo *pTrunkInfo) diff --git a/storage/trunk_mgr/trunk_mem.h b/storage/trunk_mgr/trunk_mem.h index d47de3a..7cd1f85 100644 --- a/storage/trunk_mgr/trunk_mem.h +++ b/storage/trunk_mgr/trunk_mem.h @@ -35,13 +35,16 @@ extern int g_avg_storage_reserved_mb; //calc by above var: g_storage_reserved_m extern int g_store_path_index; //store to which path extern int g_current_trunk_file_id; //current trunk file id extern TimeInfo g_trunk_create_file_time_base; +extern TimeInfo g_trunk_compress_binlog_time_base; extern int g_trunk_create_file_interval; extern int g_trunk_compress_binlog_min_interval; +extern int g_trunk_compress_binlog_interval; extern TrackerServerInfo g_trunk_server; //the trunk server extern bool g_if_use_trunk_file; //if use trunk file extern bool g_trunk_create_file_advance; extern bool g_trunk_init_check_occupying; extern bool g_trunk_init_reload_from_binlog; +extern volatile int g_trunk_binlog_compress_in_progress; extern bool g_if_trunker_self; //if am i trunk server extern int64_t g_trunk_create_file_space_threshold; extern int64_t g_trunk_total_free_space; //trunk total free space in bytes @@ -87,6 +90,8 @@ int trunk_file_delete(const char *trunk_filename, \ int trunk_create_trunk_file_advance(void *args); +int trunk_binlog_compress_func(void *args); + int storage_delete_trunk_data_file(); char *storage_trunk_get_data_filename(char *full_filename); diff --git a/storage/trunk_mgr/trunk_sync.c b/storage/trunk_mgr/trunk_sync.c index ca86522..b08adc2 100644 --- a/storage/trunk_mgr/trunk_sync.c +++ b/storage/trunk_mgr/trunk_sync.c @@ -313,29 +313,35 @@ int trunk_binlog_compress_apply() return 0; } - if ((result=trunk_binlog_close_writer(true)) != 0) - { - return result; - } + pthread_mutex_lock(&trunk_sync_thread_lock); - if (rename(binlog_filename, rollback_filename) != 0) - { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " \ - "rename %s to %s fail, " \ - "errno: %d, error info: %s", - __LINE__, binlog_filename, rollback_filename, - result, STRERROR(result)); - return result; - } + do + { + if ((result=trunk_binlog_close_writer(false)) != 0) + { + break; + } - if ((result=trunk_binlog_open_writer(binlog_filename)) != 0) - { - rename(rollback_filename, binlog_filename); //rollback - return result; - } + if (rename(binlog_filename, rollback_filename) != 0) + { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " \ + "rename %s to %s fail, " \ + "errno: %d, error info: %s", + __LINE__, binlog_filename, rollback_filename, + result, STRERROR(result)); + break; + } - return 0; + if ((result=trunk_binlog_open_writer(binlog_filename)) != 0) + { + rename(rollback_filename, binlog_filename); //rollback + break; + } + } while (0); + + pthread_mutex_unlock(&trunk_sync_thread_lock); + return result; } static int trunk_binlog_open_read(const char *filename, @@ -478,46 +484,53 @@ int trunk_binlog_compress_commit() return errno != 0 ? errno : ENOENT; } + pthread_mutex_lock(&trunk_sync_thread_lock); if (need_open_binlog) { - trunk_binlog_close_writer(true); + trunk_binlog_close_writer(false); } - result = trunk_binlog_merge_file(data_fd); - close(data_fd); - if (result != 0) - { - return result; - } - if (unlink(data_filename) != 0) - { - result = errno != 0 ? errno : EPERM; - logError("file: "__FILE__", line: %d, " \ - "unlink %s fail, errno: %d, error info: %s", - __LINE__, data_filename, - result, STRERROR(result)); - return result; - } + do + { + result = trunk_binlog_merge_file(data_fd); + close(data_fd); + if (result != 0) + { + break; + } + if (unlink(data_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "unlink %s fail, errno: %d, error info: %s", + __LINE__, data_filename, + result, STRERROR(result)); + break; + } - get_trunk_rollback_filename(rollback_filename); - if (access(rollback_filename, F_OK) == 0) - { - if (unlink(rollback_filename) != 0) - { - result = errno != 0 ? errno : EPERM; - logWarning("file: "__FILE__", line: %d, " \ - "unlink %s fail, errno: %d, error info: %s", - __LINE__, rollback_filename, - result, STRERROR(result)); - } - } + get_trunk_rollback_filename(rollback_filename); + if (access(rollback_filename, F_OK) == 0) + { + if (unlink(rollback_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + logWarning("file: "__FILE__", line: %d, " + "unlink %s fail, errno: %d, error info: %s", + __LINE__, rollback_filename, + result, STRERROR(result)); + break; + } + } - if (need_open_binlog) - { - return trunk_binlog_open_writer(binlog_filename); - } + if (need_open_binlog) + { + result = trunk_binlog_open_writer(binlog_filename); + } + } while (0); - return 0; + pthread_mutex_unlock(&trunk_sync_thread_lock); + + return result; } int trunk_binlog_compress_rollback() @@ -557,7 +570,7 @@ int trunk_binlog_compress_rollback() { return 0; } - logError("file: "__FILE__", line: %d, " \ + logError("file: "__FILE__", line: %d, " "stat file %s fail, errno: %d, error info: %s", __LINE__, rollback_filename, result, STRERROR(result)); @@ -570,38 +583,46 @@ int trunk_binlog_compress_rollback() return 0; } - if ((result=trunk_binlog_close_writer(true)) != 0) - { - return result; - } + pthread_mutex_lock(&trunk_sync_thread_lock); + do + { + if ((result=trunk_binlog_close_writer(false)) != 0) + { + break; + } - if ((rollback_fd=trunk_binlog_open_read(rollback_filename, - false)) < 0) - { - return errno != 0 ? errno : ENOENT; - } + if ((rollback_fd=trunk_binlog_open_read(rollback_filename, + false)) < 0) + { + result = errno != 0 ? errno : ENOENT; + break; + } - result = trunk_binlog_merge_file(rollback_fd); - close(rollback_fd); - if (result == 0) - { - if (unlink(rollback_filename) != 0) - { - result = errno != 0 ? errno : EPERM; - logWarning("file: "__FILE__", line: %d, " \ - "unlink %s fail, " \ - "errno: %d, error info: %s", - __LINE__, rollback_filename, - result, STRERROR(result)); - } + result = trunk_binlog_merge_file(rollback_fd); + close(rollback_fd); + if (result == 0) + { + if (unlink(rollback_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + logWarning("file: "__FILE__", line: %d, " \ + "unlink %s fail, " \ + "errno: %d, error info: %s", + __LINE__, rollback_filename, + result, STRERROR(result)); + break; + } - return trunk_binlog_open_writer(binlog_filename); - } - else - { - trunk_binlog_open_writer(binlog_filename); - return result; - } + result = trunk_binlog_open_writer(binlog_filename); + } + else + { + result = trunk_binlog_open_writer(binlog_filename); + } + } while (0); + + pthread_mutex_unlock(&trunk_sync_thread_lock); + return result; } static int trunk_binlog_fsync_ex(const bool bNeedLock, \ @@ -1221,23 +1242,23 @@ int trunk_unlink_mark_file(const char *storage_id) t = g_current_time; localtime_r(&t, &tm); - trunk_get_mark_filename_by_id(storage_id, old_filename, \ + trunk_get_mark_filename_by_id(storage_id, old_filename, sizeof(old_filename)); if (!fileExists(old_filename)) { return ENOENT; } - snprintf(new_filename, sizeof(new_filename), \ - "%s.%04d%02d%02d%02d%02d%02d", old_filename, \ - tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, \ + snprintf(new_filename, sizeof(new_filename), + "%s.%04d%02d%02d%02d%02d%02d", old_filename, + tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); if (rename(old_filename, new_filename) != 0) { - logError("file: "__FILE__", line: %d, " \ - "rename file %s to %s fail" \ - ", errno: %d, error info: %s", \ - __LINE__, old_filename, new_filename, \ + logError("file: "__FILE__", line: %d, " + "rename file %s to %s fail, " + "errno: %d, error info: %s", + __LINE__, old_filename, new_filename, errno, STRERROR(errno)); return errno != 0 ? errno : EACCES; } @@ -1470,7 +1491,8 @@ static void* trunk_sync_thread_entrance(void* arg) __LINE__, pStorage->ip_addr, local_ip_addr); */ - if (is_local_host_ip(pStorage->ip_addr)) + if ((strcmp(pStorage->id, g_my_server_id_str) == 0) || + is_local_host_ip(pStorage->ip_addr)) { //can't self sync to self logError("file: "__FILE__", line: %d, " \ "ip_addr %s belong to the local host," \ @@ -1630,7 +1652,8 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage) return 0; } - if (is_local_host_ip(pStorage->ip_addr)) //can't self sync to self + if ((strcmp(pStorage->id, g_my_server_id_str) == 0) || + is_local_host_ip(pStorage->ip_addr)) //can't self sync to self { return 0; } @@ -1695,6 +1718,42 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage) return 0; } +void trunk_waiting_sync_thread_exit() +{ + int saved_trunk_sync_thread_count; + int count; + + saved_trunk_sync_thread_count = g_trunk_sync_thread_count; + if (saved_trunk_sync_thread_count > 0) + { + logInfo("file: "__FILE__", line: %d, " + "waiting %d trunk sync threads exit ...", + __LINE__, saved_trunk_sync_thread_count); + } + + count = 0; + while (g_trunk_sync_thread_count > 0 && count < 20) + { + usleep(50000); + count++; + } + + if (g_trunk_sync_thread_count > 0) + { + logWarning("file: "__FILE__", line: %d, " + "kill %d trunk sync threads.", + __LINE__, g_trunk_sync_thread_count); + kill_trunk_sync_threads(); + } + + if (saved_trunk_sync_thread_count > 0) + { + logInfo("file: "__FILE__", line: %d, " + "%d trunk sync threads exited", + __LINE__, saved_trunk_sync_thread_count); + } +} + int trunk_unlink_all_mark_files() { FDFSStorageServer *pStorageServer; @@ -1710,7 +1769,7 @@ int trunk_unlink_all_mark_files() continue; } - if ((result=trunk_unlink_mark_file( \ + if ((result=trunk_unlink_mark_file( pStorageServer->server.id)) != 0) { if (result != ENOENT) diff --git a/storage/trunk_mgr/trunk_sync.h b/storage/trunk_mgr/trunk_sync.h index 58e4b6a..40ad243 100644 --- a/storage/trunk_mgr/trunk_sync.h +++ b/storage/trunk_mgr/trunk_sync.h @@ -61,6 +61,7 @@ int trunk_sync_thread_start_all(); int trunk_sync_thread_start(const FDFSStorageBrief *pStorage); int kill_trunk_sync_threads(); int trunk_binlog_sync_func(void *args); +void trunk_waiting_sync_thread_exit(); char *get_trunk_binlog_filename(char *full_filename); char *trunk_mark_filename_by_reader(const void *pArg, char *full_filename); diff --git a/tracker/tracker_func.c b/tracker/tracker_func.c index 6b3e397..79917c8 100644 --- a/tracker/tracker_func.c +++ b/tracker/tracker_func.c @@ -563,9 +563,18 @@ int tracker_load_from_conf_file(const char *filename, \ { return result; } - g_trunk_compress_binlog_min_interval = iniGetIntValue(NULL, \ - "trunk_compress_binlog_min_interval", \ + g_trunk_compress_binlog_min_interval = iniGetIntValue(NULL, + "trunk_compress_binlog_min_interval", &iniContext, 0); + g_trunk_compress_binlog_interval = iniGetIntValue(NULL, + "trunk_compress_binlog_interval", + &iniContext, 0); + if ((result=get_time_item_from_conf(&iniContext, + "trunk_compress_binlog_time_base", + &g_trunk_compress_binlog_time_base, 3, 0)) != 0) + { + return result; + } g_trunk_init_check_occupying = iniGetBoolValue(NULL, \ "trunk_init_check_occupying", &iniContext, false); @@ -750,6 +759,8 @@ int tracker_load_from_conf_file(const char *filename, \ "trunk_init_check_occupying=%d, " \ "trunk_init_reload_from_binlog=%d, " \ "trunk_compress_binlog_min_interval=%d, " \ + "trunk_compress_binlog_interval=%d, " \ + "trunk_compress_binlog_time_base=%02d:%02d, " \ "use_storage_id=%d, " \ "id_type_in_filename=%s, " \ "storage_id/ip_count=%d / %d, " \ @@ -789,6 +800,9 @@ int tracker_load_from_conf_file(const char *filename, \ (FDFS_ONE_MB * 1024)), g_trunk_init_check_occupying, \ g_trunk_init_reload_from_binlog, \ g_trunk_compress_binlog_min_interval, \ + g_trunk_compress_binlog_interval, \ + g_trunk_compress_binlog_time_base.hour, \ + g_trunk_compress_binlog_time_base.minute, \ g_use_storage_id, g_id_type_in_filename == \ FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \ g_storage_ids_by_id.count, g_storage_ids_by_ip.count, \ diff --git a/tracker/tracker_global.c b/tracker/tracker_global.c index fddc830..de47160 100644 --- a/tracker/tracker_global.c +++ b/tracker/tracker_global.c @@ -55,7 +55,9 @@ int g_slot_min_size = 256; //slot min size, such as 256 bytes int g_slot_max_size = 16 * 1024 * 1024; //slot max size, such as 16MB int g_trunk_file_size = 64 * 1024 * 1024; //the trunk file size, such as 64MB TimeInfo g_trunk_create_file_time_base = {0, 0}; +TimeInfo g_trunk_compress_binlog_time_base = {0, 0}; int g_trunk_create_file_interval = 86400; +int g_trunk_compress_binlog_interval = 0; int g_trunk_compress_binlog_min_interval = 0; int64_t g_trunk_create_file_space_threshold = 0; diff --git a/tracker/tracker_global.h b/tracker/tracker_global.h index 85502b1..1bec35c 100644 --- a/tracker/tracker_global.h +++ b/tracker/tracker_global.h @@ -79,7 +79,9 @@ extern int g_slot_min_size; //slot min size, such as 256 bytes extern int g_slot_max_size; //slot max size, such as 16MB extern int g_trunk_file_size; //the trunk file size, such as 64MB extern TimeInfo g_trunk_create_file_time_base; +extern TimeInfo g_trunk_compress_binlog_time_base; extern int g_trunk_create_file_interval; +extern int g_trunk_compress_binlog_interval; extern int g_trunk_compress_binlog_min_interval; extern int64_t g_trunk_create_file_space_threshold; diff --git a/tracker/tracker_service.c b/tracker/tracker_service.c index 1f30432..f75d0b4 100644 --- a/tracker/tracker_service.c +++ b/tracker/tracker_service.c @@ -695,6 +695,8 @@ static int tracker_deal_parameter_req(struct fast_task_info *pTask) "trunk_init_check_occupying=%d\n" \ "trunk_init_reload_from_binlog=%d\n" \ "trunk_compress_binlog_min_interval=%d\n" \ + "trunk_compress_binlog_interval=%d\n" \ + "trunk_compress_binlog_time_base=%02d:%02d\n" \ "store_slave_file_use_link=%d\n", \ g_use_storage_id, g_id_type_in_filename == \ FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \ @@ -712,6 +714,9 @@ static int tracker_deal_parameter_req(struct fast_task_info *pTask) g_trunk_init_check_occupying, \ g_trunk_init_reload_from_binlog, \ g_trunk_compress_binlog_min_interval, \ + g_trunk_compress_binlog_interval, \ + g_trunk_compress_binlog_time_base.hour, \ + g_trunk_compress_binlog_time_base.minute, \ g_store_slave_file_use_link); return 0;