function prototype for socket and rdma both

support_rdma
YuQing 2023-09-03 11:50:50 +08:00
parent 58a796e169
commit 96c7bc9a42
4 changed files with 286 additions and 135 deletions

View File

@ -43,10 +43,10 @@ SFGlobalVariables g_sf_global_vars = {
{0, 0}, NULL, {NULL, 0}
};
SFContext g_sf_context = {
{'\0'}, NULL, 0, -1, -1, 0, 0, 1, DEFAULT_WORK_THREADS,
{'\0'}, {'\0'}, 0, true, true, NULL, NULL, NULL, NULL,
NULL, sf_task_finish_clean_up, NULL
SFContext g_sf_context = {{'\0'}, NULL, 0,
{{false, sf_network_type_sock}, {false, sf_network_type_rdma}},
1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, NULL,
NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL
};
static inline void set_config_str_value(const char *value,
@ -411,17 +411,50 @@ int sf_load_config_ex(const char *server_name, SFContextIniConfig *config,
return sf_load_context_from_config_ex(&g_sf_context, config);
}
static int init_network_handler(SFNetworkHandler *handler,
SFContext *sf_context)
{
handler->ctx = sf_context;
handler->inner.handler = handler;
handler->outer.handler = handler;
handler->inner.is_inner = true;
handler->outer.is_inner = false;
if (handler->type == sf_network_type_sock) {
handler->inner.sock = -1;
handler->outer.sock = -1;
} else {
}
return 0;
}
int sf_load_context_from_config_ex(SFContext *sf_context,
SFContextIniConfig *config)
{
SFNetworkHandler *sock_handler;
SFNetworkHandler *rdma_handler;
char *inner_port;
char *outer_port;
char *inner_bind_addr;
char *outer_bind_addr;
char *bind_addr;
int port;
int i;
int result;
sf_context->inner_port = sf_context->outer_port = 0;
memset(sf_context->handlers, 0, sizeof(sf_context->handlers));
sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
sock_handler->type = sf_network_type_sock;
rdma_handler->type = sf_network_type_rdma;
for (i=0; i<SF_NETWORK_HANDLER_COUNT; i++) {
if ((result=init_network_handler(sf_context->handlers + i,
sf_context)) != 0)
{
return result;
}
}
inner_port = iniGetStrValue(config->ini_ctx.section_name,
"inner_port", config->ini_ctx.context);
@ -431,22 +464,30 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
port = iniGetIntValue(config->ini_ctx.section_name,
"port", config->ini_ctx.context, 0);
if (port > 0) {
sf_context->inner_port = sf_context->outer_port = port;
sock_handler->inner.port = sock_handler->outer.port = port;
}
} else {
if (inner_port != NULL) {
sf_context->inner_port = atoi(inner_port);
sock_handler->inner.port = strtol(inner_port, NULL, 10);
}
if (outer_port != NULL) {
sf_context->outer_port = atoi(outer_port);
sock_handler->outer.port = strtol(outer_port, NULL, 10);
}
}
if (sf_context->inner_port <= 0) {
sf_context->inner_port = config->default_inner_port;
if (sock_handler->inner.port <= 0) {
sock_handler->inner.port = config->default_inner_port;
}
if (sf_context->outer_port <= 0) {
sf_context->outer_port = config->default_outer_port;
if (sock_handler->outer.port <= 0) {
sock_handler->outer.port = config->default_outer_port;
}
if (sock_handler->inner.port == sock_handler->outer.port) {
sock_handler->inner.enabled = false;
sock_handler->outer.enabled = true;
} else {
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = true;
}
inner_bind_addr = iniGetStrValue(config->ini_ctx.section_name,
@ -495,23 +536,25 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
void sf_context_config_to_string(const SFContext *sf_context,
char *output, const int size)
{
const SFNetworkHandler *sock_handler;
int len;
sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
len = 0;
if ((sf_context->inner_port == sf_context->outer_port) &&
if ((sock_handler->inner.port == sock_handler->outer.port) &&
(strcmp(sf_context->inner_bind_addr,
sf_context->outer_bind_addr) == 0))
{
len += snprintf(output + len, size - len,
"port=%u, bind_addr=%s",
sf_context->inner_port,
sock_handler->inner.port,
sf_context->inner_bind_addr);
} else {
len += snprintf(output + len, size - len,
"inner_port=%u, inner_bind_addr=%s, "
"outer_port=%u, outer_bind_addr=%s",
sf_context->inner_port, sf_context->inner_bind_addr,
sf_context->outer_port, sf_context->outer_bind_addr);
sock_handler->inner.port, sf_context->inner_bind_addr,
sock_handler->outer.port, sf_context->outer_bind_addr);
}
len += snprintf(output + len, size - len,

View File

@ -57,12 +57,6 @@ struct worker_thread_context {
struct nio_thread_data *thread_data;
};
struct accept_thread_context {
SFContext *sf_context;
int server_sock;
};
int sf_init_task(struct fast_task_info *task)
{
task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side
@ -345,15 +339,16 @@ static void *worker_thread_entrance(void *arg)
return NULL;
}
static int _socket_server(const char *bind_addr, int port, int *sock)
int sf_create_socket_server(SFListener *listener, const char *bind_addr)
{
int result;
*sock = socketServer(bind_addr, port, &result);
if (*sock < 0) {
listener->sock = socketServer(bind_addr, listener->port, &result);
if (listener->sock < 0) {
return result;
}
if ((result=tcpsetserveropt(*sock, SF_G_NETWORK_TIMEOUT)) != 0) {
if ((result=tcpsetserveropt(listener->sock, SF_G_NETWORK_TIMEOUT)) != 0) {
return result;
}
@ -363,57 +358,104 @@ static int _socket_server(const char *bind_addr, int port, int *sock)
int sf_socket_server_ex(SFContext *sf_context)
{
int result;
bool dual_ports;
const char *bind_addr;
SFNetworkHandler *handler;
SFNetworkHandler *end;
sf_context->inner_sock = sf_context->outer_sock = -1;
if (sf_context->outer_port == sf_context->inner_port) {
if (*sf_context->outer_bind_addr == '\0' ||
*sf_context->inner_bind_addr == '\0') {
bind_addr = "";
return _socket_server(bind_addr, sf_context->outer_port,
&sf_context->outer_sock);
} else if (strcmp(sf_context->outer_bind_addr,
sf_context->inner_bind_addr) == 0) {
bind_addr = sf_context->outer_bind_addr;
if (is_private_ip(bind_addr)) {
return _socket_server(bind_addr, sf_context->
inner_port, &sf_context->inner_sock);
} else {
return _socket_server(bind_addr, sf_context->
outer_port, &sf_context->outer_sock);
}
end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers; handler<end; handler++) {
if (!handler->enabled) {
continue;
}
}
if ((result=_socket_server(sf_context->outer_bind_addr,
sf_context->outer_port, &sf_context->outer_sock)) != 0)
{
return result;
}
handler->inner.enabled = false;
handler->outer.enabled = false;
if (handler->outer.port == handler->inner.port) {
if (*sf_context->outer_bind_addr == '\0' ||
*sf_context->inner_bind_addr == '\0') {
bind_addr = "";
if ((result=handler->create_server(&handler->
outer, bind_addr)) != 0)
{
return result;
}
handler->outer.enabled = true;
dual_ports = false;
} else if (strcmp(sf_context->outer_bind_addr,
sf_context->inner_bind_addr) == 0) {
bind_addr = sf_context->outer_bind_addr;
if (is_private_ip(bind_addr)) {
if ((result=handler->create_server(&handler->
inner, bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
} else {
if ((result=handler->create_server(&handler->
outer, bind_addr)) != 0)
{
return result;
}
handler->outer.enabled = true;
}
dual_ports = false;
} else {
dual_ports = true;
}
} else {
dual_ports = true;
}
if ((result=_socket_server(sf_context->inner_bind_addr,
sf_context->inner_port, &sf_context->inner_sock)) != 0)
{
return result;
if (dual_ports) {
if ((result=handler->create_server(&handler->outer,
sf_context->outer_bind_addr)) != 0)
{
return result;
}
if ((result=handler->create_server(&handler->inner,
sf_context->inner_bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
handler->outer.enabled = true;
}
}
return 0;
}
void sf_socket_close_ex(SFContext *sf_context)
void sf_close_socket_server(SFListener *listener)
{
if (sf_context->inner_sock >= 0) {
close(sf_context->inner_sock);
sf_context->inner_sock = -1;
}
if (sf_context->outer_sock >= 0) {
close(sf_context->outer_sock);
sf_context->outer_sock = -1;
if (listener->sock >= 0) {
close(listener->sock);
listener->sock = -1;
}
}
static void accept_run(struct accept_thread_context *accept_context)
void sf_socket_close_ex(SFContext *sf_context)
{
SFNetworkHandler *handler;
SFNetworkHandler *end;
end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers; handler<end; handler++) {
if (!handler->enabled) {
continue;
}
if (handler->outer.enabled) {
handler->close_server(&handler->outer);
}
if (handler->inner.enabled) {
handler->close_server(&handler->inner);
}
}
}
static void accept_run(SFListener *listener)
{
int incomesock;
int port;
@ -423,7 +465,7 @@ static void accept_run(struct accept_thread_context *accept_context)
while (g_sf_global_vars.continue_flag) {
sockaddr_len = sizeof(inaddr);
incomesock = accept(accept_context->server_sock,
incomesock = accept(listener->sock,
(struct sockaddr*)&inaddr, &sockaddr_len);
if (incomesock < 0) { //error
if (!(errno == EINTR || errno == EAGAIN)) {
@ -441,8 +483,8 @@ static void accept_run(struct accept_thread_context *accept_context)
}
FC_SET_CLOEXEC(incomesock);
if ((task=sf_alloc_init_task(accept_context->
sf_context, incomesock)) == NULL)
if ((task=sf_alloc_init_task(listener->handler->ctx,
incomesock)) == NULL)
{
close(incomesock);
continue;
@ -451,13 +493,12 @@ static void accept_run(struct accept_thread_context *accept_context)
getPeerIpAddPort(incomesock, task->client_ip,
sizeof(task->client_ip), &port);
task->port = port;
task->thread_data = accept_context->sf_context->thread_data +
incomesock % accept_context->sf_context->work_threads;
if (accept_context->sf_context->accept_done_func != NULL) {
if (accept_context->sf_context->accept_done_func(task,
inaddr.sin_addr.s_addr,
accept_context->server_sock ==
accept_context->sf_context->inner_sock) != 0)
task->thread_data = listener->handler->ctx->thread_data +
incomesock % listener->handler->ctx->work_threads;
if (listener->handler->ctx->accept_done_func != NULL) {
if (listener->handler->ctx->accept_done_func(
task, inaddr.sin_addr.s_addr,
listener->is_inner) != 0)
{
close(incomesock);
sf_release_task(task);
@ -472,24 +513,23 @@ static void accept_run(struct accept_thread_context *accept_context)
}
}
static void *accept_thread_entrance(struct accept_thread_context
*accept_context)
static void *accept_thread_entrance(SFListener *listener)
{
#ifdef OS_LINUX
{
char thread_name[32];
snprintf(thread_name, sizeof(thread_name), "%s-listen",
accept_context->sf_context->name);
snprintf(thread_name, sizeof(thread_name), "%s-%s-listen",
listener->handler->type == sf_network_type_sock ?
"sock" : "rdma", listener->handler->ctx->name);
prctl(PR_SET_NAME, thread_name);
}
#endif
accept_run(accept_context);
accept_run(listener);
return NULL;
}
void _accept_loop(struct accept_thread_context *accept_context,
const int accept_threads)
int _accept_loop(SFListener *listener, const int accept_threads)
{
pthread_t tid;
pthread_attr_t thread_attr;
@ -497,7 +537,7 @@ void _accept_loop(struct accept_thread_context *accept_context,
int i;
if (accept_threads <= 0) {
return;
return 0;
}
if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars.
@ -505,68 +545,73 @@ void _accept_loop(struct accept_thread_context *accept_context,
{
logWarning("file: "__FILE__", line: %d, "
"init_pthread_attr fail!", __LINE__);
return result;
}
else {
for (i=0; i<accept_threads; i++) {
if ((result=pthread_create(&tid, &thread_attr,
(void * (*)(void *))accept_thread_entrance,
accept_context)) != 0)
{
logError("file: "__FILE__", line: %d, "
"create thread failed, startup threads: %d, "
"errno: %d, error info: %s",
__LINE__, i, result, strerror(result));
break;
}
}
pthread_attr_destroy(&thread_attr);
for (i=0; i<accept_threads; i++) {
if ((result=pthread_create(&tid, &thread_attr,
(void * (*)(void *))accept_thread_entrance,
listener)) != 0)
{
logError("file: "__FILE__", line: %d, "
"create thread failed, startup threads: %d, "
"errno: %d, error info: %s",
__LINE__, i, result, strerror(result));
return result;
}
}
pthread_attr_destroy(&thread_attr);
return 0;
}
void sf_accept_loop_ex(SFContext *sf_context, const bool block)
int sf_accept_loop_ex(SFContext *sf_context, const bool blocked)
{
struct accept_thread_context *accept_contexts;
int count;
int bytes;
SFNetworkHandler *handler;
SFNetworkHandler *hend;
SFListener *listeners[SF_NETWORK_HANDLER_COUNT * 2];
SFListener **listener;
SFListener **last;
SFListener **lend;
if (sf_context->outer_sock >= 0) {
count = 2;
listener = listeners;
hend = sf_context->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers; handler<hend; handler++) {
if (!handler->enabled) {
continue;
}
if (handler->inner.enabled) {
*listener++ = &handler->inner;
}
if (handler->outer.enabled) {
*listener++ = &handler->outer;
}
}
if (listener == listeners) {
logError("file: "__FILE__", line: %d, "
"no listener!", __LINE__);
return ENOENT;
}
last = listener - 1;
if (blocked) {
lend = listener - 1;
} else {
count = 1;
lend = listener;
}
bytes = sizeof(struct accept_thread_context) * count;
accept_contexts = (struct accept_thread_context *)fc_malloc(bytes);
if (accept_contexts == NULL) {
return;
for (listener=listeners; listener<lend; listener++) {
_accept_loop(*listener, sf_context->accept_threads);
}
accept_contexts[0].sf_context = sf_context;
accept_contexts[0].server_sock = sf_context->inner_sock;
if (sf_context->outer_sock >= 0) {
accept_contexts[1].sf_context = sf_context;
accept_contexts[1].server_sock = sf_context->outer_sock;
if (sf_context->inner_sock >= 0) {
_accept_loop(accept_contexts, sf_context->accept_threads);
}
if (block) {
_accept_loop(accept_contexts + 1, sf_context->accept_threads - 1);
accept_run(accept_contexts + 1);
} else {
_accept_loop(accept_contexts + 1, sf_context->accept_threads);
}
} else {
if (block) {
_accept_loop(accept_contexts, sf_context->accept_threads - 1);
accept_run(accept_contexts);
} else {
_accept_loop(accept_contexts, sf_context->accept_threads);
}
if (blocked) {
_accept_loop(*last, sf_context->accept_threads - 1);
accept_run(*last);
}
return 0;
}
#if defined(DEBUG_FLAG)

View File

@ -83,13 +83,16 @@ int sf_add_slow_log_schedule(SFSlowLogContext *slowlog_ctx);
void sf_set_current_time();
int sf_create_socket_server(SFListener *listener, const char *bind_addr);
void sf_close_socket_server(SFListener *listener);
int sf_socket_server_ex(SFContext *sf_context);
#define sf_socket_server() sf_socket_server_ex(&g_sf_context)
void sf_socket_close_ex(SFContext *sf_context);
#define sf_socket_close() sf_socket_close_ex(&g_sf_context)
void sf_accept_loop_ex(SFContext *sf_context, const bool block);
int sf_accept_loop_ex(SFContext *sf_context, const bool blocked);
#define sf_accept_loop() sf_accept_loop_ex(&g_sf_context, true)

View File

@ -34,6 +34,10 @@
#define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency
#define SF_NETWORK_HANDLER_COUNT 2
#define SF_SOCKET_NETWORK_HANDLER_INDEX 0
#define SF_RDMACM_NETWORK_HANDLER_INDEX 1
typedef int (*sf_accept_done_callback)(struct fast_task_info *task,
const in_addr_t client_addr, const bool bInnerPort);
typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
@ -49,15 +53,71 @@ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task);
typedef int (*sf_error_handler_callback)(const int errnum);
struct sf_listener;
typedef int (*sf_create_server_callback)(struct sf_listener *listener,
const char *bind_addr);
typedef void (*sf_close_server_callback)(struct sf_listener *listener);
typedef struct fast_task_info * (*sf_accept_connection_callback)(
struct sf_listener *listener);
typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task);
typedef int (*sf_connect_server_done_callback)(struct fast_task_info *task);
typedef void (*sf_close_connection_callback)(struct fast_task_info *task);
typedef int (*sf_send_data_callback)(struct fast_task_info *task,
bool *send_done);
typedef int (*sf_recv_data_callback)(struct fast_task_info *task,
bool *recv_done);
typedef enum {
sf_network_type_sock = 's',
sf_network_type_rdma = 'r'
} SFNetworkType;
struct sf_network_handler;
typedef struct sf_listener {
struct sf_network_handler *handler;
int port;
bool enabled;
bool is_inner;
union {
int sock; //for socket
void *id; //for rdma_cm
};
} SFListener;
struct sf_context;
typedef struct sf_network_handler {
bool enabled;
SFNetworkType type;
struct sf_context *ctx;
SFListener inner;
SFListener outer;
/* for server side */
sf_create_server_callback create_server;
sf_close_server_callback close_server;
sf_accept_connection_callback accept_connection;
/* for client side */
sf_async_connect_server_callback async_connect_server;
sf_connect_server_done_callback connect_server_done;
/* server and client both */
sf_close_connection_callback close_connection;
sf_send_data_callback send_data;
sf_recv_data_callback recv_data;
} SFNetworkHandler;
typedef struct sf_context {
char name[64];
struct nio_thread_data *thread_data;
volatile int thread_count;
int outer_sock;
int inner_sock;
int outer_port;
int inner_port;
//int rdma_port_offset;
SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT];
int accept_threads;
int work_threads;