use Linux eventfd for notify
parent
0f2b3a8e2c
commit
965c8277c7
448
src/sf_nio.c
448
src/sf_nio.c
|
|
@ -24,7 +24,7 @@
|
||||||
#include "sf_global.h"
|
#include "sf_global.h"
|
||||||
#include "sf_nio.h"
|
#include "sf_nio.h"
|
||||||
|
|
||||||
#define SF_CTX ((SFContext *)(pTask->ctx))
|
#define SF_CTX ((SFContext *)(task->ctx))
|
||||||
|
|
||||||
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
||||||
sf_set_body_length_callback set_body_length_func,
|
sf_set_body_length_callback set_body_length_func,
|
||||||
|
|
@ -38,67 +38,67 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
||||||
sf_context->timeout_callback = timeout_callback;
|
sf_context->timeout_callback = timeout_callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sf_task_detach_thread(struct fast_task_info *pTask)
|
static void sf_task_detach_thread(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd);
|
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
|
||||||
|
|
||||||
if (pTask->event.timer.expires > 0) {
|
if (task->event.timer.expires > 0) {
|
||||||
fast_timer_remove(&pTask->thread_data->timer,
|
fast_timer_remove(&task->thread_data->timer,
|
||||||
&pTask->event.timer);
|
&task->event.timer);
|
||||||
pTask->event.timer.expires = 0;
|
task->event.timer.expires = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SF_CTX->remove_from_ready_list) {
|
if (SF_CTX->remove_from_ready_list) {
|
||||||
ioevent_remove(&pTask->thread_data->ev_puller, pTask);
|
ioevent_remove(&task->thread_data->ev_puller, task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_task_switch_thread(struct fast_task_info *pTask,
|
void sf_task_switch_thread(struct fast_task_info *task,
|
||||||
const int new_thread_index)
|
const int new_thread_index)
|
||||||
{
|
{
|
||||||
sf_task_detach_thread(pTask);
|
sf_task_detach_thread(task);
|
||||||
pTask->thread_data = SF_CTX->thread_data + new_thread_index;
|
task->thread_data = SF_CTX->thread_data + new_thread_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_task_finish_clean_up(struct fast_task_info *pTask)
|
void sf_task_finish_clean_up(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
assert(pTask->event.fd >= 0);
|
assert(task->event.fd >= 0);
|
||||||
if (pTask->event.fd < 0) {
|
if (task->event.fd < 0) {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"pTask: %p already cleaned",
|
"task: %p already cleaned",
|
||||||
__LINE__, pTask);
|
__LINE__, task);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (pTask->finish_callback != NULL) {
|
if (task->finish_callback != NULL) {
|
||||||
pTask->finish_callback(pTask);
|
task->finish_callback(task);
|
||||||
pTask->finish_callback = NULL;
|
task->finish_callback = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
sf_task_detach_thread(pTask);
|
sf_task_detach_thread(task);
|
||||||
close(pTask->event.fd);
|
close(task->event.fd);
|
||||||
pTask->event.fd = -1;
|
task->event.fd = -1;
|
||||||
|
|
||||||
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
|
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
|
||||||
free_queue_push(pTask);
|
free_queue_push(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int set_write_event(struct fast_task_info *pTask)
|
static inline int set_write_event(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
if (pTask->event.callback == (IOEventCallback)sf_client_sock_write) {
|
if (task->event.callback == (IOEventCallback)sf_client_sock_write) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->event.callback = (IOEventCallback)sf_client_sock_write;
|
task->event.callback = (IOEventCallback)sf_client_sock_write;
|
||||||
if (ioevent_modify(&pTask->thread_data->ev_puller,
|
if (ioevent_modify(&task->thread_data->ev_puller,
|
||||||
pTask->event.fd, IOEVENT_WRITE, pTask) != 0)
|
task->event.fd, IOEVENT_WRITE, task) != 0)
|
||||||
{
|
{
|
||||||
result = errno != 0 ? errno : ENOENT;
|
result = errno != 0 ? errno : ENOENT;
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"ioevent_modify fail, "
|
"ioevent_modify fail, "
|
||||||
|
|
@ -109,20 +109,20 @@ static inline int set_write_event(struct fast_task_info *pTask)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int set_read_event(struct fast_task_info *pTask)
|
static inline int set_read_event(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
if (pTask->event.callback == (IOEventCallback)sf_client_sock_read) {
|
if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->event.callback = (IOEventCallback)sf_client_sock_read;
|
task->event.callback = (IOEventCallback)sf_client_sock_read;
|
||||||
if (ioevent_modify(&pTask->thread_data->ev_puller,
|
if (ioevent_modify(&task->thread_data->ev_puller,
|
||||||
pTask->event.fd, IOEVENT_READ, pTask) != 0)
|
task->event.fd, IOEVENT_READ, task) != 0)
|
||||||
{
|
{
|
||||||
result = errno != 0 ? errno : ENOENT;
|
result = errno != 0 ? errno : ENOENT;
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"ioevent_modify fail, "
|
"ioevent_modify fail, "
|
||||||
|
|
@ -134,38 +134,17 @@ static inline int set_read_event(struct fast_task_info *pTask)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_nio_notify(struct fast_task_info *pTask, const int stage)
|
static int sf_ioevent_add(struct fast_task_info *task)
|
||||||
{
|
|
||||||
long task_addr;
|
|
||||||
|
|
||||||
task_addr = (long)pTask;
|
|
||||||
pTask->nio_stage = stage;
|
|
||||||
if (write(pTask->thread_data->pipe_fds[1], &task_addr,
|
|
||||||
sizeof(task_addr)) != sizeof(task_addr))
|
|
||||||
{
|
|
||||||
int result;
|
|
||||||
result = errno != 0 ? errno : EIO;
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
|
||||||
"write to pipe %d fail, errno: %d, error info: %s",
|
|
||||||
__LINE__, pTask->thread_data->pipe_fds[1],
|
|
||||||
result, STRERROR(result));
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int sf_ioevent_add(struct fast_task_info *pTask)
|
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd,
|
result = ioevent_set(task, task->thread_data, task->event.fd,
|
||||||
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
|
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
|
||||||
g_sf_global_vars.network_timeout);
|
g_sf_global_vars.network_timeout);
|
||||||
return result > 0 ? -1 * result : result;
|
return result > 0 ? -1 * result : result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int sf_nio_init(struct fast_task_info *pTask)
|
static int sf_nio_init(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
int current_connections;
|
int current_connections;
|
||||||
|
|
||||||
|
|
@ -175,15 +154,146 @@ static int sf_nio_init(struct fast_task_info *pTask)
|
||||||
g_sf_global_vars.connection_stat.max_count = current_connections;
|
g_sf_global_vars.connection_stat.max_count = current_connections;
|
||||||
}
|
}
|
||||||
|
|
||||||
return sf_ioevent_add(pTask);
|
return sf_ioevent_add(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int sf_nio_deal_task(struct fast_task_info *task)
|
||||||
|
{
|
||||||
|
int result;
|
||||||
|
switch (task->nio_stage) {
|
||||||
|
case SF_NIO_STAGE_INIT:
|
||||||
|
task->nio_stage = SF_NIO_STAGE_RECV;
|
||||||
|
result = sf_nio_init(task);
|
||||||
|
break;
|
||||||
|
case SF_NIO_STAGE_RECV:
|
||||||
|
if ((result=set_read_event(task)) == 0)
|
||||||
|
{
|
||||||
|
sf_client_sock_read(task->event.fd,
|
||||||
|
IOEVENT_READ, task);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case SF_NIO_STAGE_SEND:
|
||||||
|
result = sf_send_add_event(task);
|
||||||
|
break;
|
||||||
|
case SF_NIO_STAGE_CONTINUE: //continue deal
|
||||||
|
result = SF_CTX->deal_task(task);
|
||||||
|
break;
|
||||||
|
case SF_NIO_STAGE_FORWARDED: //forward by other thread
|
||||||
|
if ((result=sf_ioevent_add(task)) == 0) {
|
||||||
|
result = SF_CTX->deal_task(task);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case SF_NIO_STAGE_CLOSE:
|
||||||
|
result = -EIO; //close this socket
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"client ip: %s, invalid stage: %d",
|
||||||
|
__LINE__, task->client_ip, task->nio_stage);
|
||||||
|
result = -EINVAL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result < 0) {
|
||||||
|
SF_CTX->task_cleanup_func(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(OS_LINUX)
|
||||||
|
int sf_nio_notify(struct fast_task_info *task, const int stage)
|
||||||
|
{
|
||||||
|
int64_t n;
|
||||||
|
int result;
|
||||||
|
bool notify;
|
||||||
|
|
||||||
|
task->nio_stage = stage;
|
||||||
|
task->next = NULL;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&task->thread_data.waiting_queue.lock);
|
||||||
|
if (task->thread_data.waiting_queue.tail == NULL) {
|
||||||
|
task->thread_data.waiting_queue.head = task;
|
||||||
|
notify = true;
|
||||||
|
} else {
|
||||||
|
task->thread_data.waiting_queue.tail->next = task;
|
||||||
|
notify = false;
|
||||||
|
}
|
||||||
|
task->thread_data.waiting_queue.tail = task;
|
||||||
|
pthread_mutex_unlock(&task->thread_data.waiting_queue.lock);
|
||||||
|
|
||||||
|
if (notify) {
|
||||||
|
n = 1;
|
||||||
|
if (write(NOTIFY_WRITE_FD(task->thread_data),
|
||||||
|
&n, sizeof(n)) != sizeof(n))
|
||||||
|
{
|
||||||
|
result = errno != 0 ? errno : EIO;
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"write eventfd %d fail, errno: %d, error info: %s",
|
||||||
|
__LINE__, NOTIFY_WRITE_FD(task->thread_data),
|
||||||
|
result, STRERROR(result));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void sf_recv_notify_read(int sock, short event, void *arg)
|
||||||
|
{
|
||||||
|
int64_t n;
|
||||||
|
struct nio_thread_data *thread_data;
|
||||||
|
struct fast_task_info *task;
|
||||||
|
struct fast_task_info *current;
|
||||||
|
|
||||||
|
thread_data = (struct nio_thread_data *)arg;
|
||||||
|
if (read(sock, &n, sizeof(n)) < 0) {
|
||||||
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
|
"read from eventfd %d fail, errno: %d, error info: %s",
|
||||||
|
__LINE__, sock, errno, STRERROR(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&thread_data.waiting_queue.lock);
|
||||||
|
current = thread_data->waiting_queue.head;
|
||||||
|
thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL;
|
||||||
|
pthread_mutex_unlock(&thread_data.waiting_queue.lock);
|
||||||
|
|
||||||
|
while (current != NULL) {
|
||||||
|
task = current;
|
||||||
|
current = current->next;
|
||||||
|
|
||||||
|
sf_nio_deal_task(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
int sf_nio_notify(struct fast_task_info *task, const int stage)
|
||||||
|
{
|
||||||
|
long task_addr;
|
||||||
|
int result;
|
||||||
|
|
||||||
|
task_addr = (long)task;
|
||||||
|
task->nio_stage = stage;
|
||||||
|
if (write(NOTIFY_WRITE_FD(task->thread_data), &task_addr,
|
||||||
|
sizeof(task_addr)) != sizeof(task_addr))
|
||||||
|
{
|
||||||
|
result = errno != 0 ? errno : EIO;
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"write to pipe %d fail, errno: %d, error info: %s",
|
||||||
|
__LINE__, NOTIFY_WRITE_FD(task->thread_data),
|
||||||
|
result, STRERROR(result));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sf_recv_notify_read(int sock, short event, void *arg)
|
void sf_recv_notify_read(int sock, short event, void *arg)
|
||||||
{
|
{
|
||||||
int bytes;
|
int bytes;
|
||||||
int result;
|
|
||||||
long task_ptr;
|
long task_ptr;
|
||||||
struct fast_task_info *pTask;
|
struct fast_task_info *task;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) {
|
if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) {
|
||||||
|
|
@ -200,53 +310,18 @@ void sf_recv_notify_read(int sock, short event, void *arg)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask = (struct fast_task_info *)task_ptr;
|
task = (struct fast_task_info *)task_ptr;
|
||||||
switch (pTask->nio_stage) {
|
sf_nio_deal_task(task);
|
||||||
case SF_NIO_STAGE_INIT:
|
|
||||||
pTask->nio_stage = SF_NIO_STAGE_RECV;
|
|
||||||
result = sf_nio_init(pTask);
|
|
||||||
break;
|
|
||||||
case SF_NIO_STAGE_RECV:
|
|
||||||
if ((result=set_read_event(pTask)) == 0)
|
|
||||||
{
|
|
||||||
sf_client_sock_read(pTask->event.fd,
|
|
||||||
IOEVENT_READ, pTask);
|
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
case SF_NIO_STAGE_SEND:
|
|
||||||
result = sf_send_add_event(pTask);
|
|
||||||
break;
|
|
||||||
case SF_NIO_STAGE_CONTINUE: //continue deal
|
|
||||||
result = SF_CTX->deal_task(pTask);
|
|
||||||
break;
|
|
||||||
case SF_NIO_STAGE_FORWARDED: //forward by other thread
|
|
||||||
if ((result=sf_ioevent_add(pTask)) == 0) {
|
|
||||||
result = SF_CTX->deal_task(pTask);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case SF_NIO_STAGE_CLOSE:
|
|
||||||
result = -EIO; //close this socket
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
|
||||||
"client ip: %s, invalid stage: %d",
|
|
||||||
__LINE__, pTask->client_ip, pTask->nio_stage);
|
|
||||||
result = -EINVAL;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (result < 0) {
|
int sf_send_add_event(struct fast_task_info *task)
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int sf_send_add_event(struct fast_task_info *pTask)
|
|
||||||
{
|
{
|
||||||
pTask->offset = 0;
|
task->offset = 0;
|
||||||
if (pTask->length > 0) {
|
if (task->length > 0) {
|
||||||
/* direct send */
|
/* direct send */
|
||||||
if (sf_client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask) < 0) {
|
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
|
||||||
return errno != 0 ? errno : EIO;
|
return errno != 0 ? errno : EIO;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -259,43 +334,43 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
int bytes;
|
int bytes;
|
||||||
int recv_bytes;
|
int recv_bytes;
|
||||||
int total_read;
|
int total_read;
|
||||||
struct fast_task_info *pTask;
|
struct fast_task_info *task;
|
||||||
|
|
||||||
pTask = (struct fast_task_info *)arg;
|
task = (struct fast_task_info *)arg;
|
||||||
if (pTask->nio_stage != SF_NIO_STAGE_RECV) {
|
if (task->nio_stage != SF_NIO_STAGE_RECV) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(sock >= 0);
|
assert(sock >= 0);
|
||||||
if (event & IOEVENT_TIMEOUT) {
|
if (event & IOEVENT_TIMEOUT) {
|
||||||
if (pTask->offset == 0 && pTask->req_count > 0) {
|
if (task->offset == 0 && task->req_count > 0) {
|
||||||
if (SF_CTX->timeout_callback != NULL) {
|
if (SF_CTX->timeout_callback != NULL) {
|
||||||
if (SF_CTX->timeout_callback(pTask) != 0) {
|
if (SF_CTX->timeout_callback(task) != 0) {
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->event.timer.expires = g_current_time +
|
task->event.timer.expires = g_current_time +
|
||||||
g_sf_global_vars.network_timeout;
|
g_sf_global_vars.network_timeout;
|
||||||
fast_timer_add(&pTask->thread_data->timer,
|
fast_timer_add(&task->thread_data->timer,
|
||||||
&pTask->event.timer);
|
&task->event.timer);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (pTask->length > 0) {
|
if (task->length > 0) {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, recv timeout, "
|
"client ip: %s, recv timeout, "
|
||||||
"recv offset: %d, expect length: %d",
|
"recv offset: %d, expect length: %d",
|
||||||
__LINE__, pTask->client_ip,
|
__LINE__, task->client_ip,
|
||||||
pTask->offset, pTask->length);
|
task->offset, task->length);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, req_count: %"PRId64", recv timeout",
|
"client ip: %s, req_count: %"PRId64", recv timeout",
|
||||||
__LINE__, pTask->client_ip, pTask->req_count);
|
__LINE__, task->client_ip, task->req_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -305,25 +380,25 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
if (event & IOEVENT_ERROR) {
|
if (event & IOEVENT_ERROR) {
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, recv error event: %d, "
|
"client ip: %s, recv error event: %d, "
|
||||||
"close connection", __LINE__, pTask->client_ip, event);
|
"close connection", __LINE__, task->client_ip, event);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
total_read = 0;
|
total_read = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
fast_timer_modify(&pTask->thread_data->timer,
|
fast_timer_modify(&task->thread_data->timer,
|
||||||
&pTask->event.timer, g_current_time +
|
&task->event.timer, g_current_time +
|
||||||
g_sf_global_vars.network_timeout);
|
g_sf_global_vars.network_timeout);
|
||||||
if (pTask->length == 0) { //recv header
|
if (task->length == 0) { //recv header
|
||||||
recv_bytes = SF_CTX->header_size - pTask->offset;
|
recv_bytes = SF_CTX->header_size - task->offset;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
recv_bytes = pTask->length - pTask->offset;
|
recv_bytes = task->length - task->offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes = read(sock, pTask->data + pTask->offset, recv_bytes);
|
bytes = read(sock, task->data + task->offset, recv_bytes);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
break;
|
break;
|
||||||
|
|
@ -331,107 +406,107 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
else if (errno == EINTR) { //should retry
|
else if (errno == EINTR) { //should retry
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, ignore interupt signal",
|
"client ip: %s, ignore interupt signal",
|
||||||
__LINE__, pTask->client_ip);
|
__LINE__, task->client_ip);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, recv fail, "
|
"client ip: %s, recv fail, "
|
||||||
"errno: %d, error info: %s",
|
"errno: %d, error info: %s",
|
||||||
__LINE__, pTask->client_ip,
|
__LINE__, task->client_ip,
|
||||||
errno, strerror(errno));
|
errno, strerror(errno));
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (bytes == 0) {
|
else if (bytes == 0) {
|
||||||
if (pTask->offset > 0) {
|
if (task->offset > 0) {
|
||||||
if (pTask->length > 0) {
|
if (task->length > 0) {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, connection "
|
"client ip: %s, connection "
|
||||||
"disconnected, expect pkg length: %d, "
|
"disconnected, expect pkg length: %d, "
|
||||||
"recv pkg length: %d", __LINE__,
|
"recv pkg length: %d", __LINE__,
|
||||||
pTask->client_ip, pTask->length,
|
task->client_ip, task->length,
|
||||||
pTask->offset);
|
task->offset);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, connection "
|
"client ip: %s, connection "
|
||||||
"disconnected, recv pkg length: %d",
|
"disconnected, recv pkg length: %d",
|
||||||
__LINE__, pTask->client_ip,
|
__LINE__, task->client_ip,
|
||||||
pTask->offset);
|
task->offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, sock: %d, recv fail, "
|
"client ip: %s, sock: %d, recv fail, "
|
||||||
"connection disconnected",
|
"connection disconnected",
|
||||||
__LINE__, pTask->client_ip, sock);
|
__LINE__, task->client_ip, sock);
|
||||||
}
|
}
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
total_read += bytes;
|
total_read += bytes;
|
||||||
pTask->offset += bytes;
|
task->offset += bytes;
|
||||||
if (pTask->length == 0) { //header
|
if (task->length == 0) { //header
|
||||||
if (pTask->offset < SF_CTX->header_size) {
|
if (task->offset < SF_CTX->header_size) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SF_CTX->set_body_length(pTask) != 0) {
|
if (SF_CTX->set_body_length(task) != 0) {
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pTask->length < 0) {
|
if (task->length < 0) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, pkg length: %d < 0",
|
"client ip: %s, pkg length: %d < 0",
|
||||||
__LINE__, pTask->client_ip,
|
__LINE__, task->client_ip,
|
||||||
pTask->length);
|
task->length);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->length += SF_CTX->header_size;
|
task->length += SF_CTX->header_size;
|
||||||
if (pTask->length > g_sf_global_vars.max_pkg_size) {
|
if (task->length > g_sf_global_vars.max_pkg_size) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, pkg length: %d > "
|
"client ip: %s, pkg length: %d > "
|
||||||
"max pkg size: %d", __LINE__,
|
"max pkg size: %d", __LINE__,
|
||||||
pTask->client_ip, pTask->length,
|
task->client_ip, task->length,
|
||||||
g_sf_global_vars.max_pkg_size);
|
g_sf_global_vars.max_pkg_size);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->length > pTask->size) {
|
if (task->length > task->size) {
|
||||||
int old_size;
|
int old_size;
|
||||||
old_size = pTask->size;
|
old_size = task->size;
|
||||||
if (free_queue_realloc_buffer(pTask, pTask->length) != 0) {
|
if (free_queue_realloc_buffer(task, task->length) != 0) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, realloc buffer size "
|
"client ip: %s, realloc buffer size "
|
||||||
"from %d to %d fail", __LINE__,
|
"from %d to %d fail", __LINE__,
|
||||||
pTask->client_ip, pTask->size, pTask->length);
|
task->client_ip, task->size, task->length);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, task length: %d, realloc buffer size "
|
"client ip: %s, task length: %d, realloc buffer size "
|
||||||
"from %d to %d", __LINE__, pTask->client_ip,
|
"from %d to %d", __LINE__, task->client_ip,
|
||||||
pTask->length, old_size, pTask->size);
|
task->length, old_size, task->size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->offset >= pTask->length) { //recv done
|
if (task->offset >= task->length) { //recv done
|
||||||
pTask->req_count++;
|
task->req_count++;
|
||||||
pTask->nio_stage = SF_NIO_STAGE_SEND;
|
task->nio_stage = SF_NIO_STAGE_SEND;
|
||||||
if (SF_CTX->deal_task(pTask) < 0) { //fatal error
|
if (SF_CTX->deal_task(task) < 0) { //fatal error
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
@ -445,41 +520,41 @@ int sf_client_sock_write(int sock, short event, void *arg)
|
||||||
{
|
{
|
||||||
int bytes;
|
int bytes;
|
||||||
int total_write;
|
int total_write;
|
||||||
struct fast_task_info *pTask;
|
struct fast_task_info *task;
|
||||||
|
|
||||||
assert(sock >= 0);
|
assert(sock >= 0);
|
||||||
pTask = (struct fast_task_info *)arg;
|
task = (struct fast_task_info *)arg;
|
||||||
if (event & IOEVENT_TIMEOUT) {
|
if (event & IOEVENT_TIMEOUT) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, send timeout. total length: %d, offset: %d, "
|
"client ip: %s, send timeout. total length: %d, offset: %d, "
|
||||||
"remain: %d", __LINE__, pTask->client_ip, pTask->length,
|
"remain: %d", __LINE__, task->client_ip, task->length,
|
||||||
pTask->offset, pTask->length - pTask->offset);
|
task->offset, task->length - task->offset);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event & IOEVENT_ERROR) {
|
if (event & IOEVENT_ERROR) {
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, recv error event: %d, "
|
"client ip: %s, recv error event: %d, "
|
||||||
"close connection", __LINE__, pTask->client_ip, event);
|
"close connection", __LINE__, task->client_ip, event);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
total_write = 0;
|
total_write = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
fast_timer_modify(&pTask->thread_data->timer,
|
fast_timer_modify(&task->thread_data->timer,
|
||||||
&pTask->event.timer, g_current_time +
|
&task->event.timer, g_current_time +
|
||||||
g_sf_global_vars.network_timeout);
|
g_sf_global_vars.network_timeout);
|
||||||
|
|
||||||
bytes = write(sock, pTask->data + pTask->offset,
|
bytes = write(sock, task->data + task->offset,
|
||||||
pTask->length - pTask->offset);
|
task->length - task->offset);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||||
{
|
{
|
||||||
if (set_write_event(pTask) != 0) {
|
if (set_write_event(task) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
@ -487,36 +562,37 @@ int sf_client_sock_write(int sock, short event, void *arg)
|
||||||
else if (errno == EINTR) { //should retry
|
else if (errno == EINTR) { //should retry
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
logDebug("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, ignore interupt signal",
|
"client ip: %s, ignore interupt signal",
|
||||||
__LINE__, pTask->client_ip);
|
__LINE__, task->client_ip);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, send fail, "
|
"client ip: %s, send fail, "
|
||||||
"errno: %d, error info: %s",
|
"errno: %d, error info: %s",
|
||||||
__LINE__, pTask->client_ip,
|
__LINE__, task->client_ip,
|
||||||
errno, strerror(errno));
|
errno, strerror(errno));
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (bytes == 0) {
|
else if (bytes == 0) {
|
||||||
logWarning("file: "__FILE__", line: %d, "
|
logWarning("file: "__FILE__", line: %d, "
|
||||||
"client ip: %s, sock: %d, send failed, connection disconnected",
|
"client ip: %s, sock: %d, send failed, "
|
||||||
__LINE__, pTask->client_ip, sock);
|
"connection disconnected",
|
||||||
|
__LINE__, task->client_ip, sock);
|
||||||
|
|
||||||
SF_CTX->task_cleanup_func(pTask);
|
SF_CTX->task_cleanup_func(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
total_write += bytes;
|
total_write += bytes;
|
||||||
pTask->offset += bytes;
|
task->offset += bytes;
|
||||||
if (pTask->offset >= pTask->length) {
|
if (task->offset >= task->length) {
|
||||||
pTask->offset = 0;
|
task->offset = 0;
|
||||||
pTask->length = 0;
|
task->length = 0;
|
||||||
pTask->nio_stage = SF_NIO_STAGE_RECV;
|
task->nio_stage = SF_NIO_STAGE_RECV;
|
||||||
if (set_read_event(pTask) != 0) {
|
if (set_read_event(task) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,9 @@
|
||||||
#include "sf_define.h"
|
#include "sf_define.h"
|
||||||
#include "sf_types.h"
|
#include "sf_types.h"
|
||||||
|
|
||||||
|
#define NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0]
|
||||||
|
#define NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1]
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,9 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
#if defined(OS_LINUX)
|
||||||
|
#include <sys/eventfd.h>
|
||||||
|
#endif
|
||||||
#include "fastcommon/logger.h"
|
#include "fastcommon/logger.h"
|
||||||
#include "fastcommon/sockopt.h"
|
#include "fastcommon/sockopt.h"
|
||||||
#include "fastcommon/shared_func.h"
|
#include "fastcommon/shared_func.h"
|
||||||
|
|
@ -98,7 +101,7 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
int bytes;
|
int bytes;
|
||||||
struct worker_thread_context *thread_contexts;
|
struct worker_thread_context *thread_contexts;
|
||||||
struct worker_thread_context *thread_ctx;
|
struct worker_thread_context *thread_ctx;
|
||||||
struct nio_thread_data *pThreadData;
|
struct nio_thread_data *thread_data;
|
||||||
struct nio_thread_data *pDataEnd;
|
struct nio_thread_data *pDataEnd;
|
||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
pthread_attr_t thread_attr;
|
pthread_attr_t thread_attr;
|
||||||
|
|
@ -140,19 +143,19 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
|
|
||||||
sf_context->thread_count = 0;
|
sf_context->thread_count = 0;
|
||||||
pDataEnd = sf_context->thread_data + sf_context->work_threads;
|
pDataEnd = sf_context->thread_data + sf_context->work_threads;
|
||||||
for (pThreadData=sf_context->thread_data,thread_ctx=thread_contexts;
|
for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts;
|
||||||
pThreadData<pDataEnd; pThreadData++,thread_ctx++)
|
thread_data<pDataEnd; thread_data++,thread_ctx++)
|
||||||
{
|
{
|
||||||
pThreadData->thread_loop_callback = thread_loop_callback;
|
thread_data->thread_loop_callback = thread_loop_callback;
|
||||||
if (alloc_thread_extra_data_callback != NULL) {
|
if (alloc_thread_extra_data_callback != NULL) {
|
||||||
pThreadData->arg = alloc_thread_extra_data_callback(
|
thread_data->arg = alloc_thread_extra_data_callback(
|
||||||
(int)(pThreadData - sf_context->thread_data));
|
(int)(thread_data - sf_context->thread_data));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
pThreadData->arg = NULL;
|
thread_data->arg = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ioevent_init(&pThreadData->ev_puller,
|
if (ioevent_init(&thread_data->ev_puller,
|
||||||
g_sf_global_vars.max_connections + 2, net_timeout_ms, 0) != 0)
|
g_sf_global_vars.max_connections + 2, net_timeout_ms, 0) != 0)
|
||||||
{
|
{
|
||||||
result = errno != 0 ? errno : ENOMEM;
|
result = errno != 0 ? errno : ENOMEM;
|
||||||
|
|
@ -163,7 +166,7 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
result = fast_timer_init(&pThreadData->timer,
|
result = fast_timer_init(&thread_data->timer,
|
||||||
2 * g_sf_global_vars.network_timeout, g_current_time);
|
2 * g_sf_global_vars.network_timeout, g_current_time);
|
||||||
if (result != 0) {
|
if (result != 0) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
|
@ -173,7 +176,22 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pipe(pThreadData->pipe_fds) != 0) {
|
#if defined(OS_LINUX)
|
||||||
|
if ((NOTIFY_READ_FD(thread_data)=eventfd(0, EFD_NONBLOCK) < 0) {
|
||||||
|
result = errno != 0 ? errno : EPERM;
|
||||||
|
logError("file: "__FILE__", line: %d, "
|
||||||
|
"call eventfd fail, "
|
||||||
|
"errno: %d, error info: %s",
|
||||||
|
__LINE__, result, strerror(result));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
NOTIFY_WRITE_FD(tdata) = NOTIFY_READ_FD(thread_data);
|
||||||
|
|
||||||
|
if ((result=init_pthread_lock(&thread_data.waiting_queue.lock)) != 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
if (pipe(thread_data->pipe_fds) != 0) {
|
||||||
result = errno != 0 ? errno : EPERM;
|
result = errno != 0 ? errno : EPERM;
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"call pipe fail, "
|
"call pipe fail, "
|
||||||
|
|
@ -181,15 +199,7 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
__LINE__, result, strerror(result));
|
__LINE__, result, strerror(result));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if ((result=fd_add_flags(NOTIFY_READ_FD(thread_data),
|
||||||
#if defined(OS_LINUX)
|
|
||||||
if ((result=fd_add_flags(pThreadData->pipe_fds[0],
|
|
||||||
O_NONBLOCK | O_NOATIME)) != 0)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
if ((result=fd_add_flags(pThreadData->pipe_fds[0],
|
|
||||||
O_NONBLOCK)) != 0)
|
O_NONBLOCK)) != 0)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
|
@ -197,14 +207,14 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
thread_ctx->sf_context = sf_context;
|
thread_ctx->sf_context = sf_context;
|
||||||
thread_ctx->thread_data = pThreadData;
|
thread_ctx->thread_data = thread_data;
|
||||||
if ((result=pthread_create(&tid, &thread_attr,
|
if ((result=pthread_create(&tid, &thread_attr,
|
||||||
worker_thread_entrance, thread_ctx)) != 0)
|
worker_thread_entrance, thread_ctx)) != 0)
|
||||||
{
|
{
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"create thread failed, startup threads: %d, "
|
"create thread failed, startup threads: %d, "
|
||||||
"errno: %d, error info: %s",
|
"errno: %d, error info: %s",
|
||||||
__LINE__, (int)(pThreadData - sf_context->thread_data),
|
__LINE__, (int)(thread_data - sf_context->thread_data),
|
||||||
result, strerror(result));
|
result, strerror(result));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -216,14 +226,14 @@ int sf_service_init_ex(SFContext *sf_context,
|
||||||
|
|
||||||
int sf_service_destroy_ex(SFContext *sf_context)
|
int sf_service_destroy_ex(SFContext *sf_context)
|
||||||
{
|
{
|
||||||
struct nio_thread_data *pDataEnd, *pThreadData;
|
struct nio_thread_data *pDataEnd, *thread_data;
|
||||||
|
|
||||||
free_queue_destroy();
|
free_queue_destroy();
|
||||||
pDataEnd = sf_context->thread_data + sf_context->work_threads;
|
pDataEnd = sf_context->thread_data + sf_context->work_threads;
|
||||||
for (pThreadData=sf_context->thread_data; pThreadData<pDataEnd;
|
for (thread_data=sf_context->thread_data; thread_data<pDataEnd;
|
||||||
pThreadData++)
|
thread_data++)
|
||||||
{
|
{
|
||||||
fast_timer_destroy(&pThreadData->timer);
|
fast_timer_destroy(&thread_data->timer);
|
||||||
}
|
}
|
||||||
free(sf_context->thread_data);
|
free(sf_context->thread_data);
|
||||||
sf_context->thread_data = NULL;
|
sf_context->thread_data = NULL;
|
||||||
|
|
@ -306,10 +316,9 @@ static void *accept_thread_entrance(void *arg)
|
||||||
{
|
{
|
||||||
struct accept_thread_context *accept_context;
|
struct accept_thread_context *accept_context;
|
||||||
int incomesock;
|
int incomesock;
|
||||||
long task_ptr;
|
|
||||||
struct sockaddr_in inaddr;
|
struct sockaddr_in inaddr;
|
||||||
socklen_t sockaddr_len;
|
socklen_t sockaddr_len;
|
||||||
struct fast_task_info *pTask;
|
struct fast_task_info *task;
|
||||||
char szClientIp[IP_ADDRESS_SIZE];
|
char szClientIp[IP_ADDRESS_SIZE];
|
||||||
|
|
||||||
accept_context = (struct accept_thread_context *)arg;
|
accept_context = (struct accept_thread_context *)arg;
|
||||||
|
|
@ -334,8 +343,8 @@ static void *accept_thread_entrance(void *arg)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask = free_queue_pop();
|
task = free_queue_pop();
|
||||||
if (pTask == NULL) {
|
if (task == NULL) {
|
||||||
logError("file: "__FILE__", line: %d, "
|
logError("file: "__FILE__", line: %d, "
|
||||||
"malloc task buff failed, you should "
|
"malloc task buff failed, you should "
|
||||||
"increase the parameter: max_connections",
|
"increase the parameter: max_connections",
|
||||||
|
|
@ -343,30 +352,21 @@ static void *accept_thread_entrance(void *arg)
|
||||||
close(incomesock);
|
close(incomesock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
strcpy(pTask->client_ip, szClientIp);
|
strcpy(task->client_ip, szClientIp);
|
||||||
|
|
||||||
pTask->ctx = accept_context->sf_context;
|
task->ctx = accept_context->sf_context;
|
||||||
pTask->nio_stage = SF_NIO_STAGE_INIT;
|
task->event.fd = incomesock;
|
||||||
pTask->event.fd = incomesock;
|
task->thread_data = accept_context->sf_context->thread_data +
|
||||||
pTask->thread_data = accept_context->sf_context->thread_data +
|
|
||||||
incomesock % accept_context->sf_context->work_threads;
|
incomesock % accept_context->sf_context->work_threads;
|
||||||
if (accept_context->sf_context->accept_done_func != NULL) {
|
if (accept_context->sf_context->accept_done_func != NULL) {
|
||||||
accept_context->sf_context->accept_done_func(pTask,
|
accept_context->sf_context->accept_done_func(task,
|
||||||
accept_context->server_sock ==
|
accept_context->server_sock ==
|
||||||
accept_context->sf_context->inner_sock);
|
accept_context->sf_context->inner_sock);
|
||||||
}
|
}
|
||||||
|
|
||||||
task_ptr = (long)pTask;
|
if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) {
|
||||||
if (write(pTask->thread_data->pipe_fds[1], &task_ptr,
|
|
||||||
sizeof(task_ptr)) != sizeof(task_ptr))
|
|
||||||
{
|
|
||||||
logError("file: "__FILE__", line: %d, "
|
|
||||||
"call write to pipe fd: %d fail, "
|
|
||||||
"errno: %d, error info: %s",
|
|
||||||
__LINE__, pTask->thread_data->pipe_fds[1],
|
|
||||||
errno, strerror(errno));
|
|
||||||
close(incomesock);
|
close(incomesock);
|
||||||
free_queue_push(pTask);
|
free_queue_push(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue