trunk binlog compression support transaction

pull/484/head
YuQing 2019-12-18 21:16:34 +08:00
parent fd8772976d
commit 2c5955c1fe
8 changed files with 867 additions and 305 deletions

View File

@ -1,11 +1,12 @@
Version 6.05 2019-12-15
Version 6.05 2019-12-18
* fdfs_trackerd and fdfs_storaged print the server version in usage.
you can execute fdfs_trackerd or fdfs_storaged without parameters
to show the server version
* trunk server support compress the trunk binlog periodically,
config items in tracker.conf: trunk_compress_binlog_interval
and trunk_compress_binlog_time_base
* trunk binlog compression support transaction
Version 6.04 2019-12-05
* storage_report_ip_changed ignore result EEXIST

View File

@ -73,8 +73,8 @@ typedef struct
#define INIT_ITEM_LAST_HTTP_PORT "last_http_port"
#define INIT_ITEM_CURRENT_TRUNK_FILE_ID "current_trunk_file_id"
#define INIT_ITEM_TRUNK_LAST_COMPRESS_TIME "trunk_last_compress_time"
#define INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS \
"trunk_binlog_compress_in_progress"
#define INIT_ITEM_TRUNK_BINLOG_COMPRESS_STAGE \
"trunk_binlog_compress_stage"
#define INIT_ITEM_STORE_PATH_MARK_PREFIX "store_path_mark"
#define STAT_ITEM_TOTAL_UPLOAD "total_upload_count"
@ -670,10 +670,10 @@ int storage_write_to_sync_ini_file()
INIT_ITEM_LAST_SERVER_PORT, g_last_server_port,
INIT_ITEM_LAST_HTTP_PORT, g_last_http_port,
INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id,
INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time,
INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS,
g_trunk_binlog_compress_in_progress
);
INIT_ITEM_TRUNK_LAST_COMPRESS_TIME,
(int)g_trunk_last_compress_time,
INIT_ITEM_TRUNK_BINLOG_COMPRESS_STAGE,
g_trunk_binlog_compress_stage);
if (g_check_store_path_mark)
{
@ -1070,9 +1070,9 @@ static int storage_check_and_make_data_dirs()
INIT_ITEM_CURRENT_TRUNK_FILE_ID, &iniContext, 0);
g_trunk_last_compress_time = iniGetIntValue(NULL,
INIT_ITEM_TRUNK_LAST_COMPRESS_TIME , &iniContext, 0);
g_trunk_binlog_compress_in_progress = iniGetIntValue(NULL,
INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS,
&iniContext, 0);
g_trunk_binlog_compress_stage = iniGetIntValue(NULL,
INIT_ITEM_TRUNK_BINLOG_COMPRESS_STAGE,
&iniContext, STORAGE_TRUNK_COMPRESS_STAGE_NONE);
if ((result=storage_load_store_path_marks(&iniContext)) != 0)
{
@ -2302,6 +2302,11 @@ int storage_func_init(const char *filename, \
return result;
}
if ((result=storage_trunk_binlog_compress_check_recovery()) != 0)
{
return result;
}
if ((result=init_pthread_lock(&sync_stat_file_lock)) != 0)
{
return result;

View File

@ -4064,7 +4064,7 @@ static int storage_server_trunk_delete_binlog_marks(struct fast_task_info *pTask
return result;
}
return trunk_unlink_all_mark_files();
return trunk_unlink_all_mark_files(false);
}
/**

View File

@ -1262,7 +1262,7 @@ static void do_unset_trunk_server_myself(ConnectionInfo *pTrackerServer)
trunk_waiting_sync_thread_exit();
storage_trunk_destroy_ex(true);
storage_trunk_destroy_ex(true, true);
if (g_trunk_create_file_advance &&
g_trunk_create_file_interval > 0)
{

View File

@ -62,13 +62,14 @@ bool g_if_trunker_self = false;
bool g_trunk_create_file_advance = false;
bool g_trunk_init_check_occupying = false;
bool g_trunk_init_reload_from_binlog = false;
volatile int g_trunk_binlog_compress_in_progress = 0;
volatile int g_trunk_data_save_in_progress = 0;
static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE;
int g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_NONE;
int64_t g_trunk_total_free_space = 0;
int64_t g_trunk_create_file_space_threshold = 0;
time_t g_trunk_last_compress_time = 0;
static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE;
static volatile int trunk_binlog_compress_in_progress = 0;
static volatile int trunk_data_save_in_progress = 0;
static pthread_mutex_t trunk_file_lock;
static pthread_mutex_t trunk_mem_lock;
static struct fast_mblock_man free_blocks_man;
@ -272,7 +273,8 @@ int storage_trunk_init()
return 0;
}
int storage_trunk_destroy_ex(const bool bNeedSleep)
int storage_trunk_destroy_ex(const bool bNeedSleep,
const bool bSaveData)
{
int result;
int i;
@ -292,7 +294,14 @@ int storage_trunk_destroy_ex(const bool bNeedSleep)
logDebug("file: "__FILE__", line: %d, " \
"storage trunk destroy", __LINE__);
if (bSaveData)
{
result = storage_trunk_save();
}
else
{
result = 0;
}
for (i=0; i<g_fdfs_store_paths.count; i++)
{
@ -500,18 +509,97 @@ static int do_save_trunk_data()
static int storage_trunk_do_save()
{
int result;
if (__sync_add_and_fetch(&g_trunk_data_save_in_progress, 1) != 1)
if (__sync_add_and_fetch(&trunk_data_save_in_progress, 1) != 1)
{
__sync_sub_and_fetch(&g_trunk_data_save_in_progress, 1);
__sync_sub_and_fetch(&trunk_data_save_in_progress, 1);
logError("file: "__FILE__", line: %d, "
"trunk binlog compress already in progress, "
"g_trunk_data_save_in_progress=%d", __LINE__,
g_trunk_data_save_in_progress);
"trunk_data_save_in_progress=%d", __LINE__,
trunk_data_save_in_progress);
return EINPROGRESS;
}
result = do_save_trunk_data();
__sync_sub_and_fetch(&g_trunk_data_save_in_progress, 1);
__sync_sub_and_fetch(&trunk_data_save_in_progress, 1);
return result;
}
int storage_trunk_binlog_compress_check_recovery()
{
int result;
char tmp_filename[MAX_PATH_SIZE];
if (g_trunk_binlog_compress_stage ==
STORAGE_TRUNK_COMPRESS_STAGE_NONE ||
g_trunk_binlog_compress_stage ==
STORAGE_TRUNK_COMPRESS_STAGE_FINISHED)
{
return 0;
}
result = 0;
do {
if (g_trunk_binlog_compress_stage ==
STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGING)
{
get_trunk_binlog_tmp_filename(tmp_filename);
if (access(tmp_filename, F_OK) != 0)
{
if (errno == ENOENT)
{
g_trunk_binlog_compress_stage =
STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGE_DONE;
}
}
}
else if (g_trunk_binlog_compress_stage ==
STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGING)
{
get_trunk_binlog_tmp_filename(tmp_filename);
if (access(tmp_filename, F_OK) != 0)
{
if (errno == ENOENT)
{
g_trunk_binlog_compress_stage =
STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGE_DONE;
}
}
}
switch (g_trunk_binlog_compress_stage)
{
case STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_BEGIN:
case STORAGE_TRUNK_COMPRESS_STAGE_APPLY_DONE:
case STORAGE_TRUNK_COMPRESS_STAGE_SAVE_DONE:
case STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGING:
case STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGING:
result = trunk_binlog_compress_rollback();
break;
case STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGE_DONE:
if ((result=trunk_binlog_compress_delete_binlog_rollback_file(
true)) == 0)
{
result = trunk_binlog_compress_rollback();
}
break;
case STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGE_DONE:
if ((result=trunk_binlog_compress_delete_temp_files_after_commit()) != 0)
{
break;
}
case STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_SUCCESS:
/* unlink all mark files because the binlog file be compressed */
result = trunk_unlink_all_mark_files(true);
if (result == 0)
{
g_trunk_binlog_compress_stage =
STORAGE_TRUNK_COMPRESS_STAGE_FINISHED;
result = storage_write_to_sync_ini_file();
}
break;
}
} while (0);
return result;
}
@ -522,6 +610,18 @@ static int storage_trunk_compress()
int current_write_version;
int result;
if (!(g_trunk_binlog_compress_stage ==
STORAGE_TRUNK_COMPRESS_STAGE_NONE ||
g_trunk_binlog_compress_stage ==
STORAGE_TRUNK_COMPRESS_STAGE_FINISHED))
{
logWarning("file: "__FILE__", line: %d, "
"g_trunk_binlog_compress_stage = %d, "
"can't start trunk binglog compress!",
__LINE__, g_trunk_binlog_compress_stage);
return EAGAIN;
}
if (g_current_time - g_up_time < 600)
{
logWarning("file: "__FILE__", line: %d, "
@ -540,33 +640,48 @@ static int storage_trunk_compress()
return EALREADY;
}
if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 1) != 1)
if (__sync_add_and_fetch(&trunk_binlog_compress_in_progress, 1) != 1)
{
__sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1);
__sync_sub_and_fetch(&trunk_binlog_compress_in_progress, 1);
logError("file: "__FILE__", line: %d, "
"trunk binlog compress already in progress, "
"g_trunk_binlog_compress_in_progress=%d",
__LINE__, g_trunk_binlog_compress_in_progress);
"trunk_binlog_compress_in_progress=%d",
__LINE__, trunk_binlog_compress_in_progress);
return EINPROGRESS;
}
storage_write_to_sync_ini_file();
logInfo("file: "__FILE__", line: %d, "
"start compress trunk binlog ...", __LINE__);
do
{
if ((result=trunk_binlog_compress_delete_rollback_files(false)) != 0)
{
break;
}
g_trunk_binlog_compress_stage =
STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_BEGIN;
storage_write_to_sync_ini_file();
if ((result=trunk_binlog_compress_apply()) != 0)
{
break;
}
g_trunk_binlog_compress_stage =
STORAGE_TRUNK_COMPRESS_STAGE_APPLY_DONE;
storage_write_to_sync_ini_file();
if ((result=storage_trunk_do_save()) != 0)
{
trunk_binlog_compress_rollback();
break;
}
g_trunk_binlog_compress_stage =
STORAGE_TRUNK_COMPRESS_STAGE_SAVE_DONE;
storage_write_to_sync_ini_file();
if ((result=trunk_binlog_compress_commit()) != 0)
{
trunk_binlog_compress_rollback();
@ -575,10 +690,12 @@ static int storage_trunk_compress()
g_trunk_last_compress_time = g_current_time;
last_write_version = current_write_version;
/* unlink all mark files because the binlog file be compressed */
result = trunk_unlink_all_mark_files(true);
} while (0);
__sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1);
storage_write_to_sync_ini_file();
__sync_sub_and_fetch(&trunk_binlog_compress_in_progress, 1);
if (result == 0)
{
@ -586,10 +703,26 @@ static int storage_trunk_compress()
"compress trunk binlog successfully.", __LINE__);
}
else
{
if (g_trunk_binlog_compress_stage !=
STORAGE_TRUNK_COMPRESS_STAGE_FINISHED)
{
logCrit("file: "__FILE__", line: %d, "
"compress trunk binlog fail, "
"g_trunk_binlog_compress_stage = %d, "
"set g_if_trunker_self to false!", __LINE__,
g_trunk_binlog_compress_stage);
g_if_trunker_self = false;
trunk_waiting_sync_thread_exit();
storage_trunk_destroy_ex(true, false);
}
else
{
logError("file: "__FILE__", line: %d, "
"compress trunk binlog fail.", __LINE__);
}
}
return result;
}
@ -602,7 +735,7 @@ static int storage_trunk_save()
g_current_time - g_trunk_last_compress_time >
g_trunk_compress_binlog_min_interval))
{
if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 0) == 0)
if (__sync_add_and_fetch(&trunk_binlog_compress_in_progress, 0) == 0)
{
return storage_trunk_do_save();
}
@ -610,15 +743,16 @@ static int storage_trunk_save()
{
logWarning("file: "__FILE__", line: %d, "
"trunk binlog compress already in progress, "
"g_trunk_binlog_compress_in_progress=%d",
__LINE__, g_trunk_binlog_compress_in_progress);
"trunk_binlog_compress_in_progress=%d",
__LINE__, trunk_binlog_compress_in_progress);
return 0;
}
}
if ((result=storage_trunk_compress()) == 0)
{
return trunk_unlink_all_mark_files(); //because the binlog file be compressed
g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_FINISHED;
return storage_write_to_sync_ini_file();
}
return (result == EAGAIN || result == EALREADY ||
@ -634,19 +768,20 @@ int trunk_binlog_compress_func(void *args)
return 0;
}
result = storage_trunk_compress();
if (result != 0)
if ((result=storage_trunk_compress()) != 0)
{
return result;
}
if (!g_if_trunker_self)
{
return 0;
g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_FINISHED;
return storage_write_to_sync_ini_file();
}
trunk_sync_notify_thread_reset_offset();
return 0;
g_trunk_binlog_compress_stage = STORAGE_TRUNK_COMPRESS_STAGE_FINISHED;
return storage_write_to_sync_ini_file();
}
static bool storage_trunk_is_space_occupied(const FDFSTrunkFullInfo *pTrunkInfo)
@ -980,11 +1115,14 @@ static int storage_trunk_restore(const int64_t restore_offset)
trunk_mark_filename_by_reader(&reader, trunk_mark_filename);
if (unlink(trunk_mark_filename) != 0)
{
logError("file: "__FILE__", line: %d, " \
"unlink file %s fail, " \
"errno: %d, error info: %s", __LINE__, \
if (errno != ENOENT)
{
logError("file: "__FILE__", line: %d, "
"unlink file %s fail, "
"errno: %d, error info: %s", __LINE__,
trunk_mark_filename, errno, STRERROR(errno));
}
}
if (result != 0)
{
@ -1039,10 +1177,10 @@ int storage_delete_trunk_data_file()
result = errno != 0 ? errno : ENOENT;
if (result != ENOENT)
{
logError("file: "__FILE__", line: %d, " \
"unlink trunk data file: %s fail, " \
"errno: %d, error info: %s", \
__LINE__, trunk_data_filename, \
logError("file: "__FILE__", line: %d, "
"unlink trunk data file: %s fail, "
"errno: %d, error info: %s",
__LINE__, trunk_data_filename,
result, STRERROR(result));
}

View File

@ -22,6 +22,17 @@
#include "trunk_shared.h"
#include "fdfs_shared_func.h"
#define STORAGE_TRUNK_COMPRESS_STAGE_NONE 0
#define STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_BEGIN 1
#define STORAGE_TRUNK_COMPRESS_STAGE_APPLY_DONE 2
#define STORAGE_TRUNK_COMPRESS_STAGE_SAVE_DONE 3
#define STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGING 4
#define STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGE_DONE 5
#define STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_SUCCESS 6
#define STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGING 7
#define STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGE_DONE 8
#define STORAGE_TRUNK_COMPRESS_STAGE_FINISHED 9
#ifdef __cplusplus
extern "C" {
#endif
@ -44,7 +55,7 @@ extern bool g_if_use_trunk_file; //if use trunk file
extern bool g_trunk_create_file_advance;
extern bool g_trunk_init_check_occupying;
extern bool g_trunk_init_reload_from_binlog;
extern volatile int g_trunk_binlog_compress_in_progress;
extern int g_trunk_binlog_compress_stage;
extern bool g_if_trunker_self; //if am i trunk server
extern int64_t g_trunk_create_file_space_threshold;
extern int64_t g_trunk_total_free_space; //trunk total free space in bytes
@ -63,9 +74,10 @@ typedef struct {
} FDFSTrunkSlot;
int storage_trunk_init();
int storage_trunk_destroy_ex(const bool bNeedSleep);
int storage_trunk_destroy_ex(const bool bNeedSleep,
const bool bSaveData);
#define storage_trunk_destroy() storage_trunk_destroy_ex(false)
#define storage_trunk_destroy() storage_trunk_destroy_ex(false, true)
int trunk_alloc_space(const int size, FDFSTrunkFullInfo *pResult);
int trunk_alloc_confirm(const FDFSTrunkFullInfo *pTrunkInfo, const int status);
@ -92,6 +104,8 @@ int trunk_create_trunk_file_advance(void *args);
int trunk_binlog_compress_func(void *args);
int storage_trunk_binlog_compress_check_recovery();
int storage_delete_trunk_data_file();
char *storage_trunk_get_data_filename(char *full_filename);

File diff suppressed because it is too large Load Diff

View File

@ -28,8 +28,8 @@ extern "C" {
typedef struct
{
char storage_id[FDFS_STORAGE_ID_MAX_SIZE];
char mark_filename[MAX_PATH_SIZE];
BinLogBuffer binlog_buff;
int mark_fd;
int binlog_fd;
int64_t binlog_offset;
int64_t last_binlog_offset; //for write to mark file
@ -65,7 +65,7 @@ void trunk_waiting_sync_thread_exit();
char *get_trunk_binlog_filename(char *full_filename);
char *trunk_mark_filename_by_reader(const void *pArg, char *full_filename);
int trunk_unlink_all_mark_files();
int trunk_unlink_all_mark_files(const bool force_delete);
int trunk_unlink_mark_file(const char *storage_id);
int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \
const char *new_ip_addr, const int new_port);
@ -78,6 +78,9 @@ int trunk_reader_init(const FDFSStorageBrief *pStorage,
void trunk_reader_destroy(TrunkBinLogReader *pReader);
//trunk binlog compress
int trunk_binlog_compress_delete_binlog_rollback_file(const bool silence);
int trunk_binlog_compress_delete_rollback_files(const bool silence);
int trunk_binlog_compress_delete_temp_files_after_commit();
int trunk_binlog_compress_apply();
int trunk_binlog_compress_commit();
int trunk_binlog_compress_rollback();
@ -85,6 +88,14 @@ int trunk_binlog_compress_rollback();
int trunk_sync_notify_thread_reset_offset();
int trunk_binlog_get_write_version();
char *get_trunk_binlog_tmp_filename_ex(const char *binlog_filename,
char *tmp_filename);
static inline char *get_trunk_binlog_tmp_filename(char *tmp_filename)
{
return get_trunk_binlog_tmp_filename_ex(NULL, tmp_filename);
}
#ifdef __cplusplus
}
#endif