trunk server support compress the trunk binlog periodically

pull/484/head
YuQing 2019-12-14 21:03:35 +08:00
parent a49735ae5a
commit cf0ec7e4cf
14 changed files with 438 additions and 198 deletions

View File

@ -1,8 +1,11 @@
Version 6.05 2019-12-13 Version 6.05 2019-12-14
* fdfs_trackerd and fdfs_storaged print the server version in usage. * fdfs_trackerd and fdfs_storaged print the server version in usage.
you can execute fdfs_trackerd or fdfs_storaged without parameters you can execute fdfs_trackerd or fdfs_storaged without parameters
to show the server version to show the server version
* trunk server support compress the trunk binlog periodically,
config items in tracker.conf: trunk_compress_binlog_interval
and trunk_compress_binlog_time_base
Version 6.04 2019-12-05 Version 6.04 2019-12-05
* storage_report_ip_changed ignore result EEXIST * storage_report_ip_changed ignore result EEXIST

View File

@ -197,13 +197,26 @@ trunk_init_check_occupying = false
trunk_init_reload_from_binlog = false trunk_init_reload_from_binlog = false
# the min interval for compressing the trunk binlog file # the min interval for compressing the trunk binlog file
# unit: second # unit: second, 0 means never compress
# FastDFS compress the trunk binlog when trunk init and trunk destroy # FastDFS compress the trunk binlog when trunk init and trunk destroy
# recommand to set this parameter to 86400 (one day) # recommand to set this parameter to 86400 (one day)
# default value is 0, 0 means never compress # default value is 0
# since V5.01 # since V5.01
trunk_compress_binlog_min_interval = 86400 trunk_compress_binlog_min_interval = 86400
# the interval for compressing the trunk binlog file
# unit: second, 0 means never compress
# recommand to set this parameter to 86400 (one day)
# default value is 0
# since V6.05
trunk_compress_binlog_interval = 86400
# compress the trunk binlog time base, time format: Hour:Minute
# Hour from 0 to 23, Minute from 0 to 59
# default value is 03:00
# since V6.05
trunk_compress_binlog_time_base = 03:00
# if use storage server ID instead of IP address # if use storage server ID instead of IP address
# if you want to use dual IPs for storage server, you MUST set # if you want to use dual IPs for storage server, you MUST set
# this parameter to true, and configure the dual IPs in the file # this parameter to true, and configure the dual IPs in the file

View File

@ -73,6 +73,8 @@ typedef struct
#define INIT_ITEM_LAST_HTTP_PORT "last_http_port" #define INIT_ITEM_LAST_HTTP_PORT "last_http_port"
#define INIT_ITEM_CURRENT_TRUNK_FILE_ID "current_trunk_file_id" #define INIT_ITEM_CURRENT_TRUNK_FILE_ID "current_trunk_file_id"
#define INIT_ITEM_TRUNK_LAST_COMPRESS_TIME "trunk_last_compress_time" #define INIT_ITEM_TRUNK_LAST_COMPRESS_TIME "trunk_last_compress_time"
#define INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS \
"trunk_binlog_compress_in_progress"
#define INIT_ITEM_STORE_PATH_MARK_PREFIX "store_path_mark" #define INIT_ITEM_STORE_PATH_MARK_PREFIX "store_path_mark"
#define STAT_ITEM_TOTAL_UPLOAD "total_upload_count" #define STAT_ITEM_TOTAL_UPLOAD "total_upload_count"
@ -658,6 +660,7 @@ int storage_write_to_sync_ini_file()
"%s=%d\n" "%s=%d\n"
"%s=%d\n" "%s=%d\n"
"%s=%d\n" "%s=%d\n"
"%s=%d\n"
"%s=%d\n", "%s=%d\n",
INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time, INIT_ITEM_STORAGE_JOIN_TIME, g_storage_join_time,
INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done, INIT_ITEM_SYNC_OLD_DONE, g_sync_old_done,
@ -667,10 +670,11 @@ int storage_write_to_sync_ini_file()
INIT_ITEM_LAST_SERVER_PORT, g_last_server_port, INIT_ITEM_LAST_SERVER_PORT, g_last_server_port,
INIT_ITEM_LAST_HTTP_PORT, g_last_http_port, INIT_ITEM_LAST_HTTP_PORT, g_last_http_port,
INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id, INIT_ITEM_CURRENT_TRUNK_FILE_ID, g_current_trunk_file_id,
INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time INIT_ITEM_TRUNK_LAST_COMPRESS_TIME, (int)g_trunk_last_compress_time,
INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS,
g_trunk_binlog_compress_in_progress
); );
if (g_check_store_path_mark) if (g_check_store_path_mark)
{ {
for (i=0; i<g_fdfs_store_paths.count; i++) for (i=0; i<g_fdfs_store_paths.count; i++)
@ -1062,10 +1066,13 @@ static int storage_check_and_make_data_dirs()
g_last_http_port = atoi(pValue); g_last_http_port = atoi(pValue);
} }
g_current_trunk_file_id = iniGetIntValue(NULL, \ g_current_trunk_file_id = iniGetIntValue(NULL,
INIT_ITEM_CURRENT_TRUNK_FILE_ID, &iniContext, 0); INIT_ITEM_CURRENT_TRUNK_FILE_ID, &iniContext, 0);
g_trunk_last_compress_time = iniGetIntValue(NULL, \ g_trunk_last_compress_time = iniGetIntValue(NULL,
INIT_ITEM_TRUNK_LAST_COMPRESS_TIME , &iniContext, 0); INIT_ITEM_TRUNK_LAST_COMPRESS_TIME , &iniContext, 0);
g_trunk_binlog_compress_in_progress = iniGetIntValue(NULL,
INIT_ITEM_TRUNK_BINLOG_COMPRESS_IN_PROGRESS,
&iniContext, 0);
if ((result=storage_load_store_path_marks(&iniContext)) != 0) if ((result=storage_load_store_path_marks(&iniContext)) != 0)
{ {

View File

@ -132,12 +132,20 @@ int storage_get_params_from_tracker()
"trunk_create_file_space_threshold", \ "trunk_create_file_space_threshold", \
&iniContext, 0); &iniContext, 0);
g_trunk_init_check_occupying = iniGetBoolValue(NULL, \ g_trunk_init_check_occupying = iniGetBoolValue(NULL,
"trunk_init_check_occupying", &iniContext, false); "trunk_init_check_occupying", &iniContext, false);
g_trunk_init_reload_from_binlog = iniGetBoolValue(NULL, \ g_trunk_init_reload_from_binlog = iniGetBoolValue(NULL,
"trunk_init_reload_from_binlog", &iniContext, false); "trunk_init_reload_from_binlog", &iniContext, false);
g_trunk_compress_binlog_min_interval = iniGetIntValue(NULL, \ g_trunk_compress_binlog_min_interval = iniGetIntValue(NULL,
"trunk_compress_binlog_min_interval", &iniContext, 0); "trunk_compress_binlog_min_interval", &iniContext, 0);
g_trunk_compress_binlog_interval = iniGetIntValue(NULL,
"trunk_compress_binlog_interval", &iniContext, 0);
if ((result=get_time_item_from_conf(&iniContext,
"trunk_compress_binlog_time_base",
&g_trunk_compress_binlog_time_base, 3, 0)) != 0)
{
return result;
}
g_store_slave_file_use_link = iniGetBoolValue(NULL, \ g_store_slave_file_use_link = iniGetBoolValue(NULL, \
"store_slave_file_use_link", &iniContext, false); "store_slave_file_use_link", &iniContext, false);
@ -187,6 +195,8 @@ int storage_get_params_from_tracker()
"trunk_init_check_occupying=%d, " \ "trunk_init_check_occupying=%d, " \
"trunk_init_reload_from_binlog=%d, " \ "trunk_init_reload_from_binlog=%d, " \
"trunk_compress_binlog_min_interval=%d, " \ "trunk_compress_binlog_min_interval=%d, " \
"trunk_compress_binlog_interval=%d, " \
"trunk_compress_binlog_time_base=%02d:%02d, " \
"store_slave_file_use_link=%d", \ "store_slave_file_use_link=%d", \
__LINE__, g_use_storage_id, \ __LINE__, g_use_storage_id, \
g_id_type_in_filename == FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \ g_id_type_in_filename == FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \
@ -204,6 +214,9 @@ int storage_get_params_from_tracker()
(FDFS_ONE_MB * 1024)), g_trunk_init_check_occupying, \ (FDFS_ONE_MB * 1024)), g_trunk_init_check_occupying, \
g_trunk_init_reload_from_binlog, \ g_trunk_init_reload_from_binlog, \
g_trunk_compress_binlog_min_interval, \ g_trunk_compress_binlog_min_interval, \
g_trunk_compress_binlog_interval, \
g_trunk_compress_binlog_time_base.hour, \
g_trunk_compress_binlog_time_base.minute, \
g_store_slave_file_use_link); g_store_slave_file_use_link);
if (g_use_storage_id && *g_sync_src_id != '\0' && \ if (g_use_storage_id && *g_sync_src_id != '\0' && \

View File

@ -3037,7 +3037,8 @@ static void* storage_sync_thread_entrance(void* arg)
__LINE__, pStorage->ip_addr, local_ip_addr); __LINE__, pStorage->ip_addr, local_ip_addr);
*/ */
if (is_local_host_ip(pStorage->ip_addr)) if (strcmp(pStorage->id, g_my_server_id_str) == 0 ||
is_local_host_ip(pStorage->ip_addr))
{ //can't self sync to self { //can't self sync to self
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, " \
"ip_addr %s belong to the local host," \ "ip_addr %s belong to the local host," \
@ -3263,11 +3264,11 @@ int storage_sync_thread_start(const FDFSStorageBrief *pStorage)
return 0; return 0;
} }
if (storage_server_is_myself(pStorage) || \ if (strcmp(pStorage->id, g_my_server_id_str) == 0 ||
is_local_host_ip(pStorage->ip_addr)) //can't self sync to self is_local_host_ip(pStorage->ip_addr)) //can't self sync to self
{ {
logWarning("file: "__FILE__", line: %d, " \ logWarning("file: "__FILE__", line: %d, "
"storage id: %s is myself, can't start sync thread!", \ "storage id: %s is myself, can't start sync thread!",
__LINE__, pStorage->id); __LINE__, pStorage->id);
return 0; return 0;
} }

View File

@ -37,7 +37,8 @@
#include "trunk_sync.h" #include "trunk_sync.h"
#include "storage_param_getter.h" #include "storage_param_getter.h"
#define TRUNK_FILE_CREATOR_TASK_ID 88 #define TRUNK_FILE_CREATOR_TASK_ID 88
#define TRUNK_BINLOG_COMPRESS_TASK_ID 89
static pthread_mutex_t reporter_thread_lock; static pthread_mutex_t reporter_thread_lock;
@ -876,9 +877,11 @@ static int tracker_merge_servers(ConnectionInfo *pTrackerServer,
FDFS_STORAGE_STATUS_SYNCING)) && \ FDFS_STORAGE_STATUS_SYNCING)) && \
((*ppFound)->server.status > pServer->status)) ((*ppFound)->server.status > pServer->status))
{ {
pServer->id[FDFS_STORAGE_ID_MAX_SIZE - 1] = '\0';
*(pServer->ip_addr + IP_ADDRESS_SIZE - 1) = '\0'; *(pServer->ip_addr + IP_ADDRESS_SIZE - 1) = '\0';
if (is_local_host_ip(pServer->ip_addr) && \ if ((strcmp(pServer->id, g_my_server_id_str) == 0) ||
buff2int(pServer->port) == g_server_port) (is_local_host_ip(pServer->ip_addr) &&
buff2int(pServer->port) == g_server_port))
{ {
need_rejoin_tracker = true; need_rejoin_tracker = true;
logWarning("file: "__FILE__", line: %d, " \ logWarning("file: "__FILE__", line: %d, " \
@ -1204,6 +1207,74 @@ static void set_trunk_server(const char *ip_addr, const int port)
} }
} }
static int do_set_trunk_server_myself(ConnectionInfo *pTrackerServer)
{
int result;
ScheduleArray scheduleArray;
ScheduleEntry entries[2];
ScheduleEntry *entry;
tracker_fetch_trunk_fid(pTrackerServer);
g_if_trunker_self = true;
if ((result=storage_trunk_init()) != 0)
{
return result;
}
scheduleArray.entries = entries;
entry = entries;
if (g_trunk_create_file_advance &&
g_trunk_create_file_interval > 0)
{
INIT_SCHEDULE_ENTRY_EX(*entry, TRUNK_FILE_CREATOR_TASK_ID,
g_trunk_create_file_time_base,
g_trunk_create_file_interval,
trunk_create_trunk_file_advance, NULL);
entry->new_thread = true;
entry++;
}
if (g_trunk_compress_binlog_interval > 0)
{
INIT_SCHEDULE_ENTRY_EX(*entry, TRUNK_BINLOG_COMPRESS_TASK_ID,
g_trunk_compress_binlog_time_base,
g_trunk_compress_binlog_interval,
trunk_binlog_compress_func, NULL);
entry->new_thread = true;
entry++;
}
scheduleArray.count = entry - entries;
if (scheduleArray.count > 0)
{
sched_add_entries(&scheduleArray);
}
trunk_sync_thread_start_all();
return 0;
}
static void do_unset_trunk_server_myself(ConnectionInfo *pTrackerServer)
{
tracker_report_trunk_fid(pTrackerServer);
g_if_trunker_self = false;
trunk_waiting_sync_thread_exit();
storage_trunk_destroy_ex(true);
if (g_trunk_create_file_advance &&
g_trunk_create_file_interval > 0)
{
sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID);
}
if (g_trunk_compress_binlog_interval > 0)
{
sched_del_entry(TRUNK_BINLOG_COMPRESS_TASK_ID);
}
}
static int tracker_check_response(ConnectionInfo *pTrackerServer, static int tracker_check_response(ConnectionInfo *pTrackerServer,
const int tracker_index, bool *bServerPortChanged) const int tracker_index, bool *bServerPortChanged)
{ {
@ -1384,11 +1455,13 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer,
{ {
int port; int port;
pBriefServers->id[FDFS_STORAGE_ID_MAX_SIZE - 1] = '\0';
pBriefServers->ip_addr[IP_ADDRESS_SIZE - 1] = '\0'; pBriefServers->ip_addr[IP_ADDRESS_SIZE - 1] = '\0';
port = buff2int(pBriefServers->port); port = buff2int(pBriefServers->port);
set_trunk_server(pBriefServers->ip_addr, port); set_trunk_server(pBriefServers->ip_addr, port);
if (is_local_host_ip(pBriefServers->ip_addr) && if ((strcmp(pBriefServers->id, g_my_server_id_str) == 0) ||
port == g_server_port) (is_local_host_ip(pBriefServers->ip_addr) &&
port == g_server_port))
{ {
if (g_if_trunker_self) if (g_if_trunker_self)
{ {
@ -1403,32 +1476,10 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer,
"I am the the trunk server %s:%d", __LINE__, "I am the the trunk server %s:%d", __LINE__,
pBriefServers->ip_addr, port); pBriefServers->ip_addr, port);
tracker_fetch_trunk_fid(pTrackerServer); if ((result=do_set_trunk_server_myself(pTrackerServer)) != 0)
g_if_trunker_self = true; {
return result;
if ((result=storage_trunk_init()) != 0) }
{
return result;
}
if (g_trunk_create_file_advance &&
g_trunk_create_file_interval > 0)
{
ScheduleArray scheduleArray;
ScheduleEntry entries[1];
entries[0].id = TRUNK_FILE_CREATOR_TASK_ID;
entries[0].time_base = g_trunk_create_file_time_base;
entries[0].interval = g_trunk_create_file_interval;
entries[0].task_func = trunk_create_trunk_file_advance;
entries[0].func_args = NULL;
scheduleArray.count = 1;
scheduleArray.entries = entries;
sched_add_entries(&scheduleArray);
}
trunk_sync_thread_start_all();
} }
} }
else else
@ -1440,46 +1491,13 @@ static int tracker_check_response(ConnectionInfo *pTrackerServer,
if (g_if_trunker_self) if (g_if_trunker_self)
{ {
int saved_trunk_sync_thread_count;
logWarning("file: "__FILE__", line: %d, " \ logWarning("file: "__FILE__", line: %d, " \
"I am the old trunk server, " \ "I am the old trunk server, " \
"the new trunk server is %s:%d", \ "the new trunk server is %s:%d", \
__LINE__, g_trunk_server.connections[0].ip_addr, \ __LINE__, g_trunk_server.connections[0].ip_addr, \
g_trunk_server.connections[0].port); g_trunk_server.connections[0].port);
tracker_report_trunk_fid(pTrackerServer); do_unset_trunk_server_myself(pTrackerServer);
g_if_trunker_self = false;
saved_trunk_sync_thread_count = \
g_trunk_sync_thread_count;
if (saved_trunk_sync_thread_count > 0)
{
logInfo("file: "__FILE__", line: %d, "\
"waiting %d trunk sync " \
"threads exit ...", __LINE__, \
saved_trunk_sync_thread_count);
}
while (g_trunk_sync_thread_count > 0)
{
usleep(50000);
}
if (saved_trunk_sync_thread_count > 0)
{
logInfo("file: "__FILE__", line: %d, " \
"%d trunk sync threads exited",\
__LINE__, \
saved_trunk_sync_thread_count);
}
storage_trunk_destroy_ex(true);
if (g_trunk_create_file_advance && \
g_trunk_create_file_interval > 0)
{
sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID);
}
} }
} }
} }

View File

@ -52,14 +52,18 @@ int g_avg_storage_reserved_mb = FDFS_DEF_STORAGE_RESERVED_MB;
int g_store_path_index = 0; int g_store_path_index = 0;
int g_current_trunk_file_id = 0; int g_current_trunk_file_id = 0;
TimeInfo g_trunk_create_file_time_base = {0, 0}; TimeInfo g_trunk_create_file_time_base = {0, 0};
TimeInfo g_trunk_compress_binlog_time_base = {0, 0};
int g_trunk_create_file_interval = 86400; int g_trunk_create_file_interval = 86400;
int g_trunk_compress_binlog_min_interval = 0; int g_trunk_compress_binlog_min_interval = 0;
int g_trunk_compress_binlog_interval = 0;
TrackerServerInfo g_trunk_server = {0, 0}; TrackerServerInfo g_trunk_server = {0, 0};
bool g_if_use_trunk_file = false; bool g_if_use_trunk_file = false;
bool g_if_trunker_self = false; bool g_if_trunker_self = false;
bool g_trunk_create_file_advance = false; bool g_trunk_create_file_advance = false;
bool g_trunk_init_check_occupying = false; bool g_trunk_init_check_occupying = false;
bool g_trunk_init_reload_from_binlog = false; bool g_trunk_init_reload_from_binlog = false;
volatile int g_trunk_binlog_compress_in_progress = 0;
volatile int g_trunk_data_save_in_progress = 0;
static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE; static byte trunk_init_flag = STORAGE_TRUNK_INIT_FLAG_NONE;
int64_t g_trunk_total_free_space = 0; int64_t g_trunk_total_free_space = 0;
int64_t g_trunk_create_file_space_threshold = 0; int64_t g_trunk_create_file_space_threshold = 0;
@ -387,7 +391,7 @@ static int tree_walk_callback(void *data, void *args)
return 0; return 0;
} }
static int storage_trunk_do_save() static int do_save_trunk_data()
{ {
int64_t trunk_binlog_size; int64_t trunk_binlog_size;
char trunk_data_filename[MAX_PATH_SIZE]; char trunk_data_filename[MAX_PATH_SIZE];
@ -493,40 +497,133 @@ static int storage_trunk_do_save()
return result; return result;
} }
static int storage_trunk_save() static int storage_trunk_do_save()
{ {
int result; int result;
if (__sync_add_and_fetch(&g_trunk_data_save_in_progress, 1) != 1)
{
__sync_sub_and_fetch(&g_trunk_data_save_in_progress, 1);
logError("file: "__FILE__", line: %d, "
"trunk binlog compress already in progress, "
"g_trunk_data_save_in_progress=%d", __LINE__,
g_trunk_data_save_in_progress);
return EINPROGRESS;
}
if (!(g_trunk_compress_binlog_min_interval > 0 && \ result = do_save_trunk_data();
__sync_sub_and_fetch(&g_trunk_data_save_in_progress, 1);
return result;
}
static int storage_trunk_compress()
{
int result;
if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 1) != 1)
{
__sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1);
logError("file: "__FILE__", line: %d, "
"trunk binlog compress already in progress, "
"g_trunk_binlog_compress_in_progress=%d",
__LINE__, g_trunk_binlog_compress_in_progress);
return EINPROGRESS;
}
storage_write_to_sync_ini_file();
logInfo("file: "__FILE__", line: %d, "
"start compress trunk binlog ...", __LINE__);
do
{
if ((result=trunk_binlog_compress_apply()) != 0)
{
break;
}
if ((result=storage_trunk_do_save()) != 0)
{
trunk_binlog_compress_rollback();
break;
}
if ((result=trunk_binlog_compress_commit()) != 0)
{
trunk_binlog_compress_rollback();
break;
}
g_trunk_last_compress_time = g_current_time;
} while (0);
__sync_sub_and_fetch(&g_trunk_binlog_compress_in_progress, 1);
storage_write_to_sync_ini_file();
if (result == 0)
{
logInfo("file: "__FILE__", line: %d, "
"compress trunk binlog successfully.", __LINE__);
return trunk_unlink_all_mark_files(); //because the binlog file be compressed
}
else
{
logError("file: "__FILE__", line: %d, "
"compress trunk binlog fail.", __LINE__);
}
return result;
}
static int storage_trunk_save()
{
if (!(g_trunk_compress_binlog_min_interval > 0 &&
g_current_time - g_trunk_last_compress_time > g_current_time - g_trunk_last_compress_time >
g_trunk_compress_binlog_min_interval)) g_trunk_compress_binlog_min_interval))
{ {
return storage_trunk_do_save(); if (__sync_add_and_fetch(&g_trunk_binlog_compress_in_progress, 0) == 0)
{
return storage_trunk_do_save();
}
else
{
logWarning("file: "__FILE__", line: %d, "
"trunk binlog compress already in progress, "
"g_trunk_binlog_compress_in_progress=%d",
__LINE__, g_trunk_binlog_compress_in_progress);
return 0;
}
} }
return storage_trunk_compress();
}
logInfo("start compress trunk binlog ..."); int trunk_binlog_compress_func(void *args)
if ((result=trunk_binlog_compress_apply()) != 0) {
{ int result;
return result;
}
if ((result=storage_trunk_do_save()) != 0) if (!g_if_trunker_self)
{ {
trunk_binlog_compress_rollback(); return 0;
return result; }
}
if ((result=trunk_binlog_compress_commit()) != 0) result = storage_trunk_compress();
{ if (result != 0)
trunk_binlog_compress_rollback(); {
return result; return result;
} }
g_trunk_last_compress_time = g_current_time; if (!g_if_trunker_self)
storage_write_to_sync_ini_file(); {
return 0;
}
logInfo("compress trunk binlog done."); g_if_trunker_self = false; //for sync thread exit
return trunk_unlink_all_mark_files(); //because the binlog file be compressed trunk_waiting_sync_thread_exit();
g_if_trunker_self = true; //restore to true
trunk_sync_thread_start_all();
return 0;
} }
static bool storage_trunk_is_space_occupied(const FDFSTrunkFullInfo *pTrunkInfo) static bool storage_trunk_is_space_occupied(const FDFSTrunkFullInfo *pTrunkInfo)

View File

@ -35,13 +35,16 @@ extern int g_avg_storage_reserved_mb; //calc by above var: g_storage_reserved_m
extern int g_store_path_index; //store to which path extern int g_store_path_index; //store to which path
extern int g_current_trunk_file_id; //current trunk file id extern int g_current_trunk_file_id; //current trunk file id
extern TimeInfo g_trunk_create_file_time_base; extern TimeInfo g_trunk_create_file_time_base;
extern TimeInfo g_trunk_compress_binlog_time_base;
extern int g_trunk_create_file_interval; extern int g_trunk_create_file_interval;
extern int g_trunk_compress_binlog_min_interval; extern int g_trunk_compress_binlog_min_interval;
extern int g_trunk_compress_binlog_interval;
extern TrackerServerInfo g_trunk_server; //the trunk server extern TrackerServerInfo g_trunk_server; //the trunk server
extern bool g_if_use_trunk_file; //if use trunk file extern bool g_if_use_trunk_file; //if use trunk file
extern bool g_trunk_create_file_advance; extern bool g_trunk_create_file_advance;
extern bool g_trunk_init_check_occupying; extern bool g_trunk_init_check_occupying;
extern bool g_trunk_init_reload_from_binlog; extern bool g_trunk_init_reload_from_binlog;
extern volatile int g_trunk_binlog_compress_in_progress;
extern bool g_if_trunker_self; //if am i trunk server extern bool g_if_trunker_self; //if am i trunk server
extern int64_t g_trunk_create_file_space_threshold; extern int64_t g_trunk_create_file_space_threshold;
extern int64_t g_trunk_total_free_space; //trunk total free space in bytes extern int64_t g_trunk_total_free_space; //trunk total free space in bytes
@ -87,6 +90,8 @@ int trunk_file_delete(const char *trunk_filename, \
int trunk_create_trunk_file_advance(void *args); int trunk_create_trunk_file_advance(void *args);
int trunk_binlog_compress_func(void *args);
int storage_delete_trunk_data_file(); int storage_delete_trunk_data_file();
char *storage_trunk_get_data_filename(char *full_filename); char *storage_trunk_get_data_filename(char *full_filename);

View File

@ -313,29 +313,35 @@ int trunk_binlog_compress_apply()
return 0; return 0;
} }
if ((result=trunk_binlog_close_writer(true)) != 0) pthread_mutex_lock(&trunk_sync_thread_lock);
{
return result;
}
if (rename(binlog_filename, rollback_filename) != 0) do
{ {
result = errno != 0 ? errno : EIO; if ((result=trunk_binlog_close_writer(false)) != 0)
logError("file: "__FILE__", line: %d, " \ {
"rename %s to %s fail, " \ break;
"errno: %d, error info: %s", }
__LINE__, binlog_filename, rollback_filename,
result, STRERROR(result));
return result;
}
if ((result=trunk_binlog_open_writer(binlog_filename)) != 0) if (rename(binlog_filename, rollback_filename) != 0)
{ {
rename(rollback_filename, binlog_filename); //rollback result = errno != 0 ? errno : EIO;
return result; logError("file: "__FILE__", line: %d, " \
} "rename %s to %s fail, " \
"errno: %d, error info: %s",
__LINE__, binlog_filename, rollback_filename,
result, STRERROR(result));
break;
}
return 0; if ((result=trunk_binlog_open_writer(binlog_filename)) != 0)
{
rename(rollback_filename, binlog_filename); //rollback
break;
}
} while (0);
pthread_mutex_unlock(&trunk_sync_thread_lock);
return result;
} }
static int trunk_binlog_open_read(const char *filename, static int trunk_binlog_open_read(const char *filename,
@ -478,46 +484,53 @@ int trunk_binlog_compress_commit()
return errno != 0 ? errno : ENOENT; return errno != 0 ? errno : ENOENT;
} }
pthread_mutex_lock(&trunk_sync_thread_lock);
if (need_open_binlog) if (need_open_binlog)
{ {
trunk_binlog_close_writer(true); trunk_binlog_close_writer(false);
} }
result = trunk_binlog_merge_file(data_fd); do
close(data_fd); {
if (result != 0) result = trunk_binlog_merge_file(data_fd);
{ close(data_fd);
return result; if (result != 0)
} {
if (unlink(data_filename) != 0) break;
{ }
result = errno != 0 ? errno : EPERM; if (unlink(data_filename) != 0)
logError("file: "__FILE__", line: %d, " \ {
"unlink %s fail, errno: %d, error info: %s", result = errno != 0 ? errno : EPERM;
__LINE__, data_filename, logError("file: "__FILE__", line: %d, "
result, STRERROR(result)); "unlink %s fail, errno: %d, error info: %s",
return result; __LINE__, data_filename,
} result, STRERROR(result));
break;
}
get_trunk_rollback_filename(rollback_filename); get_trunk_rollback_filename(rollback_filename);
if (access(rollback_filename, F_OK) == 0) if (access(rollback_filename, F_OK) == 0)
{ {
if (unlink(rollback_filename) != 0) if (unlink(rollback_filename) != 0)
{ {
result = errno != 0 ? errno : EPERM; result = errno != 0 ? errno : EPERM;
logWarning("file: "__FILE__", line: %d, " \ logWarning("file: "__FILE__", line: %d, "
"unlink %s fail, errno: %d, error info: %s", "unlink %s fail, errno: %d, error info: %s",
__LINE__, rollback_filename, __LINE__, rollback_filename,
result, STRERROR(result)); result, STRERROR(result));
} break;
} }
}
if (need_open_binlog) if (need_open_binlog)
{ {
return trunk_binlog_open_writer(binlog_filename); result = trunk_binlog_open_writer(binlog_filename);
} }
} while (0);
return 0; pthread_mutex_unlock(&trunk_sync_thread_lock);
return result;
} }
int trunk_binlog_compress_rollback() int trunk_binlog_compress_rollback()
@ -557,7 +570,7 @@ int trunk_binlog_compress_rollback()
{ {
return 0; return 0;
} }
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"stat file %s fail, errno: %d, error info: %s", "stat file %s fail, errno: %d, error info: %s",
__LINE__, rollback_filename, __LINE__, rollback_filename,
result, STRERROR(result)); result, STRERROR(result));
@ -570,38 +583,46 @@ int trunk_binlog_compress_rollback()
return 0; return 0;
} }
if ((result=trunk_binlog_close_writer(true)) != 0) pthread_mutex_lock(&trunk_sync_thread_lock);
{ do
return result; {
} if ((result=trunk_binlog_close_writer(false)) != 0)
{
break;
}
if ((rollback_fd=trunk_binlog_open_read(rollback_filename, if ((rollback_fd=trunk_binlog_open_read(rollback_filename,
false)) < 0) false)) < 0)
{ {
return errno != 0 ? errno : ENOENT; result = errno != 0 ? errno : ENOENT;
} break;
}
result = trunk_binlog_merge_file(rollback_fd); result = trunk_binlog_merge_file(rollback_fd);
close(rollback_fd); close(rollback_fd);
if (result == 0) if (result == 0)
{ {
if (unlink(rollback_filename) != 0) if (unlink(rollback_filename) != 0)
{ {
result = errno != 0 ? errno : EPERM; result = errno != 0 ? errno : EPERM;
logWarning("file: "__FILE__", line: %d, " \ logWarning("file: "__FILE__", line: %d, " \
"unlink %s fail, " \ "unlink %s fail, " \
"errno: %d, error info: %s", "errno: %d, error info: %s",
__LINE__, rollback_filename, __LINE__, rollback_filename,
result, STRERROR(result)); result, STRERROR(result));
} break;
}
return trunk_binlog_open_writer(binlog_filename); result = trunk_binlog_open_writer(binlog_filename);
} }
else else
{ {
trunk_binlog_open_writer(binlog_filename); result = trunk_binlog_open_writer(binlog_filename);
return result; }
} } while (0);
pthread_mutex_unlock(&trunk_sync_thread_lock);
return result;
} }
static int trunk_binlog_fsync_ex(const bool bNeedLock, \ static int trunk_binlog_fsync_ex(const bool bNeedLock, \
@ -1221,23 +1242,23 @@ int trunk_unlink_mark_file(const char *storage_id)
t = g_current_time; t = g_current_time;
localtime_r(&t, &tm); localtime_r(&t, &tm);
trunk_get_mark_filename_by_id(storage_id, old_filename, \ trunk_get_mark_filename_by_id(storage_id, old_filename,
sizeof(old_filename)); sizeof(old_filename));
if (!fileExists(old_filename)) if (!fileExists(old_filename))
{ {
return ENOENT; return ENOENT;
} }
snprintf(new_filename, sizeof(new_filename), \ snprintf(new_filename, sizeof(new_filename),
"%s.%04d%02d%02d%02d%02d%02d", old_filename, \ "%s.%04d%02d%02d%02d%02d%02d", old_filename,
tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, \ tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday,
tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_hour, tm.tm_min, tm.tm_sec);
if (rename(old_filename, new_filename) != 0) if (rename(old_filename, new_filename) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"rename file %s to %s fail" \ "rename file %s to %s fail, "
", errno: %d, error info: %s", \ "errno: %d, error info: %s",
__LINE__, old_filename, new_filename, \ __LINE__, old_filename, new_filename,
errno, STRERROR(errno)); errno, STRERROR(errno));
return errno != 0 ? errno : EACCES; return errno != 0 ? errno : EACCES;
} }
@ -1470,7 +1491,8 @@ static void* trunk_sync_thread_entrance(void* arg)
__LINE__, pStorage->ip_addr, local_ip_addr); __LINE__, pStorage->ip_addr, local_ip_addr);
*/ */
if (is_local_host_ip(pStorage->ip_addr)) if ((strcmp(pStorage->id, g_my_server_id_str) == 0) ||
is_local_host_ip(pStorage->ip_addr))
{ //can't self sync to self { //can't self sync to self
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, " \
"ip_addr %s belong to the local host," \ "ip_addr %s belong to the local host," \
@ -1630,7 +1652,8 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage)
return 0; return 0;
} }
if (is_local_host_ip(pStorage->ip_addr)) //can't self sync to self if ((strcmp(pStorage->id, g_my_server_id_str) == 0) ||
is_local_host_ip(pStorage->ip_addr)) //can't self sync to self
{ {
return 0; return 0;
} }
@ -1695,6 +1718,42 @@ int trunk_sync_thread_start(const FDFSStorageBrief *pStorage)
return 0; return 0;
} }
void trunk_waiting_sync_thread_exit()
{
int saved_trunk_sync_thread_count;
int count;
saved_trunk_sync_thread_count = g_trunk_sync_thread_count;
if (saved_trunk_sync_thread_count > 0)
{
logInfo("file: "__FILE__", line: %d, "
"waiting %d trunk sync threads exit ...",
__LINE__, saved_trunk_sync_thread_count);
}
count = 0;
while (g_trunk_sync_thread_count > 0 && count < 20)
{
usleep(50000);
count++;
}
if (g_trunk_sync_thread_count > 0)
{
logWarning("file: "__FILE__", line: %d, "
"kill %d trunk sync threads.",
__LINE__, g_trunk_sync_thread_count);
kill_trunk_sync_threads();
}
if (saved_trunk_sync_thread_count > 0)
{
logInfo("file: "__FILE__", line: %d, "
"%d trunk sync threads exited",
__LINE__, saved_trunk_sync_thread_count);
}
}
int trunk_unlink_all_mark_files() int trunk_unlink_all_mark_files()
{ {
FDFSStorageServer *pStorageServer; FDFSStorageServer *pStorageServer;
@ -1710,7 +1769,7 @@ int trunk_unlink_all_mark_files()
continue; continue;
} }
if ((result=trunk_unlink_mark_file( \ if ((result=trunk_unlink_mark_file(
pStorageServer->server.id)) != 0) pStorageServer->server.id)) != 0)
{ {
if (result != ENOENT) if (result != ENOENT)

View File

@ -61,6 +61,7 @@ int trunk_sync_thread_start_all();
int trunk_sync_thread_start(const FDFSStorageBrief *pStorage); int trunk_sync_thread_start(const FDFSStorageBrief *pStorage);
int kill_trunk_sync_threads(); int kill_trunk_sync_threads();
int trunk_binlog_sync_func(void *args); int trunk_binlog_sync_func(void *args);
void trunk_waiting_sync_thread_exit();
char *get_trunk_binlog_filename(char *full_filename); char *get_trunk_binlog_filename(char *full_filename);
char *trunk_mark_filename_by_reader(const void *pArg, char *full_filename); char *trunk_mark_filename_by_reader(const void *pArg, char *full_filename);

View File

@ -563,9 +563,18 @@ int tracker_load_from_conf_file(const char *filename, \
{ {
return result; return result;
} }
g_trunk_compress_binlog_min_interval = iniGetIntValue(NULL, \ g_trunk_compress_binlog_min_interval = iniGetIntValue(NULL,
"trunk_compress_binlog_min_interval", \ "trunk_compress_binlog_min_interval",
&iniContext, 0); &iniContext, 0);
g_trunk_compress_binlog_interval = iniGetIntValue(NULL,
"trunk_compress_binlog_interval",
&iniContext, 0);
if ((result=get_time_item_from_conf(&iniContext,
"trunk_compress_binlog_time_base",
&g_trunk_compress_binlog_time_base, 3, 0)) != 0)
{
return result;
}
g_trunk_init_check_occupying = iniGetBoolValue(NULL, \ g_trunk_init_check_occupying = iniGetBoolValue(NULL, \
"trunk_init_check_occupying", &iniContext, false); "trunk_init_check_occupying", &iniContext, false);
@ -750,6 +759,8 @@ int tracker_load_from_conf_file(const char *filename, \
"trunk_init_check_occupying=%d, " \ "trunk_init_check_occupying=%d, " \
"trunk_init_reload_from_binlog=%d, " \ "trunk_init_reload_from_binlog=%d, " \
"trunk_compress_binlog_min_interval=%d, " \ "trunk_compress_binlog_min_interval=%d, " \
"trunk_compress_binlog_interval=%d, " \
"trunk_compress_binlog_time_base=%02d:%02d, " \
"use_storage_id=%d, " \ "use_storage_id=%d, " \
"id_type_in_filename=%s, " \ "id_type_in_filename=%s, " \
"storage_id/ip_count=%d / %d, " \ "storage_id/ip_count=%d / %d, " \
@ -789,6 +800,9 @@ int tracker_load_from_conf_file(const char *filename, \
(FDFS_ONE_MB * 1024)), g_trunk_init_check_occupying, \ (FDFS_ONE_MB * 1024)), g_trunk_init_check_occupying, \
g_trunk_init_reload_from_binlog, \ g_trunk_init_reload_from_binlog, \
g_trunk_compress_binlog_min_interval, \ g_trunk_compress_binlog_min_interval, \
g_trunk_compress_binlog_interval, \
g_trunk_compress_binlog_time_base.hour, \
g_trunk_compress_binlog_time_base.minute, \
g_use_storage_id, g_id_type_in_filename == \ g_use_storage_id, g_id_type_in_filename == \
FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \ FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \
g_storage_ids_by_id.count, g_storage_ids_by_ip.count, \ g_storage_ids_by_id.count, g_storage_ids_by_ip.count, \

View File

@ -55,7 +55,9 @@ int g_slot_min_size = 256; //slot min size, such as 256 bytes
int g_slot_max_size = 16 * 1024 * 1024; //slot max size, such as 16MB int g_slot_max_size = 16 * 1024 * 1024; //slot max size, such as 16MB
int g_trunk_file_size = 64 * 1024 * 1024; //the trunk file size, such as 64MB int g_trunk_file_size = 64 * 1024 * 1024; //the trunk file size, such as 64MB
TimeInfo g_trunk_create_file_time_base = {0, 0}; TimeInfo g_trunk_create_file_time_base = {0, 0};
TimeInfo g_trunk_compress_binlog_time_base = {0, 0};
int g_trunk_create_file_interval = 86400; int g_trunk_create_file_interval = 86400;
int g_trunk_compress_binlog_interval = 0;
int g_trunk_compress_binlog_min_interval = 0; int g_trunk_compress_binlog_min_interval = 0;
int64_t g_trunk_create_file_space_threshold = 0; int64_t g_trunk_create_file_space_threshold = 0;

View File

@ -79,7 +79,9 @@ extern int g_slot_min_size; //slot min size, such as 256 bytes
extern int g_slot_max_size; //slot max size, such as 16MB extern int g_slot_max_size; //slot max size, such as 16MB
extern int g_trunk_file_size; //the trunk file size, such as 64MB extern int g_trunk_file_size; //the trunk file size, such as 64MB
extern TimeInfo g_trunk_create_file_time_base; extern TimeInfo g_trunk_create_file_time_base;
extern TimeInfo g_trunk_compress_binlog_time_base;
extern int g_trunk_create_file_interval; extern int g_trunk_create_file_interval;
extern int g_trunk_compress_binlog_interval;
extern int g_trunk_compress_binlog_min_interval; extern int g_trunk_compress_binlog_min_interval;
extern int64_t g_trunk_create_file_space_threshold; extern int64_t g_trunk_create_file_space_threshold;

View File

@ -695,6 +695,8 @@ static int tracker_deal_parameter_req(struct fast_task_info *pTask)
"trunk_init_check_occupying=%d\n" \ "trunk_init_check_occupying=%d\n" \
"trunk_init_reload_from_binlog=%d\n" \ "trunk_init_reload_from_binlog=%d\n" \
"trunk_compress_binlog_min_interval=%d\n" \ "trunk_compress_binlog_min_interval=%d\n" \
"trunk_compress_binlog_interval=%d\n" \
"trunk_compress_binlog_time_base=%02d:%02d\n" \
"store_slave_file_use_link=%d\n", \ "store_slave_file_use_link=%d\n", \
g_use_storage_id, g_id_type_in_filename == \ g_use_storage_id, g_id_type_in_filename == \
FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \ FDFS_ID_TYPE_SERVER_ID ? "id" : "ip", \
@ -712,6 +714,9 @@ static int tracker_deal_parameter_req(struct fast_task_info *pTask)
g_trunk_init_check_occupying, \ g_trunk_init_check_occupying, \
g_trunk_init_reload_from_binlog, \ g_trunk_init_reload_from_binlog, \
g_trunk_compress_binlog_min_interval, \ g_trunk_compress_binlog_min_interval, \
g_trunk_compress_binlog_interval, \
g_trunk_compress_binlog_time_base.hour, \
g_trunk_compress_binlog_time_base.minute, \
g_store_slave_file_use_link); g_store_slave_file_use_link);
return 0; return 0;