compress the trunk binlog gracefully

pull/484/head
YuQing 2019-12-15 18:49:02 +08:00
parent cf0ec7e4cf
commit cab3a90d7f
6 changed files with 291 additions and 113 deletions

View File

@ -1,5 +1,5 @@
Version 6.05 2019-12-14
Version 6.05 2019-12-15
* fdfs_trackerd and fdfs_storaged print the server version in usage.
you can execute fdfs_trackerd or fdfs_storaged without parameters
to show the server version

View File

@ -29,7 +29,7 @@
#include "storage_func.h"
#include "storage_sync_func.h"
void storage_sync_connect_storage_server_ex(FDFSStorageBrief *pStorage,
void storage_sync_connect_storage_server_ex(const FDFSStorageBrief *pStorage,
ConnectionInfo *conn, bool *check_flag)
{
int nContinuousFail;

View File

@ -17,11 +17,11 @@
extern "C" {
#endif
void storage_sync_connect_storage_server_ex(FDFSStorageBrief *pStorage,
void storage_sync_connect_storage_server_ex(const FDFSStorageBrief *pStorage,
ConnectionInfo *conn, bool *check_flag);
static inline void storage_sync_connect_storage_server(
FDFSStorageBrief *pStorage, ConnectionInfo *conn)
const FDFSStorageBrief *pStorage, ConnectionInfo *conn)
{
bool check_flag = true;
storage_sync_connect_storage_server_ex(pStorage,

View File

@ -520,6 +520,15 @@ static int storage_trunk_compress()
{
int result;
if (g_current_time - g_up_time < 600)
{
logWarning("file: "__FILE__", line: %d, "
"too little time lapse: %ds afer startup, "
"skip trunk binlog compress", __LINE__,
(int)(g_current_time - g_up_time));
return EBUSY;
}
if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 1) != 1)
{
__sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1);
@ -563,7 +572,6 @@ static int storage_trunk_compress()
{
logInfo("file: "__FILE__", line: %d, "
"compress trunk binlog successfully.", __LINE__);
return trunk_unlink_all_mark_files(); //because the binlog file be compressed
}
else
{
@ -576,6 +584,8 @@ static int storage_trunk_compress()
static int storage_trunk_save()
{
int result;
if (!(g_trunk_compress_binlog_min_interval > 0 &&
g_current_time - g_trunk_last_compress_time >
g_trunk_compress_binlog_min_interval))
@ -594,7 +604,12 @@ static int storage_trunk_save()
}
}
return storage_trunk_compress();
if ((result=storage_trunk_compress()) == 0)
{
return trunk_unlink_all_mark_files(); //because the binlog file be compressed
}
return result;
}
int trunk_binlog_compress_func(void *args)
@ -617,12 +632,7 @@ int trunk_binlog_compress_func(void *args)
return 0;
}
g_if_trunker_self = false; //for sync thread exit
trunk_waiting_sync_thread_exit();
g_if_trunker_self = true; //restore to true
trunk_sync_thread_start_all();
trunk_sync_notify_thread_reset_offset();
return 0;
}

View File

@ -52,8 +52,22 @@ static char *trunk_binlog_write_cache_buff = NULL;
static int trunk_binlog_write_cache_len = 0;
static int trunk_binlog_write_version = 1;
typedef struct
{
bool running;
bool reset_binlog_offset;
const FDFSStorageBrief *pStorage;
pthread_t tid;
} TrunkSyncThreadInfo;
typedef struct
{
TrunkSyncThreadInfo **thread_data;
int alloc_count;
} TrunkSyncThreadInfoArray;
/* save sync thread ids */
static pthread_t *trunk_sync_tids = NULL;
static TrunkSyncThreadInfoArray sync_thread_info_array = {NULL, 0};
static int trunk_write_to_mark_file(TrunkBinLogReader *pReader);
static int trunk_binlog_fsync_ex(const bool bNeedLock, \
@ -199,27 +213,43 @@ int kill_trunk_sync_threads()
{
int result;
int kill_res;
TrunkSyncThreadInfo **thread_info;
TrunkSyncThreadInfo **info_end;
if (trunk_sync_tids == NULL)
if (sync_thread_info_array.thread_data == NULL)
{
return 0;
}
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_lock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
kill_res = kill_work_threads(trunk_sync_tids, g_trunk_sync_thread_count);
kill_res = 0;
info_end = sync_thread_info_array.thread_data +
sync_thread_info_array.alloc_count;
for (thread_info=sync_thread_info_array.thread_data;
thread_info<info_end; thread_info++)
{
if ((*thread_info)->running && (kill_res=pthread_kill(
(*thread_info)->tid, SIGINT)) != 0)
{
logError("file: "__FILE__", line: %d, "
"kill thread failed, "
"errno: %d, error info: %s",
__LINE__, kill_res, STRERROR(kill_res));
}
}
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_unlock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
@ -231,6 +261,47 @@ int kill_trunk_sync_threads()
return kill_res;
}
int trunk_sync_notify_thread_reset_offset()
{
int result;
TrunkSyncThreadInfo **thread_info;
TrunkSyncThreadInfo **info_end;
if (sync_thread_info_array.thread_data == NULL)
{
return 0;
}
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_lock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
info_end = sync_thread_info_array.thread_data +
sync_thread_info_array.alloc_count;
for (thread_info=sync_thread_info_array.thread_data;
thread_info<info_end; thread_info++)
{
if ((*thread_info)->running)
{
(*thread_info)->reset_binlog_offset = true;
}
}
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_unlock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
return result;
}
int trunk_binlog_sync_func(void *args)
{
if (trunk_binlog_write_cache_len > 0)
@ -809,6 +880,7 @@ int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \
get_filename_func filename_func, const void *pArg)
{
char full_filename[MAX_PATH_SIZE];
struct stat file_stat;
if (pReader->binlog_fd >= 0)
{
@ -819,14 +891,34 @@ int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \
pReader->binlog_fd = open(full_filename, O_RDONLY);
if (pReader->binlog_fd < 0)
{
logError("file: "__FILE__", line: %d, " \
"open binlog file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
logError("file: "__FILE__", line: %d, "
"open binlog file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
if (fstat(pReader->binlog_fd, &file_stat) != 0)
{
logError("file: "__FILE__", line: %d, "
"stat binlog file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
if (pReader->binlog_offset > file_stat.st_size)
{
logWarning("file: "__FILE__", line: %d, "
"binlog file \"%s\", binlog_offset: %"PRId64
" > file size: %"PRId64", set binlog_offset to 0",
__LINE__, full_filename, pReader->binlog_offset,
(int64_t)file_stat.st_size);
pReader->binlog_offset = 0;
}
if (pReader->binlog_offset > 0 && \
lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0)
{
@ -895,7 +987,8 @@ static char *trunk_get_mark_filename_by_id(const char *storage_id,
full_filename, filename_size);
}
int trunk_reader_init(FDFSStorageBrief *pStorage, TrunkBinLogReader *pReader)
int trunk_reader_init(const FDFSStorageBrief *pStorage,
TrunkBinLogReader *pReader)
{
char full_filename[MAX_PATH_SIZE];
IniContext iniContext;
@ -1067,11 +1160,11 @@ static int trunk_write_to_mark_file(TrunkBinLogReader *pReader)
int len;
int result;
len = sprintf(buff, \
"%s=%"PRId64"\n", \
len = sprintf(buff,
"%s=%"PRId64"\n",
MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset);
if ((result=storage_write_to_fd(pReader->mark_fd, \
if ((result=storage_write_to_fd(pReader->mark_fd,
trunk_mark_filename_by_reader, pReader, buff, len)) == 0)
{
pReader->last_binlog_offset = pReader->binlog_offset;
@ -1303,48 +1396,33 @@ int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \
return 0;
}
static void trunk_sync_thread_exit(ConnectionInfo *pStorage)
static void trunk_sync_thread_exit(TrunkSyncThreadInfo *thread_data,
const int port)
{
int result;
int i;
pthread_t tid;
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_lock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
tid = pthread_self();
for (i=0; i<g_trunk_sync_thread_count; i++)
{
if (pthread_equal(trunk_sync_tids[i], tid))
{
break;
}
}
while (i < g_trunk_sync_thread_count - 1)
{
trunk_sync_tids[i] = trunk_sync_tids[i + 1];
i++;
}
thread_data->running = false;
g_trunk_sync_thread_count--;
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_unlock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
logInfo("file: "__FILE__", line: %d, " \
"trunk sync thread to storage server %s:%d exit",
__LINE__, pStorage->ip_addr, pStorage->port);
logInfo("file: "__FILE__", line: %d, "
"trunk sync thread to storage server %s:%d exit",
__LINE__, thread_data->pStorage->ip_addr, port);
}
static int trunk_sync_data(TrunkBinLogReader *pReader, \
@ -1419,9 +1497,10 @@ static int trunk_sync_data(TrunkBinLogReader *pReader, \
return 0;
}
static void* trunk_sync_thread_entrance(void* arg)
static void *trunk_sync_thread_entrance(void* arg)
{
FDFSStorageBrief *pStorage;
TrunkSyncThreadInfo *thread_data;
const FDFSStorageBrief *pStorage;
TrunkBinLogReader reader;
ConnectionInfo storage_server;
char local_ip_addr[IP_ADDRESS_SIZE];
@ -1439,7 +1518,8 @@ static void* trunk_sync_thread_entrance(void* arg)
current_time = g_current_time;
last_keep_alive_time = 0;
pStorage = (FDFSStorageBrief *)arg;
thread_data = (TrunkSyncThreadInfo *)arg;
pStorage = thread_data->pStorage;
strcpy(storage_server.ip_addr, pStorage->ip_addr);
storage_server.port = g_server_port;
@ -1503,6 +1583,16 @@ static void* trunk_sync_thread_entrance(void* arg)
break;
}
if (thread_data->reset_binlog_offset)
{
thread_data->reset_binlog_offset = false;
if (reader.binlog_offset > 0)
{
reader.binlog_offset = 0;
trunk_write_to_mark_file(&reader);
}
}
if (reader.binlog_offset == 0)
{
if ((result=fdfs_deal_no_body_cmd(&storage_server, \
@ -1520,9 +1610,9 @@ static void* trunk_sync_thread_entrance(void* arg)
}
sync_result = 0;
while (g_continue_flag && \
pStorage->status != FDFS_STORAGE_STATUS_DELETED && \
pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED && \
while (g_continue_flag && !thread_data->reset_binlog_offset &&
pStorage->status != FDFS_STORAGE_STATUS_DELETED &&
pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED &&
pStorage->status != FDFS_STORAGE_STATUS_NONE)
{
read_result = trunk_binlog_preread(&reader);
@ -1613,7 +1703,7 @@ static void* trunk_sync_thread_entrance(void* arg)
}
trunk_reader_destroy(&reader);
trunk_sync_thread_exit(&storage_server);
trunk_sync_thread_exit(thread_data, storage_server.port);
return NULL;
}
@ -1639,14 +1729,98 @@ int trunk_sync_thread_start_all()
return result;
}
TrunkSyncThreadInfo *trunk_sync_alloc_thread_data()
{
TrunkSyncThreadInfo **thread_info;
TrunkSyncThreadInfo **info_end;
TrunkSyncThreadInfo **new_thread_data;
TrunkSyncThreadInfo **new_data_start;
int alloc_count;
int bytes;
if (g_trunk_sync_thread_count + 1 < sync_thread_info_array.alloc_count)
{
info_end = sync_thread_info_array.thread_data +
sync_thread_info_array.alloc_count;
for (thread_info=sync_thread_info_array.thread_data;
thread_info<info_end; thread_info++)
{
if (!(*thread_info)->running)
{
return *thread_info;
}
}
}
if (sync_thread_info_array.alloc_count == 0)
{
alloc_count = 1;
}
else
{
alloc_count = sync_thread_info_array.alloc_count * 2;
}
bytes = sizeof(TrunkSyncThreadInfo *) * alloc_count;
new_thread_data = (TrunkSyncThreadInfo **)malloc(bytes);
if (new_thread_data == NULL)
{
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail, "
"errno: %d, error info: %s",
__LINE__, bytes, errno, STRERROR(errno));
return NULL;
}
logInfo("file: "__FILE__", line: %d, "
"alloc %d thread data entries",
__LINE__, alloc_count);
if (sync_thread_info_array.alloc_count > 0)
{
memcpy(new_thread_data, sync_thread_info_array.thread_data,
sizeof(TrunkSyncThreadInfo *) *
sync_thread_info_array.alloc_count);
}
new_data_start = new_thread_data + sync_thread_info_array.alloc_count;
info_end = new_thread_data + alloc_count;
for (thread_info=new_data_start; thread_info<info_end; thread_info++)
{
*thread_info = (TrunkSyncThreadInfo *)malloc(
sizeof(TrunkSyncThreadInfo));
if (*thread_info == NULL)
{
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail, "
"errno: %d, error info: %s",
__LINE__, (int)sizeof(TrunkSyncThreadInfo),
errno, STRERROR(errno));
return NULL;
}
memset(*thread_info, 0, sizeof(TrunkSyncThreadInfo));
}
if (sync_thread_info_array.thread_data != NULL)
{
free(sync_thread_info_array.thread_data);
}
sync_thread_info_array.thread_data = new_thread_data;
sync_thread_info_array.alloc_count = alloc_count;
return *new_data_start;
}
int trunk_sync_thread_start(const FDFSStorageBrief *pStorage)
{
int result;
int lock_res;
pthread_attr_t pattr;
pthread_t tid;
TrunkSyncThreadInfo *thread_data;
if (pStorage->status == FDFS_STORAGE_STATUS_DELETED || \
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || \
if (pStorage->status == FDFS_STORAGE_STATUS_DELETED ||
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED ||
pStorage->status == FDFS_STORAGE_STATUS_NONE)
{
return 0;
@ -1663,59 +1837,50 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage)
return result;
}
/*
//printf("start storage ip_addr: %s, g_trunk_sync_thread_count=%d\n",
pStorage->ip_addr, g_trunk_sync_thread_count);
*/
if ((result=pthread_create(&tid, &pattr, trunk_sync_thread_entrance, \
(void *)pStorage)) != 0)
if ((lock_res=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, errno: %d, " \
"error info: %s", \
__LINE__, result, STRERROR(result));
pthread_attr_destroy(&pattr);
return result;
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_lock fail, "
"errno: %d, error info: %s",
__LINE__, lock_res, STRERROR(lock_res));
}
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
do
{
thread_data = trunk_sync_alloc_thread_data();
if (thread_data == NULL)
{
result = ENOMEM;
break;
}
g_trunk_sync_thread_count++;
trunk_sync_tids = (pthread_t *)realloc(trunk_sync_tids, sizeof(pthread_t) * \
g_trunk_sync_thread_count);
if (trunk_sync_tids == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, (int)sizeof(pthread_t) * \
g_trunk_sync_thread_count, \
errno, STRERROR(errno));
}
else
{
trunk_sync_tids[g_trunk_sync_thread_count - 1] = tid;
}
thread_data->running = true;
thread_data->pStorage = pStorage;
if ((result=pthread_create(&thread_data->tid, &pattr,
trunk_sync_thread_entrance,
(void *)thread_data)) != 0)
{
thread_data->running = false;
logError("file: "__FILE__", line: %d, "
"create thread failed, errno: %d, "
"error info: %s",
__LINE__, result, STRERROR(result));
break;
}
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_trunk_sync_thread_count++;
} while (0);
if ((lock_res=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_unlock fail, "
"errno: %d, error info: %s",
__LINE__, lock_res, STRERROR(lock_res));
}
pthread_attr_destroy(&pattr);
return 0;
return result;
}
void trunk_waiting_sync_thread_exit()

View File

@ -73,7 +73,8 @@ int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \
int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \
get_filename_func filename_func, const void *pArg);
int trunk_reader_init(FDFSStorageBrief *pStorage, TrunkBinLogReader *pReader);
int trunk_reader_init(const FDFSStorageBrief *pStorage,
TrunkBinLogReader *pReader);
void trunk_reader_destroy(TrunkBinLogReader *pReader);
//trunk binlog compress
@ -81,6 +82,8 @@ int trunk_binlog_compress_apply();
int trunk_binlog_compress_commit();
int trunk_binlog_compress_rollback();
int trunk_sync_notify_thread_reset_offset();
#ifdef __cplusplus
}
#endif