445 lines
9.8 KiB
C
445 lines
9.8 KiB
C
/**
|
|
* Copyright (C) 2008 Happy Fish / YuQing
|
|
*
|
|
* FastDFS may be copied only under the terms of the GNU General
|
|
* Public License V3, which may be found in the FastDFS source kit.
|
|
* Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
|
|
**/
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <unistd.h>
|
|
#include <string.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <signal.h>
|
|
#include <sys/types.h>
|
|
#include <sys/time.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <pthread.h>
|
|
#include "fastcommon/shared_func.h"
|
|
#include "fastcommon/sched_thread.h"
|
|
#include "fdfs_global.h"
|
|
#include "fastcommon/logger.h"
|
|
#include "fastcommon/sockopt.h"
|
|
#include "fastcommon/fast_task_queue.h"
|
|
#include "tracker_types.h"
|
|
#include "tracker_proto.h"
|
|
#include "tracker_mem.h"
|
|
#include "tracker_global.h"
|
|
#include "tracker_service.h"
|
|
#include "fastcommon/ioevent_loop.h"
|
|
#include "tracker_nio.h"
|
|
|
|
static void client_sock_read(int sock, short event, void *arg);
|
|
static void client_sock_write(int sock, short event, void *arg);
|
|
|
|
void task_finish_clean_up(struct fast_task_info *pTask)
|
|
{
|
|
TrackerClientInfo *pClientInfo;
|
|
|
|
pClientInfo = (TrackerClientInfo *)pTask->arg;
|
|
|
|
if (pTask->finish_callback != NULL)
|
|
{
|
|
pTask->finish_callback(pTask);
|
|
pTask->finish_callback = NULL;
|
|
}
|
|
|
|
if (pClientInfo->pGroup != NULL)
|
|
{
|
|
if (pClientInfo->pStorage != NULL)
|
|
{
|
|
tracker_mem_offline_store_server(pClientInfo->pGroup, \
|
|
pClientInfo->pStorage);
|
|
}
|
|
}
|
|
|
|
ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd);
|
|
close(pTask->event.fd);
|
|
pTask->event.fd = -1;
|
|
|
|
if (pTask->event.timer.expires > 0)
|
|
{
|
|
fast_timer_remove(&pTask->thread_data->timer,
|
|
&pTask->event.timer);
|
|
pTask->event.timer.expires = 0;
|
|
}
|
|
|
|
memset(pTask->arg, 0, sizeof(TrackerClientInfo));
|
|
free_queue_push(pTask);
|
|
__sync_fetch_and_sub(&g_connection_stat.current_count, 1);
|
|
}
|
|
|
|
void recv_notify_read(int sock, short event, void *arg)
|
|
{
|
|
int bytes;
|
|
int incomesock;
|
|
struct nio_thread_data *pThreadData;
|
|
struct fast_task_info *pTask;
|
|
char szClientIp[IP_ADDRESS_SIZE];
|
|
in_addr_t client_addr;
|
|
|
|
while (1)
|
|
{
|
|
if ((bytes=read(sock, &incomesock, sizeof(incomesock))) < 0)
|
|
{
|
|
if (!(errno == EAGAIN || errno == EWOULDBLOCK))
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"call read failed, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, errno, STRERROR(errno));
|
|
}
|
|
|
|
break;
|
|
}
|
|
else if (bytes == 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (incomesock < 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
client_addr = getPeerIpaddr(incomesock, \
|
|
szClientIp, IP_ADDRESS_SIZE);
|
|
if (g_allow_ip_count >= 0)
|
|
{
|
|
if (bsearch(&client_addr, g_allow_ip_addrs, \
|
|
g_allow_ip_count, sizeof(in_addr_t), \
|
|
cmp_by_ip_addr_t) == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"ip addr %s is not allowed to access", \
|
|
__LINE__, szClientIp);
|
|
|
|
close(incomesock);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (tcpsetnonblockopt(incomesock) != 0)
|
|
{
|
|
close(incomesock);
|
|
continue;
|
|
}
|
|
|
|
pTask = free_queue_pop();
|
|
if (pTask == NULL)
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"malloc task buff fail, you should "
|
|
"increase the parameter \"max_connections\" "
|
|
"in tracker.conf, or check your applications "
|
|
"for connection leaks", __LINE__);
|
|
close(incomesock);
|
|
continue;
|
|
}
|
|
|
|
strcpy(pTask->client_ip, szClientIp);
|
|
|
|
pThreadData = g_thread_data + incomesock % g_work_threads;
|
|
if (ioevent_set(pTask, pThreadData, incomesock, IOEVENT_READ,
|
|
client_sock_read, g_fdfs_network_timeout) != 0)
|
|
{
|
|
task_finish_clean_up(pTask);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
static int set_send_event(struct fast_task_info *pTask)
|
|
{
|
|
int result;
|
|
|
|
if (pTask->event.callback == client_sock_write)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
pTask->event.callback = client_sock_write;
|
|
if (ioevent_modify(&pTask->thread_data->ev_puller,
|
|
pTask->event.fd, IOEVENT_WRITE, pTask) != 0)
|
|
{
|
|
result = errno != 0 ? errno : ENOENT;
|
|
task_finish_clean_up(pTask);
|
|
|
|
logError("file: "__FILE__", line: %d, "\
|
|
"ioevent_modify fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
return result;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int send_add_event(struct fast_task_info *pTask)
|
|
{
|
|
pTask->offset = 0;
|
|
|
|
/* direct send */
|
|
client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask);
|
|
return 0;
|
|
}
|
|
|
|
static void client_sock_read(int sock, short event, void *arg)
|
|
{
|
|
int bytes;
|
|
int recv_bytes;
|
|
struct fast_task_info *pTask;
|
|
|
|
pTask = (struct fast_task_info *)arg;
|
|
|
|
if (event & IOEVENT_TIMEOUT)
|
|
{
|
|
if (pTask->offset == 0)
|
|
{
|
|
if (pTask->req_count > 0)
|
|
{
|
|
pTask->event.timer.expires = g_current_time +
|
|
g_fdfs_network_timeout;
|
|
fast_timer_add(&pTask->thread_data->timer,
|
|
&pTask->event.timer);
|
|
}
|
|
else
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, recv timeout. "
|
|
"after the connection is established, "
|
|
"you must send a request before %ds timeout, "
|
|
"maybe connections leak in you application.",
|
|
__LINE__, pTask->client_ip, g_fdfs_network_timeout);
|
|
task_finish_clean_up(pTask);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, recv timeout, "
|
|
"recv offset: %d, expect length: %d, "
|
|
"req_count: %"PRId64, __LINE__, pTask->client_ip,
|
|
pTask->offset, pTask->length, pTask->req_count);
|
|
task_finish_clean_up(pTask);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
if (event & IOEVENT_ERROR)
|
|
{
|
|
logDebug("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, recv error event: %d, "
|
|
"close connection", __LINE__, pTask->client_ip, event);
|
|
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
|
|
while (1)
|
|
{
|
|
fast_timer_modify(&pTask->thread_data->timer,
|
|
&pTask->event.timer, g_current_time +
|
|
g_fdfs_network_timeout);
|
|
if (pTask->length == 0) //recv header
|
|
{
|
|
recv_bytes = sizeof(TrackerHeader) - pTask->offset;
|
|
}
|
|
else
|
|
{
|
|
recv_bytes = pTask->length - pTask->offset;
|
|
}
|
|
|
|
bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
|
|
if (bytes < 0)
|
|
{
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
|
{
|
|
}
|
|
else if (errno == EINTR)
|
|
{
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, recv failed, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, pTask->client_ip, \
|
|
errno, STRERROR(errno));
|
|
|
|
task_finish_clean_up(pTask);
|
|
}
|
|
|
|
return;
|
|
}
|
|
else if (bytes == 0)
|
|
{
|
|
logDebug("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, recv failed, " \
|
|
"connection disconnected.", \
|
|
__LINE__, pTask->client_ip);
|
|
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
|
|
if (pTask->length == 0) //header
|
|
{
|
|
if (pTask->offset + bytes < sizeof(TrackerHeader))
|
|
{
|
|
pTask->offset += bytes;
|
|
return;
|
|
}
|
|
|
|
pTask->length = buff2long(((TrackerHeader *) \
|
|
pTask->data)->pkg_len);
|
|
if (pTask->length < 0)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, pkg length: %d < 0", \
|
|
__LINE__, pTask->client_ip, \
|
|
pTask->length);
|
|
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
|
|
pTask->length += sizeof(TrackerHeader);
|
|
if (pTask->length > TRACKER_MAX_PACKAGE_SIZE)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, pkg length: %d > " \
|
|
"max pkg size: %d", __LINE__, \
|
|
pTask->client_ip, pTask->length, \
|
|
TRACKER_MAX_PACKAGE_SIZE);
|
|
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
}
|
|
|
|
pTask->offset += bytes;
|
|
if (pTask->offset >= pTask->length) //recv done
|
|
{
|
|
pTask->req_count++;
|
|
tracker_deal_task(pTask);
|
|
return;
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
static void client_sock_write(int sock, short event, void *arg)
|
|
{
|
|
int bytes;
|
|
int result;
|
|
struct fast_task_info *pTask;
|
|
|
|
pTask = (struct fast_task_info *)arg;
|
|
if (event & IOEVENT_TIMEOUT)
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"send timeout", __LINE__);
|
|
|
|
task_finish_clean_up(pTask);
|
|
|
|
return;
|
|
}
|
|
|
|
if (event & IOEVENT_ERROR)
|
|
{
|
|
logDebug("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, recv error event: %d, "
|
|
"close connection", __LINE__, pTask->client_ip, event);
|
|
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
|
|
while (1)
|
|
{
|
|
fast_timer_modify(&pTask->thread_data->timer,
|
|
&pTask->event.timer, g_current_time +
|
|
g_fdfs_network_timeout);
|
|
|
|
bytes = send(sock, pTask->data + pTask->offset, \
|
|
pTask->length - pTask->offset, 0);
|
|
//printf("%08X sended %d bytes\n", (int)pTask, bytes);
|
|
if (bytes < 0)
|
|
{
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
|
{
|
|
set_send_event(pTask);
|
|
}
|
|
else if (errno == EINTR)
|
|
{
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
logError("file: "__FILE__", line: %d, " \
|
|
"client ip: %s, recv failed, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, pTask->client_ip, \
|
|
errno, STRERROR(errno));
|
|
|
|
task_finish_clean_up(pTask);
|
|
}
|
|
|
|
return;
|
|
}
|
|
else if (bytes == 0)
|
|
{
|
|
logWarning("file: "__FILE__", line: %d, " \
|
|
"send failed, connection disconnected.", \
|
|
__LINE__);
|
|
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
|
|
pTask->offset += bytes;
|
|
if (pTask->offset >= pTask->length)
|
|
{
|
|
if (pTask->length == sizeof(TrackerHeader) && \
|
|
((TrackerHeader *)pTask->data)->status == EINVAL)
|
|
{
|
|
logDebug("file: "__FILE__", line: %d, "\
|
|
"close conn: #%d, client ip: %s", \
|
|
__LINE__, pTask->event.fd,
|
|
pTask->client_ip);
|
|
task_finish_clean_up(pTask);
|
|
return;
|
|
}
|
|
|
|
pTask->offset = 0;
|
|
pTask->length = 0;
|
|
|
|
pTask->event.callback = client_sock_read;
|
|
if (ioevent_modify(&pTask->thread_data->ev_puller,
|
|
pTask->event.fd, IOEVENT_READ, pTask) != 0)
|
|
{
|
|
result = errno != 0 ? errno : ENOENT;
|
|
task_finish_clean_up(pTask);
|
|
|
|
logError("file: "__FILE__", line: %d, "\
|
|
"ioevent_modify fail, " \
|
|
"errno: %d, error info: %s", \
|
|
__LINE__, result, STRERROR(result));
|
|
return;
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|