From 079bc4737bf916010589703b2a231f072485e91f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 24 Dec 2022 10:29:42 +0800 Subject: [PATCH] use prctl to set pthread name under Linux --- HISTORY | 3 ++ common/fdfs_global.c | 2 +- storage/storage_dio.c | 43 ++++++++++++++++++---- storage/storage_dio.h | 3 ++ storage/storage_service.c | 2 +- storage/storage_sync.c | 63 +++++++++++++++++++-------------- storage/storage_sync.h | 2 +- storage/tracker_client_thread.c | 9 +++++ storage/trunk_mgr/trunk_sync.c | 42 ++++++++++++++-------- storage/trunk_mgr/trunk_sync.h | 2 +- tracker/tracker_relationship.c | 6 ++++ 11 files changed, 125 insertions(+), 52 deletions(-) diff --git a/HISTORY b/HISTORY index b540775..201b0e0 100644 --- a/HISTORY +++ b/HISTORY @@ -1,4 +1,7 @@ +Version 6.9.3 2022-12-24 + * use prctl to set pthread name under Linux + Version 6.9.2 2022-11-28 * space size such as total_mb and free_mb use int64_t instead of int * bugfixed: log connection ip_addr and port correctly diff --git a/common/fdfs_global.c b/common/fdfs_global.c index 8e072f9..d34fed4 100644 --- a/common/fdfs_global.c +++ b/common/fdfs_global.c @@ -20,7 +20,7 @@ #include "fastcommon/logger.h" #include "fdfs_global.h" -Version g_fdfs_version = {6, 9, 2}; +Version g_fdfs_version = {6, 9, 3}; bool g_use_connection_pool = false; ConnectionPool g_connection_pool; int g_connection_pool_max_idle_time = 3600; diff --git a/storage/storage_dio.c b/storage/storage_dio.c index a7a8755..a30b338 100644 --- a/storage/storage_dio.c +++ b/storage/storage_dio.c @@ -25,6 +25,7 @@ #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/ioevent_loop.h" +#include "fastcommon/fc_atomic.h" #include "sf/sf_service.h" #include "storage_global.h" #include "storage_service.h" @@ -87,13 +88,13 @@ int storage_dio_init() for (pThreadData=g_dio_thread_data; pThreadDatacount = threads_count_per_path; - pThreadData->contexts = g_dio_contexts + (pThreadData - \ + pThreadData->contexts = g_dio_contexts + (pThreadData - g_dio_thread_data) * threads_count_per_path; pThreadData->reader = pThreadData->contexts; pThreadData->writer = pThreadData->contexts+g_disk_reader_threads; pContextEnd = pThreadData->contexts + pThreadData->count; - for (pContext=pThreadData->contexts; pContextcontexts; pContextqueue))) != 0) @@ -101,7 +102,25 @@ int storage_dio_init() return result; } - if ((result=pthread_create(&tid, &thread_attr, \ + pContext->path_index = pThreadData - g_dio_thread_data; + pContext->thread_index = pContext - pThreadData->contexts; + if (g_disk_rw_separated) + { + if (pContext->thread_index < g_disk_reader_threads) + { + pContext->rw = "r"; + } + else + { + pContext->rw = "w"; + pContext->thread_index -= g_disk_reader_threads; + } + } + else + { + pContext->rw = "rw"; + } + if ((result=pthread_create(&tid, &thread_attr, dio_thread_entrance, pContext)) != 0) { logError("file: "__FILE__", line: %d, " \ @@ -723,12 +742,22 @@ void dio_trunk_write_finish_clean_up(struct fast_task_info *pTask) } } -static void *dio_thread_entrance(void* arg) +static void *dio_thread_entrance(void* arg) { struct storage_dio_context *pContext; struct fast_task_info *pTask; pContext = (struct storage_dio_context *)arg; + +#ifdef OS_LINUX + { + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), "dio-p%02d-%s[%d]", + pContext->path_index, pContext->rw, pContext->thread_index); + prctl(PR_SET_NAME, thread_name); + } +#endif + while (SF_G_CONTINUE_FLAG) { while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL) @@ -740,9 +769,9 @@ static void *dio_thread_entrance(void* arg) __sync_sub_and_fetch(&g_dio_thread_count, 1); - logDebug("file: "__FILE__", line: %d, " \ - "dio thread exited, thread count: %d", \ - __LINE__, g_dio_thread_count); + logDebug("file: "__FILE__", line: %d, " + "dio thread exited, thread count: %d", __LINE__, + FC_ATOMIC_GET(g_dio_thread_count)); return NULL; } diff --git a/storage/storage_dio.h b/storage/storage_dio.h index 4e6f09d..e421877 100644 --- a/storage/storage_dio.h +++ b/storage/storage_dio.h @@ -21,6 +21,9 @@ struct storage_dio_context { + int path_index; + int thread_index; + const char *rw; struct fast_blocked_queue queue; }; diff --git a/storage/storage_service.c b/storage/storage_service.c index 333c726..640d9e1 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -1653,7 +1653,7 @@ int storage_service_init() NULL, sock_accept_done_callback, storage_set_body_length, sock_send_done_callback, storage_deal_task, task_finish_clean_up, NULL, 1000, sizeof(TrackerHeader), sizeof(StorageClientInfo)); - sf_enable_thread_notify(false); + sf_enable_thread_notify(true); sf_set_remove_from_ready_list(false); return result; diff --git a/storage/storage_sync.c b/storage/storage_sync.c index d2cab16..a256c48 100644 --- a/storage/storage_sync.c +++ b/storage/storage_sync.c @@ -19,14 +19,15 @@ #include #include #include -#include "fdfs_define.h" #include "fastcommon/logger.h" -#include "fdfs_global.h" #include "fastcommon/sockopt.h" #include "fastcommon/shared_func.h" #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/ini_file_reader.h" +#include "fastcommon/fc_atomic.h" +#include "fdfs_define.h" +#include "fdfs_global.h" #include "tracker_types.h" #include "tracker_proto.h" #include "storage_global.h" @@ -62,7 +63,7 @@ int g_binlog_index = 0; static int64_t binlog_file_size = 0; static int binlog_compress_index = 0; -int g_storage_sync_thread_count = 0; +volatile int g_storage_sync_thread_count = 0; static pthread_mutex_t sync_thread_lock; static char *binlog_write_cache_buff = NULL; static int binlog_write_cache_len = 0; @@ -1551,7 +1552,8 @@ int kill_storage_sync_threads() __LINE__, result, STRERROR(result)); } - kill_res = kill_work_threads(sync_tids, g_storage_sync_thread_count); + kill_res = kill_work_threads(sync_tids, FC_ATOMIC_GET( + g_storage_sync_thread_count)); if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0) { @@ -1561,7 +1563,7 @@ int kill_storage_sync_threads() __LINE__, result, STRERROR(result)); } - while (g_storage_sync_thread_count > 0) + while (FC_ATOMIC_GET(g_storage_sync_thread_count) > 0) { usleep(50000); } @@ -2853,6 +2855,7 @@ static void storage_sync_thread_exit(ConnectionInfo *pStorage) { int result; int i; + int thread_count; pthread_t tid; if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0) @@ -2863,8 +2866,9 @@ static void storage_sync_thread_exit(ConnectionInfo *pStorage) __LINE__, result, STRERROR(result)); } + thread_count = FC_ATOMIC_GET(g_storage_sync_thread_count); tid = pthread_self(); - for (i=0; iip_addr); storage_server.port = SF_G_INNER_PORT; storage_server.sock = -1; +#ifdef OS_LINUX + { + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), "data-sync[%d]", + FC_ATOMIC_GET(g_storage_sync_thread_count)); + prctl(PR_SET_NAME, thread_name); + } +#endif + memset(local_ip_addr, 0, sizeof(local_ip_addr)); pReader = (StorageBinLogReader *)malloc(sizeof(StorageBinLogReader)); if (pReader == NULL) @@ -3253,6 +3266,7 @@ static void* storage_sync_thread_entrance(void* arg) int storage_sync_thread_start(const FDFSStorageBrief *pStorage) { int result; + int thread_count; pthread_attr_t pattr; pthread_t tid; @@ -3286,13 +3300,12 @@ int storage_sync_thread_start(const FDFSStorageBrief *pStorage) pStorage->ip_addr, g_storage_sync_thread_count); */ - if ((result=pthread_create(&tid, &pattr, storage_sync_thread_entrance, \ - (void *)pStorage)) != 0) + if ((result=pthread_create(&tid, &pattr, storage_sync_thread_entrance, + (void *)pStorage)) != 0) { - logError("file: "__FILE__", line: %d, " \ - "create thread failed, errno: %d, " \ - "error info: %s", \ - __LINE__, result, STRERROR(result)); + logError("file: "__FILE__", line: %d, " + "create thread failed, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); pthread_attr_destroy(&pattr); return result; @@ -3306,21 +3319,19 @@ int storage_sync_thread_start(const FDFSStorageBrief *pStorage) __LINE__, result, STRERROR(result)); } - g_storage_sync_thread_count++; - sync_tids = (pthread_t *)realloc(sync_tids, sizeof(pthread_t) * \ - g_storage_sync_thread_count); + thread_count = FC_ATOMIC_INC(g_storage_sync_thread_count); + sync_tids = (pthread_t *)realloc(sync_tids, + sizeof(pthread_t) * thread_count); if (sync_tids == NULL) { - logError("file: "__FILE__", line: %d, " \ - "malloc %d bytes fail, " \ - "errno: %d, error info: %s", \ - __LINE__, (int)sizeof(pthread_t) * \ - g_storage_sync_thread_count, \ - errno, STRERROR(errno)); + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, (int)sizeof(pthread_t) * thread_count, + errno, STRERROR(errno)); } else { - sync_tids[g_storage_sync_thread_count - 1] = tid; + sync_tids[thread_count - 1] = tid; } if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0) diff --git a/storage/storage_sync.h b/storage/storage_sync.h index 7dfa99d..edb99cb 100644 --- a/storage/storage_sync.h +++ b/storage/storage_sync.h @@ -74,7 +74,7 @@ typedef struct extern int g_binlog_fd; extern int g_binlog_index; -extern int g_storage_sync_thread_count; +extern volatile int g_storage_sync_thread_count; int storage_sync_init(); int storage_sync_destroy(); diff --git a/storage/tracker_client_thread.c b/storage/tracker_client_thread.c index 9084c14..1dcb325 100644 --- a/storage/tracker_client_thread.c +++ b/storage/tracker_client_thread.c @@ -224,6 +224,15 @@ static void *tracker_report_thread_entrance(void *arg) fdfs_server_sock_reset(pTrackerServer); tracker_index = pTrackerServer - g_tracker_group.servers; +#ifdef OS_LINUX + { + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), + "tracker-cli[%d]", tracker_index); + prctl(PR_SET_NAME, thread_name); + } +#endif + logDebug("file: "__FILE__", line: %d, " "report thread to tracker server %s:%u started", __LINE__, pTrackerServer->connections[0].ip_addr, diff --git a/storage/trunk_mgr/trunk_sync.c b/storage/trunk_mgr/trunk_sync.c index c0abb8d..29fce2e 100644 --- a/storage/trunk_mgr/trunk_sync.c +++ b/storage/trunk_mgr/trunk_sync.c @@ -21,14 +21,15 @@ #include #include #include -#include "fdfs_define.h" -#include "fastcommon/logger.h" -#include "fdfs_global.h" #include "fastcommon/sockopt.h" #include "fastcommon/shared_func.h" #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/ini_file_reader.h" +#include "fastcommon/fc_atomic.h" +#include "fdfs_define.h" +#include "fastcommon/logger.h" +#include "fdfs_global.h" #include "tracker_types.h" #include "tracker_proto.h" #include "storage_global.h" @@ -49,7 +50,7 @@ static int trunk_binlog_fd = -1; -int g_trunk_sync_thread_count = 0; +volatile int g_trunk_sync_thread_count = 0; static pthread_mutex_t trunk_sync_thread_lock; static char *trunk_binlog_write_cache_buff = NULL; static int trunk_binlog_write_cache_len = 0; @@ -59,6 +60,7 @@ typedef struct { bool running; bool reset_binlog_offset; + int thread_index; const FDFSStorageBrief *pStorage; pthread_t tid; } TrunkSyncThreadInfo; @@ -288,7 +290,7 @@ int kill_trunk_sync_threads() __LINE__, result, STRERROR(result)); } - while (g_trunk_sync_thread_count > 0) + while (FC_ATOMIC_GET(g_trunk_sync_thread_count) > 0) { usleep(50000); } @@ -1961,7 +1963,7 @@ static void trunk_sync_thread_exit(TrunkSyncThreadInfo *thread_data, } thread_data->running = false; - g_trunk_sync_thread_count--; + FC_ATOMIC_DEC(g_trunk_sync_thread_count); if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) { @@ -2060,7 +2062,17 @@ static void *trunk_sync_thread_entrance(void* arg) int result; time_t current_time; time_t last_keep_alive_time; - + + thread_data = (TrunkSyncThreadInfo *)arg; +#ifdef OS_LINUX + { + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), + "trunk-sync[%d]", thread_data->thread_index); + prctl(PR_SET_NAME, thread_name); + } +#endif + memset(local_ip_addr, 0, sizeof(local_ip_addr)); memset(&reader, 0, sizeof(reader)); reader.binlog_fd = -1; @@ -2068,9 +2080,7 @@ static void *trunk_sync_thread_entrance(void* arg) current_time = g_current_time; last_keep_alive_time = 0; - thread_data = (TrunkSyncThreadInfo *)arg; pStorage = thread_data->pStorage; - strcpy(storage_server.ip_addr, pStorage->ip_addr); storage_server.port = SF_G_INNER_PORT; storage_server.sock = -1; @@ -2289,7 +2299,8 @@ TrunkSyncThreadInfo *trunk_sync_alloc_thread_data() int alloc_count; int bytes; - if (g_trunk_sync_thread_count + 1 < sync_thread_info_array.alloc_count) + if (FC_ATOMIC_GET(g_trunk_sync_thread_count) + 1 < + sync_thread_info_array.alloc_count) { info_end = sync_thread_info_array.thread_data + sync_thread_info_array.alloc_count; @@ -2351,6 +2362,7 @@ TrunkSyncThreadInfo *trunk_sync_alloc_thread_data() } memset(*thread_info, 0, sizeof(TrunkSyncThreadInfo)); + (*thread_info)->thread_index = thread_info - new_thread_data; } old_thread_data = sync_thread_info_array.thread_data; @@ -2420,7 +2432,7 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage) break; } - g_trunk_sync_thread_count++; + FC_ATOMIC_INC(g_trunk_sync_thread_count); } while (0); if ((lock_res=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0) @@ -2440,7 +2452,7 @@ void trunk_waiting_sync_thread_exit() int saved_trunk_sync_thread_count; int count; - saved_trunk_sync_thread_count = g_trunk_sync_thread_count; + saved_trunk_sync_thread_count = FC_ATOMIC_GET(g_trunk_sync_thread_count); if (saved_trunk_sync_thread_count > 0) { logInfo("file: "__FILE__", line: %d, " @@ -2449,17 +2461,17 @@ void trunk_waiting_sync_thread_exit() } count = 0; - while (g_trunk_sync_thread_count > 0 && count < 60) + while (FC_ATOMIC_GET(g_trunk_sync_thread_count) > 0 && count < 60) { usleep(50000); count++; } - if (g_trunk_sync_thread_count > 0) + if (FC_ATOMIC_GET(g_trunk_sync_thread_count) > 0) { logWarning("file: "__FILE__", line: %d, " "kill %d trunk sync threads.", - __LINE__, g_trunk_sync_thread_count); + __LINE__, FC_ATOMIC_GET(g_trunk_sync_thread_count)); kill_trunk_sync_threads(); } diff --git a/storage/trunk_mgr/trunk_sync.h b/storage/trunk_mgr/trunk_sync.h index 958d7bd..4d0efa2 100644 --- a/storage/trunk_mgr/trunk_sync.h +++ b/storage/trunk_mgr/trunk_sync.h @@ -42,7 +42,7 @@ typedef struct FDFSTrunkFullInfo trunk; } TrunkBinLogRecord; -extern int g_trunk_sync_thread_count; +extern volatile int g_trunk_sync_thread_count; int trunk_sync_init(); int trunk_sync_destroy(); diff --git a/tracker/tracker_relationship.c b/tracker/tracker_relationship.c index 9b2bd80..8c5329a 100644 --- a/tracker/tracker_relationship.c +++ b/tracker/tracker_relationship.c @@ -561,6 +561,12 @@ static void *relationship_thread_entrance(void* arg) int fail_count; int sleep_seconds; +#ifdef OS_LINUX + { + prctl(PR_SET_NAME, "relationship"); + } +#endif + fail_count = 0; sleep_seconds = 1; while (SF_G_CONTINUE_FLAG)