enable epoll edge trigger by global variable epoll_edge_trigger
parent
1dd9ac656f
commit
c1ae024da5
|
|
@ -40,6 +40,7 @@ typedef struct sf_global_variables {
|
|||
|
||||
volatile bool continue_flag;
|
||||
bool tcp_quick_ack;
|
||||
bool epoll_edge_trigger;
|
||||
int max_connections;
|
||||
int max_pkg_size;
|
||||
int min_buff_size;
|
||||
|
|
@ -98,6 +99,8 @@ extern SFContext g_sf_context;
|
|||
#define SF_G_ERROR_HANDLER g_sf_global_vars.error_handler
|
||||
#define SF_G_EMPTY_STRING g_sf_global_vars.empty
|
||||
|
||||
#define SF_G_EPOLL_EDGE_TRIGGER g_sf_global_vars.epoll_edge_trigger
|
||||
|
||||
#define SF_WORK_THREADS(sf_context) sf_context.work_threads
|
||||
#define SF_ALIVE_THREAD_COUNT(sf_context) sf_context.thread_count
|
||||
#define SF_THREAD_INDEX(sf_context, tdata) (int)(tdata - sf_context.thread_data)
|
||||
|
|
|
|||
59
src/sf_nio.c
59
src/sf_nio.c
|
|
@ -37,6 +37,7 @@
|
|||
#include "fastcommon/sockopt.h"
|
||||
#include "fastcommon/fast_task_queue.h"
|
||||
#include "fastcommon/ioevent_loop.h"
|
||||
#include "fastcommon/fc_atomic.h"
|
||||
#include "sf_global.h"
|
||||
#include "sf_service.h"
|
||||
#include "sf_nio.h"
|
||||
|
|
@ -143,7 +144,7 @@ static inline int set_write_event(struct fast_task_info *task)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int sf_set_read_event(struct fast_task_info *task)
|
||||
static int setup_read_event(struct fast_task_info *task)
|
||||
{
|
||||
int result;
|
||||
|
||||
|
|
@ -169,6 +170,27 @@ int sf_set_read_event(struct fast_task_info *task)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int sf_set_read_event(struct fast_task_info *task)
|
||||
{
|
||||
int result;
|
||||
|
||||
if ((result=setup_read_event(task)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
if (SF_G_EPOLL_EDGE_TRIGGER) {
|
||||
if (FC_ATOMIC_GET(task->nio_stages.notify) == SF_NIO_STAGE_SEND) {
|
||||
__sync_bool_compare_and_swap(&task->nio_stages.next,
|
||||
SF_NIO_STAGE_NONE, SF_NIO_STAGE_RECV);
|
||||
return 0;
|
||||
} else {
|
||||
return sf_nio_notify(task, SF_NIO_STAGE_RECV);
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static inline int sf_ioevent_add(struct fast_task_info *task,
|
||||
IOEventCallback callback, const int timeout)
|
||||
{
|
||||
|
|
@ -276,8 +298,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
|
|||
result = sf_connect_server(task);
|
||||
break;
|
||||
case SF_NIO_STAGE_RECV:
|
||||
if ((result=sf_set_read_event(task)) == 0)
|
||||
{
|
||||
if ((result=setup_read_event(task)) == 0) {
|
||||
sf_client_sock_read(task->event.fd,
|
||||
IOEVENT_READ, task);
|
||||
}
|
||||
|
|
@ -321,7 +342,7 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
|
|||
int old_stage;
|
||||
bool notify;
|
||||
|
||||
if (__sync_add_and_fetch(&task->canceled, 0)) {
|
||||
if (FC_ATOMIC_GET(task->canceled)) {
|
||||
if (stage == SF_NIO_STAGE_CONTINUE) {
|
||||
if (task->continue_callback != NULL) {
|
||||
return task->continue_callback(task);
|
||||
|
|
@ -342,12 +363,22 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
|
|||
while (!__sync_bool_compare_and_swap(&task->nio_stages.notify,
|
||||
SF_NIO_STAGE_NONE, stage))
|
||||
{
|
||||
old_stage = __sync_fetch_and_add(&task->nio_stages.notify, 0);
|
||||
old_stage = FC_ATOMIC_GET(task->nio_stages.notify);
|
||||
if (old_stage == stage) {
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"current stage: %d equals to the target, skip set",
|
||||
__LINE__, stage);
|
||||
return 0;
|
||||
|
||||
} else if (SF_G_EPOLL_EDGE_TRIGGER && (
|
||||
(old_stage == SF_NIO_STAGE_RECV &&
|
||||
stage == SF_NIO_STAGE_SEND) ||
|
||||
(old_stage == SF_NIO_STAGE_SEND &&
|
||||
stage == SF_NIO_STAGE_RECV)))
|
||||
{
|
||||
__sync_bool_compare_and_swap(&task->nio_stages.
|
||||
next, SF_NIO_STAGE_NONE, stage);
|
||||
return 0;
|
||||
} else if (old_stage != SF_NIO_STAGE_NONE) {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
"current stage: %d != %d, skip set stage to %d",
|
||||
|
|
@ -390,6 +421,7 @@ void sf_recv_notify_read(int sock, short event, void *arg)
|
|||
{
|
||||
int64_t n;
|
||||
int stage;
|
||||
int next;
|
||||
struct nio_thread_data *thread_data;
|
||||
struct fast_task_info *task;
|
||||
struct fast_task_info *current;
|
||||
|
|
@ -410,7 +442,7 @@ void sf_recv_notify_read(int sock, short event, void *arg)
|
|||
task = current;
|
||||
current = current->next;
|
||||
|
||||
stage = __sync_add_and_fetch(&task->nio_stages.notify, 0);
|
||||
stage = FC_ATOMIC_GET(task->nio_stages.notify);
|
||||
if (!task->canceled) {
|
||||
if (stage == SF_NIO_STAGE_CONTINUE) {
|
||||
/* MUST set to SF_NIO_STAGE_NONE first for re-entry */
|
||||
|
|
@ -422,6 +454,15 @@ void sf_recv_notify_read(int sock, short event, void *arg)
|
|||
__sync_bool_compare_and_swap(&task->nio_stages.notify,
|
||||
stage, SF_NIO_STAGE_NONE);
|
||||
}
|
||||
|
||||
if (SF_G_EPOLL_EDGE_TRIGGER) {
|
||||
next = FC_ATOMIC_GET(task->nio_stages.next);
|
||||
if (next != SF_NIO_STAGE_NONE) {
|
||||
sf_nio_notify(task, next);
|
||||
__sync_bool_compare_and_swap(&task->nio_stages.next,
|
||||
next, SF_NIO_STAGE_NONE);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (stage != SF_NIO_STAGE_NONE) {
|
||||
if (stage == SF_NIO_STAGE_CONTINUE) {
|
||||
|
|
@ -749,9 +790,9 @@ int sf_client_sock_write(int sock, short event, void *arg)
|
|||
}
|
||||
} else if (bytes == 0) {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, sock: %d, send failed, "
|
||||
"connection disconnected",
|
||||
__LINE__, task->client_ip, sock);
|
||||
"client ip: %s, sock: %d, task length: %d, offset: %d, "
|
||||
"send failed, connection disconnected", __LINE__,
|
||||
task->client_ip, sock, task->length, task->offset);
|
||||
|
||||
ioevent_add_to_deleted_list(task);
|
||||
return -1;
|
||||
|
|
|
|||
|
|
@ -131,6 +131,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
{
|
||||
int result;
|
||||
int bytes;
|
||||
int extra_events;
|
||||
struct worker_thread_context *thread_contexts;
|
||||
struct worker_thread_context *thread_ctx;
|
||||
struct nio_thread_data *thread_data;
|
||||
|
|
@ -172,6 +173,18 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
return ENOMEM;
|
||||
}
|
||||
|
||||
if (SF_G_EPOLL_EDGE_TRIGGER) {
|
||||
#ifdef OS_LINUX
|
||||
extra_events = EPOLLET;
|
||||
#elif defined(OS_FREEBSD)
|
||||
extra_events = EV_CLEAR;
|
||||
#else
|
||||
extra_events = 0;
|
||||
#endif
|
||||
} else {
|
||||
extra_events = 0;
|
||||
}
|
||||
|
||||
g_current_time = time(NULL);
|
||||
sf_context->thread_count = 0;
|
||||
data_end = sf_context->thread_data + sf_context->work_threads;
|
||||
|
|
@ -187,8 +200,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
|||
thread_data->arg = NULL;
|
||||
}
|
||||
|
||||
if (ioevent_init(&thread_data->ev_puller,
|
||||
g_sf_global_vars.max_connections + 2, net_timeout_ms, 0) != 0)
|
||||
if (ioevent_init(&thread_data->ev_puller, 2 + g_sf_global_vars.
|
||||
max_connections, net_timeout_ms, extra_events) != 0)
|
||||
{
|
||||
result = errno != 0 ? errno : ENOMEM;
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
|
|
|
|||
Loading…
Reference in New Issue