callbacks impl. for socket

support_rdma
YuQing 2023-09-03 18:35:31 +08:00
parent 96c7bc9a42
commit 36e4922440
8 changed files with 369 additions and 281 deletions

View File

@ -171,12 +171,18 @@ void client_channel_destroy()
}
static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
*channel, const uint32_t hash_code, const char *server_ip,
const uint16_t port, int *err_no)
*channel, const uint32_t hash_code, const SFNetworkType network_type,
const char *server_ip, const uint16_t port, int *err_no)
{
struct fast_task_info *task;
SFNetworkHandler *handler;
if ((task=sf_alloc_init_task(&g_sf_context, -1)) == NULL) {
if (network_type == sf_network_type_sock) {
handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
} else {
handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
}
if ((task=sf_alloc_init_task(handler, -1)) == NULL) {
*err_no = ENOMEM;
return NULL;
}
@ -226,8 +232,8 @@ int idempotency_client_channel_check_reconnect(
}
struct idempotency_client_channel *idempotency_client_channel_get(
const char *server_ip, const uint16_t server_port,
const int timeout, int *err_no)
const SFNetworkType network_type, const char *server_ip,
const uint16_t server_port, const int timeout, int *err_no)
{
int r;
int key_len;
@ -277,8 +283,8 @@ struct idempotency_client_channel *idempotency_client_channel_get(
break;
}
channel->task = alloc_channel_task(channel,
hash_code, server_ip, server_port, err_no);
channel->task = alloc_channel_task(channel, hash_code,
network_type, server_ip, server_port, err_no);
if (channel->task == NULL) {
fast_mblock_free_object(&channel_context.
channel_allocator, channel);

View File

@ -40,8 +40,8 @@ void idempotency_client_channel_config_to_string_ex(
char *output, const int size, const bool add_comma);
struct idempotency_client_channel *idempotency_client_channel_get(
const char *server_ip, const uint16_t server_port,
const int timeout, int *err_no);
const SFNetworkType network_type, const char *server_ip,
const uint16_t server_port, const int timeout, int *err_no);
static inline uint64_t idempotency_client_channel_next_seq_id(
struct idempotency_client_channel *channel)

View File

@ -31,6 +31,7 @@
#include "fastcommon/process_ctrl.h"
#include "fastcommon/logger.h"
#include "sf_nio.h"
#include "sf_service.h"
#include "sf_global.h"
SFGlobalVariables g_sf_global_vars = {
@ -423,7 +424,16 @@ static int init_network_handler(SFNetworkHandler *handler,
if (handler->type == sf_network_type_sock) {
handler->inner.sock = -1;
handler->outer.sock = -1;
handler->create_server = sf_create_socket_server;
handler->close_server = sf_close_socket_server;
handler->accept_connection = sf_accept_socket_connection;
handler->async_connect_server = sf_async_connect_socket_server;
handler->connect_server_done = sf_connect_socket_server_done;
handler->close_connection = sf_close_socket_connection;
handler->send_data = sf_socket_send_data;
handler->recv_data = sf_socket_recv_data;
} else {
//TODO
}
return 0;
@ -483,8 +493,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
}
if (sock_handler->inner.port == sock_handler->outer.port) {
sock_handler->inner.enabled = false;
sock_handler->outer.enabled = true;
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = false;
} else {
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = true;

View File

@ -111,10 +111,8 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
}
release_iovec_buffer(task);
sf_task_detach_thread(task);
close(task->event.fd);
task->event.fd = -1;
task->handler->close_connection(task);
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
sf_release_task(task);
@ -130,15 +128,13 @@ static inline int set_write_event(struct fast_task_info *task)
task->event.callback = (IOEventCallback)sf_client_sock_write;
if (ioevent_modify(&task->thread_data->ev_puller,
task->event.fd, IOEVENT_WRITE, task) != 0)
task->event.fd, IOEVENT_WRITE, task) != 0)
{
result = errno != 0 ? errno : ENOENT;
ioevent_add_to_deleted_list(task);
logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
"ioevent_modify fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
return 0;
@ -194,20 +190,28 @@ static inline int sf_nio_init(struct fast_task_info *task)
task->network_timeout);
}
static int sf_client_sock_connect(int sock, short event, void *arg)
int sf_connect_socket_server_done(struct fast_task_info *task)
{
int result;
socklen_t len;
len = sizeof(result);
if (getsockopt(task->event.fd, SOL_SOCKET, SO_ERROR, &result, &len) < 0) {
result = errno != 0 ? errno : EACCES;
}
return result;
}
static int sf_client_sock_connect(int sock, short event, void *arg)
{
int result;
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
if (event & IOEVENT_TIMEOUT) {
result = ETIMEDOUT;
} else {
len = sizeof(result);
if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) {
result = errno != 0 ? errno : EACCES;
}
result = task->handler->connect_server_done(task);
}
if (result != 0) {
@ -225,21 +229,27 @@ static int sf_client_sock_connect(int sock, short event, void *arg)
return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE);
}
static int sf_connect_server(struct fast_task_info *task)
int sf_async_connect_socket_server(struct fast_task_info *task)
{
int result;
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
O_NONBLOCK, NULL, &result)) < 0)
{
return result > 0 ? -1 * result : result;
}
result = asyncconnectserverbyip(task->event.fd,
return asyncconnectserverbyip(task->event.fd,
task->server_ip, task->port);
}
static int sf_async_connect_server(struct fast_task_info *task)
{
int result;
result = task->handler->async_connect_server(task);
if (result == 0) {
if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, task->network_timeout)) != 0)
sf_client_sock_read, task->network_timeout)) != 0)
{
return result;
}
@ -254,8 +264,7 @@ static int sf_connect_server(struct fast_task_info *task)
sf_client_sock_connect, task->connect_timeout);
return result > 0 ? -1 * result : result;
} else {
close(task->event.fd);
task->event.fd = -1;
task->handler->close_connection(task);
logError("file: "__FILE__", line: %d, "
"connect to server %s:%u fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip,
@ -274,7 +283,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage)
result = sf_nio_init(task);
break;
case SF_NIO_STAGE_CONNECT:
result = sf_connect_server(task);
result = sf_async_connect_server(task);
break;
case SF_NIO_STAGE_RECV:
if ((result=sf_set_read_event(task)) == 0) {
@ -496,13 +505,231 @@ static inline int check_task(struct fast_task_info *task,
}
}
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action)
{
int bytes;
if (task->iovec_array.iovs != NULL) {
bytes = writev(task->event.fd, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX));
} else {
bytes = write(task->event.fd, task->data + task->offset,
task->length - task->offset);
}
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
if (set_write_event(task) != 0) {
return -1;
}
*action = sf_comm_action_break;
return 0;
} else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
__LINE__, task->client_ip);
*action = sf_comm_action_continue;
return 0;
} else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, task offset: %d, length: %d, "
"errno: %d, error info: %s", __LINE__, task->client_ip,
task->offset, task->length, errno, strerror(errno));
return -1;
}
} else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, task length: %d, offset: %d, "
"send failed, connection disconnected", __LINE__,
task->client_ip, task->event.fd, task->length, task->offset);
return -1;
}
task->offset += bytes;
if (task->offset >= task->length) {
*action = sf_comm_action_finish;
} else {
*action = sf_comm_action_continue;
/* set next writev iovec array */
if (task->iovec_array.iovs != NULL) {
struct iovec *iov;
struct iovec *end;
int iov_sum;
int iov_remain;
iov = task->iovec_array.iovs;
end = task->iovec_array.iovs + task->iovec_array.count;
iov_sum = 0;
do {
iov_sum += iov->iov_len;
iov_remain = iov_sum - bytes;
if (iov_remain == 0) {
iov++;
break;
} else if (iov_remain > 0) {
iov->iov_base += (iov->iov_len - iov_remain);
iov->iov_len = iov_remain;
break;
}
iov++;
} while (iov < end);
task->iovec_array.iovs = iov;
task->iovec_array.count = end - iov;
}
}
return bytes;
}
ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
{
int bytes;
int recv_bytes;
bool new_alloc;
if (task->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->offset;
bytes = read(task->event.fd, task->data + task->offset, recv_bytes);
} else {
recv_bytes = task->length - task->offset;
if (task->recv_body == NULL) {
bytes = read(task->event.fd, task->data + task->offset, recv_bytes);
} else {
bytes = read(task->event.fd, task->recv_body + (task->offset -
SF_CTX->header_size), recv_bytes);
}
}
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
*action = sf_comm_action_break;
return 0;
} else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
__LINE__, task->client_ip);
*action = sf_comm_action_continue;
return 0;
} else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv fail, "
"errno: %d, error info: %s",
__LINE__, task->client_ip,
errno, strerror(errno));
return -1;
}
} else if (bytes == 0) {
if (task->offset > 0) {
if (task->length > 0) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection disconnected, "
"expect pkg length: %d, recv pkg length: %d",
__LINE__, task->client_ip, task->length,
task->offset);
} else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection "
"disconnected, recv pkg length: %d",
__LINE__, task->client_ip,
task->offset);
}
} else {
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, recv fail, "
"connection disconnected", __LINE__,
task->client_ip, task->event.fd);
}
return -1;
}
TCP_SET_QUICK_ACK(task->event.fd);
task->offset += bytes;
if (task->length == 0) { //pkg header
if (task->offset < SF_CTX->header_size) {
*action = sf_comm_action_continue;
return bytes;
}
if (SF_CTX->set_body_length(task) != 0) {
return -1;
}
if (task->length < 0) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d < 0",
__LINE__, task->client_ip,
task->length);
return -1;
}
task->length += SF_CTX->header_size;
if (task->length > g_sf_global_vars.max_pkg_size) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d > "
"max pkg size: %d", __LINE__,
task->client_ip, task->length,
g_sf_global_vars.max_pkg_size);
return -1;
}
if (SF_CTX->alloc_recv_buffer != NULL) {
task->recv_body = SF_CTX->alloc_recv_buffer(task,
task->length - SF_CTX->header_size, &new_alloc);
if (new_alloc && task->recv_body == NULL) {
return -1;
}
} else {
new_alloc = false;
}
if (!new_alloc) {
if (task->length > task->size) {
int old_size;
if (!SF_CTX->realloc_task_buffer) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d exceeds "
"task size: %d, but realloc buffer disabled",
__LINE__, task->client_ip, task->size,
task->length);
return -1;
}
old_size = task->size;
if (free_queue_realloc_buffer(task, task->length) != 0) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, realloc buffer size "
"from %d to %d fail", __LINE__,
task->client_ip, task->size, task->length);
return -1;
}
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, task length: %d, realloc buffer "
"size from %d to %d", __LINE__, task->client_ip,
task->length, old_size, task->size);
}
}
}
if (task->offset >= task->length) { //recv done
*action = sf_comm_action_finish;
} else {
*action = sf_comm_action_continue;
}
return 0;
}
int sf_client_sock_read(int sock, short event, void *arg)
{
int result;
int bytes;
int recv_bytes;
int total_read;
bool new_alloc;
SFCommAction action;
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
@ -544,147 +771,19 @@ int sf_client_sock_read(int sock, short event, void *arg)
}
total_read = 0;
action = sf_comm_action_continue;
while (1) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
task->network_timeout);
if (task->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->offset;
bytes = read(sock, task->data + task->offset, recv_bytes);
} else {
recv_bytes = task->length - task->offset;
if (task->recv_body == NULL) {
bytes = read(sock, task->data + task->offset, recv_bytes);
} else {
bytes = read(sock, task->recv_body + (task->offset -
SF_CTX->header_size), recv_bytes);
}
}
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
__LINE__, task->client_ip);
continue;
} else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv fail, "
"errno: %d, error info: %s",
__LINE__, task->client_ip,
errno, strerror(errno));
ioevent_add_to_deleted_list(task);
return -1;
}
} else if (bytes == 0) {
if (task->offset > 0) {
if (task->length > 0) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection disconnected, "
"expect pkg length: %d, recv pkg length: %d",
__LINE__, task->client_ip, task->length,
task->offset);
} else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection "
"disconnected, recv pkg length: %d",
__LINE__, task->client_ip,
task->offset);
}
} else {
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, recv fail, "
"connection disconnected", __LINE__,
task->client_ip, sock);
}
if ((bytes=task->handler->recv_data(task, &action)) < 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
TCP_SET_QUICK_ACK(sock);
total_read += bytes;
task->offset += bytes;
if (task->length == 0) { //pkg header
if (task->offset < SF_CTX->header_size) {
continue;
}
if (SF_CTX->set_body_length(task) != 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
if (task->length < 0) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d < 0",
__LINE__, task->client_ip,
task->length);
ioevent_add_to_deleted_list(task);
return -1;
}
task->length += SF_CTX->header_size;
if (task->length > g_sf_global_vars.max_pkg_size) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d > "
"max pkg size: %d", __LINE__,
task->client_ip, task->length,
g_sf_global_vars.max_pkg_size);
ioevent_add_to_deleted_list(task);
return -1;
}
if (SF_CTX->alloc_recv_buffer != NULL) {
task->recv_body = SF_CTX->alloc_recv_buffer(task,
task->length - SF_CTX->header_size, &new_alloc);
if (new_alloc && task->recv_body == NULL) {
ioevent_add_to_deleted_list(task);
return -1;
}
} else {
new_alloc = false;
}
if (!new_alloc) {
if (task->length > task->size) {
int old_size;
if (!SF_CTX->realloc_task_buffer) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d exceeds "
"task size: %d, but realloc buffer disabled",
__LINE__, task->client_ip, task->size,
task->length);
ioevent_add_to_deleted_list(task);
return -1;
}
old_size = task->size;
if (free_queue_realloc_buffer(task, task->length) != 0) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, realloc buffer size "
"from %d to %d fail", __LINE__,
task->client_ip, task->size, task->length);
ioevent_add_to_deleted_list(task);
return -1;
}
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, task length: %d, realloc buffer "
"size from %d to %d", __LINE__, task->client_ip,
task->length, old_size, task->size);
}
}
}
if (task->offset >= task->length) { //recv done
if (action == sf_comm_action_finish) {
task->req_count++;
task->nio_stages.current = SF_NIO_STAGE_SEND;
if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
@ -692,6 +791,8 @@ int sf_client_sock_read(int sock, short event, void *arg)
return -1;
}
break;
} else if (action == sf_comm_action_break) {
break;
}
}
@ -704,6 +805,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
int bytes;
int total_write;
int length;
SFCommAction action;
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
@ -722,52 +824,19 @@ int sf_client_sock_write(int sock, short event, void *arg)
}
total_write = 0;
action = sf_comm_action_continue;
while (1) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
task->network_timeout);
if (task->iovec_array.iovs != NULL) {
bytes = writev(sock, task->iovec_array.iovs,
FC_MIN(task->iovec_array.count, IOV_MAX));
} else {
bytes = write(sock, task->data + task->offset,
task->length - task->offset);
}
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
if (set_write_event(task) != 0) {
return -1;
}
break;
} else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
__LINE__, task->client_ip);
continue;
} else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, task offset: %d, length: %d, "
"errno: %d, error info: %s", __LINE__, task->client_ip,
task->offset, task->length, errno, strerror(errno));
ioevent_add_to_deleted_list(task);
return -1;
}
} else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, task length: %d, offset: %d, "
"send failed, connection disconnected", __LINE__,
task->client_ip, sock, task->length, task->offset);
&task->event.timer, g_current_time +
task->network_timeout);
if ((bytes=task->handler->send_data(task, &action)) < 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
total_write += bytes;
task->offset += bytes;
if (task->offset >= task->length) {
if (action == sf_comm_action_finish) {
release_iovec_buffer(task);
length = task->length;
@ -785,35 +854,8 @@ int sf_client_sock_write(int sock, short event, void *arg)
}
break;
}
/* set next writev iovec array */
if (task->iovec_array.iovs != NULL) {
struct iovec *iov;
struct iovec *end;
int iov_sum;
int iov_remain;
iov = task->iovec_array.iovs;
end = task->iovec_array.iovs + task->iovec_array.count;
iov_sum = 0;
do {
iov_sum += iov->iov_len;
iov_remain = iov_sum - bytes;
if (iov_remain == 0) {
iov++;
break;
} else if (iov_remain > 0) {
iov->iov_base += (iov->iov_len - iov_remain);
iov->iov_len = iov_remain;
break;
}
iov++;
} while (iov < end);
task->iovec_array.iovs = iov;
task->iovec_array.count = end - iov;
} else if (action == sf_comm_action_break) {
break;
}
}

View File

@ -26,7 +26,7 @@
#include "sf_define.h"
#include "sf_types.h"
#define SF_CTX ((SFContext *)(task->ctx))
#define SF_CTX (task->handler->ctx)
#ifdef __cplusplus
extern "C" {
@ -92,6 +92,12 @@ void sf_task_switch_thread(struct fast_task_info *task,
void sf_task_detach_thread(struct fast_task_info *task);
int sf_async_connect_socket_server(struct fast_task_info *task);
int sf_connect_socket_server_done(struct fast_task_info *task);
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action);
ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action);
static inline int sf_nio_forward_request(struct fast_task_info *task,
const int new_thread_index)
{

View File

@ -436,6 +436,49 @@ void sf_close_socket_server(SFListener *listener)
}
}
struct fast_task_info *sf_accept_socket_connection(SFListener *listener)
{
int incomesock;
int port;
socklen_t sockaddr_len;
struct fast_task_info *task;
sockaddr_len = sizeof(listener->inaddr);
incomesock = accept(listener->sock, (struct sockaddr *)
&listener->inaddr, &sockaddr_len);
if (incomesock < 0) { //error
if (!(errno == EINTR || errno == EAGAIN)) {
logError("file: "__FILE__", line: %d, "
"accept fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
}
return NULL;
}
if (tcpsetnonblockopt(incomesock) != 0) {
close(incomesock);
return NULL;
}
FC_SET_CLOEXEC(incomesock);
if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) {
close(incomesock);
return NULL;
}
getPeerIpAddPort(incomesock, task->client_ip,
sizeof(task->client_ip), &port);
task->port = port;
return task;
}
void sf_close_socket_connection(struct fast_task_info *task)
{
close(task->event.fd);
task->event.fd = -1;
}
void sf_socket_close_ex(SFContext *sf_context)
{
SFNetworkHandler *handler;
@ -457,57 +500,28 @@ void sf_socket_close_ex(SFContext *sf_context)
static void accept_run(SFListener *listener)
{
int incomesock;
int port;
struct sockaddr_in inaddr;
socklen_t sockaddr_len;
struct fast_task_info *task;
while (g_sf_global_vars.continue_flag) {
sockaddr_len = sizeof(inaddr);
incomesock = accept(listener->sock,
(struct sockaddr*)&inaddr, &sockaddr_len);
if (incomesock < 0) { //error
if (!(errno == EINTR || errno == EAGAIN)) {
logError("file: "__FILE__", line: %d, "
"accept fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
}
if ((task=listener->handler->accept_connection(listener)) == NULL) {
continue;
}
if (tcpsetnonblockopt(incomesock) != 0) {
close(incomesock);
continue;
}
FC_SET_CLOEXEC(incomesock);
if ((task=sf_alloc_init_task(listener->handler->ctx,
incomesock)) == NULL)
{
close(incomesock);
continue;
}
getPeerIpAddPort(incomesock, task->client_ip,
sizeof(task->client_ip), &port);
task->port = port;
task->thread_data = listener->handler->ctx->thread_data +
incomesock % listener->handler->ctx->work_threads;
task->event.fd % listener->handler->ctx->work_threads;
if (listener->handler->ctx->accept_done_func != NULL) {
if (listener->handler->ctx->accept_done_func(
task, inaddr.sin_addr.s_addr,
if (listener->handler->ctx->accept_done_func(task,
listener->inaddr.sin_addr.s_addr,
listener->is_inner) != 0)
{
close(incomesock);
listener->handler->close_connection(task);
sf_release_task(task);
continue;
}
}
if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) {
close(incomesock);
listener->handler->close_connection(task);
sf_release_task(task);
}
}

View File

@ -85,6 +85,9 @@ void sf_set_current_time();
int sf_create_socket_server(SFListener *listener, const char *bind_addr);
void sf_close_socket_server(SFListener *listener);
struct fast_task_info *sf_accept_socket_connection(SFListener *listener);
void sf_close_socket_connection(struct fast_task_info *task);
int sf_socket_server_ex(SFContext *sf_context);
#define sf_socket_server() sf_socket_server_ex(&g_sf_context)
@ -127,7 +130,7 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler);
int sf_init_task(struct fast_task_info *task);
static inline struct fast_task_info *sf_alloc_init_task(
SFContext *sf_context, const int sock)
SFNetworkHandler *handler, const int fd)
{
struct fast_task_info *task;
@ -139,11 +142,11 @@ static inline struct fast_task_info *sf_alloc_init_task(
__LINE__);
return NULL;
}
__sync_add_and_fetch(&task->reffer_count, 1);
__sync_bool_compare_and_swap(&task->canceled, 1, 0);
task->ctx = sf_context;
task->event.fd = sock;
task->handler = handler;
task->event.fd = fd;
return task;
}

View File

@ -53,6 +53,17 @@ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task);
typedef int (*sf_error_handler_callback)(const int errnum);
typedef enum {
sf_network_type_sock = 's',
sf_network_type_rdma = 'r'
} SFNetworkType;
typedef enum {
sf_comm_action_continue = 'c',
sf_comm_action_break = 'b',
sf_comm_action_finish = 'f'
} SFCommAction;
struct sf_listener;
typedef int (*sf_create_server_callback)(struct sf_listener *listener,
const char *bind_addr);
@ -63,15 +74,10 @@ typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task);
typedef int (*sf_connect_server_done_callback)(struct fast_task_info *task);
typedef void (*sf_close_connection_callback)(struct fast_task_info *task);
typedef int (*sf_send_data_callback)(struct fast_task_info *task,
bool *send_done);
typedef int (*sf_recv_data_callback)(struct fast_task_info *task,
bool *recv_done);
typedef enum {
sf_network_type_sock = 's',
sf_network_type_rdma = 'r'
} SFNetworkType;
typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task,
SFCommAction *action);
typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task,
SFCommAction *action);
struct sf_network_handler;
typedef struct sf_listener {
@ -83,6 +89,7 @@ typedef struct sf_listener {
int sock; //for socket
void *id; //for rdma_cm
};
struct sockaddr_in inaddr; //for accept
} SFListener;
struct sf_context;