/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
//sf_service.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "fastcommon/logger.h"
#include "fastcommon/sockopt.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/ioevent_loop.h"
#include "fastcommon/fc_memory.h"
#include "sf_proto.h"
#include "sf_util.h"
#include "sf_service.h"
#if defined(OS_LINUX)
#include
#endif
static bool terminate_flag = false;
static sf_sig_quit_handler sig_quit_handler = NULL;
static void sigQuitHandler(int sig);
static void sigHupHandler(int sig);
static void sigUsrHandler(int sig);
#if defined(DEBUG_FLAG)
static void sigDumpHandler(int sig);
#endif
struct worker_thread_context {
SFContext *sf_context;
struct nio_thread_data *thread_data;
};
static void *worker_thread_entrance(void *arg);
static int sf_init_free_queue(SFContext *sf_context, const char *name,
const bool double_buffers, const bool need_shrink_task_buffer,
const int task_padding_size, const int task_arg_size,
TaskInitCallback init_callback, void *init_arg)
{
int result;
int buffer_size;
int m;
int max_m;
int alloc_conn_once;
if ((result=set_rand_seed()) != 0) {
logCrit("file: "__FILE__", line: %d, "
"set_rand_seed fail, program exit!", __LINE__);
return result;
}
if (strcmp(name, "cluster") == 0 || strcmp(name, "replica") == 0) {
buffer_size = FC_MAX(4 * 1024 * 1024, sf_context->
net_buffer_cfg.max_buff_size);
max_m = 64;
} else {
buffer_size = sf_context->net_buffer_cfg.min_buff_size;
max_m = 16;
}
m = buffer_size / (64 * 1024);
if (m == 0) {
m = 1;
} else if (m > max_m) {
m = max_m;
}
alloc_conn_once = 256 / m;
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections,
alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size,
sf_context->net_buffer_cfg.max_buff_size, task_padding_size,
task_arg_size, init_callback, init_arg);
}
int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_alloc_thread_extra_data_callback
alloc_thread_extra_data_callback,
ThreadLoopCallback thread_loop_callback,
sf_accept_done_callback accept_done_callback,
sf_set_body_length_callback set_body_length_func,
sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
sf_send_done_callback send_done_callback,
sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size,
const int task_arg_size, const bool double_buffers,
const bool need_shrink_task_buffer, const bool explicit_post_recv,
TaskInitCallback init_callback, void *init_arg,
sf_release_buffer_callback release_buffer_callback)
{
int result;
int bytes;
int extra_events;
int max_entries;
int i;
struct worker_thread_context *thread_contexts;
struct worker_thread_context *thread_ctx;
struct nio_thread_data *thread_data;
struct nio_thread_data *data_end;
pthread_t tid;
pthread_attr_t thread_attr;
fc_safe_strcpy(sf_context->name, name);
sf_context->connect_need_log = true;
sf_context->realloc_task_buffer = sf_context->net_buffer_cfg.
min_buff_size < sf_context->net_buffer_cfg.max_buff_size;
sf_context->callbacks.accept_done = accept_done_callback;
sf_set_parameters_ex(sf_context, proto_header_size,
set_body_length_func, alloc_recv_buffer_func,
send_done_callback, deal_func, task_cleanup_func,
timeout_callback, release_buffer_callback);
if (explicit_post_recv) {
for (i=0; ihandlers[i].handlers[SF_RDMACM_NETWORK_HANDLER_INDEX].
explicit_post_recv = true;
}
}
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
need_shrink_task_buffer, task_padding_size,
task_arg_size, init_callback, init_arg)) != 0)
{
return result;
}
if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars.
thread_stack_size)) != 0)
{
logError("file: "__FILE__", line: %d, "
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
bytes = sizeof(struct nio_thread_data) * sf_context->work_threads;
sf_context->thread_data = (struct nio_thread_data *)fc_malloc(bytes);
if (sf_context->thread_data == NULL) {
return ENOMEM;
}
memset(sf_context->thread_data, 0, bytes);
bytes = sizeof(struct worker_thread_context) * sf_context->work_threads;
thread_contexts = (struct worker_thread_context *)fc_malloc(bytes);
if (thread_contexts == NULL) {
return ENOMEM;
}
if (SF_G_EPOLL_EDGE_TRIGGER) {
#ifdef OS_LINUX
#if IOEVENT_USE_EPOLL
extra_events = EPOLLET;
#else
extra_events = 0;
#endif
#elif defined(OS_FREEBSD)
extra_events = EV_CLEAR;
#else
extra_events = 0;
#endif
} else {
extra_events = 0;
}
max_entries = (sf_context->net_buffer_cfg.max_connections +
sf_context->work_threads - 1) / sf_context->work_threads;
if (strcmp(sf_context->name, "service") == 0) {
if (max_entries < 4 * 1024) {
max_entries = max_entries * 2;
} else if (max_entries < 8 * 1024) {
max_entries = (max_entries * 3) / 2;
} else if (max_entries < 16 * 1024) {
max_entries = (max_entries * 5) / 4;
} else if (max_entries < 32 * 1024) {
max_entries = (max_entries * 6) / 5;
#if IOEVENT_USE_URING
if (max_entries > 32 * 1024) {
max_entries = 32 * 1024;
}
#else
} else if (max_entries < 64 * 1024) {
max_entries = (max_entries * 11) / 10;
} else if (max_entries < 128 * 1024) {
max_entries = (max_entries * 21) / 20;
#endif
}
} else {
if (max_entries < 1024) {
max_entries += 8;
} else {
max_entries = 1024;
}
}
g_current_time = time(NULL);
sf_context->thread_count = 0;
data_end = sf_context->thread_data + sf_context->work_threads;
for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts;
thread_datatimeout_ms = net_timeout_ms;
FC_INIT_LIST_HEAD(&thread_data->polling_queue);
if (sf_context->smart_polling.enabled) {
thread_data->busy_polling_callback =
sf_rdma_busy_polling_callback;
} else {
thread_data->busy_polling_callback = NULL;
}
thread_data->thread_loop_callback = thread_loop_callback;
if (alloc_thread_extra_data_callback != NULL) {
thread_data->arg = alloc_thread_extra_data_callback(
(int)(thread_data - sf_context->thread_data));
}
else {
thread_data->arg = NULL;
}
if ((result=ioevent_init(&thread_data->ev_puller, sf_context->
name, sf_context->use_io_uring, max_entries,
net_timeout_ms, extra_events)) != 0)
{
char prompt[256];
#if IOEVENT_USE_URING
if (result == EPERM) {
strcpy(prompt, " make sure kernel.io_uring_disabled set to 0");
} else if (result == EINVAL) {
sprintf(prompt, " maybe max_connections: %d is too large"
" or [%s]'s work_threads: %d is too small",
sf_context->net_buffer_cfg.max_connections,
sf_context->name, sf_context->work_threads);
} else {
*prompt = '\0';
}
#else
*prompt = '\0';
#endif
logError("file: "__FILE__", line: %d, "
"ioevent_init fail, errno: %d, error info: %s.%s"
, __LINE__, result, strerror(result), prompt);
return result;
}
#if IOEVENT_USE_URING
if (send_done_callback != NULL) {
ioevent_set_send_zc_done_notify(&thread_data->ev_puller, true);
}
#endif
result = fast_timer_init(&thread_data->timer, 2 * sf_context->
net_buffer_cfg.network_timeout, g_current_time);
if (result != 0) {
logError("file: "__FILE__", line: %d, "
"fast_timer_init fail, errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
if ((result=init_pthread_lock(&thread_data->
waiting_queue.lock)) != 0)
{
return result;
}
#if defined(OS_LINUX)
FC_NOTIFY_READ_FD(thread_data) = eventfd(0,
EFD_NONBLOCK | EFD_CLOEXEC);
if (FC_NOTIFY_READ_FD(thread_data) < 0) {
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, "
"call eventfd fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
break;
}
FC_NOTIFY_WRITE_FD(thread_data) = FC_NOTIFY_READ_FD(thread_data);
#else
if (pipe(thread_data->pipe_fds) != 0) {
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, "
"call pipe fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
break;
}
if ((result=fd_add_flags(FC_NOTIFY_READ_FD(thread_data),
O_NONBLOCK)) != 0)
{
break;
}
FC_SET_CLOEXEC(FC_NOTIFY_READ_FD(thread_data));
FC_SET_CLOEXEC(FC_NOTIFY_WRITE_FD(thread_data));
#endif
thread_ctx->sf_context = sf_context;
thread_ctx->thread_data = thread_data;
if ((result=pthread_create(&tid, &thread_attr,
worker_thread_entrance, thread_ctx)) != 0)
{
logError("file: "__FILE__", line: %d, "
"create thread failed, startup threads: %d, "
"errno: %d, error info: %s",
__LINE__, (int)(thread_data - sf_context->thread_data),
result, strerror(result));
break;
}
}
pthread_attr_destroy(&thread_attr);
return result;
}
int sf_service_destroy_ex(SFContext *sf_context)
{
struct nio_thread_data *data_end, *thread_data;
free_queue_destroy(&sf_context->free_queue);
data_end = sf_context->thread_data + sf_context->work_threads;
for (thread_data=sf_context->thread_data; thread_datatimer);
}
free(sf_context->thread_data);
sf_context->thread_data = NULL;
return 0;
}
void sf_service_set_thread_loop_callback_ex(SFContext *sf_context,
ThreadLoopCallback thread_loop_callback)
{
struct nio_thread_data *data_end, *thread_data;
data_end = sf_context->thread_data + sf_context->work_threads;
for (thread_data=sf_context->thread_data; thread_datathread_loop_callback = thread_loop_callback;
}
}
static void *worker_thread_entrance(void *arg)
{
struct worker_thread_context *thread_ctx;
int thread_count;
thread_ctx = (struct worker_thread_context *)arg;
#ifdef OS_LINUX
{
char thread_name[32];
snprintf(thread_name, sizeof(thread_name), "%s-net[%d]",
thread_ctx->sf_context->name, (int)(thread_ctx->
thread_data - thread_ctx->sf_context->thread_data));
prctl(PR_SET_NAME, thread_name);
}
#endif
thread_count = __sync_add_and_fetch(&thread_ctx->
sf_context->thread_count, 1);
logDebug("file: "__FILE__", line: %d, "
"worker thread enter, current thread index: %d, "
"current thread count: %d", __LINE__,
(int)(thread_ctx->thread_data - thread_ctx->
sf_context->thread_data), thread_count);
ioevent_loop(thread_ctx->thread_data,
sf_recv_notify_read,
thread_ctx->sf_context->callbacks.task_cleanup,
&g_sf_global_vars.continue_flag);
ioevent_destroy(&thread_ctx->thread_data->ev_puller);
thread_count = __sync_sub_and_fetch(&thread_ctx->
sf_context->thread_count, 1);
logDebug("file: "__FILE__", line: %d, "
"worker thread exit, current thread index: %d, "
"current thread count: %d", __LINE__,
(int)(thread_ctx->thread_data - thread_ctx->
sf_context->thread_data), thread_count);
return NULL;
}
int sf_socket_create_server(SFListener *listener,
int af, const char *bind_addr)
{
int result;
listener->sock = socketServer2(af, bind_addr,
listener->port, &result);
if (listener->sock < 0) {
return result;
}
if ((result=tcpsetserveropt(listener->sock, listener->handler->
fh->ctx->net_buffer_cfg.network_timeout)) != 0)
{
return result;
}
return 0;
}
int sf_socket_server_ex(SFContext *sf_context)
{
int result;
int i;
bool dual_ports;
const char *bind_addr;
SFAddressFamilyHandler *fh;
SFNetworkHandler *handler;
SFNetworkHandler *end;
for (i=0; ihandlers + i;
if (fh->af == AF_UNSPEC) {
continue;
}
end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=fh->handlers; handlerenabled) {
continue;
}
handler->inner.enabled = false;
handler->outer.enabled = false;
if (handler->outer.port == handler->inner.port) {
if (*fh->outer_bind_addr == '\0' ||
*fh->inner_bind_addr == '\0')
{
bind_addr = "";
if ((result=handler->create_server(&handler->
outer, fh->af, bind_addr)) != 0)
{
return result;
}
handler->outer.enabled = true;
dual_ports = false;
} else if (strcmp(fh->outer_bind_addr,
fh->inner_bind_addr) == 0)
{
bind_addr = fh->outer_bind_addr;
if (is_private_ip(bind_addr)) {
if ((result=handler->create_server(&handler->
inner, fh->af, bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
} else {
if ((result=handler->create_server(&handler->
outer, fh->af, bind_addr)) != 0)
{
return result;
}
handler->outer.enabled = true;
}
dual_ports = false;
} else {
dual_ports = true;
}
} else {
dual_ports = true;
}
if (dual_ports) {
if ((result=handler->create_server(&handler->outer,
fh->af, fh->outer_bind_addr)) != 0)
{
return result;
}
if ((result=handler->create_server(&handler->inner,
fh->af, fh->inner_bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
handler->outer.enabled = true;
}
/*
logInfo("%p [%d] inner {port: %d, enabled: %d}, "
"outer {port: %d, enabled: %d}", sf_context,
(int)(handler-sf_context->handlers),
handler->inner.port, handler->inner.enabled,
handler->outer.port, handler->outer.enabled);
*/
}
}
return 0;
}
void sf_socket_close_server(SFListener *listener)
{
if (listener->sock >= 0) {
close(listener->sock);
listener->sock = -1;
}
}
struct fast_task_info *sf_socket_accept_connection(SFListener *listener)
{
int incomesock;
int port;
socklen_t sockaddr_len;
struct fast_task_info *task;
sockaddr_len = sizeof(listener->inaddr);
incomesock = accept(listener->sock, (struct sockaddr *)
&listener->inaddr, &sockaddr_len);
if (incomesock < 0) { //error
if (!(errno == EINTR || errno == EAGAIN)) {
logError("file: "__FILE__", line: %d, "
"accept fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
}
return NULL;
}
if (tcpsetnonblockopt(incomesock) != 0) {
close(incomesock);
return NULL;
}
FC_SET_CLOEXEC(incomesock);
if ((task=sf_alloc_init_server_task(listener->handler,
incomesock)) == NULL)
{
close(incomesock);
return NULL;
}
getPeerIpAddPort(incomesock, task->client_ip,
sizeof(task->client_ip), &port);
task->port = port;
return task;
}
void sf_socket_close_ex(SFContext *sf_context)
{
int i;
SFNetworkHandler *handler;
SFNetworkHandler *end;
for (i=0; ihandlers[i].af == AF_UNSPEC) {
continue;
}
end = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers[i].handlers; handlerenabled) {
continue;
}
if (handler->outer.enabled) {
handler->close_server(&handler->outer);
}
if (handler->inner.enabled) {
handler->close_server(&handler->inner);
}
}
}
}
static void accept_run(SFListener *listener)
{
struct fast_task_info *task;
while (g_sf_global_vars.continue_flag) {
if ((task=listener->handler->accept_connection(listener)) == NULL) {
continue;
}
task->thread_data = listener->handler->fh->ctx->thread_data +
task->event.fd % listener->handler->fh->ctx->work_threads;
if (listener->handler->fh->ctx->callbacks.accept_done != NULL) {
if (listener->handler->fh->ctx->callbacks.accept_done(task,
listener->inaddr.sin_addr.s_addr,
listener->is_inner) != 0)
{
listener->handler->close_connection(task);
sf_release_task(task);
continue;
}
}
if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) {
listener->handler->close_connection(task);
sf_release_task(task);
}
}
}
static void *accept_thread_entrance(SFListener *listener)
{
#ifdef OS_LINUX
{
char thread_name[32];
snprintf(thread_name, sizeof(thread_name), "%s-%s-listen",
listener->handler->comm_type == fc_comm_type_sock ?
"sock" : "rdma", listener->handler->fh->ctx->name);
prctl(PR_SET_NAME, thread_name);
}
#endif
accept_run(listener);
return NULL;
}
int _accept_loop(SFListener *listener, const int accept_threads)
{
pthread_t tid;
pthread_attr_t thread_attr;
int result;
int i;
if (accept_threads <= 0) {
return 0;
}
if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars.
thread_stack_size)) != 0)
{
logWarning("file: "__FILE__", line: %d, "
"init_pthread_attr fail!", __LINE__);
return result;
}
for (i=0; ihandlers[i].af == AF_UNSPEC) {
continue;
}
hend = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers[i].handlers;
handlerenabled) {
continue;
}
if (handler->inner.enabled) {
*listener++ = &handler->inner;
}
if (handler->outer.enabled) {
*listener++ = &handler->outer;
}
}
}
if (listener == listeners) {
logError("file: "__FILE__", line: %d, "
"no listener!", __LINE__);
return ENOENT;
}
last = listener - 1;
if (blocked) {
lend = listener - 1;
} else {
lend = listener;
}
for (listener=listeners; listeneraccept_threads);
}
if (blocked) {
_accept_loop(*last, sf_context->accept_threads - 1);
accept_run(*last);
}
return 0;
}
#if defined(DEBUG_FLAG)
static void sigDumpHandler(int sig)
{
static bool bDumpFlag = false;
char filename[256];
if (bDumpFlag) {
return;
}
bDumpFlag = true;
snprintf(filename, sizeof(filename),
"%s/logs/sf_dump.log", SF_G_BASE_PATH_STR);
//manager_dump_global_vars_to_file(filename);
bDumpFlag = false;
}
#endif
static void sigQuitHandler(int sig)
{
if (!terminate_flag) {
terminate_flag = true;
g_sf_global_vars.continue_flag = false;
if (sig_quit_handler != NULL) {
sig_quit_handler(sig);
}
logCrit("file: "__FILE__", line: %d, "
"catch signal %d, program exiting...",
__LINE__, sig);
}
}
static void sigHupHandler(int sig)
{
logInfo("file: "__FILE__", line: %d, "
"catch signal %d", __LINE__, sig);
}
static void sigUsrHandler(int sig)
{
logInfo("file: "__FILE__", line: %d, "
"catch signal %d, ignore it", __LINE__, sig);
}
int sf_setup_signal_handler()
{
struct sigaction act;
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigUsrHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 ||
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = sigHupHandler;
if(sigaction(SIGHUP, &act, NULL) < 0) {
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = SIG_IGN;
if(sigaction(SIGPIPE, &act, NULL) < 0) {
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 ||
sigaction(SIGTERM, &act, NULL) < 0 ||
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
#if defined(DEBUG_FLAG)
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigDumpHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 ||
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
#endif
return 0;
}
int sf_startup_schedule(pthread_t *schedule_tid)
{
ScheduleArray scheduleArray;
ScheduleEntry scheduleEntries[SF_LOG_SCHEDULE_ENTRIES_COUNT];
scheduleArray.entries = scheduleEntries;
sf_logger_setup_schedule(&g_log_context, &g_sf_global_vars.error_log,
&scheduleArray);
return sched_start(&scheduleArray, schedule_tid,
g_sf_global_vars.thread_stack_size, (bool * volatile)
&g_sf_global_vars.continue_flag);
}
int sf_add_slow_log_schedule(SFSlowLogContext *slowlog_ctx)
{
int result;
ScheduleArray scheduleArray;
ScheduleEntry scheduleEntries[SF_LOG_SCHEDULE_ENTRIES_COUNT];
if (!slowlog_ctx->cfg.enabled) {
return 0;
}
if ((result=sf_logger_init(&slowlog_ctx->ctx, slowlog_ctx->cfg.
filename_prefix)) != 0)
{
return result;
}
scheduleArray.entries = scheduleEntries;
sf_logger_setup_schedule(&slowlog_ctx->ctx, &slowlog_ctx->
cfg.log_cfg, &scheduleArray);
return sched_add_entries(&scheduleArray);
}
void sf_set_current_time()
{
g_current_time = time(NULL);
g_sf_global_vars.up_time = g_current_time;
srand(g_sf_global_vars.up_time);
}
int sf_global_init(const char *log_filename_prefix)
{
sf_set_current_time();
return log_set_prefix(SF_G_BASE_PATH_STR, log_filename_prefix);
}
void sf_enable_thread_notify_ex(SFContext *sf_context, const bool enabled)
{
struct nio_thread_data *thread_data;
struct nio_thread_data *pDataEnd;
pDataEnd = sf_context->thread_data + sf_context->work_threads;
for (thread_data=sf_context->thread_data; thread_datanotify.enabled = enabled;
}
}
struct nio_thread_data *sf_get_random_thread_data_ex(SFContext *sf_context)
{
uint32_t index;
index = (uint32_t)((uint64_t)sf_context->work_threads *
(uint64_t)rand() / (uint64_t)RAND_MAX);
return sf_context->thread_data + index;
}
void sf_notify_all_threads_ex(SFContext *sf_context)
{
struct nio_thread_data *tdata;
struct nio_thread_data *tend;
tend = sf_context->thread_data + sf_context->work_threads;
for (tdata=sf_context->thread_data; tdata