654 lines
20 KiB
C
654 lines
20 KiB
C
/*
|
|
* Copyright (c) 2020 YuQing <384681@qq.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <unistd.h>
|
|
#include <string.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <signal.h>
|
|
//#include <assert.h>
|
|
#include <sys/types.h>
|
|
#include <sys/time.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <pthread.h>
|
|
#include "fastcommon/shared_func.h"
|
|
#include "fastcommon/sched_thread.h"
|
|
#include "fastcommon/pthread_func.h"
|
|
#include "fastcommon/logger.h"
|
|
#include "fastcommon/sockopt.h"
|
|
#include "fastcommon/fast_task_queue.h"
|
|
#include "fastcommon/ioevent_loop.h"
|
|
#include "sf_global.h"
|
|
#include "sf_nio.h"
|
|
|
|
#define SF_CTX ((SFContext *)(task->ctx))
|
|
|
|
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
|
sf_set_body_length_callback set_body_length_func,
|
|
sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func,
|
|
sf_recv_timeout_callback timeout_callback)
|
|
{
|
|
sf_context->header_size = header_size;
|
|
sf_context->set_body_length = set_body_length_func;
|
|
sf_context->deal_task = deal_func;
|
|
sf_context->task_cleanup_func = cleanup_func;
|
|
sf_context->timeout_callback = timeout_callback;
|
|
}
|
|
|
|
void sf_task_detach_thread(struct fast_task_info *task)
|
|
{
|
|
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
|
|
|
if (task->event.timer.expires > 0) {
|
|
fast_timer_remove(&task->thread_data->timer,
|
|
&task->event.timer);
|
|
task->event.timer.expires = 0;
|
|
}
|
|
|
|
if (SF_CTX->remove_from_ready_list) {
|
|
ioevent_remove(&task->thread_data->ev_puller, task);
|
|
}
|
|
}
|
|
|
|
void sf_task_switch_thread(struct fast_task_info *task,
|
|
const int new_thread_index)
|
|
{
|
|
sf_task_detach_thread(task);
|
|
task->thread_data = SF_CTX->thread_data + new_thread_index;
|
|
}
|
|
|
|
void sf_task_finish_clean_up(struct fast_task_info *task)
|
|
{
|
|
/*
|
|
assert(task->event.fd >= 0);
|
|
if (task->event.fd < 0) {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"task: %p already cleaned",
|
|
__LINE__, task);
|
|
return;
|
|
}
|
|
*/
|
|
|
|
if (task->finish_callback != NULL) {
|
|
task->finish_callback(task);
|
|
task->finish_callback = NULL;
|
|
}
|
|
|
|
sf_task_detach_thread(task);
|
|
close(task->event.fd);
|
|
task->event.fd = -1;
|
|
|
|
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
|
|
free_queue_push(task);
|
|
}
|
|
|
|
static inline int set_write_event(struct fast_task_info *task)
|
|
{
|
|
int result;
|
|
|
|
if (task->event.callback == (IOEventCallback)sf_client_sock_write) {
|
|
return 0;
|
|
}
|
|
|
|
task->event.callback = (IOEventCallback)sf_client_sock_write;
|
|
if (ioevent_modify(&task->thread_data->ev_puller,
|
|
task->event.fd, IOEVENT_WRITE, task) != 0)
|
|
{
|
|
result = errno != 0 ? errno : ENOENT;
|
|
ioevent_add_to_deleted_list(task);
|
|
|
|
logError("file: "__FILE__", line: %d, "
|
|
"ioevent_modify fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, strerror(result));
|
|
return result;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int sf_set_read_event(struct fast_task_info *task)
|
|
{
|
|
int result;
|
|
|
|
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
|
if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
|
|
return 0;
|
|
}
|
|
|
|
task->event.callback = (IOEventCallback)sf_client_sock_read;
|
|
if (ioevent_modify(&task->thread_data->ev_puller,
|
|
task->event.fd, IOEVENT_READ, task) != 0)
|
|
{
|
|
result = errno != 0 ? errno : ENOENT;
|
|
ioevent_add_to_deleted_list(task);
|
|
|
|
logError("file: "__FILE__", line: %d, "
|
|
"ioevent_modify fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, result, strerror(result));
|
|
return result;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static inline int sf_ioevent_add(struct fast_task_info *task,
|
|
IOEventCallback callback, const int timeout)
|
|
{
|
|
int result;
|
|
|
|
result = ioevent_set(task, task->thread_data, task->event.fd,
|
|
IOEVENT_READ, callback, timeout);
|
|
return result > 0 ? -1 * result : result;
|
|
}
|
|
|
|
static inline int sf_nio_init(struct fast_task_info *task)
|
|
{
|
|
int current_connections;
|
|
|
|
current_connections = __sync_add_and_fetch(
|
|
&g_sf_global_vars.connection_stat.current_count, 1);
|
|
if (current_connections > g_sf_global_vars.connection_stat.max_count) {
|
|
g_sf_global_vars.connection_stat.max_count = current_connections;
|
|
}
|
|
|
|
return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read,
|
|
task->network_timeout);
|
|
}
|
|
|
|
static int sf_client_sock_connect(int sock, short event, void *arg)
|
|
{
|
|
int result;
|
|
socklen_t len;
|
|
struct fast_task_info *task;
|
|
|
|
task = (struct fast_task_info *)arg;
|
|
if (event & IOEVENT_TIMEOUT) {
|
|
result = ETIMEDOUT;
|
|
} else {
|
|
len = sizeof(result);
|
|
if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) {
|
|
result = errno != 0 ? errno : EACCES;
|
|
}
|
|
}
|
|
|
|
if (result != 0) {
|
|
logError("file: "__FILE__", line: %d, "
|
|
"connect to server %s:%u fail, errno: %d, "
|
|
"error info: %s", __LINE__, task->server_ip,
|
|
task->port, result, STRERROR(result));
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"connect to server %s:%u successfully",
|
|
__LINE__, task->server_ip, task->port);
|
|
return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE);
|
|
}
|
|
|
|
static int sf_connect_server(struct fast_task_info *task)
|
|
{
|
|
int result;
|
|
|
|
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
|
|
O_NONBLOCK, NULL, &result)) < 0)
|
|
{
|
|
return result > 0 ? -1 * result : result;
|
|
}
|
|
|
|
result = asyncconnectserverbyip(task->event.fd,
|
|
task->server_ip, task->port);
|
|
if (result == 0) {
|
|
if ((result=sf_ioevent_add(task, (IOEventCallback)
|
|
sf_client_sock_read, task->network_timeout)) != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
logInfo("file: "__FILE__", line: %d, "
|
|
"connect to server %s:%u successfully",
|
|
__LINE__, task->server_ip, task->port);
|
|
return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE);
|
|
} else if (result == EINPROGRESS) {
|
|
result = ioevent_set(task, task->thread_data, task->event.fd,
|
|
IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback)
|
|
sf_client_sock_connect, task->connect_timeout);
|
|
return result > 0 ? -1 * result : result;
|
|
} else {
|
|
close(task->event.fd);
|
|
task->event.fd = -1;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"connect to server %s:%u fail, errno: %d, "
|
|
"error info: %s", __LINE__, task->server_ip,
|
|
task->port, result, STRERROR(result));
|
|
return result > 0 ? -1 * result : result;
|
|
}
|
|
}
|
|
|
|
static int sf_nio_deal_task(struct fast_task_info *task)
|
|
{
|
|
int result;
|
|
|
|
switch (task->nio_stages.notify) {
|
|
case SF_NIO_STAGE_INIT:
|
|
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
|
result = sf_nio_init(task);
|
|
break;
|
|
case SF_NIO_STAGE_CONNECT:
|
|
result = sf_connect_server(task);
|
|
break;
|
|
case SF_NIO_STAGE_RECV:
|
|
if ((result=sf_set_read_event(task)) == 0)
|
|
{
|
|
sf_client_sock_read(task->event.fd,
|
|
IOEVENT_READ, task);
|
|
}
|
|
break;
|
|
case SF_NIO_STAGE_SEND:
|
|
result = sf_send_add_event(task);
|
|
break;
|
|
case SF_NIO_STAGE_CONTINUE: //continue deal
|
|
result = SF_CTX->deal_task(task, SF_NIO_STAGE_CONTINUE);
|
|
break;
|
|
case SF_NIO_STAGE_FORWARDED: //forward by other thread
|
|
if ((result=sf_ioevent_add(task, (IOEventCallback)
|
|
sf_client_sock_read,
|
|
task->network_timeout)) == 0)
|
|
{
|
|
result = SF_CTX->deal_task(task, SF_NIO_STAGE_SEND);
|
|
}
|
|
break;
|
|
case SF_NIO_STAGE_CLOSE:
|
|
result = -EIO; //close this socket
|
|
break;
|
|
default:
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, invalid notify stage: %d",
|
|
__LINE__, task->client_ip, task->nio_stages.notify);
|
|
result = -EINVAL;
|
|
break;
|
|
}
|
|
|
|
if (result < 0) {
|
|
ioevent_add_to_deleted_list(task);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
int sf_nio_notify(struct fast_task_info *task, const int stage)
|
|
{
|
|
int64_t n;
|
|
int result;
|
|
bool notify;
|
|
|
|
PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock);
|
|
task->nio_stages.notify = stage;
|
|
task->next = NULL;
|
|
|
|
if (task->thread_data->waiting_queue.tail == NULL) {
|
|
task->thread_data->waiting_queue.head = task;
|
|
notify = true;
|
|
} else {
|
|
task->thread_data->waiting_queue.tail->next = task;
|
|
notify = false;
|
|
}
|
|
task->thread_data->waiting_queue.tail = task;
|
|
PTHREAD_MUTEX_UNLOCK(&task->thread_data->waiting_queue.lock);
|
|
|
|
if (notify) {
|
|
n = 1;
|
|
if (write(FC_NOTIFY_WRITE_FD(task->thread_data),
|
|
&n, sizeof(n)) != sizeof(n))
|
|
{
|
|
result = errno != 0 ? errno : EIO;
|
|
logError("file: "__FILE__", line: %d, "
|
|
"write eventfd %d fail, errno: %d, error info: %s",
|
|
__LINE__, FC_NOTIFY_WRITE_FD(task->thread_data),
|
|
result, STRERROR(result));
|
|
return result;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void sf_recv_notify_read(int sock, short event, void *arg)
|
|
{
|
|
int64_t n;
|
|
struct nio_thread_data *thread_data;
|
|
struct fast_task_info *task;
|
|
struct fast_task_info *current;
|
|
|
|
thread_data = (struct nio_thread_data *)arg;
|
|
if (read(sock, &n, sizeof(n)) < 0) {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"read from eventfd %d fail, errno: %d, error info: %s",
|
|
__LINE__, sock, errno, STRERROR(errno));
|
|
}
|
|
|
|
PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock);
|
|
current = thread_data->waiting_queue.head;
|
|
thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL;
|
|
PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock);
|
|
|
|
while (current != NULL) {
|
|
task = current;
|
|
current = current->next;
|
|
|
|
if (!task->canceled) {
|
|
sf_nio_deal_task(task);
|
|
}
|
|
}
|
|
}
|
|
|
|
int sf_send_add_event(struct fast_task_info *task)
|
|
{
|
|
task->offset = 0;
|
|
if (task->length > 0) {
|
|
/* direct send */
|
|
task->nio_stages.current = SF_NIO_STAGE_SEND;
|
|
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
|
|
return errno != 0 ? errno : EIO;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int sf_client_sock_read(int sock, short event, void *arg)
|
|
{
|
|
int bytes;
|
|
int recv_bytes;
|
|
int total_read;
|
|
struct fast_task_info *task;
|
|
|
|
task = (struct fast_task_info *)arg;
|
|
if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_RECV)) {
|
|
return 0;
|
|
}
|
|
|
|
//assert(sock >= 0);
|
|
if (event & IOEVENT_TIMEOUT) {
|
|
if (task->offset == 0 && task->req_count > 0) {
|
|
if (SF_CTX->timeout_callback != NULL) {
|
|
if (SF_CTX->timeout_callback(task) != 0) {
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
task->event.timer.expires = g_current_time +
|
|
task->network_timeout;
|
|
fast_timer_add(&task->thread_data->timer,
|
|
&task->event.timer);
|
|
} else {
|
|
if (task->length > 0) {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, recv timeout, "
|
|
"recv offset: %d, expect length: %d",
|
|
__LINE__, task->client_ip,
|
|
task->offset, task->length);
|
|
} else {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, req_count: %"PRId64", recv timeout",
|
|
__LINE__, task->client_ip, task->req_count);
|
|
}
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
if (event & IOEVENT_ERROR) {
|
|
logDebug("file: "__FILE__", line: %d, "
|
|
"client ip: %s, recv error event: %d, "
|
|
"close connection", __LINE__, task->client_ip, event);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
total_read = 0;
|
|
while (1) {
|
|
fast_timer_modify(&task->thread_data->timer,
|
|
&task->event.timer, g_current_time +
|
|
task->network_timeout);
|
|
if (task->length == 0) { //recv header
|
|
recv_bytes = SF_CTX->header_size - task->offset;
|
|
} else {
|
|
recv_bytes = task->length - task->offset;
|
|
}
|
|
|
|
bytes = read(sock, task->data + task->offset, recv_bytes);
|
|
if (bytes < 0) {
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
break;
|
|
} else if (errno == EINTR) { //should retry
|
|
logDebug("file: "__FILE__", line: %d, "
|
|
"client ip: %s, ignore interupt signal",
|
|
__LINE__, task->client_ip);
|
|
continue;
|
|
} else {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, recv fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, task->client_ip,
|
|
errno, strerror(errno));
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
} else if (bytes == 0) {
|
|
if (task->offset > 0) {
|
|
if (task->length > 0) {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, connection "
|
|
"disconnected, expect pkg length: %d, "
|
|
"recv pkg length: %d", __LINE__,
|
|
task->client_ip, task->length,
|
|
task->offset);
|
|
} else {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, connection "
|
|
"disconnected, recv pkg length: %d",
|
|
__LINE__, task->client_ip,
|
|
task->offset);
|
|
}
|
|
} else {
|
|
logDebug("file: "__FILE__", line: %d, "
|
|
"client ip: %s, sock: %d, recv fail, "
|
|
"connection disconnected",
|
|
__LINE__, task->client_ip, sock);
|
|
}
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
total_read += bytes;
|
|
task->offset += bytes;
|
|
if (task->length == 0) { //header
|
|
if (task->offset < SF_CTX->header_size) {
|
|
break;
|
|
}
|
|
|
|
if (SF_CTX->set_body_length(task) != 0) {
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
if (task->length < 0) {
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, pkg length: %d < 0",
|
|
__LINE__, task->client_ip,
|
|
task->length);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
task->length += SF_CTX->header_size;
|
|
if (task->length > g_sf_global_vars.max_pkg_size) {
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, pkg length: %d > "
|
|
"max pkg size: %d", __LINE__,
|
|
task->client_ip, task->length,
|
|
g_sf_global_vars.max_pkg_size);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
if (task->length > task->size) {
|
|
int old_size;
|
|
|
|
if (!SF_CTX->realloc_task_buffer) {
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, pkg length: %d exceeds "
|
|
"task size: %d, but realloc buffer disabled",
|
|
__LINE__, task->client_ip, task->size,
|
|
task->length);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
old_size = task->size;
|
|
if (free_queue_realloc_buffer(task, task->length) != 0) {
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, realloc buffer size "
|
|
"from %d to %d fail", __LINE__,
|
|
task->client_ip, task->size, task->length);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
logDebug("file: "__FILE__", line: %d, "
|
|
"client ip: %s, task length: %d, realloc buffer size "
|
|
"from %d to %d", __LINE__, task->client_ip,
|
|
task->length, old_size, task->size);
|
|
}
|
|
}
|
|
|
|
if (task->offset >= task->length) { //recv done
|
|
task->req_count++;
|
|
task->nio_stages.current = SF_NIO_STAGE_SEND;
|
|
if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
return total_read;
|
|
}
|
|
|
|
int sf_client_sock_write(int sock, short event, void *arg)
|
|
{
|
|
int bytes;
|
|
int total_write;
|
|
struct fast_task_info *task;
|
|
|
|
//assert(sock >= 0);
|
|
task = (struct fast_task_info *)arg;
|
|
if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_SEND)) {
|
|
return 0;
|
|
}
|
|
|
|
if (event & IOEVENT_TIMEOUT) {
|
|
logError("file: "__FILE__", line: %d, "
|
|
"client ip: %s, send timeout. total length: %d, offset: %d, "
|
|
"remain: %d", __LINE__, task->client_ip, task->length,
|
|
task->offset, task->length - task->offset);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
if (event & IOEVENT_ERROR) {
|
|
logDebug("file: "__FILE__", line: %d, "
|
|
"client ip: %s, recv error event: %d, "
|
|
"close connection", __LINE__, task->client_ip, event);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
total_write = 0;
|
|
while (1) {
|
|
fast_timer_modify(&task->thread_data->timer,
|
|
&task->event.timer, g_current_time +
|
|
task->network_timeout);
|
|
|
|
bytes = write(sock, task->data + task->offset,
|
|
task->length - task->offset);
|
|
if (bytes < 0) {
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
|
{
|
|
if (set_write_event(task) != 0) {
|
|
return -1;
|
|
}
|
|
break;
|
|
} else if (errno == EINTR) { //should retry
|
|
logDebug("file: "__FILE__", line: %d, "
|
|
"client ip: %s, ignore interupt signal",
|
|
__LINE__, task->client_ip);
|
|
continue;
|
|
} else {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, send fail, "
|
|
"errno: %d, error info: %s",
|
|
__LINE__, task->client_ip,
|
|
errno, strerror(errno));
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
} else if (bytes == 0) {
|
|
logWarning("file: "__FILE__", line: %d, "
|
|
"client ip: %s, sock: %d, send failed, "
|
|
"connection disconnected",
|
|
__LINE__, task->client_ip, sock);
|
|
|
|
ioevent_add_to_deleted_list(task);
|
|
return -1;
|
|
}
|
|
|
|
total_write += bytes;
|
|
task->offset += bytes;
|
|
if (task->offset >= task->length) {
|
|
task->offset = 0;
|
|
task->length = 0;
|
|
if (sf_set_read_event(task) != 0) {
|
|
return -1;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
return total_write;
|
|
}
|