2557 lines
63 KiB
C
2557 lines
63 KiB
C
/**
|
|
* Copyright (C) 2008 Happy Fish / YuQing
|
|
*
|
|
* FastDFS may be copied only under the terms of the GNU General
|
|
* Public License V3, which may be found in the FastDFS source kit.
|
|
* Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
|
|
**/
|
|
|
|
//trunk_sync.c
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <dirent.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <time.h>
|
|
#include "fdfs_define.h"
|
|
#include "fastcommon/logger.h"
|
|
#include "fdfs_global.h"
|
|
#include "fastcommon/sockopt.h"
|
|
#include "fastcommon/shared_func.h"
|
|
#include "fastcommon/pthread_func.h"
|
|
#include "fastcommon/sched_thread.h"
|
|
#include "fastcommon/ini_file_reader.h"
|
|
#include "tracker_types.h"
|
|
#include "tracker_proto.h"
|
|
#include "storage_global.h"
|
|
#include "storage_func.h"
|
|
#include "storage_ip_changed_dealer.h"
|
|
#include "tracker_client_thread.h"
|
|
#include "storage_client.h"
|
|
#include "storage_sync_func.h"
|
|
#include "trunk_sync.h"
|
|
|
|
#define TRUNK_SYNC_BINLOG_FILENAME_STR "binlog"
|
|
#define TRUNK_SYNC_BINLOG_FILENAME_LEN (sizeof(TRUNK_SYNC_BINLOG_FILENAME_STR) - 1)
|
|
#define TRUNK_SYNC_BINLOG_ROLLBACK_EXT ".rollback"
|
|
#define TRUNK_SYNC_MARK_FILE_EXT_STR ".mark"
|
|
#define TRUNK_SYNC_MARK_FILE_EXT_LEN (sizeof(TRUNK_SYNC_MARK_FILE_EXT_STR) - 1)
|
|
#define TRUNK_DIR_NAME "trunk"
|
|
#define MARK_ITEM_BINLOG_FILE_OFFSET "binlog_offset"
|
|
|
|
static int trunk_binlog_fd = -1;
|
|
|
|
int g_trunk_sync_thread_count = 0;
|
|
static pthread_mutex_t trunk_sync_thread_lock;
|
|
static char *trunk_binlog_write_cache_buff = NULL;
|
|
static int trunk_binlog_write_cache_len = 0;
|
|
static int trunk_binlog_write_version = 1;
|
|
|
|
typedef struct
|
|
{
|
|
bool running;
|
|
bool reset_binlog_offset;
|
|
const FDFSStorageBrief *pStorage;
|
|
pthread_t tid;
|
|
} TrunkSyncThreadInfo;
|
|
|
|
typedef struct
|
|
{
|
|
TrunkSyncThreadInfo **thread_data;
|
|
int alloc_count;
|
|
} TrunkSyncThreadInfoArray;
|
|
|
|
/* save sync thread ids */
|
|
static TrunkSyncThreadInfoArray sync_thread_info_array = {NULL, 0};
|
|
|
|
static int trunk_write_to_mark_file(TrunkBinLogReader *pReader);
|
|
static int trunk_binlog_fsync_ex(const bool bNeedLock, \
|
|
const char *buff, int *length);
|
|
static int trunk_binlog_preread(TrunkBinLogReader *pReader);
|
|
|
|
#define trunk_binlog_fsync(bNeedLock) trunk_binlog_fsync_ex(bNeedLock, \
|
|
trunk_binlog_write_cache_buff, (&trunk_binlog_write_cache_len))
|
|
|
|
char *get_trunk_binlog_filename(char *full_filename)
|
|
{
|
|
snprintf(full_filename, MAX_PATH_SIZE, \
|
|
"%s/data/"TRUNK_DIR_NAME"/"TRUNK_SYNC_BINLOG_FILENAME_STR, \
|
|
g_fdfs_base_path);
|
|
return full_filename;
|
|
}
|
|
|
|
static char *get_trunk_binlog_rollback_filename(char *full_filename)
|
|
{
|
|
get_trunk_binlog_filename(full_filename);
|
|
if (strlen(full_filename) + sizeof(TRUNK_SYNC_BINLOG_ROLLBACK_EXT) >
|
|
MAX_PATH_SIZE)
|
|
{
|
|
return NULL;
|
|
}
|
|
strcat(full_filename, TRUNK_SYNC_BINLOG_ROLLBACK_EXT);
|
|
return full_filename;
|
|
}
|
|
|
|
static char *get_trunk_data_rollback_filename(char *full_filename)
|
|
{
|
|
storage_trunk_get_data_filename(full_filename);
|
|
if (strlen(full_filename) + sizeof(TRUNK_SYNC_BINLOG_ROLLBACK_EXT) >
|
|
MAX_PATH_SIZE)
|
|
{
|
|
return NULL;
|
|
}
|
|
strcat(full_filename, TRUNK_SYNC_BINLOG_ROLLBACK_EXT);
|
|
return full_filename;
|
|
}
|
|
|
|
char *get_trunk_binlog_tmp_filename_ex(const char *binlog_filename,
|
|
char *tmp_filename)
|
|
{
|
|
const char *true_binlog_filename;
|
|
char filename[MAX_PATH_SIZE];
|
|
|
|
if (binlog_filename == NULL)
|
|
{
|
|
get_trunk_binlog_filename(filename);
|
|
true_binlog_filename = filename;
|
|
}
|
|
else
|
|
{
|
|
true_binlog_filename = binlog_filename;
|
|
}
|
|
|
|
sprintf(tmp_filename, "%s.tmp", true_binlog_filename);
|
|
return tmp_filename;
|
|
}
|
|
|
|
static int trunk_binlog_open_writer(const char *binlog_filename)
|
|
{
|
|
trunk_binlog_fd = open(binlog_filename, O_WRONLY | O_CREAT |
|
|
O_APPEND, 0644);
|
|
if (trunk_binlog_fd < 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"open file \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, binlog_filename, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : EACCES;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_close_writer(const bool needLock)
|
|
{
|
|
int result;
|
|
if (trunk_binlog_write_cache_len > 0)
|
|
{
|
|
if ((result=trunk_binlog_fsync(needLock)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
}
|
|
close(trunk_binlog_fd);
|
|
trunk_binlog_fd = -1;
|
|
return 0;
|
|
}
|
|
|
|
int trunk_sync_init()
|
|
{
|
|
char data_path[MAX_PATH_SIZE];
|
|
char sync_path[MAX_PATH_SIZE];
|
|
char binlog_filename[MAX_PATH_SIZE];
|
|
int result;
|
|
|
|
snprintf(data_path, sizeof(data_path), "%s/data", g_fdfs_base_path);
|
|
if (!fileExists(data_path))
|
|
{
|
|
if (mkdir(data_path, 0755) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"mkdir \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, data_path, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : ENOENT;
|
|
}
|
|
|
|
STORAGE_CHOWN(data_path, geteuid(), getegid())
|
|
}
|
|
|
|
snprintf(sync_path, sizeof(sync_path), \
|
|
"%s/"TRUNK_DIR_NAME, data_path);
|
|
if (!fileExists(sync_path))
|
|
{
|
|
if (mkdir(sync_path, 0755) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"mkdir \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, sync_path, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : ENOENT;
|
|
}
|
|
|
|
STORAGE_CHOWN(sync_path, geteuid(), getegid())
|
|
}
|
|
|
|
trunk_binlog_write_cache_buff = (char *)malloc( \
|
|
TRUNK_BINLOG_BUFFER_SIZE);
|
|
if (trunk_binlog_write_cache_buff == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"malloc %d bytes fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, TRUNK_BINLOG_BUFFER_SIZE, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : ENOMEM;
|
|
}
|
|
|
|
get_trunk_binlog_filename(binlog_filename);
|
|
if ((result=trunk_binlog_open_writer(binlog_filename)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
if ((result=init_pthread_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
STORAGE_FCHOWN(trunk_binlog_fd, binlog_filename, geteuid(), getegid())
|
|
|
|
return 0;
|
|
}
|
|
|
|
int trunk_sync_destroy()
|
|
{
|
|
if (trunk_binlog_fd >= 0)
|
|
{
|
|
trunk_binlog_fsync(true);
|
|
close(trunk_binlog_fd);
|
|
trunk_binlog_fd = -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int kill_trunk_sync_threads()
|
|
{
|
|
int result;
|
|
int kill_res;
|
|
TrunkSyncThreadInfo **thread_info;
|
|
TrunkSyncThreadInfo **info_end;
|
|
|
|
if (sync_thread_info_array.thread_data == NULL)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_lock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
kill_res = 0;
|
|
info_end = sync_thread_info_array.thread_data +
|
|
sync_thread_info_array.alloc_count;
|
|
for (thread_info=sync_thread_info_array.thread_data;
|
|
thread_info<info_end; thread_info++)
|
|
{
|
|
if ((*thread_info)->running && (kill_res=pthread_kill(
|
|
(*thread_info)->tid, SIGINT)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"kill thread failed, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, kill_res, STRERROR(kill_res));
|
|
}
|
|
}
|
|
|
|
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_unlock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
while (g_trunk_sync_thread_count > 0)
|
|
{
|
|
usleep(50000);
|
|
}
|
|
|
|
return kill_res;
|
|
}
|
|
|
|
int trunk_sync_notify_thread_reset_offset()
|
|
{
|
|
int result;
|
|
int i;
|
|
int count;
|
|
bool done;
|
|
TrunkSyncThreadInfo **thread_info;
|
|
TrunkSyncThreadInfo **info_end;
|
|
|
|
if (sync_thread_info_array.thread_data == NULL)
|
|
{
|
|
return EINVAL;
|
|
}
|
|
|
|
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_lock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
count = 0;
|
|
info_end = sync_thread_info_array.thread_data +
|
|
sync_thread_info_array.alloc_count;
|
|
for (thread_info=sync_thread_info_array.thread_data;
|
|
thread_info<info_end; thread_info++)
|
|
{
|
|
if ((*thread_info)->running)
|
|
{
|
|
(*thread_info)->reset_binlog_offset = true;
|
|
count++;
|
|
}
|
|
}
|
|
|
|
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_unlock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"notify %d trunk sync threads to reset offset.",
|
|
__LINE__, count);
|
|
|
|
done = false;
|
|
for (i=0; i<300 && g_continue_flag; i++)
|
|
{
|
|
info_end = sync_thread_info_array.thread_data +
|
|
sync_thread_info_array.alloc_count;
|
|
for (thread_info=sync_thread_info_array.thread_data;
|
|
thread_info<info_end; thread_info++)
|
|
{
|
|
if ((*thread_info)->running && (*thread_info)->reset_binlog_offset)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (thread_info == info_end)
|
|
{
|
|
done = true;
|
|
break;
|
|
}
|
|
|
|
sleep(1);
|
|
}
|
|
|
|
if (done)
|
|
{
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"trunk sync threads reset binlog offset done.",
|
|
__LINE__);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
count = 0;
|
|
info_end = sync_thread_info_array.thread_data +
|
|
sync_thread_info_array.alloc_count;
|
|
for (thread_info=sync_thread_info_array.thread_data;
|
|
thread_info<info_end; thread_info++)
|
|
{
|
|
if ((*thread_info)->running && (*thread_info)->reset_binlog_offset)
|
|
{
|
|
count++;
|
|
}
|
|
}
|
|
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"%d trunk sync threads reset binlog offset timeout.",
|
|
__LINE__, count);
|
|
return EBUSY;
|
|
}
|
|
}
|
|
|
|
int trunk_binlog_sync_func(void *args)
|
|
{
|
|
if (trunk_binlog_write_cache_len > 0)
|
|
{
|
|
return trunk_binlog_fsync(true);
|
|
}
|
|
else
|
|
{
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
#define BACKUP_FILENAME_LEN (TRUNK_SYNC_BINLOG_FILENAME_LEN + 15)
|
|
|
|
typedef struct
|
|
{
|
|
char filename[BACKUP_FILENAME_LEN + 1];
|
|
} TrunkBinlogBackupFileInfo;
|
|
|
|
typedef struct
|
|
{
|
|
TrunkBinlogBackupFileInfo *files;
|
|
int count;
|
|
int alloc;
|
|
} TrunkBinlogBackupFileArray;
|
|
|
|
static int trunk_binlog_check_alloc_filename_array(
|
|
TrunkBinlogBackupFileArray *file_array)
|
|
{
|
|
int bytes;
|
|
TrunkBinlogBackupFileInfo *files;
|
|
int alloc;
|
|
|
|
if (file_array->count < file_array->alloc)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
if (file_array->alloc == 0)
|
|
{
|
|
alloc = g_trunk_binlog_max_backups + 1;
|
|
}
|
|
else
|
|
{
|
|
alloc = file_array->alloc * 2;
|
|
}
|
|
|
|
bytes = sizeof(TrunkBinlogBackupFileInfo) * alloc;
|
|
files = (TrunkBinlogBackupFileInfo *)malloc(bytes);
|
|
if (files == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"malloc %d bytes fail", __LINE__, bytes);
|
|
return ENOMEM;
|
|
}
|
|
|
|
if (file_array->count > 0)
|
|
{
|
|
memcpy(files, file_array->files, sizeof(TrunkBinlogBackupFileInfo) *
|
|
file_array->count);
|
|
}
|
|
|
|
if (file_array->files != NULL)
|
|
{
|
|
free(file_array->files);
|
|
}
|
|
|
|
file_array->files = files;
|
|
file_array->alloc = alloc;
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_compare_filename(const void *p1, const void *p2)
|
|
{
|
|
return strcmp(((TrunkBinlogBackupFileInfo *)p1)->filename,
|
|
((TrunkBinlogBackupFileInfo *)p2)->filename);
|
|
}
|
|
|
|
static int trunk_binlog_delete_overflow_backups()
|
|
{
|
|
#define BACKUP_FILENAME_PREFIX_STR TRUNK_SYNC_BINLOG_FILENAME_STR"."
|
|
#define BACKUP_FILENAME_PREFIX_LEN (sizeof(BACKUP_FILENAME_PREFIX_STR) - 1)
|
|
|
|
int result;
|
|
int i;
|
|
int over_count;
|
|
char file_path[MAX_PATH_SIZE];
|
|
char full_filename[MAX_PATH_SIZE];
|
|
DIR *dir;
|
|
struct dirent *ent;
|
|
TrunkBinlogBackupFileArray file_array;
|
|
|
|
snprintf(file_path, sizeof(file_path),
|
|
"%s/data/%s", g_fdfs_base_path, TRUNK_DIR_NAME);
|
|
if ((dir=opendir(file_path)) == NULL)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call opendir %s fail, errno: %d, error info: %s",
|
|
__LINE__, file_path, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
result = 0;
|
|
file_array.files = NULL;
|
|
file_array.count = 0;
|
|
file_array.alloc = 0;
|
|
while ((ent=readdir(dir)) != NULL)
|
|
{
|
|
if (strlen(ent->d_name) == BACKUP_FILENAME_LEN &&
|
|
memcmp(ent->d_name, BACKUP_FILENAME_PREFIX_STR,
|
|
BACKUP_FILENAME_PREFIX_LEN) == 0)
|
|
{
|
|
if ((result=trunk_binlog_check_alloc_filename_array(
|
|
&file_array)) != 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
strcpy(file_array.files[file_array.count].
|
|
filename, ent->d_name);
|
|
file_array.count++;
|
|
}
|
|
}
|
|
|
|
closedir(dir);
|
|
|
|
over_count = (file_array.count - g_trunk_binlog_max_backups) + 1;
|
|
if (result != 0 || over_count <= 0)
|
|
{
|
|
if (file_array.files != NULL)
|
|
{
|
|
free(file_array.files);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
qsort(file_array.files, file_array.count,
|
|
sizeof(TrunkBinlogBackupFileInfo),
|
|
trunk_binlog_compare_filename);
|
|
for (i=0; i<over_count; i++)
|
|
{
|
|
sprintf(full_filename, "%s/%s", file_path,
|
|
file_array.files[i].filename);
|
|
unlink(full_filename);
|
|
}
|
|
|
|
free(file_array.files);
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_backup_and_truncate()
|
|
{
|
|
int result;
|
|
int open_res;
|
|
char binlog_filename[MAX_PATH_SIZE];
|
|
char backup_filename[MAX_PATH_SIZE];
|
|
time_t t;
|
|
struct tm tm;
|
|
|
|
if ((result=trunk_binlog_delete_overflow_backups()) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
if ((result=trunk_binlog_close_writer(false)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
do
|
|
{
|
|
t = g_current_time;
|
|
localtime_r(&t, &tm);
|
|
|
|
get_trunk_binlog_filename(binlog_filename);
|
|
snprintf(backup_filename, sizeof(backup_filename),
|
|
"%s.%04d%02d%02d%02d%02d%02d", binlog_filename,
|
|
tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday,
|
|
tm.tm_hour, tm.tm_min, tm.tm_sec);
|
|
if (rename(binlog_filename, backup_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EACCES;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"rename file %s to %s fail, "
|
|
"errno: %d, error info: %s", __LINE__,
|
|
binlog_filename, backup_filename,
|
|
result, STRERROR(result));
|
|
break;
|
|
}
|
|
} while (0);
|
|
|
|
open_res = trunk_binlog_open_writer(binlog_filename);
|
|
return (result == 0) ? open_res : result;
|
|
}
|
|
|
|
int storage_delete_trunk_data_file()
|
|
{
|
|
char trunk_data_filename[MAX_PATH_SIZE];
|
|
int result;
|
|
|
|
storage_trunk_get_data_filename(trunk_data_filename);
|
|
if (unlink(trunk_data_filename) == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
result = errno != 0 ? errno : ENOENT;
|
|
if (result == ENOENT)
|
|
{
|
|
result = 0;
|
|
}
|
|
else
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"unlink trunk data file: %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, trunk_data_filename,
|
|
result, STRERROR(result));
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
int trunk_binlog_truncate()
|
|
{
|
|
int result;
|
|
|
|
result = 0;
|
|
pthread_mutex_lock(&trunk_sync_thread_lock);
|
|
do
|
|
{
|
|
if (g_trunk_binlog_max_backups > 0)
|
|
{
|
|
result = trunk_binlog_backup_and_truncate();
|
|
}
|
|
else
|
|
{
|
|
if (trunk_binlog_write_cache_len > 0)
|
|
{
|
|
if ((result=trunk_binlog_fsync(false)) != 0)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
if (ftruncate(trunk_binlog_fd, 0) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EIO;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call ftruncate fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
break;
|
|
}
|
|
}
|
|
} while (0);
|
|
|
|
pthread_mutex_unlock(&trunk_sync_thread_lock);
|
|
if (result == 0)
|
|
{
|
|
result = storage_delete_trunk_data_file();
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static int trunk_binlog_delete_rollback_file(const char *filename,
|
|
const bool silence)
|
|
{
|
|
int result;
|
|
if (access(filename, F_OK) == 0)
|
|
{
|
|
if (!silence)
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"rollback file %s exist, delete it!",
|
|
__LINE__, filename);
|
|
}
|
|
if (unlink(filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result != ENOENT)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"unlink file %s fail, errno: %d, error info: %s",
|
|
__LINE__, filename, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result != ENOENT)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"access file %s fail, errno: %d, error info: %s",
|
|
__LINE__, filename, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int trunk_binlog_compress_delete_binlog_rollback_file(const bool silence)
|
|
{
|
|
char binlog_rollback_filename[MAX_PATH_SIZE];
|
|
|
|
if (get_trunk_binlog_rollback_filename(binlog_rollback_filename) == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"binlog rollback filename is too long", __LINE__);
|
|
return ENAMETOOLONG;
|
|
}
|
|
|
|
return trunk_binlog_delete_rollback_file(binlog_rollback_filename, silence);
|
|
}
|
|
|
|
int trunk_binlog_compress_delete_rollback_files(const bool silence)
|
|
{
|
|
int result;
|
|
char data_rollback_filename[MAX_PATH_SIZE];
|
|
|
|
if ((result=trunk_binlog_compress_delete_binlog_rollback_file(
|
|
silence)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
if (get_trunk_data_rollback_filename(data_rollback_filename) == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"data rollback filename is too long", __LINE__);
|
|
return ENAMETOOLONG;
|
|
}
|
|
|
|
if ((result=trunk_binlog_delete_rollback_file(data_rollback_filename,
|
|
silence)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_rename_file(const char *src_filename,
|
|
const char *dest_filename, const int log_ignore_errno)
|
|
{
|
|
int result;
|
|
if (access(src_filename, F_OK) == 0)
|
|
{
|
|
if (rename(src_filename, dest_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EIO;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"rename %s to %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, src_filename,
|
|
dest_filename, result,
|
|
STRERROR(result));
|
|
return result;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
result = errno != 0 ? errno : EIO;
|
|
if (result - log_ignore_errno != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call access %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, src_filename,
|
|
result, STRERROR(result));
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_open_read(const char *filename,
|
|
const bool skipFirstLine)
|
|
{
|
|
int result;
|
|
int fd;
|
|
char buff[32];
|
|
|
|
fd = open(filename, O_RDONLY);
|
|
if (fd < 0)
|
|
{
|
|
result = errno != 0 ? errno : EACCES;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"open file \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, filename, result, STRERROR(result));
|
|
return -1;
|
|
}
|
|
|
|
if (skipFirstLine)
|
|
{
|
|
if (fd_gets(fd, buff, sizeof(buff), 16) <= 0)
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, " \
|
|
"skip first line fail!", __LINE__);
|
|
}
|
|
}
|
|
|
|
return fd;
|
|
}
|
|
|
|
static int trunk_binlog_merge_file(int old_fd, const int stage)
|
|
{
|
|
int result;
|
|
int tmp_fd;
|
|
int bytes;
|
|
char binlog_filename[MAX_PATH_SIZE];
|
|
char tmp_filename[MAX_PATH_SIZE];
|
|
char buff[64 * 1024];
|
|
|
|
get_trunk_binlog_filename(binlog_filename);
|
|
get_trunk_binlog_tmp_filename_ex(binlog_filename, tmp_filename);
|
|
tmp_fd = open(tmp_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
|
|
if (tmp_fd < 0)
|
|
{
|
|
result = errno != 0 ? errno : EACCES;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"open file \"%s\" fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, tmp_filename, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
while ((bytes=fc_safe_read(old_fd, buff, sizeof(buff))) > 0)
|
|
{
|
|
if (fc_safe_write(tmp_fd, buff, bytes) != bytes)
|
|
{
|
|
result = errno != 0 ? errno : EACCES;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"write to file \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, tmp_filename,
|
|
result, STRERROR(result));
|
|
close(tmp_fd);
|
|
return result;
|
|
}
|
|
}
|
|
|
|
if (access(binlog_filename, F_OK) == 0)
|
|
{
|
|
int binlog_fd;
|
|
if ((binlog_fd=trunk_binlog_open_read(binlog_filename,
|
|
false)) < 0)
|
|
{
|
|
close(tmp_fd);
|
|
return errno != 0 ? errno : EPERM;
|
|
}
|
|
|
|
while ((bytes=fc_safe_read(binlog_fd, buff, sizeof(buff))) > 0)
|
|
{
|
|
if (fc_safe_write(tmp_fd, buff, bytes) != bytes)
|
|
{
|
|
result = errno != 0 ? errno : EACCES;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"write to file \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, tmp_filename,
|
|
result, STRERROR(result));
|
|
close(tmp_fd);
|
|
close(binlog_fd);
|
|
return result;
|
|
}
|
|
}
|
|
close(binlog_fd);
|
|
}
|
|
|
|
if (fsync(tmp_fd) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EIO;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"sync file \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, tmp_filename, \
|
|
errno, STRERROR(errno));
|
|
close(tmp_fd);
|
|
return result;
|
|
}
|
|
close(tmp_fd);
|
|
|
|
g_trunk_binlog_compress_stage = stage;
|
|
storage_write_to_sync_ini_file();
|
|
|
|
if (rename(tmp_filename, binlog_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"rename %s to %s fail, " \
|
|
"errno: %d, error info: %s",
|
|
__LINE__, tmp_filename, binlog_filename,
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_compress_rollback_data_file()
|
|
{
|
|
int result;
|
|
char data_filename[MAX_PATH_SIZE];
|
|
char data_rollback_filename[MAX_PATH_SIZE];
|
|
struct stat fs;
|
|
|
|
storage_trunk_get_data_filename(data_filename);
|
|
get_trunk_data_rollback_filename(data_rollback_filename);
|
|
|
|
if (stat(data_rollback_filename, &fs) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result == ENOENT)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
logError("file: "__FILE__", line: %d, "
|
|
"stat file %s fail, errno: %d, error info: %s",
|
|
__LINE__, data_rollback_filename,
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
if (unlink(data_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result != ENOENT)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"unlink %s fail, errno: %d, error info: %s",
|
|
__LINE__, data_filename, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
}
|
|
|
|
if (fs.st_size == 0)
|
|
{
|
|
unlink(data_rollback_filename); //delete zero file directly
|
|
return 0;
|
|
}
|
|
|
|
if (rename(data_rollback_filename, data_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result == ENOENT)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
logError("file: "__FILE__", line: %d, "
|
|
"rename file %s to %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, data_rollback_filename,
|
|
data_filename, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_compress_rollback_binlog_file(const char *binlog_filename)
|
|
{
|
|
int result;
|
|
int rollback_fd;
|
|
char binlog_rollback_filename[MAX_PATH_SIZE];
|
|
struct stat fs;
|
|
|
|
get_trunk_binlog_rollback_filename(binlog_rollback_filename);
|
|
if (stat(binlog_rollback_filename, &fs) != 0)
|
|
{
|
|
result = errno != 0 ? errno : ENOENT;
|
|
if (result == ENOENT)
|
|
{
|
|
return 0;
|
|
}
|
|
logError("file: "__FILE__", line: %d, "
|
|
"stat file %s fail, errno: %d, error info: %s",
|
|
__LINE__, binlog_rollback_filename,
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
if (fs.st_size == 0)
|
|
{
|
|
unlink(binlog_rollback_filename); //delete zero file directly
|
|
return 0;
|
|
}
|
|
|
|
if (access(binlog_filename, F_OK) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result == ENOENT)
|
|
{
|
|
if (rename(binlog_rollback_filename, binlog_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result != ENOENT)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"rename file %s to %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, binlog_rollback_filename,
|
|
binlog_filename, errno, STRERROR(errno));
|
|
return result;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"access file %s fail, errno: %d, error info: %s",
|
|
__LINE__, binlog_filename, errno, STRERROR(errno));
|
|
return result;
|
|
}
|
|
}
|
|
|
|
if ((rollback_fd=trunk_binlog_open_read(binlog_rollback_filename,
|
|
false)) < 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result == ENOENT)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
result = trunk_binlog_merge_file(rollback_fd,
|
|
STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGING);
|
|
close(rollback_fd);
|
|
|
|
g_trunk_binlog_compress_stage =
|
|
STORAGE_TRUNK_COMPRESS_STAGE_ROLLBACK_MERGE_DONE;
|
|
storage_write_to_sync_ini_file();
|
|
|
|
if (unlink(binlog_rollback_filename) != 0)
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"unlink %s fail, errno: %d, error info: %s",
|
|
__LINE__, binlog_rollback_filename,
|
|
errno, STRERROR(errno));
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
int trunk_binlog_compress_delete_temp_files_after_commit()
|
|
{
|
|
int result;
|
|
char data_filename[MAX_PATH_SIZE];
|
|
|
|
storage_trunk_get_data_filename(data_filename);
|
|
if (unlink(data_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : ENOENT;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"unlink %s fail, errno: %d, error info: %s",
|
|
__LINE__, data_filename,
|
|
result, STRERROR(result));
|
|
if (result != ENOENT)
|
|
{
|
|
return result;
|
|
}
|
|
}
|
|
|
|
return trunk_binlog_compress_delete_rollback_files(true);
|
|
}
|
|
|
|
int trunk_binlog_compress_apply()
|
|
{
|
|
int result;
|
|
int open_res;
|
|
bool need_open_binlog;
|
|
char binlog_filename[MAX_PATH_SIZE];
|
|
char data_filename[MAX_PATH_SIZE];
|
|
char binlog_rollback_filename[MAX_PATH_SIZE];
|
|
char data_rollback_filename[MAX_PATH_SIZE];
|
|
|
|
get_trunk_binlog_filename(binlog_filename);
|
|
if (get_trunk_binlog_rollback_filename(binlog_rollback_filename) == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"filename: %s is too long",
|
|
__LINE__, binlog_filename);
|
|
return ENAMETOOLONG;
|
|
}
|
|
|
|
storage_trunk_get_data_filename(data_filename);
|
|
if (get_trunk_data_rollback_filename(data_rollback_filename) == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"data rollback filename is too long", __LINE__);
|
|
return ENAMETOOLONG;
|
|
}
|
|
|
|
if (access(binlog_filename, F_OK) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"access file: %s is fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, binlog_filename,
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
need_open_binlog = trunk_binlog_fd >= 0;
|
|
|
|
pthread_mutex_lock(&trunk_sync_thread_lock);
|
|
if (need_open_binlog)
|
|
{
|
|
trunk_binlog_close_writer(false);
|
|
}
|
|
|
|
do
|
|
{
|
|
result = trunk_binlog_rename_file(data_filename,
|
|
data_rollback_filename, ENOENT);
|
|
if (result != 0)
|
|
{
|
|
if (result == ENOENT)
|
|
{
|
|
result = writeToFile(data_rollback_filename, "", 0);
|
|
}
|
|
|
|
if (result != 0)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ((result=trunk_binlog_rename_file(binlog_filename,
|
|
binlog_rollback_filename, 0)) != 0)
|
|
{
|
|
trunk_compress_rollback_data_file();
|
|
break;
|
|
}
|
|
} while (0);
|
|
|
|
if (need_open_binlog)
|
|
{
|
|
if ((open_res=trunk_binlog_open_writer(binlog_filename)) != 0)
|
|
{
|
|
trunk_binlog_rename_file(binlog_rollback_filename,
|
|
binlog_filename, 0); //rollback
|
|
trunk_compress_rollback_data_file();
|
|
|
|
if (result == 0)
|
|
{
|
|
result = open_res;
|
|
}
|
|
}
|
|
}
|
|
|
|
pthread_mutex_unlock(&trunk_sync_thread_lock);
|
|
return result;
|
|
}
|
|
|
|
int trunk_binlog_compress_commit()
|
|
{
|
|
int result;
|
|
int data_fd;
|
|
bool need_open_binlog;
|
|
char binlog_filename[MAX_PATH_SIZE];
|
|
char data_filename[MAX_PATH_SIZE];
|
|
|
|
need_open_binlog = trunk_binlog_fd >= 0;
|
|
get_trunk_binlog_filename(binlog_filename);
|
|
storage_trunk_get_data_filename(data_filename);
|
|
|
|
if ((data_fd=trunk_binlog_open_read(data_filename, true)) < 0)
|
|
{
|
|
return errno != 0 ? errno : ENOENT;
|
|
}
|
|
|
|
pthread_mutex_lock(&trunk_sync_thread_lock);
|
|
if (need_open_binlog)
|
|
{
|
|
trunk_binlog_close_writer(false);
|
|
}
|
|
|
|
do
|
|
{
|
|
result = trunk_binlog_merge_file(data_fd,
|
|
STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGING);
|
|
close(data_fd);
|
|
if (result != 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
g_trunk_binlog_compress_stage =
|
|
STORAGE_TRUNK_COMPRESS_STAGE_COMMIT_MERGE_DONE;
|
|
storage_write_to_sync_ini_file();
|
|
|
|
if ((result=trunk_binlog_compress_delete_temp_files_after_commit()) != 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
g_trunk_binlog_compress_stage =
|
|
STORAGE_TRUNK_COMPRESS_STAGE_COMPRESS_SUCCESS;
|
|
storage_write_to_sync_ini_file();
|
|
|
|
if (need_open_binlog)
|
|
{
|
|
result = trunk_binlog_open_writer(binlog_filename);
|
|
}
|
|
} while (0);
|
|
|
|
pthread_mutex_unlock(&trunk_sync_thread_lock);
|
|
|
|
return result;
|
|
}
|
|
|
|
static int do_compress_rollback()
|
|
{
|
|
int result;
|
|
bool need_open_binlog;
|
|
char binlog_filename[MAX_PATH_SIZE];
|
|
|
|
need_open_binlog = trunk_binlog_fd >= 0;
|
|
get_trunk_binlog_filename(binlog_filename);
|
|
|
|
pthread_mutex_lock(&trunk_sync_thread_lock);
|
|
if (need_open_binlog)
|
|
{
|
|
trunk_binlog_close_writer(false);
|
|
}
|
|
|
|
do
|
|
{
|
|
if ((result=trunk_compress_rollback_binlog_file(binlog_filename)) != 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if ((result=trunk_compress_rollback_data_file()) != 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (need_open_binlog)
|
|
{
|
|
result = trunk_binlog_open_writer(binlog_filename);
|
|
}
|
|
} while (0);
|
|
|
|
pthread_mutex_unlock(&trunk_sync_thread_lock);
|
|
return result;
|
|
}
|
|
|
|
int trunk_binlog_compress_rollback()
|
|
{
|
|
int result;
|
|
|
|
if ((result=do_compress_rollback()) == 0)
|
|
{
|
|
g_trunk_binlog_compress_stage =
|
|
STORAGE_TRUNK_COMPRESS_STAGE_FINISHED;
|
|
storage_write_to_sync_ini_file();
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static int trunk_binlog_fsync_ex(const bool bNeedLock, \
|
|
const char *buff, int *length)
|
|
{
|
|
int result;
|
|
int write_ret;
|
|
char full_filename[MAX_PATH_SIZE];
|
|
|
|
if (bNeedLock && (result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call pthread_mutex_lock fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
if (*length == 0) //ignore
|
|
{
|
|
write_ret = 0; //skip
|
|
}
|
|
else if (fc_safe_write(trunk_binlog_fd, buff, *length) != *length)
|
|
{
|
|
write_ret = errno != 0 ? errno : EIO;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"write to binlog file \"%s\" fail, fd=%d, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, get_trunk_binlog_filename(full_filename), \
|
|
trunk_binlog_fd, errno, STRERROR(errno));
|
|
}
|
|
else if (fsync(trunk_binlog_fd) != 0)
|
|
{
|
|
write_ret = errno != 0 ? errno : EIO;
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"sync to binlog file \"%s\" fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, get_trunk_binlog_filename(full_filename), \
|
|
errno, STRERROR(errno));
|
|
}
|
|
else
|
|
{
|
|
write_ret = 0;
|
|
}
|
|
|
|
if (write_ret == 0)
|
|
{
|
|
trunk_binlog_write_version++;
|
|
*length = 0; //reset cache buff
|
|
}
|
|
|
|
if (bNeedLock && (result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call pthread_mutex_unlock fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
return write_ret;
|
|
}
|
|
|
|
int trunk_binlog_write(const int timestamp, const char op_type, \
|
|
const FDFSTrunkFullInfo *pTrunk)
|
|
{
|
|
int result;
|
|
int write_ret;
|
|
|
|
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call pthread_mutex_lock fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
trunk_binlog_write_cache_len += sprintf(trunk_binlog_write_cache_buff + \
|
|
trunk_binlog_write_cache_len, \
|
|
"%d %c %d %d %d %d %d %d\n", \
|
|
timestamp, op_type, \
|
|
pTrunk->path.store_path_index, \
|
|
pTrunk->path.sub_path_high, \
|
|
pTrunk->path.sub_path_low, \
|
|
pTrunk->file.id, \
|
|
pTrunk->file.offset, \
|
|
pTrunk->file.size);
|
|
|
|
//check if buff full
|
|
if (TRUNK_BINLOG_BUFFER_SIZE - trunk_binlog_write_cache_len < 128)
|
|
{
|
|
write_ret = trunk_binlog_fsync(false); //sync to disk
|
|
}
|
|
else
|
|
{
|
|
write_ret = 0;
|
|
}
|
|
|
|
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call pthread_mutex_unlock fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
return write_ret;
|
|
}
|
|
|
|
int trunk_binlog_write_buffer(const char *buff, const int length)
|
|
{
|
|
int result;
|
|
int write_ret;
|
|
|
|
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call pthread_mutex_lock fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
//check if buff full
|
|
if (TRUNK_BINLOG_BUFFER_SIZE - (trunk_binlog_write_cache_len + \
|
|
length) < 128)
|
|
{
|
|
write_ret = trunk_binlog_fsync(false); //sync to disk
|
|
}
|
|
else
|
|
{
|
|
write_ret = 0;
|
|
}
|
|
|
|
if (write_ret == 0)
|
|
{
|
|
if (length >= TRUNK_BINLOG_BUFFER_SIZE)
|
|
{
|
|
if (trunk_binlog_write_cache_len > 0)
|
|
{
|
|
write_ret = trunk_binlog_fsync(false);
|
|
}
|
|
|
|
if (write_ret == 0)
|
|
{
|
|
int len;
|
|
len = length;
|
|
write_ret = trunk_binlog_fsync_ex(false, \
|
|
buff, &len);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
memcpy(trunk_binlog_write_cache_buff + \
|
|
trunk_binlog_write_cache_len, buff, length);
|
|
trunk_binlog_write_cache_len += length;
|
|
}
|
|
}
|
|
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call pthread_mutex_unlock fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
return write_ret;
|
|
}
|
|
|
|
static char *get_binlog_readable_filename(const void *pArg,
|
|
char *full_filename)
|
|
{
|
|
static char buff[MAX_PATH_SIZE];
|
|
|
|
if (full_filename == NULL)
|
|
{
|
|
full_filename = buff;
|
|
}
|
|
|
|
snprintf(full_filename, MAX_PATH_SIZE,
|
|
"%s/data/"TRUNK_DIR_NAME"/"TRUNK_SYNC_BINLOG_FILENAME_STR,
|
|
g_fdfs_base_path);
|
|
return full_filename;
|
|
}
|
|
|
|
int trunk_open_readable_binlog(TrunkBinLogReader *pReader, \
|
|
get_filename_func filename_func, const void *pArg)
|
|
{
|
|
char full_filename[MAX_PATH_SIZE];
|
|
struct stat file_stat;
|
|
|
|
if (pReader->binlog_fd >= 0)
|
|
{
|
|
close(pReader->binlog_fd);
|
|
}
|
|
|
|
filename_func(pArg, full_filename);
|
|
pReader->binlog_fd = open(full_filename, O_RDONLY);
|
|
if (pReader->binlog_fd < 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"open binlog file \"%s\" fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, full_filename,
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : ENOENT;
|
|
}
|
|
|
|
if (fstat(pReader->binlog_fd, &file_stat) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"stat binlog file \"%s\" fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, full_filename,
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : ENOENT;
|
|
}
|
|
|
|
if (pReader->binlog_offset > file_stat.st_size)
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"binlog file \"%s\", binlog_offset: %"PRId64
|
|
" > file size: %"PRId64", set binlog_offset to 0",
|
|
__LINE__, full_filename, pReader->binlog_offset,
|
|
(int64_t)file_stat.st_size);
|
|
pReader->binlog_offset = 0;
|
|
}
|
|
|
|
if (pReader->binlog_offset > 0 && \
|
|
lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"seek binlog file \"%s\" fail, file offset=" \
|
|
"%"PRId64", errno: %d, error info: %s", \
|
|
__LINE__, full_filename, pReader->binlog_offset, \
|
|
errno, STRERROR(errno));
|
|
|
|
close(pReader->binlog_fd);
|
|
pReader->binlog_fd = -1;
|
|
return errno != 0 ? errno : ESPIPE;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static char *trunk_get_mark_filename_by_id_and_port(const char *storage_id, \
|
|
const int port, char *full_filename, const int filename_size)
|
|
{
|
|
if (g_use_storage_id)
|
|
{
|
|
snprintf(full_filename, filename_size, \
|
|
"%s/data/"TRUNK_DIR_NAME"/%s%s", g_fdfs_base_path, \
|
|
storage_id, TRUNK_SYNC_MARK_FILE_EXT_STR);
|
|
}
|
|
else
|
|
{
|
|
snprintf(full_filename, filename_size, \
|
|
"%s/data/"TRUNK_DIR_NAME"/%s_%d%s", g_fdfs_base_path, \
|
|
storage_id, port, TRUNK_SYNC_MARK_FILE_EXT_STR);
|
|
}
|
|
|
|
return full_filename;
|
|
}
|
|
|
|
static char *trunk_get_mark_filename_by_ip_and_port(const char *ip_addr, \
|
|
const int port, char *full_filename, const int filename_size)
|
|
{
|
|
snprintf(full_filename, filename_size, \
|
|
"%s/data/"TRUNK_DIR_NAME"/%s_%d%s", g_fdfs_base_path, \
|
|
ip_addr, port, TRUNK_SYNC_MARK_FILE_EXT_STR);
|
|
|
|
return full_filename;
|
|
}
|
|
|
|
char *trunk_mark_filename_by_reader(const void *pArg, char *full_filename)
|
|
{
|
|
const TrunkBinLogReader *pReader;
|
|
static char buff[MAX_PATH_SIZE];
|
|
|
|
pReader = (const TrunkBinLogReader *)pArg;
|
|
if (full_filename == NULL)
|
|
{
|
|
full_filename = buff;
|
|
}
|
|
|
|
return trunk_get_mark_filename_by_id_and_port(pReader->storage_id, \
|
|
g_server_port, full_filename, MAX_PATH_SIZE);
|
|
}
|
|
|
|
static char *trunk_get_mark_filename_by_id(const char *storage_id,
|
|
char *full_filename, const int filename_size)
|
|
{
|
|
return trunk_get_mark_filename_by_id_and_port(storage_id, g_server_port, \
|
|
full_filename, filename_size);
|
|
}
|
|
|
|
int trunk_reader_init(const FDFSStorageBrief *pStorage,
|
|
TrunkBinLogReader *pReader, const bool reset_binlog_offset)
|
|
{
|
|
IniContext iniContext;
|
|
int result;
|
|
int64_t saved_binlog_offset;
|
|
bool bFileExist;
|
|
|
|
saved_binlog_offset = pReader->binlog_offset;
|
|
|
|
memset(pReader, 0, sizeof(TrunkBinLogReader));
|
|
pReader->binlog_fd = -1;
|
|
|
|
pReader->binlog_buff.buffer = (char *)malloc( \
|
|
TRUNK_BINLOG_BUFFER_SIZE);
|
|
if (pReader->binlog_buff.buffer == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"malloc %d bytes fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, TRUNK_BINLOG_BUFFER_SIZE, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : ENOMEM;
|
|
}
|
|
pReader->binlog_buff.current = pReader->binlog_buff.buffer;
|
|
|
|
if (pStorage == NULL)
|
|
{
|
|
strcpy(pReader->storage_id, "0.0.0.0");
|
|
}
|
|
else
|
|
{
|
|
strcpy(pReader->storage_id, pStorage->id);
|
|
}
|
|
trunk_mark_filename_by_reader(pReader, pReader->mark_filename);
|
|
|
|
if (pStorage == NULL)
|
|
{
|
|
bFileExist = false;
|
|
pReader->binlog_offset = saved_binlog_offset;
|
|
}
|
|
else
|
|
{
|
|
bFileExist = fileExists(pReader->mark_filename);
|
|
if (!bFileExist && (g_use_storage_id && pStorage != NULL))
|
|
{
|
|
char old_mark_filename[MAX_PATH_SIZE];
|
|
trunk_get_mark_filename_by_ip_and_port(
|
|
pStorage->ip_addr, g_server_port,
|
|
old_mark_filename, sizeof(old_mark_filename));
|
|
if (fileExists(old_mark_filename))
|
|
{
|
|
if (rename(old_mark_filename,
|
|
pReader->mark_filename) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"rename file %s to %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, old_mark_filename,
|
|
pReader->mark_filename, errno,
|
|
STRERROR(errno));
|
|
return errno != 0 ? errno : EACCES;
|
|
}
|
|
bFileExist = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (bFileExist)
|
|
{
|
|
memset(&iniContext, 0, sizeof(IniContext));
|
|
if ((result=iniLoadFromFile(pReader->mark_filename,
|
|
&iniContext)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"load from mark file \"%s\" fail, "
|
|
"error code: %d", __LINE__,
|
|
pReader->mark_filename, result);
|
|
return result;
|
|
}
|
|
|
|
if (iniContext.global.count < 1)
|
|
{
|
|
iniFreeContext(&iniContext);
|
|
logError("file: "__FILE__", line: %d, "
|
|
"in mark file \"%s\", item count: %d < 1",
|
|
__LINE__, pReader->mark_filename,
|
|
iniContext.global.count);
|
|
return ENOENT;
|
|
}
|
|
|
|
pReader->binlog_offset = iniGetInt64Value(NULL,
|
|
MARK_ITEM_BINLOG_FILE_OFFSET,
|
|
&iniContext, -1);
|
|
if (pReader->binlog_offset < 0)
|
|
{
|
|
iniFreeContext(&iniContext);
|
|
logError("file: "__FILE__", line: %d, "
|
|
"in mark file \"%s\", binlog_offset: "
|
|
"%"PRId64" < 0", __LINE__,
|
|
pReader->mark_filename,
|
|
pReader->binlog_offset);
|
|
return EINVAL;
|
|
}
|
|
|
|
iniFreeContext(&iniContext);
|
|
}
|
|
|
|
pReader->last_binlog_offset = pReader->binlog_offset;
|
|
if (!bFileExist && pStorage != NULL)
|
|
{
|
|
if ((result=trunk_write_to_mark_file(pReader)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
}
|
|
|
|
if (reset_binlog_offset && pReader->binlog_offset > 0)
|
|
{
|
|
pReader->binlog_offset = 0;
|
|
trunk_write_to_mark_file(pReader);
|
|
}
|
|
|
|
if ((result=trunk_open_readable_binlog(pReader,
|
|
get_binlog_readable_filename, pReader)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
result = trunk_binlog_preread(pReader);
|
|
if (result != 0 && result != ENOENT)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void trunk_reader_destroy(TrunkBinLogReader *pReader)
|
|
{
|
|
if (pReader->binlog_fd >= 0)
|
|
{
|
|
close(pReader->binlog_fd);
|
|
pReader->binlog_fd = -1;
|
|
}
|
|
|
|
if (pReader->binlog_buff.buffer != NULL)
|
|
{
|
|
free(pReader->binlog_buff.buffer);
|
|
pReader->binlog_buff.buffer = NULL;
|
|
pReader->binlog_buff.current = NULL;
|
|
pReader->binlog_buff.length = 0;
|
|
}
|
|
}
|
|
|
|
static int trunk_write_to_mark_file(TrunkBinLogReader *pReader)
|
|
{
|
|
char buff[128];
|
|
int len;
|
|
int result;
|
|
|
|
len = sprintf(buff,
|
|
"%s=%"PRId64"\n",
|
|
MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset);
|
|
|
|
if ((result=safeWriteToFile(pReader->mark_filename, buff, len)) == 0)
|
|
{
|
|
STORAGE_CHOWN(pReader->mark_filename, geteuid(), getegid())
|
|
pReader->last_binlog_offset = pReader->binlog_offset;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static int trunk_binlog_preread(TrunkBinLogReader *pReader)
|
|
{
|
|
int bytes_read;
|
|
int saved_trunk_binlog_write_version;
|
|
|
|
if (pReader->binlog_buff.version == trunk_binlog_write_version &&
|
|
pReader->binlog_buff.length == 0)
|
|
{
|
|
return ENOENT;
|
|
}
|
|
|
|
if (pReader->binlog_buff.length == TRUNK_BINLOG_BUFFER_SIZE) //buff full
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
saved_trunk_binlog_write_version = trunk_binlog_write_version;
|
|
if (pReader->binlog_buff.current != pReader->binlog_buff.buffer)
|
|
{
|
|
if (pReader->binlog_buff.length > 0)
|
|
{
|
|
memcpy(pReader->binlog_buff.buffer, \
|
|
pReader->binlog_buff.current, \
|
|
pReader->binlog_buff.length);
|
|
}
|
|
|
|
pReader->binlog_buff.current = pReader->binlog_buff.buffer;
|
|
}
|
|
|
|
bytes_read = fc_safe_read(pReader->binlog_fd, pReader->binlog_buff.buffer \
|
|
+ pReader->binlog_buff.length, \
|
|
TRUNK_BINLOG_BUFFER_SIZE - pReader->binlog_buff.length);
|
|
if (bytes_read < 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"read from binlog file \"%s\" fail, " \
|
|
"file offset: %"PRId64", " \
|
|
"error no: %d, error info: %s", __LINE__, \
|
|
get_binlog_readable_filename(pReader, NULL), \
|
|
pReader->binlog_offset + pReader->binlog_buff.length, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : EIO;
|
|
}
|
|
else if (bytes_read == 0) //end of binlog file
|
|
{
|
|
pReader->binlog_buff.version = saved_trunk_binlog_write_version;
|
|
return (pReader->binlog_buff.length == 0) ? ENOENT : 0;
|
|
}
|
|
|
|
pReader->binlog_buff.length += bytes_read;
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_do_line_read(TrunkBinLogReader *pReader, \
|
|
char *line, const int line_size, int *line_length)
|
|
{
|
|
char *pLineEnd;
|
|
|
|
if (pReader->binlog_buff.length == 0)
|
|
{
|
|
return ENOENT;
|
|
}
|
|
|
|
pLineEnd = (char *)memchr(pReader->binlog_buff.current, '\n', \
|
|
pReader->binlog_buff.length);
|
|
if (pLineEnd == NULL)
|
|
{
|
|
return ENOENT;
|
|
}
|
|
|
|
*line_length = (pLineEnd - pReader->binlog_buff.current) + 1;
|
|
if (*line_length >= line_size)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"read from binlog file \"%s\" fail, " \
|
|
"file offset: %"PRId64", " \
|
|
"line buffer size: %d is too small! " \
|
|
"<= line length: %d", __LINE__, \
|
|
get_binlog_readable_filename(pReader, NULL), \
|
|
pReader->binlog_offset, line_size, *line_length);
|
|
return ENOSPC;
|
|
}
|
|
|
|
memcpy(line, pReader->binlog_buff.current, *line_length);
|
|
*(line + *line_length) = '\0';
|
|
|
|
pReader->binlog_buff.current = pLineEnd + 1;
|
|
pReader->binlog_buff.length -= *line_length;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int trunk_binlog_read_line(TrunkBinLogReader *pReader, \
|
|
char *line, const int line_size, int *line_length)
|
|
{
|
|
int result;
|
|
|
|
result = trunk_binlog_do_line_read(pReader, line, \
|
|
line_size, line_length);
|
|
if (result != ENOENT)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
result = trunk_binlog_preread(pReader);
|
|
if (result != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
return trunk_binlog_do_line_read(pReader, line, \
|
|
line_size, line_length);
|
|
}
|
|
|
|
int trunk_binlog_read(TrunkBinLogReader *pReader, \
|
|
TrunkBinLogRecord *pRecord, int *record_length)
|
|
{
|
|
#define COL_COUNT 8
|
|
char line[TRUNK_BINLOG_LINE_SIZE];
|
|
char *cols[COL_COUNT];
|
|
int result;
|
|
|
|
result = trunk_binlog_read_line(pReader, line, \
|
|
sizeof(line), record_length);
|
|
if (result != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
if ((result=splitEx(line, ' ', cols, COL_COUNT)) < COL_COUNT)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"read data from binlog file \"%s\" fail, " \
|
|
"file offset: %"PRId64", " \
|
|
"read item count: %d < %d", \
|
|
__LINE__, get_binlog_readable_filename(pReader, NULL), \
|
|
pReader->binlog_offset, result, COL_COUNT);
|
|
return ENOENT;
|
|
}
|
|
|
|
pRecord->timestamp = atoi(cols[0]);
|
|
pRecord->op_type = *(cols[1]);
|
|
pRecord->trunk.path.store_path_index = atoi(cols[2]);
|
|
pRecord->trunk.path.sub_path_high = atoi(cols[3]);
|
|
pRecord->trunk.path.sub_path_low = atoi(cols[4]);
|
|
pRecord->trunk.file.id = atoi(cols[5]);
|
|
pRecord->trunk.file.offset = atoi(cols[6]);
|
|
pRecord->trunk.file.size = atoi(cols[7]);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int trunk_unlink_mark_file(const char *storage_id)
|
|
{
|
|
char old_filename[MAX_PATH_SIZE];
|
|
char new_filename[MAX_PATH_SIZE];
|
|
time_t t;
|
|
struct tm tm;
|
|
|
|
t = g_current_time;
|
|
localtime_r(&t, &tm);
|
|
|
|
trunk_get_mark_filename_by_id(storage_id, old_filename,
|
|
sizeof(old_filename));
|
|
if (!fileExists(old_filename))
|
|
{
|
|
return ENOENT;
|
|
}
|
|
|
|
snprintf(new_filename, sizeof(new_filename),
|
|
"%s.%04d%02d%02d%02d%02d%02d", old_filename,
|
|
tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday,
|
|
tm.tm_hour, tm.tm_min, tm.tm_sec);
|
|
if (rename(old_filename, new_filename) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"rename file %s to %s fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, old_filename, new_filename,
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : EACCES;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int trunk_rename_mark_file(const char *old_ip_addr, const int old_port, \
|
|
const char *new_ip_addr, const int new_port)
|
|
{
|
|
char old_filename[MAX_PATH_SIZE];
|
|
char new_filename[MAX_PATH_SIZE];
|
|
|
|
trunk_get_mark_filename_by_id_and_port(old_ip_addr, old_port, \
|
|
old_filename, sizeof(old_filename));
|
|
if (!fileExists(old_filename))
|
|
{
|
|
return ENOENT;
|
|
}
|
|
|
|
trunk_get_mark_filename_by_id_and_port(new_ip_addr, new_port, \
|
|
new_filename, sizeof(new_filename));
|
|
if (fileExists(new_filename))
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, " \
|
|
"mark file %s already exists, " \
|
|
"ignore rename file %s to %s", \
|
|
__LINE__, new_filename, old_filename, new_filename);
|
|
return EEXIST;
|
|
}
|
|
|
|
if (rename(old_filename, new_filename) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"rename file %s to %s fail" \
|
|
", errno: %d, error info: %s", \
|
|
__LINE__, old_filename, new_filename, \
|
|
errno, STRERROR(errno));
|
|
return errno != 0 ? errno : EACCES;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void trunk_sync_thread_exit(TrunkSyncThreadInfo *thread_data,
|
|
const int port)
|
|
{
|
|
int result;
|
|
|
|
if ((result=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_lock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
thread_data->running = false;
|
|
g_trunk_sync_thread_count--;
|
|
|
|
if ((result=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_unlock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
}
|
|
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"trunk sync thread to storage server %s:%d exit",
|
|
__LINE__, thread_data->pStorage->ip_addr, port);
|
|
}
|
|
|
|
static int trunk_sync_data(TrunkBinLogReader *pReader, \
|
|
ConnectionInfo *pStorage)
|
|
{
|
|
int length;
|
|
char *p;
|
|
int result;
|
|
TrackerHeader header;
|
|
char in_buff[1];
|
|
char *pBuff;
|
|
int64_t in_bytes;
|
|
|
|
p = pReader->binlog_buff.buffer + pReader->binlog_buff.length - 1;
|
|
while (p != pReader->binlog_buff.buffer && *p != '\n')
|
|
{
|
|
p--;
|
|
}
|
|
|
|
length = p - pReader->binlog_buff.buffer;
|
|
if (length == 0)
|
|
{
|
|
logWarning("FILE: "__FILE__", line: %d, " \
|
|
"no buffer to sync, buffer length: %d, " \
|
|
"should try again later", __LINE__, \
|
|
pReader->binlog_buff.length);
|
|
return ENOENT;
|
|
}
|
|
length++;
|
|
|
|
memset(&header, 0, sizeof(header));
|
|
long2buff(length, header.pkg_len);
|
|
header.cmd = STORAGE_PROTO_CMD_TRUNK_SYNC_BINLOG;
|
|
if ((result=tcpsenddata_nb(pStorage->sock, &header, \
|
|
sizeof(TrackerHeader), g_fdfs_network_timeout)) != 0)
|
|
{
|
|
logError("FILE: "__FILE__", line: %d, " \
|
|
"send data to storage server %s:%d fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, pStorage->ip_addr, pStorage->port, \
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
if ((result=tcpsenddata_nb(pStorage->sock, pReader->binlog_buff.buffer,\
|
|
length, g_fdfs_network_timeout)) != 0)
|
|
{
|
|
logError("FILE: "__FILE__", line: %d, " \
|
|
"send data to storage server %s:%d fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, pStorage->ip_addr, pStorage->port, \
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
pBuff = in_buff;
|
|
if ((result=fdfs_recv_response(pStorage, &pBuff, 0, &in_bytes)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"fdfs_recv_response fail, result: %d",
|
|
__LINE__, result);
|
|
return result;
|
|
}
|
|
|
|
pReader->binlog_offset += length;
|
|
pReader->binlog_buff.length -= length;
|
|
if (pReader->binlog_buff.length > 0)
|
|
{
|
|
pReader->binlog_buff.current = pReader->binlog_buff.buffer + length;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void *trunk_sync_thread_entrance(void* arg)
|
|
{
|
|
TrunkSyncThreadInfo *thread_data;
|
|
const FDFSStorageBrief *pStorage;
|
|
TrunkBinLogReader reader;
|
|
ConnectionInfo storage_server;
|
|
char local_ip_addr[IP_ADDRESS_SIZE];
|
|
int read_result;
|
|
int sync_result;
|
|
int result;
|
|
time_t current_time;
|
|
time_t last_keep_alive_time;
|
|
|
|
memset(local_ip_addr, 0, sizeof(local_ip_addr));
|
|
memset(&reader, 0, sizeof(reader));
|
|
reader.binlog_fd = -1;
|
|
|
|
current_time = g_current_time;
|
|
last_keep_alive_time = 0;
|
|
|
|
thread_data = (TrunkSyncThreadInfo *)arg;
|
|
pStorage = thread_data->pStorage;
|
|
|
|
strcpy(storage_server.ip_addr, pStorage->ip_addr);
|
|
storage_server.port = g_server_port;
|
|
storage_server.sock = -1;
|
|
|
|
logInfo("file: "__FILE__", line: %d, " \
|
|
"trunk sync thread to storage server %s:%d started", \
|
|
__LINE__, storage_server.ip_addr, storage_server.port);
|
|
|
|
while (g_continue_flag && g_if_trunker_self && \
|
|
pStorage->status != FDFS_STORAGE_STATUS_DELETED && \
|
|
pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED && \
|
|
pStorage->status != FDFS_STORAGE_STATUS_NONE)
|
|
{
|
|
storage_sync_connect_storage_server_ex(pStorage,
|
|
&storage_server, &g_if_trunker_self);
|
|
|
|
if ((!g_continue_flag) || (!g_if_trunker_self) || \
|
|
pStorage->status == FDFS_STORAGE_STATUS_DELETED || \
|
|
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || \
|
|
pStorage->status == FDFS_STORAGE_STATUS_NONE)
|
|
{
|
|
logError("file: "__FILE__", line: %d, break loop." \
|
|
"g_continue_flag: %d, g_if_trunker_self: %d, " \
|
|
"dest storage status: %d", __LINE__, \
|
|
g_continue_flag, g_if_trunker_self, \
|
|
pStorage->status);
|
|
break;
|
|
}
|
|
|
|
if ((result=trunk_reader_init(pStorage, &reader,
|
|
thread_data->reset_binlog_offset)) != 0)
|
|
{
|
|
logCrit("file: "__FILE__", line: %d, "
|
|
"trunk_reader_init fail, errno=%d, "
|
|
"program exit!", __LINE__, result);
|
|
g_continue_flag = false;
|
|
break;
|
|
}
|
|
|
|
getSockIpaddr(storage_server.sock, \
|
|
local_ip_addr, IP_ADDRESS_SIZE);
|
|
insert_into_local_host_ip(local_ip_addr);
|
|
|
|
/*
|
|
//printf("file: "__FILE__", line: %d, " \
|
|
"storage_server.ip_addr=%s, " \
|
|
"local_ip_addr: %s\n", \
|
|
__LINE__, pStorage->ip_addr, local_ip_addr);
|
|
*/
|
|
|
|
if ((strcmp(pStorage->id, g_my_server_id_str) == 0) ||
|
|
is_local_host_ip(pStorage->ip_addr))
|
|
{ //can't self sync to self
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"ip_addr %s belong to the local host," \
|
|
" trunk sync thread exit.", \
|
|
__LINE__, pStorage->ip_addr);
|
|
fdfs_quit(&storage_server);
|
|
close(storage_server.sock);
|
|
break;
|
|
}
|
|
|
|
if (thread_data->reset_binlog_offset)
|
|
{
|
|
thread_data->reset_binlog_offset = false;
|
|
if (reader.binlog_offset > 0)
|
|
{
|
|
reader.binlog_offset = 0;
|
|
trunk_write_to_mark_file(&reader);
|
|
}
|
|
}
|
|
|
|
if (reader.binlog_offset == 0)
|
|
{
|
|
if ((result=fdfs_deal_no_body_cmd(&storage_server, \
|
|
STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"fdfs_deal_no_body_cmd fail, result: %d",
|
|
__LINE__, result);
|
|
|
|
close(storage_server.sock);
|
|
trunk_reader_destroy(&reader);
|
|
sleep(5);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
sync_result = 0;
|
|
while (g_continue_flag && !thread_data->reset_binlog_offset &&
|
|
pStorage->status != FDFS_STORAGE_STATUS_DELETED &&
|
|
pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED &&
|
|
pStorage->status != FDFS_STORAGE_STATUS_NONE)
|
|
{
|
|
read_result = trunk_binlog_preread(&reader);
|
|
if (read_result == ENOENT)
|
|
{
|
|
if (reader.last_binlog_offset !=
|
|
reader.binlog_offset)
|
|
{
|
|
if (trunk_write_to_mark_file(&reader)!=0)
|
|
{
|
|
logCrit("file: "__FILE__", line: %d, "
|
|
"trunk_write_to_mark_file fail, "
|
|
"program exit!", __LINE__);
|
|
g_continue_flag = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
current_time = g_current_time;
|
|
if (current_time - last_keep_alive_time >= \
|
|
g_heart_beat_interval)
|
|
{
|
|
if (fdfs_active_test(&storage_server)!=0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
last_keep_alive_time = current_time;
|
|
}
|
|
|
|
if (!g_if_trunker_self)
|
|
{
|
|
break;
|
|
}
|
|
|
|
usleep(g_sync_wait_usec);
|
|
continue;
|
|
}
|
|
|
|
if (read_result != 0)
|
|
{
|
|
sleep(5);
|
|
continue;
|
|
}
|
|
|
|
if ((sync_result=trunk_sync_data(&reader, \
|
|
&storage_server)) != 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (g_sync_interval > 0)
|
|
{
|
|
usleep(g_sync_interval);
|
|
}
|
|
}
|
|
|
|
if (reader.last_binlog_offset != reader.binlog_offset)
|
|
{
|
|
if (trunk_write_to_mark_file(&reader) != 0)
|
|
{
|
|
logCrit("file: "__FILE__", line: %d, " \
|
|
"trunk_write_to_mark_file fail, " \
|
|
"program exit!", __LINE__);
|
|
g_continue_flag = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
close(storage_server.sock);
|
|
storage_server.sock = -1;
|
|
trunk_reader_destroy(&reader);
|
|
|
|
if (!g_continue_flag)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (!(sync_result == ENOTCONN || sync_result == EIO))
|
|
{
|
|
sleep(1);
|
|
}
|
|
}
|
|
|
|
if (storage_server.sock >= 0)
|
|
{
|
|
close(storage_server.sock);
|
|
}
|
|
trunk_reader_destroy(&reader);
|
|
|
|
trunk_sync_thread_exit(thread_data, storage_server.port);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
int trunk_sync_thread_start_all()
|
|
{
|
|
FDFSStorageServer *pServer;
|
|
FDFSStorageServer *pEnd;
|
|
int result;
|
|
int ret;
|
|
|
|
result = 0;
|
|
pEnd = g_storage_servers + g_storage_count;
|
|
for (pServer=g_storage_servers; pServer<pEnd; pServer++)
|
|
{
|
|
ret = trunk_sync_thread_start(&(pServer->server));
|
|
if (ret != 0)
|
|
{
|
|
result = ret;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
TrunkSyncThreadInfo *trunk_sync_alloc_thread_data()
|
|
{
|
|
TrunkSyncThreadInfo **thread_info;
|
|
TrunkSyncThreadInfo **info_end;
|
|
TrunkSyncThreadInfo **old_thread_data;
|
|
TrunkSyncThreadInfo **new_thread_data;
|
|
TrunkSyncThreadInfo **new_data_start;
|
|
int alloc_count;
|
|
int bytes;
|
|
|
|
if (g_trunk_sync_thread_count + 1 < sync_thread_info_array.alloc_count)
|
|
{
|
|
info_end = sync_thread_info_array.thread_data +
|
|
sync_thread_info_array.alloc_count;
|
|
for (thread_info=sync_thread_info_array.thread_data;
|
|
thread_info<info_end; thread_info++)
|
|
{
|
|
if (!(*thread_info)->running)
|
|
{
|
|
return *thread_info;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (sync_thread_info_array.alloc_count == 0)
|
|
{
|
|
alloc_count = 1;
|
|
}
|
|
else
|
|
{
|
|
alloc_count = sync_thread_info_array.alloc_count * 2;
|
|
}
|
|
|
|
bytes = sizeof(TrunkSyncThreadInfo *) * alloc_count;
|
|
new_thread_data = (TrunkSyncThreadInfo **)malloc(bytes);
|
|
if (new_thread_data == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"malloc %d bytes fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, bytes, errno, STRERROR(errno));
|
|
return NULL;
|
|
}
|
|
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"alloc %d thread data entries",
|
|
__LINE__, alloc_count);
|
|
|
|
if (sync_thread_info_array.alloc_count > 0)
|
|
{
|
|
memcpy(new_thread_data, sync_thread_info_array.thread_data,
|
|
sizeof(TrunkSyncThreadInfo *) *
|
|
sync_thread_info_array.alloc_count);
|
|
}
|
|
|
|
new_data_start = new_thread_data + sync_thread_info_array.alloc_count;
|
|
info_end = new_thread_data + alloc_count;
|
|
for (thread_info=new_data_start; thread_info<info_end; thread_info++)
|
|
{
|
|
*thread_info = (TrunkSyncThreadInfo *)malloc(
|
|
sizeof(TrunkSyncThreadInfo));
|
|
if (*thread_info == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"malloc %d bytes fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, (int)sizeof(TrunkSyncThreadInfo),
|
|
errno, STRERROR(errno));
|
|
return NULL;
|
|
}
|
|
|
|
memset(*thread_info, 0, sizeof(TrunkSyncThreadInfo));
|
|
}
|
|
|
|
old_thread_data = sync_thread_info_array.thread_data;
|
|
sync_thread_info_array.thread_data = new_thread_data;
|
|
sync_thread_info_array.alloc_count = alloc_count;
|
|
if (old_thread_data != NULL)
|
|
{
|
|
free(old_thread_data);
|
|
}
|
|
|
|
return *new_data_start;
|
|
}
|
|
|
|
int trunk_sync_thread_start(const FDFSStorageBrief *pStorage)
|
|
{
|
|
int result;
|
|
int lock_res;
|
|
pthread_attr_t pattr;
|
|
TrunkSyncThreadInfo *thread_data;
|
|
|
|
if (pStorage->status == FDFS_STORAGE_STATUS_DELETED ||
|
|
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED ||
|
|
pStorage->status == FDFS_STORAGE_STATUS_NONE)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
if ((strcmp(pStorage->id, g_my_server_id_str) == 0) ||
|
|
is_local_host_ip(pStorage->ip_addr)) //can't self sync to self
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
if ((result=init_pthread_attr(&pattr, g_thread_stack_size)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
if ((lock_res=pthread_mutex_lock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_lock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, lock_res, STRERROR(lock_res));
|
|
}
|
|
|
|
do
|
|
{
|
|
thread_data = trunk_sync_alloc_thread_data();
|
|
if (thread_data == NULL)
|
|
{
|
|
result = ENOMEM;
|
|
break;
|
|
}
|
|
|
|
thread_data->running = true;
|
|
thread_data->pStorage = pStorage;
|
|
if ((result=pthread_create(&thread_data->tid, &pattr,
|
|
trunk_sync_thread_entrance,
|
|
(void *)thread_data)) != 0)
|
|
{
|
|
thread_data->running = false;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"create thread failed, errno: %d, "
|
|
"error info: %s",
|
|
__LINE__, result, STRERROR(result));
|
|
break;
|
|
}
|
|
|
|
g_trunk_sync_thread_count++;
|
|
} while (0);
|
|
|
|
if ((lock_res=pthread_mutex_unlock(&trunk_sync_thread_lock)) != 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call pthread_mutex_unlock fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, lock_res, STRERROR(lock_res));
|
|
}
|
|
|
|
pthread_attr_destroy(&pattr);
|
|
return result;
|
|
}
|
|
|
|
void trunk_waiting_sync_thread_exit()
|
|
{
|
|
int saved_trunk_sync_thread_count;
|
|
int count;
|
|
|
|
saved_trunk_sync_thread_count = g_trunk_sync_thread_count;
|
|
if (saved_trunk_sync_thread_count > 0)
|
|
{
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"waiting %d trunk sync threads exit ...",
|
|
__LINE__, saved_trunk_sync_thread_count);
|
|
}
|
|
|
|
count = 0;
|
|
while (g_trunk_sync_thread_count > 0 && count < 60)
|
|
{
|
|
usleep(50000);
|
|
count++;
|
|
}
|
|
|
|
if (g_trunk_sync_thread_count > 0)
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"kill %d trunk sync threads.",
|
|
__LINE__, g_trunk_sync_thread_count);
|
|
kill_trunk_sync_threads();
|
|
}
|
|
|
|
if (saved_trunk_sync_thread_count > 0)
|
|
{
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"%d trunk sync threads exited",
|
|
__LINE__, saved_trunk_sync_thread_count);
|
|
}
|
|
}
|
|
|
|
int trunk_unlink_all_mark_files()
|
|
{
|
|
char file_path[MAX_PATH_SIZE];
|
|
char full_filename[MAX_PATH_SIZE];
|
|
DIR *dir;
|
|
struct dirent *ent;
|
|
int result;
|
|
int name_len;
|
|
time_t t;
|
|
struct tm tm;
|
|
|
|
t = g_current_time;
|
|
localtime_r(&t, &tm);
|
|
|
|
snprintf(file_path, sizeof(file_path),
|
|
"%s/data/%s", g_fdfs_base_path, TRUNK_DIR_NAME);
|
|
|
|
if ((dir=opendir(file_path)) == NULL)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"call opendir %s fail, errno: %d, error info: %s",
|
|
__LINE__, file_path, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
|
|
result = 0;
|
|
while ((ent=readdir(dir)) != NULL)
|
|
{
|
|
name_len = strlen(ent->d_name);
|
|
if (name_len <= TRUNK_SYNC_MARK_FILE_EXT_LEN)
|
|
{
|
|
continue;
|
|
}
|
|
if (memcmp(ent->d_name + (name_len -
|
|
TRUNK_SYNC_MARK_FILE_EXT_LEN),
|
|
TRUNK_SYNC_MARK_FILE_EXT_STR,
|
|
TRUNK_SYNC_MARK_FILE_EXT_LEN) != 0)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
snprintf(full_filename, sizeof(full_filename), "%s/%s",
|
|
file_path, ent->d_name);
|
|
if (unlink(full_filename) != 0)
|
|
{
|
|
result = errno != 0 ? errno : EPERM;
|
|
if (result == ENOENT)
|
|
{
|
|
result = 0;
|
|
}
|
|
else
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"unlink %s fail, errno: %d, error info: %s",
|
|
__LINE__, full_filename,
|
|
result, STRERROR(result));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
closedir(dir);
|
|
return result;
|
|
}
|
|
|
|
int trunk_binlog_get_write_version()
|
|
{
|
|
return trunk_binlog_write_version;
|
|
}
|