fastdfs/storage/storage_sync.c

3404 lines
85 KiB
C

/** * Copyright (C) 2008 Happy Fish / YuQing
*
* FastDFS may be copied only under the terms of the GNU General
* Public License V3, which may be found in the FastDFS source kit.
* Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
**/
//storage_sync.c
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include "fdfs_define.h"
#include "fastcommon/logger.h"
#include "fdfs_global.h"
#include "fastcommon/sockopt.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/ini_file_reader.h"
#include "tracker_types.h"
#include "tracker_proto.h"
#include "storage_global.h"
#include "storage_func.h"
#include "storage_ip_changed_dealer.h"
#include "tracker_client_thread.h"
#include "storage_client.h"
#include "trunk_mem.h"
#include "storage_sync_func.h"
#include "storage_sync.h"
#define SYNC_BINLOG_FILE_MAX_SIZE 1024 * 1024 * 1024
#define SYNC_BINLOG_FILE_PREFIX "binlog"
#define SYNC_BINLOG_INDEX_FILENAME_OLD SYNC_BINLOG_FILE_PREFIX".index"
#define SYNC_BINLOG_INDEX_FILENAME SYNC_BINLOG_FILE_PREFIX"_index.dat"
#define SYNC_MARK_FILE_EXT ".mark"
#define SYNC_BINLOG_FILE_EXT_FMT ".%03d"
#define SYNC_DIR_NAME "sync"
#define MARK_ITEM_BINLOG_FILE_INDEX "binlog_index"
#define MARK_ITEM_BINLOG_FILE_OFFSET "binlog_offset"
#define MARK_ITEM_NEED_SYNC_OLD "need_sync_old"
#define MARK_ITEM_SYNC_OLD_DONE "sync_old_done"
#define MARK_ITEM_UNTIL_TIMESTAMP "until_timestamp"
#define MARK_ITEM_SCAN_ROW_COUNT "scan_row_count"
#define MARK_ITEM_SYNC_ROW_COUNT "sync_row_count"
#define SYNC_BINLOG_WRITE_BUFF_SIZE (16 * 1024)
#define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write"
#define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress"
int g_binlog_fd = -1;
int g_binlog_index = 0;
static int64_t binlog_file_size = 0;
static int binlog_compress_index = 0;
int g_storage_sync_thread_count = 0;
static pthread_mutex_t sync_thread_lock;
static char *binlog_write_cache_buff = NULL;
static int binlog_write_cache_len = 0;
static int binlog_write_version = 1;
/* save sync thread ids */
static pthread_t *sync_tids = NULL;
static struct fc_list_head reader_head;
static int storage_write_to_mark_file(StorageBinLogReader *pReader);
static int storage_binlog_reader_skip(StorageBinLogReader *pReader);
static int storage_binlog_fsync(const bool bNeedLock);
static int storage_binlog_preread(StorageBinLogReader *pReader);
/**
8 bytes: filename bytes
8 bytes: file size
4 bytes: source op timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
filename bytes : filename
file size bytes: file content
**/
static int storage_sync_copy_file(ConnectionInfo *pStorageServer, \
StorageBinLogReader *pReader, const StorageBinLogRecord *pRecord, \
char proto_cmd)
{
TrackerHeader *pHeader;
char *p;
char *pBuff;
char full_filename[MAX_PATH_SIZE];
char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];
char in_buff[1];
struct stat stat_buf;
FDFSTrunkFullInfo trunkInfo;
FDFSTrunkHeader trunkHeader;
int64_t file_offset;
int64_t in_bytes;
int64_t total_send_bytes;
int result;
bool need_sync_file;
if ((result=trunk_file_stat(pRecord->store_path_index,
pRecord->true_filename, pRecord->true_filename_len,
&stat_buf, &trunkInfo, &trunkHeader)) != 0)
{
if (result == ENOENT)
{
if(pRecord->op_type==STORAGE_OP_TYPE_SOURCE_CREATE_FILE)
{
logDebug("file: "__FILE__", line: %d, " \
"sync data file, logic file: %s " \
"not exists, maybe deleted later?", \
__LINE__, pRecord->filename);
}
return 0;
}
else
{
logError("file: "__FILE__", line: %d, " \
"call stat fail, logic file: %s, "\
"error no: %d, error info: %s", \
__LINE__, pRecord->filename, \
result, STRERROR(result));
return result;
}
}
need_sync_file = true;
if (pReader->last_file_exist && proto_cmd == \
STORAGE_PROTO_CMD_SYNC_CREATE_FILE)
{
FDFSFileInfo file_info;
result = storage_query_file_info_ex(NULL, \
pStorageServer, g_group_name, \
pRecord->filename, &file_info, true);
if (result == 0)
{
if (file_info.file_size == stat_buf.st_size)
{
logDebug("file: "__FILE__", line: %d, " \
"sync data file, logic file: %s " \
"on dest server %s:%d already exists, "\
"and same as mine, ignore it", \
__LINE__, pRecord->filename, \
pStorageServer->ip_addr, \
pStorageServer->port);
need_sync_file = false;
}
else
{
logWarning("file: "__FILE__", line: %d, " \
"sync data file, logic file: %s " \
"on dest server %s:%d already exists, "\
"but file size: %"PRId64 \
" not same as mine: %"PRId64 \
", need re-sync it", __LINE__, \
pRecord->filename, pStorageServer->ip_addr,\
pStorageServer->port, \
file_info.file_size, \
(int64_t)stat_buf.st_size);
proto_cmd = STORAGE_PROTO_CMD_SYNC_UPDATE_FILE;
}
}
else if (result != ENOENT)
{
return result;
}
}
if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
file_offset = TRUNK_FILE_START_OFFSET(trunkInfo);
trunk_get_full_filename((&trunkInfo), full_filename, \
sizeof(full_filename));
}
else
{
file_offset = 0;
sprintf(full_filename, "%s/data/%s", \
g_fdfs_store_paths.paths[pRecord->store_path_index].path, \
pRecord->true_filename);
}
total_send_bytes = 0;
//printf("sync create file: %s\n", pRecord->filename);
do
{
int64_t body_len;
pHeader = (TrackerHeader *)out_buff;
memset(pHeader, 0, sizeof(TrackerHeader));
body_len = 2 * FDFS_PROTO_PKG_LEN_SIZE + \
4 + FDFS_GROUP_NAME_MAX_LEN + \
pRecord->filename_len;
if (need_sync_file)
{
body_len += stat_buf.st_size;
}
long2buff(body_len, pHeader->pkg_len);
pHeader->cmd = proto_cmd;
pHeader->status = need_sync_file ? 0 : EEXIST;
p = out_buff + sizeof(TrackerHeader);
long2buff(pRecord->filename_len, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(stat_buf.st_size, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
int2buff(pRecord->timestamp, p);
p += 4;
sprintf(p, "%s", g_group_name);
p += FDFS_GROUP_NAME_MAX_LEN;
memcpy(p, pRecord->filename, pRecord->filename_len);
p += pRecord->filename_len;
if((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
p - out_buff, SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"sync data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
break;
}
if (need_sync_file && (stat_buf.st_size > 0) && \
((result=tcpsendfile_ex(pStorageServer->sock, \
full_filename, file_offset, stat_buf.st_size, \
SF_G_NETWORK_TIMEOUT, &total_send_bytes)) != 0))
{
logError("file: "__FILE__", line: %d, " \
"sync data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
break;
}
pBuff = in_buff;
if ((result=fdfs_recv_response(pStorageServer, \
&pBuff, 0, &in_bytes)) != 0)
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
break;
}
} while (0);
__sync_add_and_fetch(&g_storage_stat.total_sync_out_bytes,
total_send_bytes);
if (result == 0)
{
__sync_add_and_fetch(&g_storage_stat.success_sync_out_bytes,
total_send_bytes);
}
if (result == EEXIST)
{
if (need_sync_file && pRecord->op_type == \
STORAGE_OP_TYPE_SOURCE_CREATE_FILE)
{
logWarning("file: "__FILE__", line: %d, " \
"storage server ip: %s:%d, data file: %s " \
"already exists, maybe some mistake?", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, pRecord->filename);
}
pReader->last_file_exist = true;
return 0;
}
else if (result == 0)
{
pReader->last_file_exist = false;
return 0;
}
else
{
return result;
}
}
/**
8 bytes: filename bytes
8 bytes: start offset
8 bytes: append length
4 bytes: source op timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
filename bytes : filename
file size bytes: file content
**/
static int storage_sync_modify_file(ConnectionInfo *pStorageServer, \
StorageBinLogReader *pReader, StorageBinLogRecord *pRecord, \
const char cmd)
{
#define SYNC_MODIFY_FIELD_COUNT 3
TrackerHeader *pHeader;
char *p;
char *pBuff;
char *fields[SYNC_MODIFY_FIELD_COUNT];
char full_filename[MAX_PATH_SIZE];
char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];
char in_buff[1];
struct stat stat_buf;
int64_t in_bytes;
int64_t total_send_bytes;
int64_t start_offset;
int64_t modify_length;
int result;
int count;
if ((count=splitEx(pRecord->filename, ' ', fields, SYNC_MODIFY_FIELD_COUNT))
!= SYNC_MODIFY_FIELD_COUNT)
{
logError("file: "__FILE__", line: %d, " \
"the format of binlog not correct, filename: %s", \
__LINE__, pRecord->filename);
return EINVAL;
}
start_offset = strtoll((fields[1]), NULL, 10);
modify_length = strtoll((fields[2]), NULL, 10);
pRecord->filename_len = strlen(pRecord->filename);
pRecord->true_filename_len = pRecord->filename_len;
if ((result=storage_split_filename_ex(pRecord->filename, \
&pRecord->true_filename_len, pRecord->true_filename, \
&pRecord->store_path_index)) != 0)
{
return result;
}
snprintf(full_filename, sizeof(full_filename), \
"%s/data/%s", g_fdfs_store_paths.paths[pRecord->store_path_index].path, \
pRecord->true_filename);
if (lstat(full_filename, &stat_buf) != 0)
{
if (errno == ENOENT)
{
logDebug("file: "__FILE__", line: %d, " \
"sync appender file, file: %s not exists, "\
"maybe deleted later?", \
__LINE__, full_filename);
return 0;
}
else
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, " \
"call stat fail, appender file: %s, "\
"error no: %d, error info: %s", \
__LINE__, full_filename, \
result, STRERROR(result));
return result;
}
}
if (stat_buf.st_size < start_offset + modify_length)
{
logWarning("file: "__FILE__", line: %d, " \
"appender file: %s 'size: %"PRId64 \
" < %"PRId64", maybe some mistakes " \
"happened, skip sync this appender file", __LINE__, \
full_filename, stat_buf.st_size, \
start_offset + modify_length);
return 0;
}
total_send_bytes = 0;
//printf("sync create file: %s\n", pRecord->filename);
do
{
int64_t body_len;
pHeader = (TrackerHeader *)out_buff;
memset(pHeader, 0, sizeof(TrackerHeader));
body_len = 3 * FDFS_PROTO_PKG_LEN_SIZE + \
4 + FDFS_GROUP_NAME_MAX_LEN + \
pRecord->filename_len + modify_length;
long2buff(body_len, pHeader->pkg_len);
pHeader->cmd = cmd;
pHeader->status = 0;
p = out_buff + sizeof(TrackerHeader);
long2buff(pRecord->filename_len, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(start_offset, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(modify_length, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
int2buff(pRecord->timestamp, p);
p += 4;
sprintf(p, "%s", g_group_name);
p += FDFS_GROUP_NAME_MAX_LEN;
memcpy(p, pRecord->filename, pRecord->filename_len);
p += pRecord->filename_len;
if((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
p - out_buff, SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"sync data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
break;
}
if ((result=tcpsendfile_ex(pStorageServer->sock, \
full_filename, start_offset, modify_length, \
SF_G_NETWORK_TIMEOUT, &total_send_bytes)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"sync data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
break;
}
pBuff = in_buff;
if ((result=fdfs_recv_response(pStorageServer, \
&pBuff, 0, &in_bytes)) != 0)
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
break;
}
} while (0);
__sync_add_and_fetch(&g_storage_stat.total_sync_out_bytes,
total_send_bytes);
if (result == 0)
{
__sync_add_and_fetch(&g_storage_stat.success_sync_out_bytes,
total_send_bytes);
}
return result == EEXIST ? 0 : result;
}
/**
8 bytes: filename bytes
8 bytes: old file size
8 bytes: new file size
4 bytes: source op timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
filename bytes : filename
**/
static int storage_sync_truncate_file(ConnectionInfo *pStorageServer, \
StorageBinLogReader *pReader, StorageBinLogRecord *pRecord)
{
#define SYNC_TRUNCATE_FIELD_COUNT 3
TrackerHeader *pHeader;
char *p;
char *pBuff;
char *fields[SYNC_TRUNCATE_FIELD_COUNT];
char full_filename[MAX_PATH_SIZE];
char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];
char in_buff[1];
struct stat stat_buf;
int64_t in_bytes;
int64_t old_file_size;
int64_t new_file_size;
int result;
int count;
if ((count=splitEx(pRecord->filename, ' ', fields,
SYNC_TRUNCATE_FIELD_COUNT)) != SYNC_TRUNCATE_FIELD_COUNT)
{
logError("file: "__FILE__", line: %d, " \
"the format of binlog not correct, filename: %s", \
__LINE__, pRecord->filename);
return EINVAL;
}
old_file_size = strtoll((fields[1]), NULL, 10);
new_file_size = strtoll((fields[2]), NULL, 10);
pRecord->filename_len = strlen(pRecord->filename);
pRecord->true_filename_len = pRecord->filename_len;
if ((result=storage_split_filename_ex(pRecord->filename, \
&pRecord->true_filename_len, pRecord->true_filename, \
&pRecord->store_path_index)) != 0)
{
return result;
}
snprintf(full_filename, sizeof(full_filename), \
"%s/data/%s", g_fdfs_store_paths.paths[pRecord->store_path_index].path, \
pRecord->true_filename);
if (lstat(full_filename, &stat_buf) != 0)
{
if (errno == ENOENT)
{
logDebug("file: "__FILE__", line: %d, " \
"sync appender file, file: %s not exists, "\
"maybe deleted later?", \
__LINE__, full_filename);
return 0;
}
else
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, " \
"call stat fail, appender file: %s, "\
"error no: %d, error info: %s", \
__LINE__, full_filename, \
result, STRERROR(result));
return result;
}
}
if (stat_buf.st_size != new_file_size)
{
logDebug("file: "__FILE__", line: %d, " \
"appender file: %s 'size: %"PRId64 \
" != %"PRId64", maybe append/modify later",\
__LINE__, full_filename, stat_buf.st_size,
new_file_size);
}
do
{
int64_t body_len;
pHeader = (TrackerHeader *)out_buff;
memset(pHeader, 0, sizeof(TrackerHeader));
body_len = 3 * FDFS_PROTO_PKG_LEN_SIZE + \
4 + FDFS_GROUP_NAME_MAX_LEN + \
pRecord->filename_len;
long2buff(body_len, pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE;
pHeader->status = 0;
p = out_buff + sizeof(TrackerHeader);
long2buff(pRecord->filename_len, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(old_file_size, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
long2buff(new_file_size, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
int2buff(pRecord->timestamp, p);
p += 4;
sprintf(p, "%s", g_group_name);
p += FDFS_GROUP_NAME_MAX_LEN;
memcpy(p, pRecord->filename, pRecord->filename_len);
p += pRecord->filename_len;
if((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
p - out_buff, SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"sync data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
break;
}
pBuff = in_buff;
if ((result=fdfs_recv_response(pStorageServer, \
&pBuff, 0, &in_bytes)) != 0)
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
break;
}
} while (0);
return result == EEXIST ? 0 : result;
}
/**
send pkg format:
4 bytes: source delete timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
remain bytes: filename
**/
static int storage_sync_delete_file(ConnectionInfo *pStorageServer, \
const StorageBinLogRecord *pRecord)
{
TrackerHeader *pHeader;
char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];
struct stat stat_buf;
FDFSTrunkFullInfo trunkInfo;
FDFSTrunkHeader trunkHeader;
char in_buff[1];
char *pBuff;
int64_t in_bytes;
int result;
if ((result=trunk_file_stat(pRecord->store_path_index, \
pRecord->true_filename, pRecord->true_filename_len, \
&stat_buf, &trunkInfo, &trunkHeader)) == 0)
{
if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_DELETE_FILE)
{
logWarning("file: "__FILE__", line: %d, " \
"sync data file, logic file: %s exists, " \
"maybe created later?", \
__LINE__, pRecord->filename);
}
return 0;
}
memset(out_buff, 0, sizeof(out_buff));
int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader));
memcpy(out_buff + sizeof(TrackerHeader) + 4, g_group_name, \
sizeof(g_group_name));
memcpy(out_buff + sizeof(TrackerHeader) + 4 + FDFS_GROUP_NAME_MAX_LEN, \
pRecord->filename, pRecord->filename_len);
pHeader = (TrackerHeader *)out_buff;
long2buff(4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len, \
pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_SYNC_DELETE_FILE;
if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
sizeof(TrackerHeader) + 4 + FDFS_GROUP_NAME_MAX_LEN + \
pRecord->filename_len, SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("FILE: "__FILE__", line: %d, " \
"send data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
return result;
}
pBuff = in_buff;
result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes);
if (result != 0)
{
if (result == ENOENT)
{
result = 0;
}
else
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
}
}
return result;
}
/**
FDFS_STORAGE_ID_MAX_SIZE bytes: my server id
**/
static int storage_report_my_server_id(ConnectionInfo *pStorageServer)
{
int result;
TrackerHeader *pHeader;
char out_buff[sizeof(TrackerHeader) + FDFS_STORAGE_ID_MAX_SIZE];
char in_buff[1];
char *pBuff;
int64_t in_bytes;
pHeader = (TrackerHeader *)out_buff;
memset(out_buff, 0, sizeof(out_buff));
long2buff(IP_ADDRESS_SIZE, pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_REPORT_SERVER_ID;
strcpy(out_buff + sizeof(TrackerHeader), g_my_server_id_str);
if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
sizeof(TrackerHeader) + FDFS_STORAGE_ID_MAX_SIZE, \
SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("FILE: "__FILE__", line: %d, " \
"send data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
return result;
}
pBuff = in_buff;
result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes);
if (result != 0)
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
}
return result;
}
/**
8 bytes: dest(link) filename length
8 bytes: source filename length
4 bytes: source op timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
dest filename length: dest filename
source filename length: source filename
**/
static int storage_sync_link_file(ConnectionInfo *pStorageServer, \
StorageBinLogRecord *pRecord)
{
TrackerHeader *pHeader;
int result;
char out_buff[sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE + \
4 + FDFS_GROUP_NAME_MAX_LEN + 256];
char in_buff[1];
FDFSTrunkFullInfo trunkInfo;
FDFSTrunkHeader trunkHeader;
int out_body_len;
int64_t in_bytes;
char *pBuff;
struct stat stat_buf;
int fd;
fd = -1;
if ((result=trunk_file_lstat_ex(pRecord->store_path_index, \
pRecord->true_filename, pRecord->true_filename_len, \
&stat_buf, &trunkInfo, &trunkHeader, &fd)) != 0)
{
if (result == ENOENT)
{
if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK)
{
logDebug("file: "__FILE__", line: %d, " \
"sync data file, logic file: %s does not " \
"exist, maybe delete later?", \
__LINE__, pRecord->filename);
}
}
else
{
logError("file: "__FILE__", line: %d, " \
"call stat fail, logic file: %s, "\
"error no: %d, error info: %s", \
__LINE__, pRecord->filename, \
result, STRERROR(result));
}
return 0;
}
if (!S_ISLNK(stat_buf.st_mode))
{
if (fd >= 0)
{
close(fd);
}
if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK)
{
logWarning("file: "__FILE__", line: %d, " \
"sync data file, logic file %s is not " \
"a symbol link, maybe create later?", \
__LINE__, pRecord->filename);
}
return 0;
}
if (pRecord->src_filename_len > 0)
{
if (fd >= 0)
{
close(fd);
}
}
else if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
result = trunk_file_get_content(&trunkInfo, \
stat_buf.st_size, &fd, pRecord->src_filename, \
sizeof(pRecord->src_filename));
close(fd);
if (result != 0)
{
logWarning("file: "__FILE__", line: %d, " \
"logic file: %s, get file content fail, " \
"errno: %d, error info: %s", \
__LINE__, pRecord->filename, \
result, STRERROR(result));
return 0;
}
pRecord->src_filename_len = stat_buf.st_size;
*(pRecord->src_filename + pRecord->src_filename_len) = '\0';
}
else
{
char full_filename[MAX_PATH_SIZE];
char src_full_filename[MAX_PATH_SIZE];
char *p;
char *pSrcFilename;
int src_path_index;
int src_filename_len;
snprintf(full_filename, sizeof(full_filename), \
"%s/data/%s", g_fdfs_store_paths.paths[pRecord->store_path_index].path, \
pRecord->true_filename);
src_filename_len = readlink(full_filename, src_full_filename, \
sizeof(src_full_filename) - 1);
if (src_filename_len <= 0)
{
logWarning("file: "__FILE__", line: %d, " \
"data file: %s, readlink fail, "\
"errno: %d, error info: %s", \
__LINE__, src_full_filename, errno, STRERROR(errno));
return 0;
}
*(src_full_filename + src_filename_len) = '\0';
pSrcFilename = strstr(src_full_filename, "/data/");
if (pSrcFilename == NULL)
{
logError("file: "__FILE__", line: %d, " \
"source data file: %s is invalid", \
__LINE__, src_full_filename);
return EINVAL;
}
pSrcFilename += 6;
p = strstr(pSrcFilename, "/data/");
while (p != NULL)
{
pSrcFilename = p + 6;
p = strstr(pSrcFilename, "/data/");
}
if (g_fdfs_store_paths.count == 1)
{
src_path_index = 0;
}
else
{
*(pSrcFilename - 6) = '\0';
for (src_path_index=0; src_path_index<g_fdfs_store_paths.count; \
src_path_index++)
{
if (strcmp(src_full_filename, \
g_fdfs_store_paths.paths[src_path_index].path) == 0)
{
break;
}
}
if (src_path_index == g_fdfs_store_paths.count)
{
logError("file: "__FILE__", line: %d, " \
"source data file: %s is invalid", \
__LINE__, src_full_filename);
return EINVAL;
}
}
pRecord->src_filename_len = src_filename_len - (pSrcFilename - \
src_full_filename) + 4;
if (pRecord->src_filename_len >= sizeof(pRecord->src_filename))
{
logError("file: "__FILE__", line: %d, " \
"source data file: %s is invalid", \
__LINE__, src_full_filename);
return EINVAL;
}
sprintf(pRecord->src_filename, "%c"FDFS_STORAGE_DATA_DIR_FORMAT"/%s", \
FDFS_STORAGE_STORE_PATH_PREFIX_CHAR, \
src_path_index, pSrcFilename);
}
pHeader = (TrackerHeader *)out_buff;
memset(out_buff, 0, sizeof(out_buff));
long2buff(pRecord->filename_len, out_buff + sizeof(TrackerHeader));
long2buff(pRecord->src_filename_len, out_buff + sizeof(TrackerHeader) + \
FDFS_PROTO_PKG_LEN_SIZE);
int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader) + \
2 * FDFS_PROTO_PKG_LEN_SIZE);
sprintf(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE\
+ 4, "%s", g_group_name);
memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE \
+ 4 + FDFS_GROUP_NAME_MAX_LEN, \
pRecord->filename, pRecord->filename_len);
memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE \
+ 4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len, \
pRecord->src_filename, pRecord->src_filename_len);
out_body_len = 2 * FDFS_PROTO_PKG_LEN_SIZE + 4 + \
FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len + \
pRecord->src_filename_len;
long2buff(out_body_len, pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_SYNC_CREATE_LINK;
if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
sizeof(TrackerHeader) + out_body_len, \
SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("FILE: "__FILE__", line: %d, " \
"send data to storage server %s:%d fail, " \
"errno: %d, error info: %s", \
__LINE__, pStorageServer->ip_addr, \
pStorageServer->port, \
result, STRERROR(result));
return result;
}
pBuff = in_buff;
result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes);
if (result != 0)
{
if (result == ENOENT)
{
result = 0;
}
else
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
}
}
return result;
}
/**
8 bytes: dest filename length
8 bytes: source filename length
4 bytes: source op timestamp
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
dest filename length: dest filename
source filename length: source filename
**/
static int storage_sync_rename_file(ConnectionInfo *pStorageServer,
StorageBinLogReader *pReader, StorageBinLogRecord *pRecord)
{
TrackerHeader *pHeader;
int result;
char out_buff[sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE +
4 + FDFS_GROUP_NAME_MAX_LEN + 256];
char in_buff[1];
int out_body_len;
int64_t in_bytes;
char *pBuff;
char full_filename[MAX_PATH_SIZE];
struct stat stat_buf;
if (pRecord->op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE)
{
return storage_sync_copy_file(pStorageServer,
pReader, pRecord,
STORAGE_PROTO_CMD_SYNC_CREATE_FILE);
}
snprintf(full_filename, sizeof(full_filename), "%s/data/%s",
g_fdfs_store_paths.paths[pRecord->store_path_index].path,
pRecord->true_filename);
if (lstat(full_filename, &stat_buf) != 0)
{
if (errno == ENOENT)
{
logWarning("file: "__FILE__", line: %d, "
"sync file rename, file: %s not exists, "
"maybe deleted later?", __LINE__, full_filename);
return 0;
}
else
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, "
"call stat fail, file: %s, "
"error no: %d, error info: %s",
__LINE__, full_filename,
result, STRERROR(result));
return result;
}
}
pHeader = (TrackerHeader *)out_buff;
memset(out_buff, 0, sizeof(out_buff));
long2buff(pRecord->filename_len, out_buff + sizeof(TrackerHeader));
long2buff(pRecord->src_filename_len, out_buff + sizeof(TrackerHeader) +
FDFS_PROTO_PKG_LEN_SIZE);
int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader) +
2 * FDFS_PROTO_PKG_LEN_SIZE);
sprintf(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE
+ 4, "%s", g_group_name);
memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE
+ 4 + FDFS_GROUP_NAME_MAX_LEN,
pRecord->filename, pRecord->filename_len);
memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE
+ 4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len,
pRecord->src_filename, pRecord->src_filename_len);
out_body_len = 2 * FDFS_PROTO_PKG_LEN_SIZE + 4 +
FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len +
pRecord->src_filename_len;
long2buff(out_body_len, pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_SYNC_RENAME_FILE;
if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff,
sizeof(TrackerHeader) + out_body_len,
SF_G_NETWORK_TIMEOUT)) != 0)
{
logError("FILE: "__FILE__", line: %d, "
"send data to storage server %s:%d fail, "
"errno: %d, error info: %s",
__LINE__, pStorageServer->ip_addr,
pStorageServer->port, result, STRERROR(result));
return result;
}
pBuff = in_buff;
result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes);
if (result != 0)
{
if (result == ENOENT)
{
return storage_sync_copy_file(pStorageServer,
pReader, pRecord,
STORAGE_PROTO_CMD_SYNC_CREATE_FILE);
}
else if (result == EEXIST)
{
logDebug("file: "__FILE__", line: %d, "
"storage server ip: %s:%d, data file: %s "
"already exists", __LINE__, pStorageServer->ip_addr,
pStorageServer->port, pRecord->filename);
return 0;
}
else
{
logError("file: "__FILE__", line: %d, "
"fdfs_recv_response fail, result: %d",
__LINE__, result);
}
}
return result;
}
#define STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) \
if ((!pReader->need_sync_old) || pReader->sync_old_done || \
(pRecord->timestamp > pReader->until_timestamp)) \
{ \
return 0; \
} \
static int storage_sync_data(StorageBinLogReader *pReader, \
ConnectionInfo *pStorageServer, \
StorageBinLogRecord *pRecord)
{
int result;
switch(pRecord->op_type)
{
case STORAGE_OP_TYPE_SOURCE_CREATE_FILE:
result = storage_sync_copy_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_CREATE_FILE);
break;
case STORAGE_OP_TYPE_SOURCE_DELETE_FILE:
result = storage_sync_delete_file( \
pStorageServer, pRecord);
break;
case STORAGE_OP_TYPE_SOURCE_UPDATE_FILE:
result = storage_sync_copy_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE);
break;
case STORAGE_OP_TYPE_SOURCE_APPEND_FILE:
result = storage_sync_modify_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_APPEND_FILE);
if (result == ENOENT) //resync appender file
{
result = storage_sync_copy_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE);
}
break;
case STORAGE_OP_TYPE_SOURCE_MODIFY_FILE:
result = storage_sync_modify_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_MODIFY_FILE);
if (result == ENOENT) //resync appender file
{
result = storage_sync_copy_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE);
}
break;
case STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE:
result = storage_sync_truncate_file(pStorageServer, \
pReader, pRecord);
break;
case STORAGE_OP_TYPE_SOURCE_RENAME_FILE:
result = storage_sync_rename_file(pStorageServer,
pReader, pRecord);
break;
case STORAGE_OP_TYPE_SOURCE_CREATE_LINK:
result = storage_sync_link_file(pStorageServer, \
pRecord);
break;
case STORAGE_OP_TYPE_REPLICA_CREATE_FILE:
STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)
result = storage_sync_copy_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_CREATE_FILE);
break;
case STORAGE_OP_TYPE_REPLICA_DELETE_FILE:
STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)
result = storage_sync_delete_file( \
pStorageServer, pRecord);
break;
case STORAGE_OP_TYPE_REPLICA_UPDATE_FILE:
STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)
result = storage_sync_copy_file(pStorageServer, \
pReader, pRecord, \
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE);
break;
case STORAGE_OP_TYPE_REPLICA_CREATE_LINK:
STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)
result = storage_sync_link_file(pStorageServer, \
pRecord);
break;
case STORAGE_OP_TYPE_REPLICA_RENAME_FILE:
STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)
result = storage_sync_rename_file(pStorageServer,
pReader, pRecord);
break;
case STORAGE_OP_TYPE_REPLICA_APPEND_FILE:
return 0;
case STORAGE_OP_TYPE_REPLICA_MODIFY_FILE:
return 0;
case STORAGE_OP_TYPE_REPLICA_TRUNCATE_FILE:
return 0;
default:
logError("file: "__FILE__", line: %d, " \
"invalid file operation type: %d", \
__LINE__, pRecord->op_type);
return EINVAL;
}
if (result == 0)
{
pReader->sync_row_count++;
if (pReader->sync_row_count - pReader->last_sync_rows >= \
g_write_mark_file_freq)
{
if ((result=storage_write_to_mark_file(pReader)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file " \
"fail, program exit!", __LINE__);
SF_G_CONTINUE_FLAG = false;
return result;
}
}
}
return result;
}
static int write_to_binlog_index(const int binlog_index)
{
char full_filename[MAX_PATH_SIZE];
char buff[256];
int fd;
int len;
snprintf(full_filename, sizeof(full_filename),
"%s/data/"SYNC_DIR_NAME"/%s", SF_G_BASE_PATH_STR,
SYNC_BINLOG_INDEX_FILENAME);
if ((fd=open(full_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0)
{
logError("file: "__FILE__", line: %d, "
"open file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
len = sprintf(buff, "%s=%d\n"
"%s=%d\n",
BINLOG_INDEX_ITEM_CURRENT_WRITE, binlog_index,
BINLOG_INDEX_ITEM_CURRENT_COMPRESS, binlog_compress_index);
if (fc_safe_write(fd, buff, len) != len)
{
logError("file: "__FILE__", line: %d, "
"write to file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, full_filename,
errno, STRERROR(errno));
close(fd);
return errno != 0 ? errno : EIO;
}
close(fd);
SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(full_filename);
return 0;
}
static int get_binlog_index_from_file_old()
{
char full_filename[MAX_PATH_SIZE];
char file_buff[64];
int fd;
int bytes;
snprintf(full_filename, sizeof(full_filename),
"%s/data/"SYNC_DIR_NAME"/%s", SF_G_BASE_PATH_STR,
SYNC_BINLOG_INDEX_FILENAME_OLD);
if ((fd=open(full_filename, O_RDONLY)) >= 0)
{
bytes = fc_safe_read(fd, file_buff, sizeof(file_buff) - 1);
close(fd);
if (bytes <= 0)
{
logError("file: "__FILE__", line: %d, " \
"read file \"%s\" fail, bytes read: %d", \
__LINE__, full_filename, bytes);
return errno != 0 ? errno : EIO;
}
file_buff[bytes] = '\0';
g_binlog_index = atoi(file_buff);
if (g_binlog_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"in file \"%s\", binlog_index: %d < 0", \
__LINE__, full_filename, g_binlog_index);
return EINVAL;
}
}
else
{
g_binlog_index = 0;
}
return 0;
}
static int get_binlog_index_from_file()
{
char full_filename[MAX_PATH_SIZE];
IniContext iniContext;
int result;
snprintf(full_filename, sizeof(full_filename),
"%s/data/"SYNC_DIR_NAME"/%s", SF_G_BASE_PATH_STR,
SYNC_BINLOG_INDEX_FILENAME);
if (access(full_filename, F_OK) != 0)
{
if (errno == ENOENT)
{
if ((result=get_binlog_index_from_file_old()) == 0)
{
if ((result=write_to_binlog_index(g_binlog_index)) != 0)
{
return result;
}
}
return result;
}
}
memset(&iniContext, 0, sizeof(IniContext));
if ((result=iniLoadFromFile(full_filename, &iniContext)) != 0)
{
logError("file: "__FILE__", line: %d, "
"load from file \"%s\" fail, "
"error code: %d",
__LINE__, full_filename, result);
return result;
}
g_binlog_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_WRITE,
&iniContext, 0);
binlog_compress_index = iniGetIntValue(NULL,
BINLOG_INDEX_ITEM_CURRENT_COMPRESS,
&iniContext, 0);
iniFreeContext(&iniContext);
return 0;
}
static char *get_writable_binlog_filename(char *full_filename)
{
static char buff[MAX_PATH_SIZE];
if (full_filename == NULL)
{
full_filename = buff;
}
snprintf(full_filename, MAX_PATH_SIZE, \
"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \
SYNC_BINLOG_FILE_EXT_FMT, \
SF_G_BASE_PATH_STR, g_binlog_index);
return full_filename;
}
static char *get_writable_binlog_filename1(char *full_filename, \
const int binlog_index)
{
snprintf(full_filename, MAX_PATH_SIZE, \
"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \
SYNC_BINLOG_FILE_EXT_FMT, \
SF_G_BASE_PATH_STR, binlog_index);
return full_filename;
}
static int open_next_writable_binlog()
{
char full_filename[MAX_PATH_SIZE];
if (g_binlog_fd >= 0)
{
close(g_binlog_fd);
g_binlog_fd = -1;
}
get_writable_binlog_filename1(full_filename, g_binlog_index + 1);
if (fileExists(full_filename))
{
if (unlink(full_filename) != 0)
{
logError("file: "__FILE__", line: %d, " \
"unlink file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
logError("file: "__FILE__", line: %d, " \
"binlog file \"%s\" already exists, truncate", \
__LINE__, full_filename);
}
g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644);
if (g_binlog_fd < 0)
{
logError("file: "__FILE__", line: %d, " \
"open file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
SF_FCHOWN_TO_RUNBY_RETURN_ON_ERROR(g_binlog_fd, full_filename);
g_binlog_index++;
return 0;
}
int storage_sync_init()
{
char data_path[MAX_PATH_SIZE];
char sync_path[MAX_PATH_SIZE];
char full_filename[MAX_PATH_SIZE];
int result;
snprintf(data_path, sizeof(data_path), "%s/data", SF_G_BASE_PATH_STR);
if (!fileExists(data_path))
{
if (mkdir(data_path, 0755) != 0)
{
logError("file: "__FILE__", line: %d, " \
"mkdir \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, data_path, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(data_path);
}
snprintf(sync_path, sizeof(sync_path), \
"%s/"SYNC_DIR_NAME, data_path);
if (!fileExists(sync_path))
{
if (mkdir(sync_path, 0755) != 0)
{
logError("file: "__FILE__", line: %d, " \
"mkdir \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, sync_path, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(sync_path);
}
binlog_write_cache_buff = (char *)malloc(SYNC_BINLOG_WRITE_BUFF_SIZE);
if (binlog_write_cache_buff == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, SYNC_BINLOG_WRITE_BUFF_SIZE, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
if ((result=get_binlog_index_from_file()) != 0)
{
return result;
}
get_writable_binlog_filename(full_filename);
g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644);
if (g_binlog_fd < 0)
{
logError("file: "__FILE__", line: %d, " \
"open file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
binlog_file_size = lseek(g_binlog_fd, 0, SEEK_END);
if (binlog_file_size < 0)
{
logError("file: "__FILE__", line: %d, " \
"ftell file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
storage_sync_destroy();
return errno != 0 ? errno : EIO;
}
SF_FCHOWN_TO_RUNBY_RETURN_ON_ERROR(g_binlog_fd, full_filename);
/*
//printf("full_filename=%s, binlog_file_size=%d\n", \
full_filename, binlog_file_size);
*/
if ((result=init_pthread_lock(&sync_thread_lock)) != 0)
{
return result;
}
load_local_host_ip_addrs();
FC_INIT_LIST_HEAD(&reader_head);
return 0;
}
int storage_sync_destroy()
{
int result;
if (g_binlog_fd >= 0)
{
storage_binlog_fsync(true);
close(g_binlog_fd);
g_binlog_fd = -1;
}
if (binlog_write_cache_buff != NULL)
{
free(binlog_write_cache_buff);
binlog_write_cache_buff = NULL;
if ((result=pthread_mutex_destroy(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_destroy fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
}
return 0;
}
int kill_storage_sync_threads()
{
int result;
int kill_res;
if (sync_tids == NULL)
{
return 0;
}
if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
kill_res = kill_work_threads(sync_tids, g_storage_sync_thread_count);
if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
while (g_storage_sync_thread_count > 0)
{
usleep(50000);
}
return kill_res;
}
int fdfs_binlog_sync_func(void *args)
{
if (binlog_write_cache_len > 0)
{
return storage_binlog_fsync(true);
}
else
{
return 0;
}
}
static int storage_binlog_fsync(const bool bNeedLock)
{
int result;
int write_ret;
if (bNeedLock && (result=pthread_mutex_lock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
if (binlog_write_cache_len == 0) //ignore
{
write_ret = 0; //skip
}
else if (fc_safe_write(g_binlog_fd, binlog_write_cache_buff, \
binlog_write_cache_len) != binlog_write_cache_len)
{
logError("file: "__FILE__", line: %d, " \
"write to binlog file \"%s\" fail, fd=%d, " \
"errno: %d, error info: %s", \
__LINE__, get_writable_binlog_filename(NULL), \
g_binlog_fd, errno, STRERROR(errno));
write_ret = errno != 0 ? errno : EIO;
}
else if (fsync(g_binlog_fd) != 0)
{
logError("file: "__FILE__", line: %d, " \
"sync to binlog file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, get_writable_binlog_filename(NULL), \
errno, STRERROR(errno));
write_ret = errno != 0 ? errno : EIO;
}
else
{
binlog_file_size += binlog_write_cache_len;
if (binlog_file_size >= SYNC_BINLOG_FILE_MAX_SIZE)
{
if ((write_ret=write_to_binlog_index( \
g_binlog_index + 1)) == 0)
{
write_ret = open_next_writable_binlog();
}
binlog_file_size = 0;
if (write_ret != 0)
{
SF_G_CONTINUE_FLAG = false;
logCrit("file: "__FILE__", line: %d, " \
"open binlog file \"%s\" fail, " \
"program exit!", \
__LINE__, \
get_writable_binlog_filename(NULL));
}
}
else
{
write_ret = 0;
}
}
binlog_write_version++;
binlog_write_cache_len = 0; //reset cache buff
if (bNeedLock && (result=pthread_mutex_unlock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return write_ret;
}
int storage_binlog_write_ex(const int timestamp, const char op_type, \
const char *filename, const char *extra)
{
int result;
int write_ret;
if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
if (extra != NULL)
{
binlog_write_cache_len += sprintf(binlog_write_cache_buff + \
binlog_write_cache_len, "%d %c %s %s\n",\
timestamp, op_type, filename, extra);
}
else
{
binlog_write_cache_len += sprintf(binlog_write_cache_buff + \
binlog_write_cache_len, "%d %c %s\n", \
timestamp, op_type, filename);
}
//check if buff full
if (SYNC_BINLOG_WRITE_BUFF_SIZE - binlog_write_cache_len < 256)
{
write_ret = storage_binlog_fsync(false); //sync to disk
}
else
{
write_ret = 0;
}
if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return write_ret;
}
static char *get_binlog_readable_filename_ex(
const int binlog_index, char *full_filename)
{
static char buff[MAX_PATH_SIZE];
if (full_filename == NULL)
{
full_filename = buff;
}
snprintf(full_filename, MAX_PATH_SIZE,
"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX""
SYNC_BINLOG_FILE_EXT_FMT,
SF_G_BASE_PATH_STR, binlog_index);
return full_filename;
}
static inline char *get_binlog_readable_filename(const void *pArg,
char *full_filename)
{
return get_binlog_readable_filename_ex(
((const StorageBinLogReader *)pArg)->binlog_index,
full_filename);
}
static void get_binlog_flag_file(const char *filepath,
char *flag_filename, const int size)
{
const char *filename;
filename = strrchr(filepath, '/');
if (filename == NULL)
{
snprintf(flag_filename, size, ".%s.flag", filepath);
}
else
{
snprintf(flag_filename, size, "%.*s.%s.flag",
(int)(filename - filepath + 1), filepath, filename + 1);
}
}
static int uncompress_binlog_file(StorageBinLogReader *pReader,
const char *filename)
{
char gzip_filename[MAX_PATH_SIZE];
char flag_filename[MAX_PATH_SIZE];
char command[MAX_PATH_SIZE];
char output[256];
struct stat flag_stat;
int result;
snprintf(gzip_filename, sizeof(gzip_filename),
"%s.gz", filename);
if (access(gzip_filename, F_OK) != 0)
{
return errno != 0 ? errno : ENOENT;
}
get_binlog_flag_file(filename, flag_filename, sizeof(flag_filename));
if (stat(flag_filename, &flag_stat) == 0)
{
if (g_current_time - flag_stat.st_mtime > 3600)
{
logInfo("file: "__FILE__", line: %d, "
"flag file %s expired, continue to uncompress",
__LINE__, flag_filename);
}
else
{
logWarning("file: "__FILE__", line: %d, "
"uncompress %s already in progress",
__LINE__, gzip_filename);
return EINPROGRESS;
}
}
if ((result=writeToFile(flag_filename, "unzip", 5)) != 0)
{
return result;
}
logInfo("file: "__FILE__", line: %d, "
"try to uncompress binlog %s",
__LINE__, gzip_filename);
snprintf(command, sizeof(command), "%s -d %s 2>&1",
get_gzip_command_filename(), gzip_filename);
result = getExecResult(command, output, sizeof(output));
unlink(flag_filename);
if (result != 0)
{
logError("file: "__FILE__", line: %d, "
"exec command \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, command, result, STRERROR(result));
return result;
}
if (*output != '\0')
{
logWarning("file: "__FILE__", line: %d, "
"exec command \"%s\", output: %s",
__LINE__, command, output);
}
if (access(filename, F_OK) == 0)
{
if (pReader->binlog_index < binlog_compress_index)
{
binlog_compress_index = pReader->binlog_index;
write_to_binlog_index(g_binlog_index);
}
}
logInfo("file: "__FILE__", line: %d, "
"uncompress binlog %s done",
__LINE__, gzip_filename);
return 0;
}
static int compress_binlog_file(const char *filename)
{
char gzip_filename[MAX_PATH_SIZE];
char flag_filename[MAX_PATH_SIZE];
char command[MAX_PATH_SIZE];
char output[256];
struct stat flag_stat;
int result;
snprintf(gzip_filename, sizeof(gzip_filename),
"%s.gz", filename);
if (access(gzip_filename, F_OK) == 0)
{
return 0;
}
if (access(filename, F_OK) != 0)
{
return errno != 0 ? errno : ENOENT;
}
get_binlog_flag_file(filename, flag_filename, sizeof(flag_filename));
if (stat(flag_filename, &flag_stat) == 0)
{
if (g_current_time - flag_stat.st_mtime > 3600)
{
logInfo("file: "__FILE__", line: %d, "
"flag file %s expired, continue to compress",
__LINE__, flag_filename);
}
else
{
logWarning("file: "__FILE__", line: %d, "
"compress %s already in progress",
__LINE__, filename);
return EINPROGRESS;
}
}
if ((result=writeToFile(flag_filename, "zip", 3)) != 0)
{
return result;
}
logInfo("file: "__FILE__", line: %d, "
"try to compress binlog %s",
__LINE__, filename);
snprintf(command, sizeof(command), "%s %s 2>&1",
get_gzip_command_filename(), filename);
result = getExecResult(command, output, sizeof(output));
unlink(flag_filename);
if (result != 0)
{
logError("file: "__FILE__", line: %d, "
"exec command \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, command, result, STRERROR(result));
return result;
}
if (*output != '\0')
{
logWarning("file: "__FILE__", line: %d, "
"exec command \"%s\", output: %s",
__LINE__, command, output);
}
logInfo("file: "__FILE__", line: %d, "
"compress binlog %s done",
__LINE__, filename);
return 0;
}
int storage_open_readable_binlog(StorageBinLogReader *pReader, \
get_filename_func filename_func, const void *pArg)
{
char full_filename[MAX_PATH_SIZE];
if (pReader->binlog_fd >= 0)
{
close(pReader->binlog_fd);
}
filename_func(pArg, full_filename);
if (access(full_filename, F_OK) != 0)
{
if (errno == ENOENT)
{
uncompress_binlog_file(pReader, full_filename);
}
}
pReader->binlog_fd = open(full_filename, O_RDONLY);
if (pReader->binlog_fd < 0)
{
logError("file: "__FILE__", line: %d, " \
"open binlog file \"%s\" fail, " \
"errno: %d, error info: %s", \
__LINE__, full_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
if (pReader->binlog_offset > 0 && \
lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0)
{
logError("file: "__FILE__", line: %d, " \
"seek binlog file \"%s\" fail, file offset=" \
"%"PRId64", errno: %d, error info: %s", \
__LINE__, full_filename, pReader->binlog_offset, \
errno, STRERROR(errno));
close(pReader->binlog_fd);
pReader->binlog_fd = -1;
return errno != 0 ? errno : ESPIPE;
}
return 0;
}
static char *get_mark_filename_by_id_and_port(const char *storage_id,
const int port, char *full_filename, const int filename_size)
{
if (g_use_storage_id)
{
snprintf(full_filename, filename_size,
"%s/data/"SYNC_DIR_NAME"/%s%s", SF_G_BASE_PATH_STR,
storage_id, SYNC_MARK_FILE_EXT);
}
else
{
snprintf(full_filename, filename_size,
"%s/data/"SYNC_DIR_NAME"/%s_%d%s", SF_G_BASE_PATH_STR,
storage_id, port, SYNC_MARK_FILE_EXT);
}
return full_filename;
}
static char *get_mark_filename_by_ip_and_port(const char *ip_addr,
const int port, char *full_filename, const int filename_size)
{
snprintf(full_filename, filename_size,
"%s/data/"SYNC_DIR_NAME"/%s_%d%s", SF_G_BASE_PATH_STR,
ip_addr, port, SYNC_MARK_FILE_EXT);
return full_filename;
}
char *get_mark_filename_by_reader(StorageBinLogReader *pReader)
{
return get_mark_filename_by_id_and_port(pReader->storage_id,
SF_G_INNER_PORT, pReader->mark_filename,
sizeof(pReader->mark_filename));
}
static char *get_mark_filename_by_id(const char *storage_id,
char *full_filename, const int filename_size)
{
return get_mark_filename_by_id_and_port(storage_id,
SF_G_INNER_PORT, full_filename, filename_size);
}
int storage_report_storage_status(const char *storage_id, \
const char *ip_addr, const char status)
{
FDFSStorageBrief briefServer;
TrackerServerInfo trackerServer;
TrackerServerInfo *pGlobalServer;
TrackerServerInfo *pTServer;
TrackerServerInfo *pTServerEnd;
ConnectionInfo *conn;
int result;
int report_count;
int success_count;
int i;
memset(&briefServer, 0, sizeof(FDFSStorageBrief));
strcpy(briefServer.id, storage_id);
strcpy(briefServer.ip_addr, ip_addr);
briefServer.status = status;
logDebug("file: "__FILE__", line: %d, " \
"begin to report storage %s 's status as: %d", \
__LINE__, ip_addr, status);
if (!g_sync_old_done)
{
logDebug("file: "__FILE__", line: %d, " \
"report storage %s 's status as: %d, " \
"waiting for g_sync_old_done turn to true...", \
__LINE__, ip_addr, status);
while (SF_G_CONTINUE_FLAG && !g_sync_old_done)
{
sleep(1);
}
if (!SF_G_CONTINUE_FLAG)
{
return 0;
}
logDebug("file: "__FILE__", line: %d, " \
"report storage %s 's status as: %d, " \
"ok, g_sync_old_done turn to true", \
__LINE__, ip_addr, status);
}
conn = NULL;
report_count = 0;
success_count = 0;
result = 0;
pTServer = &trackerServer;
pTServerEnd = g_tracker_group.servers + g_tracker_group.server_count;
for (pGlobalServer=g_tracker_group.servers; pGlobalServer<pTServerEnd; \
pGlobalServer++)
{
memcpy(pTServer, pGlobalServer, sizeof(TrackerServerInfo));
fdfs_server_sock_reset(pTServer);
for (i=0; i < 3; i++)
{
conn = tracker_connect_server_no_pool_ex(pTServer,
g_client_bind_addr ? SF_G_INNER_BIND_ADDR : NULL, &result, false);
if (conn != NULL)
{
break;
}
sleep(5);
}
if (conn == NULL)
{
logError("file: "__FILE__", line: %d, "
"connect to tracker server %s:%d fail, "
"errno: %d, error info: %s",
__LINE__, pTServer->connections[0].ip_addr,
pTServer->connections[0].port,
result, STRERROR(result));
continue;
}
report_count++;
if ((result=tracker_report_storage_status(conn,
&briefServer)) == 0)
{
success_count++;
}
fdfs_quit(conn);
close(conn->sock);
}
logDebug("file: "__FILE__", line: %d, " \
"report storage %s 's status as: %d done, " \
"report count: %d, success count: %d", \
__LINE__, ip_addr, status, report_count, success_count);
return success_count > 0 ? 0 : EAGAIN;
}
static int storage_reader_sync_init_req(StorageBinLogReader *pReader)
{
TrackerServerInfo *pTrackerServers;
TrackerServerInfo *pTServer;
TrackerServerInfo *pTServerEnd;
ConnectionInfo *conn;
char tracker_client_ip[IP_ADDRESS_SIZE];
int result;
if (!g_sync_old_done)
{
while (SF_G_CONTINUE_FLAG && !g_sync_old_done)
{
sleep(1);
}
if (!SF_G_CONTINUE_FLAG)
{
return EINTR;
}
}
pTrackerServers = (TrackerServerInfo *)malloc(
sizeof(TrackerServerInfo) * g_tracker_group.server_count);
if (pTrackerServers == NULL)
{
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__,
(int)sizeof(TrackerServerInfo) *
g_tracker_group.server_count);
return errno != 0 ? errno : ENOMEM;
}
memcpy(pTrackerServers, g_tracker_group.servers,
sizeof(TrackerServerInfo) * g_tracker_group.server_count);
pTServerEnd = pTrackerServers + g_tracker_group.server_count;
for (pTServer=pTrackerServers; pTServer<pTServerEnd; pTServer++)
{
fdfs_server_sock_reset(pTServer);
}
result = EINTR;
if (g_tracker_group.leader_index >= 0 &&
g_tracker_group.leader_index < g_tracker_group.server_count)
{
pTServer = pTrackerServers + g_tracker_group.leader_index;
}
else
{
pTServer = pTrackerServers;
}
do
{
conn = NULL;
while (SF_G_CONTINUE_FLAG)
{
conn = tracker_connect_server_no_pool_ex(pTServer,
g_client_bind_addr ? SF_G_INNER_BIND_ADDR : NULL, &result, true);
if (conn != NULL)
{
break;
}
pTServer++;
if (pTServer >= pTServerEnd)
{
pTServer = pTrackerServers;
}
sleep(g_heart_beat_interval);
}
if (!SF_G_CONTINUE_FLAG)
{
break;
}
getSockIpaddr(conn->sock, tracker_client_ip, IP_ADDRESS_SIZE);
insert_into_local_host_ip(tracker_client_ip);
if ((result=tracker_sync_src_req(conn, pReader)) != 0)
{
fdfs_quit(conn);
close(conn->sock);
sleep(g_heart_beat_interval);
continue;
}
fdfs_quit(conn);
close(conn->sock);
break;
} while (1);
free(pTrackerServers);
/*
//printf("need_sync_old=%d, until_timestamp=%d\n", \
pReader->need_sync_old, pReader->until_timestamp);
*/
return result;
}
int storage_reader_init(FDFSStorageBrief *pStorage, StorageBinLogReader *pReader)
{
IniContext iniContext;
int result;
bool bFileExist;
bool bNeedSyncOld;
memset(pReader, 0, sizeof(StorageBinLogReader));
pReader->binlog_fd = -1;
pReader->binlog_buff.buffer = (char *)malloc(
STORAGE_BINLOG_BUFFER_SIZE);
if (pReader->binlog_buff.buffer == NULL)
{
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail, "
"errno: %d, error info: %s",
__LINE__, STORAGE_BINLOG_BUFFER_SIZE,
errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
pReader->binlog_buff.current = pReader->binlog_buff.buffer;
if (pStorage == NULL)
{
strcpy(pReader->storage_id, "0.0.0.0");
}
else
{
strcpy(pReader->storage_id, pStorage->id);
}
get_mark_filename_by_reader(pReader);
if (pStorage == NULL)
{
bFileExist = false;
}
else if (pStorage->status <= FDFS_STORAGE_STATUS_WAIT_SYNC)
{
bFileExist = false;
}
else
{
bFileExist = fileExists(pReader->mark_filename);
if (!bFileExist && (g_use_storage_id && pStorage != NULL))
{
char old_mark_filename[MAX_PATH_SIZE];
get_mark_filename_by_ip_and_port(pStorage->ip_addr,
SF_G_INNER_PORT, old_mark_filename,
sizeof(old_mark_filename));
if (fileExists(old_mark_filename))
{
if (rename(old_mark_filename,
pReader->mark_filename) !=0 )
{
logError("file: "__FILE__", line: %d, "
"rename file %s to %s fail"
", errno: %d, error info: %s",
__LINE__, old_mark_filename,
pReader->mark_filename, errno,
STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
bFileExist = true;
}
}
}
if (pStorage != NULL && !bFileExist)
{
if ((result=storage_reader_sync_init_req(pReader)) != 0)
{
return result;
}
}
if (bFileExist)
{
memset(&iniContext, 0, sizeof(IniContext));
if ((result=iniLoadFromFile(pReader->mark_filename,
&iniContext)) != 0)
{
logError("file: "__FILE__", line: %d, "
"load from mark file \"%s\" fail, "
"error code: %d", __LINE__,
pReader->mark_filename, result);
return result;
}
if (iniContext.global.count < 7)
{
iniFreeContext(&iniContext);
logError("file: "__FILE__", line: %d, "
"in mark file \"%s\", item count: %d < 7",
__LINE__, pReader->mark_filename,
iniContext.global.count);
return ENOENT;
}
bNeedSyncOld = iniGetBoolValue(NULL,
MARK_ITEM_NEED_SYNC_OLD,
&iniContext, false);
if (pStorage != NULL && pStorage->status ==
FDFS_STORAGE_STATUS_SYNCING)
{
if ((result=storage_reader_sync_init_req(pReader)) != 0)
{
iniFreeContext(&iniContext);
return result;
}
if (pReader->need_sync_old && !bNeedSyncOld)
{
bFileExist = false; //re-sync
}
else
{
pReader->need_sync_old = bNeedSyncOld;
}
}
else
{
pReader->need_sync_old = bNeedSyncOld;
}
if (bFileExist)
{
pReader->binlog_index = iniGetIntValue(NULL, \
MARK_ITEM_BINLOG_FILE_INDEX, \
&iniContext, -1);
pReader->binlog_offset = iniGetInt64Value(NULL, \
MARK_ITEM_BINLOG_FILE_OFFSET, \
&iniContext, -1);
pReader->sync_old_done = iniGetBoolValue(NULL, \
MARK_ITEM_SYNC_OLD_DONE, \
&iniContext, false);
pReader->until_timestamp = iniGetIntValue(NULL, \
MARK_ITEM_UNTIL_TIMESTAMP, \
&iniContext, -1);
pReader->scan_row_count = iniGetInt64Value(NULL, \
MARK_ITEM_SCAN_ROW_COUNT, \
&iniContext, 0);
pReader->sync_row_count = iniGetInt64Value(NULL, \
MARK_ITEM_SYNC_ROW_COUNT, \
&iniContext, 0);
if (pReader->binlog_index < 0)
{
iniFreeContext(&iniContext);
logError("file: "__FILE__", line: %d, " \
"in mark file \"%s\", " \
"binlog_index: %d < 0", \
__LINE__, pReader->mark_filename, \
pReader->binlog_index);
return EINVAL;
}
if (pReader->binlog_offset < 0)
{
iniFreeContext(&iniContext);
logError("file: "__FILE__", line: %d, "
"in mark file \"%s\", binlog_offset: "
"%"PRId64" < 0", __LINE__,
pReader->mark_filename,
pReader->binlog_offset);
return EINVAL;
}
}
iniFreeContext(&iniContext);
}
pReader->last_scan_rows = pReader->scan_row_count;
pReader->last_sync_rows = pReader->sync_row_count;
if ((result=storage_open_readable_binlog(pReader,
get_binlog_readable_filename, pReader)) != 0)
{
return result;
}
if (pStorage != NULL && !bFileExist)
{
if (!pReader->need_sync_old && pReader->until_timestamp > 0)
{
if ((result=storage_binlog_reader_skip(pReader)) != 0)
{
return result;
}
}
if ((result=storage_write_to_mark_file(pReader)) != 0)
{
return result;
}
}
result = storage_binlog_preread(pReader);
if (result != 0 && result != ENOENT)
{
return result;
}
return 0;
}
void storage_reader_destroy(StorageBinLogReader *pReader)
{
if (pReader->binlog_fd >= 0)
{
close(pReader->binlog_fd);
pReader->binlog_fd = -1;
}
if (pReader->binlog_buff.buffer != NULL)
{
free(pReader->binlog_buff.buffer);
pReader->binlog_buff.buffer = NULL;
pReader->binlog_buff.current = NULL;
pReader->binlog_buff.length = 0;
}
}
static int storage_write_to_mark_file(StorageBinLogReader *pReader)
{
char buff[256];
int len;
int result;
len = sprintf(buff,
"%s=%d\n"
"%s=%"PRId64"\n"
"%s=%d\n"
"%s=%d\n"
"%s=%d\n"
"%s=%"PRId64"\n"
"%s=%"PRId64"\n",
MARK_ITEM_BINLOG_FILE_INDEX, pReader->binlog_index,
MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset,
MARK_ITEM_NEED_SYNC_OLD, pReader->need_sync_old,
MARK_ITEM_SYNC_OLD_DONE, pReader->sync_old_done,
MARK_ITEM_UNTIL_TIMESTAMP, (int)pReader->until_timestamp,
MARK_ITEM_SCAN_ROW_COUNT, pReader->scan_row_count,
MARK_ITEM_SYNC_ROW_COUNT, pReader->sync_row_count);
if ((result=safeWriteToFile(pReader->mark_filename, buff, len)) == 0)
{
SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(pReader->mark_filename);
pReader->last_scan_rows = pReader->scan_row_count;
pReader->last_sync_rows = pReader->sync_row_count;
}
return result;
}
static int rewind_to_prev_rec_end(StorageBinLogReader *pReader)
{
if (lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0)
{
logError("file: "__FILE__", line: %d, " \
"seek binlog file \"%s\"fail, " \
"file offset: %"PRId64", " \
"errno: %d, error info: %s", \
__LINE__, get_binlog_readable_filename(pReader, NULL), \
pReader->binlog_offset, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOENT;
}
pReader->binlog_buff.current = pReader->binlog_buff.buffer;
pReader->binlog_buff.length = 0;
return 0;
}
static int storage_binlog_preread(StorageBinLogReader *pReader)
{
int bytes_read;
int saved_binlog_write_version;
if (pReader->binlog_buff.version == binlog_write_version &&
pReader->binlog_buff.length == 0)
{
return ENOENT;
}
saved_binlog_write_version = binlog_write_version;
if (pReader->binlog_buff.current != pReader->binlog_buff.buffer)
{
if (pReader->binlog_buff.length > 0)
{
memcpy(pReader->binlog_buff.buffer, \
pReader->binlog_buff.current, \
pReader->binlog_buff.length);
}
pReader->binlog_buff.current = pReader->binlog_buff.buffer;
}
bytes_read = fc_safe_read(pReader->binlog_fd, pReader->binlog_buff.buffer \
+ pReader->binlog_buff.length, \
STORAGE_BINLOG_BUFFER_SIZE - pReader->binlog_buff.length);
if (bytes_read < 0)
{
logError("file: "__FILE__", line: %d, " \
"read from binlog file \"%s\" fail, " \
"file offset: %"PRId64", " \
"error no: %d, error info: %s", __LINE__, \
get_binlog_readable_filename(pReader, NULL), \
pReader->binlog_offset + pReader->binlog_buff.length, \
errno, STRERROR(errno));
return errno != 0 ? errno : EIO;
}
else if (bytes_read == 0) //end of binlog file
{
pReader->binlog_buff.version = saved_binlog_write_version;
return ENOENT;
}
pReader->binlog_buff.length += bytes_read;
return 0;
}
static int storage_binlog_do_line_read(StorageBinLogReader *pReader, \
char *line, const int line_size, int *line_length)
{
char *pLineEnd;
if (pReader->binlog_buff.length == 0)
{
*line_length = 0;
return ENOENT;
}
pLineEnd = (char *)memchr(pReader->binlog_buff.current, '\n', \
pReader->binlog_buff.length);
if (pLineEnd == NULL)
{
*line_length = 0;
return ENOENT;
}
*line_length = (pLineEnd - pReader->binlog_buff.current) + 1;
if (*line_length >= line_size)
{
logError("file: "__FILE__", line: %d, " \
"read from binlog file \"%s\" fail, " \
"file offset: %"PRId64", " \
"line buffer size: %d is too small! " \
"<= line length: %d", __LINE__, \
get_binlog_readable_filename(pReader, NULL), \
pReader->binlog_offset, line_size, *line_length);
return ENOSPC;
}
memcpy(line, pReader->binlog_buff.current, *line_length);
*(line + *line_length) = '\0';
pReader->binlog_buff.current = pLineEnd + 1;
pReader->binlog_buff.length -= *line_length;
return 0;
}
static int storage_binlog_read_line(StorageBinLogReader *pReader, \
char *line, const int line_size, int *line_length)
{
int result;
result = storage_binlog_do_line_read(pReader, line, \
line_size, line_length);
if (result != ENOENT)
{
return result;
}
result = storage_binlog_preread(pReader);
if (result != 0)
{
return result;
}
return storage_binlog_do_line_read(pReader, line, \
line_size, line_length);
}
int storage_binlog_read(StorageBinLogReader *pReader, \
StorageBinLogRecord *pRecord, int *record_length)
{
char line[STORAGE_BINLOG_LINE_SIZE];
char *cols[3];
int result;
while (1)
{
result = storage_binlog_read_line(pReader, line, \
sizeof(line), record_length);
if (result == 0)
{
break;
}
else if (result != ENOENT)
{
return result;
}
if (pReader->binlog_index >= g_binlog_index)
{
return ENOENT;
}
if (pReader->binlog_buff.length != 0)
{
logError("file: "__FILE__", line: %d, " \
"binlog file \"%s\" not ended by \\n, " \
"file offset: %"PRId64, __LINE__, \
get_binlog_readable_filename(pReader, NULL), \
pReader->binlog_offset);
return ENOENT;
}
//rotate
pReader->binlog_index++;
pReader->binlog_offset = 0;
pReader->binlog_buff.version = 0;
if ((result=storage_open_readable_binlog(pReader, \
get_binlog_readable_filename, pReader)) != 0)
{
return result;
}
if ((result=storage_write_to_mark_file(pReader)) != 0)
{
return result;
}
}
if ((result=splitEx(line, ' ', cols, 3)) < 3)
{
logError("file: "__FILE__", line: %d, " \
"read data from binlog file \"%s\" fail, " \
"file offset: %"PRId64", " \
"read item count: %d < 3", \
__LINE__, get_binlog_readable_filename(pReader, NULL), \
pReader->binlog_offset, result);
return EINVAL;
}
pRecord->timestamp = atoi(cols[0]);
pRecord->op_type = *(cols[1]);
pRecord->filename_len = strlen(cols[2]) - 1; //need trim new line \n
if (pRecord->filename_len > sizeof(pRecord->filename) - 1)
{
logError("file: "__FILE__", line: %d, " \
"item \"filename\" in binlog " \
"file \"%s\" is invalid, file offset: " \
"%"PRId64", filename length: %d > %d", \
__LINE__, get_binlog_readable_filename(pReader, NULL), \
pReader->binlog_offset, \
pRecord->filename_len, (int)sizeof(pRecord->filename)-1);
return EINVAL;
}
memcpy(pRecord->filename, cols[2], pRecord->filename_len);
*(pRecord->filename + pRecord->filename_len) = '\0';
if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK ||
pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK ||
pRecord->op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE ||
pRecord->op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE)
{
char *p;
p = strchr(pRecord->filename, ' ');
if (p == NULL)
{
*(pRecord->src_filename) = '\0';
pRecord->src_filename_len = 0;
}
else
{
pRecord->src_filename_len = pRecord->filename_len -
(p - pRecord->filename) - 1;
pRecord->filename_len = p - pRecord->filename;
*p = '\0';
memcpy(pRecord->src_filename, p + 1,
pRecord->src_filename_len);
*(pRecord->src_filename +
pRecord->src_filename_len) = '\0';
}
}
else
{
*(pRecord->src_filename) = '\0';
pRecord->src_filename_len = 0;
}
pRecord->true_filename_len = pRecord->filename_len;
if ((result=storage_split_filename_ex(pRecord->filename,
&pRecord->true_filename_len, pRecord->true_filename,
&pRecord->store_path_index)) != 0)
{
return result;
}
/*
//printf("timestamp=%d, op_type=%c, filename=%s(%d), line length=%d, " \
"offset=%d\n", \
pRecord->timestamp, pRecord->op_type, \
pRecord->filename, strlen(pRecord->filename), \
*record_length, pReader->binlog_offset);
*/
return 0;
}
static int storage_binlog_reader_skip(StorageBinLogReader *pReader)
{
StorageBinLogRecord record;
int result;
int record_len;
while (1)
{
result = storage_binlog_read(pReader, \
&record, &record_len);
if (result != 0)
{
if (result == ENOENT)
{
return 0;
}
if (result == EINVAL && g_file_sync_skip_invalid_record)
{
logWarning("file: "__FILE__", line: %d, " \
"skip invalid record!", __LINE__);
}
else
{
return result;
}
}
if (record.timestamp >= pReader->until_timestamp)
{
result = rewind_to_prev_rec_end(pReader);
break;
}
pReader->binlog_offset += record_len;
}
return result;
}
int storage_unlink_mark_file(const char *storage_id)
{
char old_filename[MAX_PATH_SIZE];
char new_filename[MAX_PATH_SIZE];
time_t t;
struct tm tm;
t = g_current_time;
localtime_r(&t, &tm);
get_mark_filename_by_id(storage_id, old_filename, sizeof(old_filename));
if (!fileExists(old_filename))
{
return ENOENT;
}
snprintf(new_filename, sizeof(new_filename), \
"%s.%04d%02d%02d%02d%02d%02d", old_filename, \
tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, \
tm.tm_hour, tm.tm_min, tm.tm_sec);
if (rename(old_filename, new_filename) != 0)
{
logError("file: "__FILE__", line: %d, " \
"rename file %s to %s fail" \
", errno: %d, error info: %s", \
__LINE__, old_filename, new_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
return 0;
}
int storage_rename_mark_file(const char *old_ip_addr, const int old_port, \
const char *new_ip_addr, const int new_port)
{
char old_filename[MAX_PATH_SIZE];
char new_filename[MAX_PATH_SIZE];
get_mark_filename_by_id_and_port(old_ip_addr, old_port, \
old_filename, sizeof(old_filename));
if (!fileExists(old_filename))
{
return ENOENT;
}
get_mark_filename_by_id_and_port(new_ip_addr, new_port, \
new_filename, sizeof(new_filename));
if (fileExists(new_filename))
{
logWarning("file: "__FILE__", line: %d, " \
"mark file %s already exists, " \
"ignore rename file %s to %s", \
__LINE__, new_filename, old_filename, new_filename);
return EEXIST;
}
if (rename(old_filename, new_filename) != 0)
{
logError("file: "__FILE__", line: %d, " \
"rename file %s to %s fail" \
", errno: %d, error info: %s", \
__LINE__, old_filename, new_filename, \
errno, STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
return 0;
}
static void storage_sync_get_start_end_times(time_t current_time, \
const TimeInfo *pStartTime, const TimeInfo *pEndTime, \
time_t *start_time, time_t *end_time)
{
struct tm tm_time;
//char buff[32];
localtime_r(&current_time, &tm_time);
tm_time.tm_sec = 0;
/*
strftime(buff, sizeof(buff), "%Y-%m-%d %H:%M:%S", &tm_time);
//printf("current time: %s\n", buff);
*/
tm_time.tm_hour = pStartTime->hour;
tm_time.tm_min = pStartTime->minute;
*start_time = mktime(&tm_time);
//end time < start time
if (pEndTime->hour < pStartTime->hour || (pEndTime->hour == \
pStartTime->hour && pEndTime->minute < pStartTime->minute))
{
current_time += 24 * 3600;
localtime_r(&current_time, &tm_time);
tm_time.tm_sec = 0;
}
tm_time.tm_hour = pEndTime->hour;
tm_time.tm_min = pEndTime->minute;
*end_time = mktime(&tm_time);
}
static void storage_sync_thread_exit(ConnectionInfo *pStorage)
{
int result;
int i;
pthread_t tid;
if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
tid = pthread_self();
for (i=0; i<g_storage_sync_thread_count; i++)
{
if (pthread_equal(sync_tids[i], tid))
{
break;
}
}
while (i < g_storage_sync_thread_count - 1)
{
sync_tids[i] = sync_tids[i + 1];
i++;
}
g_storage_sync_thread_count--;
if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
logDebug("file: "__FILE__", line: %d, " \
"sync thread to storage server %s:%d exit",
__LINE__, pStorage->ip_addr, pStorage->port);
}
static void* storage_sync_thread_entrance(void* arg)
{
FDFSStorageBrief *pStorage;
StorageBinLogReader *pReader;
StorageBinLogRecord record;
ConnectionInfo storage_server;
char local_ip_addr[IP_ADDRESS_SIZE];
int read_result;
int sync_result;
int result;
int record_len;
time_t current_time;
time_t start_time;
time_t end_time;
time_t last_keep_alive_time;
pStorage = (FDFSStorageBrief *)arg;
strcpy(storage_server.ip_addr, pStorage->ip_addr);
storage_server.port = SF_G_INNER_PORT;
storage_server.sock = -1;
memset(local_ip_addr, 0, sizeof(local_ip_addr));
pReader = (StorageBinLogReader *)malloc(sizeof(StorageBinLogReader));
if (pReader == NULL)
{
logCrit("file: "__FILE__", line: %d, "
"malloc %d bytes fail, "
"fail, program exit!",
__LINE__, (int)sizeof(StorageBinLogReader));
SF_G_CONTINUE_FLAG = false;
storage_sync_thread_exit(&storage_server);
return NULL;
}
memset(pReader, 0, sizeof(StorageBinLogReader));
pReader->binlog_fd = -1;
storage_reader_add_to_list(pReader);
current_time = g_current_time;
last_keep_alive_time = 0;
start_time = 0;
end_time = 0;
logDebug("file: "__FILE__", line: %d, " \
"sync thread to storage server %s:%d started", \
__LINE__, storage_server.ip_addr, storage_server.port);
while (SF_G_CONTINUE_FLAG && \
pStorage->status != FDFS_STORAGE_STATUS_DELETED && \
pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED && \
pStorage->status != FDFS_STORAGE_STATUS_NONE)
{
while (SF_G_CONTINUE_FLAG && \
(pStorage->status == FDFS_STORAGE_STATUS_INIT ||
pStorage->status == FDFS_STORAGE_STATUS_OFFLINE ||
pStorage->status == FDFS_STORAGE_STATUS_ONLINE))
{
sleep(1);
}
if ((!SF_G_CONTINUE_FLAG) ||
pStorage->status == FDFS_STORAGE_STATUS_DELETED || \
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || \
pStorage->status == FDFS_STORAGE_STATUS_NONE)
{
break;
}
if (g_sync_part_time)
{
current_time = g_current_time;
storage_sync_get_start_end_times(current_time, \
&g_sync_end_time, &g_sync_start_time, \
&start_time, &end_time);
start_time += 60;
end_time -= 60;
while (SF_G_CONTINUE_FLAG && (current_time >= start_time \
&& current_time <= end_time))
{
current_time = g_current_time;
sleep(1);
}
}
storage_sync_connect_storage_server(pStorage, &storage_server);
if ((!SF_G_CONTINUE_FLAG) ||
pStorage->status == FDFS_STORAGE_STATUS_DELETED || \
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || \
pStorage->status == FDFS_STORAGE_STATUS_NONE)
{
break;
}
if (pStorage->status != FDFS_STORAGE_STATUS_ACTIVE && \
pStorage->status != FDFS_STORAGE_STATUS_WAIT_SYNC && \
pStorage->status != FDFS_STORAGE_STATUS_SYNCING)
{
close(storage_server.sock);
sleep(5);
continue;
}
storage_reader_remove_from_list(pReader);
result = storage_reader_init(pStorage, pReader);
storage_reader_add_to_list(pReader);
if (result != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_reader_init fail, errno=%d, " \
"program exit!", \
__LINE__, result);
SF_G_CONTINUE_FLAG = false;
break;
}
if (!pReader->need_sync_old)
{
while (SF_G_CONTINUE_FLAG && \
(pStorage->status != FDFS_STORAGE_STATUS_ACTIVE && \
pStorage->status != FDFS_STORAGE_STATUS_DELETED && \
pStorage->status != FDFS_STORAGE_STATUS_IP_CHANGED && \
pStorage->status != FDFS_STORAGE_STATUS_NONE))
{
sleep(1);
}
if (pStorage->status != FDFS_STORAGE_STATUS_ACTIVE)
{
close(storage_server.sock);
storage_reader_destroy(pReader);
continue;
}
}
getSockIpaddr(storage_server.sock, \
local_ip_addr, IP_ADDRESS_SIZE);
insert_into_local_host_ip(local_ip_addr);
/*
//printf("file: "__FILE__", line: %d, " \
"storage_server.ip_addr=%s, " \
"local_ip_addr: %s\n", \
__LINE__, pStorage->ip_addr, local_ip_addr);
*/
if (strcmp(pStorage->id, g_my_server_id_str) == 0 ||
is_local_host_ip(pStorage->ip_addr))
{ //can't self sync to self
logError("file: "__FILE__", line: %d, " \
"ip_addr %s belong to the local host," \
" sync thread exit.", \
__LINE__, pStorage->ip_addr);
fdfs_quit(&storage_server);
close(storage_server.sock);
break;
}
if (storage_report_my_server_id(&storage_server) != 0)
{
close(storage_server.sock);
storage_reader_destroy(pReader);
sleep(1);
continue;
}
if (pStorage->status == FDFS_STORAGE_STATUS_WAIT_SYNC)
{
pStorage->status = FDFS_STORAGE_STATUS_SYNCING;
storage_report_storage_status(pStorage->id, \
pStorage->ip_addr, pStorage->status);
}
if (pStorage->status == FDFS_STORAGE_STATUS_SYNCING)
{
if (pReader->need_sync_old && pReader->sync_old_done)
{
pStorage->status = FDFS_STORAGE_STATUS_OFFLINE;
storage_report_storage_status(pStorage->id, \
pStorage->ip_addr, \
pStorage->status);
}
}
if (g_sync_part_time)
{
current_time = g_current_time;
storage_sync_get_start_end_times(current_time, \
&g_sync_start_time, &g_sync_end_time, \
&start_time, &end_time);
}
sync_result = 0;
while (SF_G_CONTINUE_FLAG && (!g_sync_part_time || \
(current_time >= start_time && \
current_time <= end_time)) && \
(pStorage->status == FDFS_STORAGE_STATUS_ACTIVE || \
pStorage->status == FDFS_STORAGE_STATUS_SYNCING))
{
read_result = storage_binlog_read(pReader, \
&record, &record_len);
if (read_result == ENOENT)
{
if (pReader->need_sync_old && \
!pReader->sync_old_done)
{
pReader->sync_old_done = true;
if (storage_write_to_mark_file(pReader) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file " \
"fail, program exit!", \
__LINE__);
SF_G_CONTINUE_FLAG = false;
break;
}
if (pStorage->status == \
FDFS_STORAGE_STATUS_SYNCING)
{
pStorage->status = \
FDFS_STORAGE_STATUS_OFFLINE;
storage_report_storage_status( \
pStorage->id, \
pStorage->ip_addr, \
pStorage->status);
}
}
if (pReader->last_scan_rows!=pReader->scan_row_count)
{
if (storage_write_to_mark_file(pReader)!=0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file fail, " \
"program exit!", __LINE__);
SF_G_CONTINUE_FLAG = false;
break;
}
}
current_time = g_current_time;
if (current_time - last_keep_alive_time >= \
g_heart_beat_interval)
{
if (fdfs_active_test(&storage_server)!=0)
{
break;
}
last_keep_alive_time = current_time;
}
usleep(g_sync_wait_usec);
continue;
}
if (g_sync_part_time)
{
current_time = g_current_time;
}
if (read_result != 0)
{
if (read_result == EINVAL && \
g_file_sync_skip_invalid_record)
{
logWarning("file: "__FILE__", line: %d, " \
"skip invalid record, binlog index: " \
"%d, offset: %"PRId64, \
__LINE__, pReader->binlog_index, \
pReader->binlog_offset);
}
else
{
sleep(5);
break;
}
}
else if ((sync_result=storage_sync_data(pReader, \
&storage_server, &record)) != 0)
{
logDebug("file: "__FILE__", line: %d, " \
"binlog index: %d, current record " \
"offset: %"PRId64", next " \
"record offset: %"PRId64, \
__LINE__, pReader->binlog_index, \
pReader->binlog_offset, \
pReader->binlog_offset + record_len);
if (rewind_to_prev_rec_end(pReader) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"rewind_to_prev_rec_end fail, "\
"program exit!", __LINE__);
SF_G_CONTINUE_FLAG = false;
}
break;
}
pReader->binlog_offset += record_len;
pReader->scan_row_count++;
if (g_sync_interval > 0)
{
usleep(g_sync_interval);
}
}
if (pReader->last_scan_rows != pReader->scan_row_count)
{
if (storage_write_to_mark_file(pReader) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_write_to_mark_file fail, " \
"program exit!", __LINE__);
SF_G_CONTINUE_FLAG = false;
break;
}
}
close(storage_server.sock);
storage_server.sock = -1;
storage_reader_destroy(pReader);
if (!SF_G_CONTINUE_FLAG)
{
break;
}
if (!(sync_result == ENOTCONN || sync_result == EIO))
{
sleep(1);
}
}
if (storage_server.sock >= 0)
{
close(storage_server.sock);
}
storage_reader_remove_from_list(pReader);
storage_reader_destroy(pReader);
if (pStorage->status == FDFS_STORAGE_STATUS_DELETED
|| pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED)
{
storage_changelog_req();
sleep(2 * g_heart_beat_interval + 1);
pStorage->status = FDFS_STORAGE_STATUS_NONE;
}
storage_sync_thread_exit(&storage_server);
return NULL;
}
int storage_sync_thread_start(const FDFSStorageBrief *pStorage)
{
int result;
pthread_attr_t pattr;
pthread_t tid;
if (pStorage->status == FDFS_STORAGE_STATUS_DELETED || \
pStorage->status == FDFS_STORAGE_STATUS_IP_CHANGED || \
pStorage->status == FDFS_STORAGE_STATUS_NONE)
{
logWarning("file: "__FILE__", line: %d, " \
"storage id: %s 's status: %d is invalid, " \
"can't start sync thread!", __LINE__, \
pStorage->id, pStorage->status);
return 0;
}
if (strcmp(pStorage->id, g_my_server_id_str) == 0 ||
is_local_host_ip(pStorage->ip_addr)) //can't self sync to self
{
logWarning("file: "__FILE__", line: %d, "
"storage id: %s is myself, can't start sync thread!",
__LINE__, pStorage->id);
return 0;
}
if ((result=init_pthread_attr(&pattr, SF_G_THREAD_STACK_SIZE)) != 0)
{
return result;
}
/*
//printf("start storage ip_addr: %s, g_storage_sync_thread_count=%d\n",
pStorage->ip_addr, g_storage_sync_thread_count);
*/
if ((result=pthread_create(&tid, &pattr, storage_sync_thread_entrance, \
(void *)pStorage)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, errno: %d, " \
"error info: %s", \
__LINE__, result, STRERROR(result));
pthread_attr_destroy(&pattr);
return result;
}
if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_storage_sync_thread_count++;
sync_tids = (pthread_t *)realloc(sync_tids, sizeof(pthread_t) * \
g_storage_sync_thread_count);
if (sync_tids == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, (int)sizeof(pthread_t) * \
g_storage_sync_thread_count, \
errno, STRERROR(errno));
}
else
{
sync_tids[g_storage_sync_thread_count - 1] = tid;
}
if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
pthread_attr_destroy(&pattr);
return 0;
}
void storage_reader_add_to_list(StorageBinLogReader *pReader)
{
pthread_mutex_lock(&sync_thread_lock);
fc_list_add_tail(&pReader->link, &reader_head);
pthread_mutex_unlock(&sync_thread_lock);
}
void storage_reader_remove_from_list(StorageBinLogReader *pReader)
{
pthread_mutex_lock(&sync_thread_lock);
fc_list_del_init(&pReader->link);
pthread_mutex_unlock(&sync_thread_lock);
}
static int calc_compress_until_binlog_index()
{
StorageBinLogReader *pReader;
int min_index;
pthread_mutex_lock(&sync_thread_lock);
min_index = g_binlog_index;
fc_list_for_each_entry(pReader, &reader_head, link)
{
if (pReader->binlog_fd >= 0 && pReader->binlog_index >= 0 &&
pReader->binlog_index < min_index)
{
min_index = pReader->binlog_index;
}
}
pthread_mutex_unlock(&sync_thread_lock);
return min_index;
}
int fdfs_binlog_compress_func(void *args)
{
char full_filename[MAX_PATH_SIZE];
int until_index;
int bindex;
int result;
if (binlog_compress_index >= g_binlog_index)
{
return 0;
}
until_index = calc_compress_until_binlog_index();
for (bindex = binlog_compress_index; bindex < until_index;
bindex++)
{
get_binlog_readable_filename_ex(bindex, full_filename);
result = compress_binlog_file(full_filename);
if (!(result == 0 || result == ENOENT))
{
break;
}
binlog_compress_index = bindex + 1;
write_to_binlog_index(g_binlog_index);
}
return 0;
}