From cab3a90d7f545079e188c5df1520676dc2f62726 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 15 Dec 2019 18:49:02 +0800 Subject: [PATCH] compress the trunk binlog gracefully --- HISTORY | 2 +- storage/storage_sync_func.c | 2 +- storage/storage_sync_func.h | 4 +- storage/trunk_mgr/trunk_mem.c | 26 ++- storage/trunk_mgr/trunk_sync.c | 365 ++++++++++++++++++++++++--------- storage/trunk_mgr/trunk_sync.h | 5 +- 6 files changed, 291 insertions(+), 113 deletions(-) diff --git a/HISTORY b/HISTORY index 05c1522..4919bc3 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 6.05 2019-12-14 +Version 6.05 2019-12-15 * fdfs_trackerd and fdfs_storaged print the server version in usage. you can execute fdfs_trackerd or fdfs_storaged without parameters to show the server version diff --git a/storage/storage_sync_func.c b/storage/storage_sync_func.c index 72a7fb8..34cb47a 100644 --- a/storage/storage_sync_func.c +++ b/storage/storage_sync_func.c @@ -29,7 +29,7 @@ #include "storage_func.h" #include "storage_sync_func.h" -void storage_sync_connect_storage_server_ex(FDFSStorageBrief *pStorage, +void storage_sync_connect_storage_server_ex(const FDFSStorageBrief *pStorage, ConnectionInfo *conn, bool *check_flag) { int nContinuousFail; diff --git a/storage/storage_sync_func.h b/storage/storage_sync_func.h index 487ec3b..94b2b08 100644 --- a/storage/storage_sync_func.h +++ b/storage/storage_sync_func.h @@ -17,11 +17,11 @@ extern "C" { #endif -void storage_sync_connect_storage_server_ex(FDFSStorageBrief *pStorage, +void storage_sync_connect_storage_server_ex(const FDFSStorageBrief *pStorage, ConnectionInfo *conn, bool *check_flag); static inline void storage_sync_connect_storage_server( - FDFSStorageBrief *pStorage, ConnectionInfo *conn) + const FDFSStorageBrief *pStorage, ConnectionInfo *conn) { bool check_flag = true; storage_sync_connect_storage_server_ex(pStorage, diff --git a/storage/trunk_mgr/trunk_mem.c b/storage/trunk_mgr/trunk_mem.c index 882a82d..a32137d 100644 --- a/storage/trunk_mgr/trunk_mem.c +++ b/storage/trunk_mgr/trunk_mem.c @@ -520,6 +520,15 @@ static int storage_trunk_compress() { int result; + if (g_current_time - g_up_time < 600) + { + logWarning("file: "__FILE__", line: %d, " + "too little time lapse: %ds afer startup, " + "skip trunk binlog compress", __LINE__, + (int)(g_current_time - g_up_time)); + return EBUSY; + } + if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 1) != 1) { __sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1); @@ -563,7 +572,6 @@ static int storage_trunk_compress() { logInfo("file: "__FILE__", line: %d, " "compress trunk binlog successfully.", __LINE__); - return trunk_unlink_all_mark_files(); //because the binlog file be compressed } else { @@ -576,6 +584,8 @@ static int storage_trunk_compress() static int storage_trunk_save() { + int result; + if (!(g_trunk_compress_binlog_min_interval > 0 && g_current_time - g_trunk_last_compress_time > g_trunk_compress_binlog_min_interval)) @@ -594,7 +604,12 @@ static int storage_trunk_save() } } - return storage_trunk_compress(); + if ((result=storage_trunk_compress()) == 0) + { + return trunk_unlink_all_mark_files(); //because the binlog file be compressed + } + + return result; } int trunk_binlog_compress_func(void *args) @@ -617,12 +632,7 @@ int trunk_binlog_compress_func(void *args) return 0; } - g_if_trunker_self = false; //for sync thread exit - trunk_waiting_sync_thread_exit(); - - g_if_trunker_self = true; //restore to true - trunk_sync_thread_start_all(); - + trunk_sync_notify_thread_reset_offset(); return 0; } diff --git a/storage/trunk_mgr/trunk_sync.c b/storage/trunk_mgr/trunk_sync.c index b08adc2..65909a0 100644 --- a/storage/trunk_mgr/trunk_sync.c +++ b/storage/trunk_mgr/trunk_sync.c @@ -52,8 +52,22 @@ static char *trunk_binlog_write_cache_buff = NULL; static int trunk_binlog_write_cache_len = 0; static int trunk_binlog_write_version = 1; +typedef struct +{ + bool running; + bool reset_binlog_offset; + const FDFSStorageBrief *pStorage; + pthread_t tid; +} TrunkSyncThreadInfo; + +typedef struct +{ + TrunkSyncThreadInfo **thread_data; + int alloc_count; +} TrunkSyncThreadInfoArray; + /* save sync thread ids */ -static pthread_t *trunk_sync_tids = NULL; +static TrunkSyncThreadInfoArray sync_thread_info_array = {NULL, 0}; static int trunk_write_to_mark_file(TrunkBinLogReader *pReader); static int trunk_binlog_fsync_ex(const bool bNeedLock, \ @@ -199,27 +213,43 @@ int kill_trunk_sync_threads() { int result; int kill_res; + TrunkSyncThreadInfo **thread_info; + TrunkSyncThreadInfo **info_end; - if (trunk_sync_tids == NULL) + if (sync_thread_info_array.thread_data == NULL) { return 0; } if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_lock fail, " + "errno: %d, error info: %s", __LINE__, result, STRERROR(result)); } - kill_res = kill_work_threads(trunk_sync_tids, g_trunk_sync_thread_count); + kill_res = 0; + info_end = sync_thread_info_array.thread_data + + sync_thread_info_array.alloc_count; + for (thread_info=sync_thread_info_array.thread_data; + thread_inforunning && (kill_res=pthread_kill( + (*thread_info)->tid, SIGINT)) != 0) + { + logError("file: "__FILE__", line: %d, " + "kill thread failed, " + "errno: %d, error info: %s", + __LINE__, kill_res, STRERROR(kill_res)); + } + } if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", __LINE__, result, STRERROR(result)); } @@ -231,6 +261,47 @@ int kill_trunk_sync_threads() return kill_res; } +int trunk_sync_notify_thread_reset_offset() +{ + int result; + TrunkSyncThreadInfo **thread_info; + TrunkSyncThreadInfo **info_end; + + if (sync_thread_info_array.thread_data == NULL) + { + return 0; + } + + if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_lock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + } + + info_end = sync_thread_info_array.thread_data + + sync_thread_info_array.alloc_count; + for (thread_info=sync_thread_info_array.thread_data; + thread_inforunning) + { + (*thread_info)->reset_binlog_offset = true; + } + } + + if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + } + + return result; +} + int trunk_binlog_sync_func(void *args) { if (trunk_binlog_write_cache_len > 0) @@ -809,6 +880,7 @@ int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \ get_filename_func filename_func, const void *pArg) { char full_filename[MAX_PATH_SIZE]; + struct stat file_stat; if (pReader->binlog_fd >= 0) { @@ -819,14 +891,34 @@ int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \ pReader->binlog_fd = open(full_filename, O_RDONLY); if (pReader->binlog_fd < 0) { - logError("file: "__FILE__", line: %d, " \ - "open binlog file \"%s\" fail, " \ - "errno: %d, error info: %s", \ - __LINE__, full_filename, \ + logError("file: "__FILE__", line: %d, " + "open binlog file \"%s\" fail, " + "errno: %d, error info: %s", + __LINE__, full_filename, errno, STRERROR(errno)); return errno != 0 ? errno : ENOENT; } + if (fstat(pReader->binlog_fd, &file_stat) != 0) + { + logError("file: "__FILE__", line: %d, " + "stat binlog file \"%s\" fail, " + "errno: %d, error info: %s", + __LINE__, full_filename, + errno, STRERROR(errno)); + return errno != 0 ? errno : ENOENT; + } + + if (pReader->binlog_offset > file_stat.st_size) + { + logWarning("file: "__FILE__", line: %d, " + "binlog file \"%s\", binlog_offset: %"PRId64 + " > file size: %"PRId64", set binlog_offset to 0", + __LINE__, full_filename, pReader->binlog_offset, + (int64_t)file_stat.st_size); + pReader->binlog_offset = 0; + } + if (pReader->binlog_offset > 0 && \ lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0) { @@ -895,7 +987,8 @@ static char *trunk_get_mark_filename_by_id(const char *storage_id, full_filename, filename_size); } -int trunk_reader_init(FDFSStorageBrief *pStorage, TrunkBinLogReader *pReader) +int trunk_reader_init(const FDFSStorageBrief *pStorage, + TrunkBinLogReader *pReader) { char full_filename[MAX_PATH_SIZE]; IniContext iniContext; @@ -1067,11 +1160,11 @@ static int trunk_write_to_mark_file(TrunkBinLogReader *pReader) int len; int result; - len = sprintf(buff, \ - "%s=%"PRId64"\n", \ + len = sprintf(buff, + "%s=%"PRId64"\n", MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset); - if ((result=storage_write_to_fd(pReader->mark_fd, \ + if ((result=storage_write_to_fd(pReader->mark_fd, trunk_mark_filename_by_reader, pReader, buff, len)) == 0) { pReader->last_binlog_offset = pReader->binlog_offset; @@ -1303,48 +1396,33 @@ int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \ return 0; } -static void trunk_sync_thread_exit(ConnectionInfo *pStorage) +static void trunk_sync_thread_exit(TrunkSyncThreadInfo *thread_data, + const int port) { int result; - int i; - pthread_t tid; if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_lock fail, " + "errno: %d, error info: %s", __LINE__, result, STRERROR(result)); } - - tid = pthread_self(); - for (i=0; irunning = false; g_trunk_sync_thread_count--; if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", __LINE__, result, STRERROR(result)); } - logInfo("file: "__FILE__", line: %d, " \ - "trunk sync thread to storage server %s:%d exit", - __LINE__, pStorage->ip_addr, pStorage->port); + logInfo("file: "__FILE__", line: %d, " + "trunk sync thread to storage server %s:%d exit", + __LINE__, thread_data->pStorage->ip_addr, port); } static int trunk_sync_data(TrunkBinLogReader *pReader, \ @@ -1419,9 +1497,10 @@ static int trunk_sync_data(TrunkBinLogReader *pReader, \ return 0; } -static void* trunk_sync_thread_entrance(void* arg) +static void *trunk_sync_thread_entrance(void* arg) { - FDFSStorageBrief *pStorage; + TrunkSyncThreadInfo *thread_data; + const FDFSStorageBrief *pStorage; TrunkBinLogReader reader; ConnectionInfo storage_server; char local_ip_addr[IP_ADDRESS_SIZE]; @@ -1439,7 +1518,8 @@ static void* trunk_sync_thread_entrance(void* arg) current_time = g_current_time; last_keep_alive_time = 0; - pStorage = (FDFSStorageBrief *)arg; + thread_data = (TrunkSyncThreadInfo *)arg; + pStorage = thread_data->pStorage; strcpy(storage_server.ip_addr, pStorage->ip_addr); storage_server.port = g_server_port; @@ -1503,6 +1583,16 @@ static void* trunk_sync_thread_entrance(void* arg) break; } + if (thread_data->reset_binlog_offset) + { + thread_data->reset_binlog_offset = false; + if (reader.binlog_offset > 0) + { + reader.binlog_offset = 0; + trunk_write_to_mark_file(&reader); + } + } + if (reader.binlog_offset == 0) { if ((result=fdfs_deal_no_body_cmd(&storage_server, \ @@ -1520,9 +1610,9 @@ static void* trunk_sync_thread_entrance(void* arg) } sync_result = 0; - while (g_continue_flag && \ - pStorage->status != FDFS_STORAGE_STATUS_DELETED && \ - pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED && \ + while (g_continue_flag && !thread_data->reset_binlog_offset && + pStorage->status != FDFS_STORAGE_STATUS_DELETED && + pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED && pStorage->status != FDFS_STORAGE_STATUS_NONE) { read_result = trunk_binlog_preread(&reader); @@ -1613,7 +1703,7 @@ static void* trunk_sync_thread_entrance(void* arg) } trunk_reader_destroy(&reader); - trunk_sync_thread_exit(&storage_server); + trunk_sync_thread_exit(thread_data, storage_server.port); return NULL; } @@ -1639,14 +1729,98 @@ int trunk_sync_thread_start_all() return result; } +TrunkSyncThreadInfo *trunk_sync_alloc_thread_data() +{ + TrunkSyncThreadInfo **thread_info; + TrunkSyncThreadInfo **info_end; + TrunkSyncThreadInfo **new_thread_data; + TrunkSyncThreadInfo **new_data_start; + int alloc_count; + int bytes; + + if (g_trunk_sync_thread_count + 1 < sync_thread_info_array.alloc_count) + { + info_end = sync_thread_info_array.thread_data + + sync_thread_info_array.alloc_count; + for (thread_info=sync_thread_info_array.thread_data; + thread_inforunning) + { + return *thread_info; + } + } + } + + if (sync_thread_info_array.alloc_count == 0) + { + alloc_count = 1; + } + else + { + alloc_count = sync_thread_info_array.alloc_count * 2; + } + + bytes = sizeof(TrunkSyncThreadInfo *) * alloc_count; + new_thread_data = (TrunkSyncThreadInfo **)malloc(bytes); + if (new_thread_data == NULL) + { + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, " + "errno: %d, error info: %s", + __LINE__, bytes, errno, STRERROR(errno)); + return NULL; + } + + logInfo("file: "__FILE__", line: %d, " + "alloc %d thread data entries", + __LINE__, alloc_count); + + if (sync_thread_info_array.alloc_count > 0) + { + memcpy(new_thread_data, sync_thread_info_array.thread_data, + sizeof(TrunkSyncThreadInfo *) * + sync_thread_info_array.alloc_count); + } + + new_data_start = new_thread_data + sync_thread_info_array.alloc_count; + info_end = new_thread_data + alloc_count; + for (thread_info=new_data_start; thread_infostatus == FDFS_STORAGE_STATUS_DELETED || \ - pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || \ + if (pStorage->status == FDFS_STORAGE_STATUS_DELETED || + pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || pStorage->status == FDFS_STORAGE_STATUS_NONE) { return 0; @@ -1663,59 +1837,50 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage) return result; } - /* - //printf("start storage ip_addr: %s, g_trunk_sync_thread_count=%d\n", - pStorage->ip_addr, g_trunk_sync_thread_count); - */ - - if ((result=pthread_create(&tid, &pattr, trunk_sync_thread_entrance, \ - (void *)pStorage)) != 0) + if ((lock_res=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "create thread failed, errno: %d, " \ - "error info: %s", \ - __LINE__, result, STRERROR(result)); - - pthread_attr_destroy(&pattr); - return result; + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_lock fail, " + "errno: %d, error info: %s", + __LINE__, lock_res, STRERROR(lock_res)); } - if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } + do + { + thread_data = trunk_sync_alloc_thread_data(); + if (thread_data == NULL) + { + result = ENOMEM; + break; + } - g_trunk_sync_thread_count++; - trunk_sync_tids = (pthread_t *)realloc(trunk_sync_tids, sizeof(pthread_t) * \ - g_trunk_sync_thread_count); - if (trunk_sync_tids == NULL) - { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, " \ - "errno: %d, error info: %s", \ - __LINE__, (int)sizeof(pthread_t) * \ - g_trunk_sync_thread_count, \ - errno, STRERROR(errno)); - } - else - { - trunk_sync_tids[g_trunk_sync_thread_count - 1] = tid; - } + thread_data->running = true; + thread_data->pStorage = pStorage; + if ((result=pthread_create(&thread_data->tid, &pattr, + trunk_sync_thread_entrance, + (void *)thread_data)) != 0) + { + thread_data->running = false; + logError("file: "__FILE__", line: %d, " + "create thread failed, errno: %d, " + "error info: %s", + __LINE__, result, STRERROR(result)); + break; + } - if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } + g_trunk_sync_thread_count++; + } while (0); + + if ((lock_res=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", + __LINE__, lock_res, STRERROR(lock_res)); + } pthread_attr_destroy(&pattr); - - return 0; + return result; } void trunk_waiting_sync_thread_exit() diff --git a/storage/trunk_mgr/trunk_sync.h b/storage/trunk_mgr/trunk_sync.h index 40ad243..e94c471 100644 --- a/storage/trunk_mgr/trunk_sync.h +++ b/storage/trunk_mgr/trunk_sync.h @@ -73,7 +73,8 @@ int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \ int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \ get_filename_func filename_func, const void *pArg); -int trunk_reader_init(FDFSStorageBrief *pStorage, TrunkBinLogReader *pReader); +int trunk_reader_init(const FDFSStorageBrief *pStorage, + TrunkBinLogReader *pReader); void trunk_reader_destroy(TrunkBinLogReader *pReader); //trunk binlog compress @@ -81,6 +82,8 @@ int trunk_binlog_compress_apply(); int trunk_binlog_compress_commit(); int trunk_binlog_compress_rollback(); +int trunk_sync_notify_thread_reset_offset(); + #ifdef __cplusplus } #endif