larger network_timeout for fetching one-store-path binlog

v6.03_dev
YuQing 2019-11-20 08:38:38 +08:00
parent 5a6acbdff8
commit 9a29048ae5
5 changed files with 63 additions and 42 deletions

View File

@ -1,5 +1,5 @@
Version 6.03 2019-11-19 Version 6.03 2019-11-20
* dual IPs support two different types of inner (intranet) IPs * dual IPs support two different types of inner (intranet) IPs
* storage server request tracker server to change it's status * storage server request tracker server to change it's status
to that of tracker leader when the storage server found to that of tracker leader when the storage server found
@ -11,6 +11,8 @@ Version 6.03 2019-11-19
and extent struct FDFSStorePathInfo and extent struct FDFSStorePathInfo
* check store path's mark file to prevent confusion * check store path's mark file to prevent confusion
* new selected tracker leader do NOT notify self by network * new selected tracker leader do NOT notify self by network
* larger network_timeout for fetching one-store-path binlog
when disk recovery
NOTE: the tracker and storage server must upgrade together NOTE: the tracker and storage server must upgrade together

View File

@ -65,6 +65,7 @@ static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \
int64_t in_bytes; int64_t in_bytes;
int64_t file_bytes; int64_t file_bytes;
int result; int result;
int network_timeout;
pBasePath = g_fdfs_store_paths.paths[store_path_index].path; pBasePath = g_fdfs_store_paths.paths[store_path_index].path;
recovery_get_binlog_filename(pBasePath, full_binlog_filename); recovery_get_binlog_filename(pBasePath, full_binlog_filename);
@ -75,21 +76,30 @@ static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \
long2buff(FDFS_GROUP_NAME_MAX_LEN + 1, pHeader->pkg_len); long2buff(FDFS_GROUP_NAME_MAX_LEN + 1, pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG; pHeader->cmd = STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG;
strcpy(out_buff + sizeof(TrackerHeader), g_group_name); strcpy(out_buff + sizeof(TrackerHeader), g_group_name);
*(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN) = \ *(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN) =
store_path_index; store_path_index;
if((result=tcpsenddata_nb(pSrcStorage->sock, out_buff, \ if((result=tcpsenddata_nb(pSrcStorage->sock, out_buff,
sizeof(out_buff), g_fdfs_network_timeout)) != 0) sizeof(out_buff), g_fdfs_network_timeout)) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, send data fail, " \ "storage server %s:%d, send data fail, "
"errno: %d, error info: %s.", \ "errno: %d, error info: %s.",
__LINE__, pSrcStorage->ip_addr, pSrcStorage->port, \ __LINE__, pSrcStorage->ip_addr, pSrcStorage->port,
result, STRERROR(result)); result, STRERROR(result));
return result; return result;
} }
if ((result=fdfs_recv_header(pSrcStorage, &in_bytes)) != 0) if (g_fdfs_network_timeout >= 600)
{
network_timeout = g_fdfs_network_timeout;
}
else
{
network_timeout = 600;
}
if ((result=fdfs_recv_header_ex(pSrcStorage, network_timeout,
&in_bytes)) != 0)
{ {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"fdfs_recv_header fail, result: %d", "fdfs_recv_header fail, result: %d",
@ -97,21 +107,20 @@ static int storage_do_fetch_binlog(ConnectionInfo *pSrcStorage, \
return result; return result;
} }
if ((result=tcprecvfile(pSrcStorage->sock, full_binlog_filename, \ if ((result=tcprecvfile(pSrcStorage->sock, full_binlog_filename,
in_bytes, 0, g_fdfs_network_timeout, \ in_bytes, 0, network_timeout, &file_bytes)) != 0)
&file_bytes)) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"tracker server %s:%d, tcprecvfile fail, " \ "storage server %s:%d, tcprecvfile fail, "
"errno: %d, error info: %s.", \ "errno: %d, error info: %s.",
__LINE__, pSrcStorage->ip_addr, pSrcStorage->port, \ __LINE__, pSrcStorage->ip_addr, pSrcStorage->port,
result, STRERROR(result)); result, STRERROR(result));
return result; return result;
} }
logInfo("file: "__FILE__", line: %d, " \ logInfo("file: "__FILE__", line: %d, "
"recovery binlog file size: %"PRId64, \ "recovery binlog from %s:%d, file size: %"PRId64, __LINE__,
__LINE__, file_bytes); pSrcStorage->ip_addr, pSrcStorage->port, file_bytes);
return 0; return 0;
} }

View File

@ -4144,7 +4144,7 @@ static int storage_server_trunk_confirm_or_free(struct fast_task_info *pTask)
} }
} }
static int storage_server_fetch_one_path_binlog_dealer( \ static int storage_server_fetch_one_path_binlog_dealer(
struct fast_task_info *pTask) struct fast_task_info *pTask)
{ {
#define STORAGE_LAST_AHEAD_BYTES (2 * FDFS_PROTO_PKG_LEN_SIZE) #define STORAGE_LAST_AHEAD_BYTES (2 * FDFS_PROTO_PKG_LEN_SIZE)
@ -4167,7 +4167,7 @@ static int storage_server_fetch_one_path_binlog_dealer( \
int64_t pkg_len; int64_t pkg_len;
pClientInfo = (StorageClientInfo *)pTask->arg; pClientInfo = (StorageClientInfo *)pTask->arg;
if (pClientInfo->total_length - pClientInfo->total_offset <= \ if (pClientInfo->total_length - pClientInfo->total_offset <=
STORAGE_LAST_AHEAD_BYTES) //finished, close the connection STORAGE_LAST_AHEAD_BYTES) //finished, close the connection
{ {
STORAGE_NIO_NOTIFY_CLOSE(pTask); STORAGE_NIO_NOTIFY_CLOSE(pTask);
@ -4177,13 +4177,13 @@ static int storage_server_fetch_one_path_binlog_dealer( \
pFileContext = &(pClientInfo->file_context); pFileContext = &(pClientInfo->file_context);
pReader = (StorageBinLogReader *)pClientInfo->extra_arg; pReader = (StorageBinLogReader *)pClientInfo->extra_arg;
store_path_index = pFileContext->extra_info.upload.trunk_info. \ store_path_index = pFileContext->extra_info.upload.trunk_info.
path.store_path_index; path.store_path_index;
pBasePath = g_fdfs_store_paths.paths[store_path_index].path; pBasePath = g_fdfs_store_paths.paths[store_path_index].path;
pOutBuff = pTask->data; pOutBuff = pTask->data;
bLast = false; bLast = false;
sprintf(diskLogicPath, "%c"FDFS_STORAGE_DATA_DIR_FORMAT, \ sprintf(diskLogicPath, "%c"FDFS_STORAGE_DATA_DIR_FORMAT,
FDFS_STORAGE_STORE_PATH_PREFIX_CHAR, store_path_index); FDFS_STORAGE_STORE_PATH_PREFIX_CHAR, store_path_index);
do do
@ -4368,12 +4368,12 @@ static int storage_server_fetch_one_path_binlog_dealer( \
pTask->length = pOutBuff - pTask->data; pTask->length = pOutBuff - pTask->data;
if (bLast) if (bLast)
{ {
pkg_len = pClientInfo->total_offset + pTask->length - \ pkg_len = pClientInfo->total_offset + pTask->length -
sizeof(TrackerHeader); sizeof(TrackerHeader);
long2buff(pkg_len, pOutBuff); long2buff(pkg_len, pOutBuff);
pTask->length += FDFS_PROTO_PKG_LEN_SIZE; pTask->length += FDFS_PROTO_PKG_LEN_SIZE;
pClientInfo->total_length = pkg_len + FDFS_PROTO_PKG_LEN_SIZE \ pClientInfo->total_length = pkg_len + FDFS_PROTO_PKG_LEN_SIZE
+ STORAGE_LAST_AHEAD_BYTES; + STORAGE_LAST_AHEAD_BYTES;
} }
@ -4406,7 +4406,7 @@ static void fetch_one_path_binlog_finish_clean_up(struct fast_task_info *pTask)
free(pReader); free(pReader);
} }
static int storage_server_do_fetch_one_path_binlog( \ static int storage_server_do_fetch_one_path_binlog(
struct fast_task_info *pTask, const int store_path_index) struct fast_task_info *pTask, const int store_path_index)
{ {
StorageClientInfo *pClientInfo; StorageClientInfo *pClientInfo;
@ -4441,20 +4441,20 @@ static int storage_server_do_fetch_one_path_binlog( \
pFileContext->fd = -1; pFileContext->fd = -1;
pFileContext->op = FDFS_STORAGE_FILE_OP_READ; pFileContext->op = FDFS_STORAGE_FILE_OP_READ;
pFileContext->dio_thread_index = storage_dio_get_thread_index( \ pFileContext->dio_thread_index = storage_dio_get_thread_index(
pTask, store_path_index, pFileContext->op); pTask, store_path_index, pFileContext->op);
pFileContext->extra_info.upload.trunk_info.path.store_path_index = pFileContext->extra_info.upload.trunk_info.path.store_path_index =
store_path_index; store_path_index;
pClientInfo->extra_arg = pReader; pClientInfo->extra_arg = pReader;
pClientInfo->total_length = INFINITE_FILE_SIZE + \ pClientInfo->total_length = INFINITE_FILE_SIZE +
sizeof(TrackerHeader); sizeof(TrackerHeader);
pClientInfo->total_offset = 0; pClientInfo->total_offset = 0;
pTask->length = sizeof(TrackerHeader); pTask->length = sizeof(TrackerHeader);
pHeader = (TrackerHeader *)pTask->data; pHeader = (TrackerHeader *)pTask->data;
pHeader->status = 0; pHeader->status = 0;
pHeader->cmd = STORAGE_PROTO_CMD_RESP; pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \ long2buff(pClientInfo->total_length - sizeof(TrackerHeader),
pHeader->pkg_len); pHeader->pkg_len);
storage_nio_notify(pTask); storage_nio_notify(pTask);
@ -4512,7 +4512,7 @@ static int storage_server_fetch_one_path_binlog(struct fast_task_info *pTask)
return EINVAL; return EINVAL;
} }
return storage_server_do_fetch_one_path_binlog( \ return storage_server_do_fetch_one_path_binlog(
pTask, store_path_index); pTask, store_path_index);
} }

View File

@ -24,19 +24,20 @@
#include "tracker_proto.h" #include "tracker_proto.h"
#include "fdfs_shared_func.h" #include "fdfs_shared_func.h"
int fdfs_recv_header(ConnectionInfo *pTrackerServer, int64_t *in_bytes) int fdfs_recv_header_ex(ConnectionInfo *pTrackerServer,
const int network_timeout, int64_t *in_bytes)
{ {
TrackerHeader resp; TrackerHeader resp;
int result; int result;
if ((result=tcprecvdata_nb(pTrackerServer->sock, &resp, \ if ((result=tcprecvdata_nb(pTrackerServer->sock, &resp,
sizeof(resp), g_fdfs_network_timeout)) != 0) sizeof(resp), network_timeout)) != 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"server: %s:%d, recv data fail, " \ "server: %s:%d, recv data fail, "
"errno: %d, error info: %s", \ "errno: %d, error info: %s",
__LINE__, pTrackerServer->ip_addr, \ __LINE__, pTrackerServer->ip_addr,
pTrackerServer->port, \ pTrackerServer->port,
result, STRERROR(result)); result, STRERROR(result));
*in_bytes = 0; *in_bytes = 0;
return result; return result;
@ -56,10 +57,10 @@ int fdfs_recv_header(ConnectionInfo *pTrackerServer, int64_t *in_bytes)
*in_bytes = buff2long(resp.pkg_len); *in_bytes = buff2long(resp.pkg_len);
if (*in_bytes < 0) if (*in_bytes < 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, "
"server: %s:%d, recv package size " \ "server: %s:%d, recv package size "
"%"PRId64" is not correct", \ "%"PRId64" is not correct",
__LINE__, pTrackerServer->ip_addr, \ __LINE__, pTrackerServer->ip_addr,
pTrackerServer->port, *in_bytes); pTrackerServer->port, *in_bytes);
*in_bytes = 0; *in_bytes = 0;
return EINVAL; return EINVAL;

View File

@ -12,6 +12,7 @@
#define _TRACKER_PROTO_H_ #define _TRACKER_PROTO_H_
#include "tracker_types.h" #include "tracker_types.h"
#include "fdfs_global.h"
#include "fastcommon/connection_pool.h" #include "fastcommon/connection_pool.h"
#include "fastcommon/ini_file_reader.h" #include "fastcommon/ini_file_reader.h"
@ -287,7 +288,15 @@ int metadata_cmp_by_name(const void *p1, const void *p2);
const char *get_storage_status_caption(const int status); const char *get_storage_status_caption(const int status);
int fdfs_recv_header(ConnectionInfo *pTrackerServer, int64_t *in_bytes); int fdfs_recv_header_ex(ConnectionInfo *pTrackerServer,
const int network_timeout, int64_t *in_bytes);
static inline int fdfs_recv_header(ConnectionInfo *pTrackerServer,
int64_t *in_bytes)
{
return fdfs_recv_header_ex(pTrackerServer,
g_fdfs_network_timeout, in_bytes);
}
int fdfs_recv_response(ConnectionInfo *pTrackerServer, \ int fdfs_recv_response(ConnectionInfo *pTrackerServer, \
char **buff, const int buff_size, \ char **buff, const int buff_size, \