diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 7253f46..12a012d 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -171,12 +171,18 @@ void client_channel_destroy() } static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel - *channel, const uint32_t hash_code, const char *server_ip, - const uint16_t port, int *err_no) + *channel, const uint32_t hash_code, const SFNetworkType network_type, + const char *server_ip, const uint16_t port, int *err_no) { struct fast_task_info *task; + SFNetworkHandler *handler; - if ((task=sf_alloc_init_task(&g_sf_context, -1)) == NULL) { + if (network_type == sf_network_type_sock) { + handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + } else { + handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + } + if ((task=sf_alloc_init_task(handler, -1)) == NULL) { *err_no = ENOMEM; return NULL; } @@ -226,8 +232,8 @@ int idempotency_client_channel_check_reconnect( } struct idempotency_client_channel *idempotency_client_channel_get( - const char *server_ip, const uint16_t server_port, - const int timeout, int *err_no) + const SFNetworkType network_type, const char *server_ip, + const uint16_t server_port, const int timeout, int *err_no) { int r; int key_len; @@ -277,8 +283,8 @@ struct idempotency_client_channel *idempotency_client_channel_get( break; } - channel->task = alloc_channel_task(channel, - hash_code, server_ip, server_port, err_no); + channel->task = alloc_channel_task(channel, hash_code, + network_type, server_ip, server_port, err_no); if (channel->task == NULL) { fast_mblock_free_object(&channel_context. channel_allocator, channel); diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index aafbe65..e942909 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -40,8 +40,8 @@ void idempotency_client_channel_config_to_string_ex( char *output, const int size, const bool add_comma); struct idempotency_client_channel *idempotency_client_channel_get( - const char *server_ip, const uint16_t server_port, - const int timeout, int *err_no); + const SFNetworkType network_type, const char *server_ip, + const uint16_t server_port, const int timeout, int *err_no); static inline uint64_t idempotency_client_channel_next_seq_id( struct idempotency_client_channel *channel) diff --git a/src/sf_global.c b/src/sf_global.c index 7e33872..7bfb8fd 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -31,6 +31,7 @@ #include "fastcommon/process_ctrl.h" #include "fastcommon/logger.h" #include "sf_nio.h" +#include "sf_service.h" #include "sf_global.h" SFGlobalVariables g_sf_global_vars = { @@ -423,7 +424,16 @@ static int init_network_handler(SFNetworkHandler *handler, if (handler->type == sf_network_type_sock) { handler->inner.sock = -1; handler->outer.sock = -1; + handler->create_server = sf_create_socket_server; + handler->close_server = sf_close_socket_server; + handler->accept_connection = sf_accept_socket_connection; + handler->async_connect_server = sf_async_connect_socket_server; + handler->connect_server_done = sf_connect_socket_server_done; + handler->close_connection = sf_close_socket_connection; + handler->send_data = sf_socket_send_data; + handler->recv_data = sf_socket_recv_data; } else { + //TODO } return 0; @@ -483,8 +493,8 @@ int sf_load_context_from_config_ex(SFContext *sf_context, } if (sock_handler->inner.port == sock_handler->outer.port) { - sock_handler->inner.enabled = false; - sock_handler->outer.enabled = true; + sock_handler->inner.enabled = true; + sock_handler->outer.enabled = false; } else { sock_handler->inner.enabled = true; sock_handler->outer.enabled = true; diff --git a/src/sf_nio.c b/src/sf_nio.c index c143c94..3eaf2b0 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -111,10 +111,8 @@ void sf_task_finish_clean_up(struct fast_task_info *task) } release_iovec_buffer(task); - sf_task_detach_thread(task); - close(task->event.fd); - task->event.fd = -1; + task->handler->close_connection(task); __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); sf_release_task(task); @@ -130,15 +128,13 @@ static inline int set_write_event(struct fast_task_info *task) task->event.callback = (IOEventCallback)sf_client_sock_write; if (ioevent_modify(&task->thread_data->ev_puller, - task->event.fd, IOEVENT_WRITE, task) != 0) + task->event.fd, IOEVENT_WRITE, task) != 0) { result = errno != 0 ? errno : ENOENT; - ioevent_add_to_deleted_list(task); - logError("file: "__FILE__", line: %d, " - "ioevent_modify fail, " - "errno: %d, error info: %s", - __LINE__, result, strerror(result)); + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); return result; } return 0; @@ -194,20 +190,28 @@ static inline int sf_nio_init(struct fast_task_info *task) task->network_timeout); } -static int sf_client_sock_connect(int sock, short event, void *arg) +int sf_connect_socket_server_done(struct fast_task_info *task) { int result; socklen_t len; + + len = sizeof(result); + if (getsockopt(task->event.fd, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { + result = errno != 0 ? errno : EACCES; + } + return result; +} + +static int sf_client_sock_connect(int sock, short event, void *arg) +{ + int result; struct fast_task_info *task; task = (struct fast_task_info *)arg; if (event & IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { - len = sizeof(result); - if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { - result = errno != 0 ? errno : EACCES; - } + result = task->handler->connect_server_done(task); } if (result != 0) { @@ -225,21 +229,27 @@ static int sf_client_sock_connect(int sock, short event, void *arg) return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); } -static int sf_connect_server(struct fast_task_info *task) +int sf_async_connect_socket_server(struct fast_task_info *task) { int result; - if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, O_NONBLOCK, NULL, &result)) < 0) { return result > 0 ? -1 * result : result; } - result = asyncconnectserverbyip(task->event.fd, + return asyncconnectserverbyip(task->event.fd, task->server_ip, task->port); +} + +static int sf_async_connect_server(struct fast_task_info *task) +{ + int result; + + result = task->handler->async_connect_server(task); if (result == 0) { if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, task->network_timeout)) != 0) + sf_client_sock_read, task->network_timeout)) != 0) { return result; } @@ -254,8 +264,7 @@ static int sf_connect_server(struct fast_task_info *task) sf_client_sock_connect, task->connect_timeout); return result > 0 ? -1 * result : result; } else { - close(task->event.fd); - task->event.fd = -1; + task->handler->close_connection(task); logError("file: "__FILE__", line: %d, " "connect to server %s:%u fail, errno: %d, " "error info: %s", __LINE__, task->server_ip, @@ -274,7 +283,7 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = sf_nio_init(task); break; case SF_NIO_STAGE_CONNECT: - result = sf_connect_server(task); + result = sf_async_connect_server(task); break; case SF_NIO_STAGE_RECV: if ((result=sf_set_read_event(task)) == 0) { @@ -496,13 +505,231 @@ static inline int check_task(struct fast_task_info *task, } } +ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) +{ + int bytes; + + if (task->iovec_array.iovs != NULL) { + bytes = writev(task->event.fd, task->iovec_array.iovs, + FC_MIN(task->iovec_array.count, IOV_MAX)); + } else { + bytes = write(task->event.fd, task->data + task->offset, + task->length - task->offset); + } + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + if (set_write_event(task) != 0) { + return -1; + } + *action = sf_comm_action_break; + return 0; + } else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, ignore interupt signal", + __LINE__, task->client_ip); + *action = sf_comm_action_continue; + return 0; + } else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, send fail, task offset: %d, length: %d, " + "errno: %d, error info: %s", __LINE__, task->client_ip, + task->offset, task->length, errno, strerror(errno)); + return -1; + } + } else if (bytes == 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, sock: %d, task length: %d, offset: %d, " + "send failed, connection disconnected", __LINE__, + task->client_ip, task->event.fd, task->length, task->offset); + return -1; + } + + task->offset += bytes; + if (task->offset >= task->length) { + *action = sf_comm_action_finish; + } else { + *action = sf_comm_action_continue; + + /* set next writev iovec array */ + if (task->iovec_array.iovs != NULL) { + struct iovec *iov; + struct iovec *end; + int iov_sum; + int iov_remain; + + iov = task->iovec_array.iovs; + end = task->iovec_array.iovs + task->iovec_array.count; + iov_sum = 0; + do { + iov_sum += iov->iov_len; + iov_remain = iov_sum - bytes; + if (iov_remain == 0) { + iov++; + break; + } else if (iov_remain > 0) { + iov->iov_base += (iov->iov_len - iov_remain); + iov->iov_len = iov_remain; + break; + } + + iov++; + } while (iov < end); + + task->iovec_array.iovs = iov; + task->iovec_array.count = end - iov; + } + } + + return bytes; +} + +ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) +{ + int bytes; + int recv_bytes; + bool new_alloc; + + if (task->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->offset; + bytes = read(task->event.fd, task->data + task->offset, recv_bytes); + } else { + recv_bytes = task->length - task->offset; + if (task->recv_body == NULL) { + bytes = read(task->event.fd, task->data + task->offset, recv_bytes); + } else { + bytes = read(task->event.fd, task->recv_body + (task->offset - + SF_CTX->header_size), recv_bytes); + } + } + + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + *action = sf_comm_action_break; + return 0; + } else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, ignore interupt signal", + __LINE__, task->client_ip); + *action = sf_comm_action_continue; + return 0; + } else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, recv fail, " + "errno: %d, error info: %s", + __LINE__, task->client_ip, + errno, strerror(errno)); + return -1; + } + } else if (bytes == 0) { + if (task->offset > 0) { + if (task->length > 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, connection disconnected, " + "expect pkg length: %d, recv pkg length: %d", + __LINE__, task->client_ip, task->length, + task->offset); + } else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, connection " + "disconnected, recv pkg length: %d", + __LINE__, task->client_ip, + task->offset); + } + } else { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, sock: %d, recv fail, " + "connection disconnected", __LINE__, + task->client_ip, task->event.fd); + } + + return -1; + } + + TCP_SET_QUICK_ACK(task->event.fd); + task->offset += bytes; + if (task->length == 0) { //pkg header + if (task->offset < SF_CTX->header_size) { + *action = sf_comm_action_continue; + return bytes; + } + + if (SF_CTX->set_body_length(task) != 0) { + return -1; + } + if (task->length < 0) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d < 0", + __LINE__, task->client_ip, + task->length); + return -1; + } + + task->length += SF_CTX->header_size; + if (task->length > g_sf_global_vars.max_pkg_size) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d > " + "max pkg size: %d", __LINE__, + task->client_ip, task->length, + g_sf_global_vars.max_pkg_size); + return -1; + } + + if (SF_CTX->alloc_recv_buffer != NULL) { + task->recv_body = SF_CTX->alloc_recv_buffer(task, + task->length - SF_CTX->header_size, &new_alloc); + if (new_alloc && task->recv_body == NULL) { + return -1; + } + } else { + new_alloc = false; + } + + if (!new_alloc) { + if (task->length > task->size) { + int old_size; + + if (!SF_CTX->realloc_task_buffer) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d exceeds " + "task size: %d, but realloc buffer disabled", + __LINE__, task->client_ip, task->size, + task->length); + return -1; + } + + old_size = task->size; + if (free_queue_realloc_buffer(task, task->length) != 0) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, realloc buffer size " + "from %d to %d fail", __LINE__, + task->client_ip, task->size, task->length); + return -1; + } + + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, task length: %d, realloc buffer " + "size from %d to %d", __LINE__, task->client_ip, + task->length, old_size, task->size); + } + } + } + + if (task->offset >= task->length) { //recv done + *action = sf_comm_action_finish; + } else { + *action = sf_comm_action_continue; + } + + return 0; +} + int sf_client_sock_read(int sock, short event, void *arg) { int result; int bytes; - int recv_bytes; int total_read; - bool new_alloc; + SFCommAction action; struct fast_task_info *task; task = (struct fast_task_info *)arg; @@ -544,147 +771,19 @@ int sf_client_sock_read(int sock, short event, void *arg) } total_read = 0; + action = sf_comm_action_continue; while (1) { fast_timer_modify(&task->thread_data->timer, &task->event.timer, g_current_time + task->network_timeout); - if (task->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - task->offset; - bytes = read(sock, task->data + task->offset, recv_bytes); - } else { - recv_bytes = task->length - task->offset; - if (task->recv_body == NULL) { - bytes = read(sock, task->data + task->offset, recv_bytes); - } else { - bytes = read(sock, task->recv_body + (task->offset - - SF_CTX->header_size), recv_bytes); - } - } - - if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - break; - } else if (errno == EINTR) { //should retry - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, ignore interupt signal", - __LINE__, task->client_ip); - continue; - } else { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, recv fail, " - "errno: %d, error info: %s", - __LINE__, task->client_ip, - errno, strerror(errno)); - - ioevent_add_to_deleted_list(task); - return -1; - } - } else if (bytes == 0) { - if (task->offset > 0) { - if (task->length > 0) { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, connection disconnected, " - "expect pkg length: %d, recv pkg length: %d", - __LINE__, task->client_ip, task->length, - task->offset); - } else { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, connection " - "disconnected, recv pkg length: %d", - __LINE__, task->client_ip, - task->offset); - } - } else { - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, recv fail, " - "connection disconnected", __LINE__, - task->client_ip, sock); - } + if ((bytes=task->handler->recv_data(task, &action)) < 0) { ioevent_add_to_deleted_list(task); return -1; } - TCP_SET_QUICK_ACK(sock); total_read += bytes; - task->offset += bytes; - if (task->length == 0) { //pkg header - if (task->offset < SF_CTX->header_size) { - continue; - } - - if (SF_CTX->set_body_length(task) != 0) { - ioevent_add_to_deleted_list(task); - return -1; - } - if (task->length < 0) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d < 0", - __LINE__, task->client_ip, - task->length); - - ioevent_add_to_deleted_list(task); - return -1; - } - - task->length += SF_CTX->header_size; - if (task->length > g_sf_global_vars.max_pkg_size) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d > " - "max pkg size: %d", __LINE__, - task->client_ip, task->length, - g_sf_global_vars.max_pkg_size); - - ioevent_add_to_deleted_list(task); - return -1; - } - - if (SF_CTX->alloc_recv_buffer != NULL) { - task->recv_body = SF_CTX->alloc_recv_buffer(task, - task->length - SF_CTX->header_size, &new_alloc); - if (new_alloc && task->recv_body == NULL) { - ioevent_add_to_deleted_list(task); - return -1; - } - } else { - new_alloc = false; - } - - if (!new_alloc) { - if (task->length > task->size) { - int old_size; - - if (!SF_CTX->realloc_task_buffer) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d exceeds " - "task size: %d, but realloc buffer disabled", - __LINE__, task->client_ip, task->size, - task->length); - - ioevent_add_to_deleted_list(task); - return -1; - } - - old_size = task->size; - if (free_queue_realloc_buffer(task, task->length) != 0) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, realloc buffer size " - "from %d to %d fail", __LINE__, - task->client_ip, task->size, task->length); - - ioevent_add_to_deleted_list(task); - return -1; - } - - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, task length: %d, realloc buffer " - "size from %d to %d", __LINE__, task->client_ip, - task->length, old_size, task->size); - } - } - } - - if (task->offset >= task->length) { //recv done + if (action == sf_comm_action_finish) { task->req_count++; task->nio_stages.current = SF_NIO_STAGE_SEND; if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error @@ -692,6 +791,8 @@ int sf_client_sock_read(int sock, short event, void *arg) return -1; } break; + } else if (action == sf_comm_action_break) { + break; } } @@ -704,6 +805,7 @@ int sf_client_sock_write(int sock, short event, void *arg) int bytes; int total_write; int length; + SFCommAction action; struct fast_task_info *task; task = (struct fast_task_info *)arg; @@ -722,52 +824,19 @@ int sf_client_sock_write(int sock, short event, void *arg) } total_write = 0; + action = sf_comm_action_continue; while (1) { fast_timer_modify(&task->thread_data->timer, - &task->event.timer, g_current_time + - task->network_timeout); - - if (task->iovec_array.iovs != NULL) { - bytes = writev(sock, task->iovec_array.iovs, - FC_MIN(task->iovec_array.count, IOV_MAX)); - } else { - bytes = write(sock, task->data + task->offset, - task->length - task->offset); - } - if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - if (set_write_event(task) != 0) { - return -1; - } - break; - } else if (errno == EINTR) { //should retry - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, ignore interupt signal", - __LINE__, task->client_ip); - continue; - } else { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, send fail, task offset: %d, length: %d, " - "errno: %d, error info: %s", __LINE__, task->client_ip, - task->offset, task->length, errno, strerror(errno)); - - ioevent_add_to_deleted_list(task); - return -1; - } - } else if (bytes == 0) { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, task length: %d, offset: %d, " - "send failed, connection disconnected", __LINE__, - task->client_ip, sock, task->length, task->offset); + &task->event.timer, g_current_time + + task->network_timeout); + if ((bytes=task->handler->send_data(task, &action)) < 0) { ioevent_add_to_deleted_list(task); return -1; } total_write += bytes; - task->offset += bytes; - if (task->offset >= task->length) { + if (action == sf_comm_action_finish) { release_iovec_buffer(task); length = task->length; @@ -785,35 +854,8 @@ int sf_client_sock_write(int sock, short event, void *arg) } break; - } - - /* set next writev iovec array */ - if (task->iovec_array.iovs != NULL) { - struct iovec *iov; - struct iovec *end; - int iov_sum; - int iov_remain; - - iov = task->iovec_array.iovs; - end = task->iovec_array.iovs + task->iovec_array.count; - iov_sum = 0; - do { - iov_sum += iov->iov_len; - iov_remain = iov_sum - bytes; - if (iov_remain == 0) { - iov++; - break; - } else if (iov_remain > 0) { - iov->iov_base += (iov->iov_len - iov_remain); - iov->iov_len = iov_remain; - break; - } - - iov++; - } while (iov < end); - - task->iovec_array.iovs = iov; - task->iovec_array.count = end - iov; + } else if (action == sf_comm_action_break) { + break; } } diff --git a/src/sf_nio.h b/src/sf_nio.h index cbdefb4..c608371 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -26,7 +26,7 @@ #include "sf_define.h" #include "sf_types.h" -#define SF_CTX ((SFContext *)(task->ctx)) +#define SF_CTX (task->handler->ctx) #ifdef __cplusplus extern "C" { @@ -92,6 +92,12 @@ void sf_task_switch_thread(struct fast_task_info *task, void sf_task_detach_thread(struct fast_task_info *task); +int sf_async_connect_socket_server(struct fast_task_info *task); +int sf_connect_socket_server_done(struct fast_task_info *task); + +ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); +ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); + static inline int sf_nio_forward_request(struct fast_task_info *task, const int new_thread_index) { diff --git a/src/sf_service.c b/src/sf_service.c index 0c34faf..5ca78a2 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -436,6 +436,49 @@ void sf_close_socket_server(SFListener *listener) } } +struct fast_task_info *sf_accept_socket_connection(SFListener *listener) +{ + int incomesock; + int port; + socklen_t sockaddr_len; + struct fast_task_info *task; + + sockaddr_len = sizeof(listener->inaddr); + incomesock = accept(listener->sock, (struct sockaddr *) + &listener->inaddr, &sockaddr_len); + if (incomesock < 0) { //error + if (!(errno == EINTR || errno == EAGAIN)) { + logError("file: "__FILE__", line: %d, " + "accept fail, errno: %d, error info: %s", + __LINE__, errno, strerror(errno)); + } + + return NULL; + } + + if (tcpsetnonblockopt(incomesock) != 0) { + close(incomesock); + return NULL; + } + FC_SET_CLOEXEC(incomesock); + + if ((task=sf_alloc_init_task(listener->handler, incomesock)) == NULL) { + close(incomesock); + return NULL; + } + + getPeerIpAddPort(incomesock, task->client_ip, + sizeof(task->client_ip), &port); + task->port = port; + return task; +} + +void sf_close_socket_connection(struct fast_task_info *task) +{ + close(task->event.fd); + task->event.fd = -1; +} + void sf_socket_close_ex(SFContext *sf_context) { SFNetworkHandler *handler; @@ -457,57 +500,28 @@ void sf_socket_close_ex(SFContext *sf_context) static void accept_run(SFListener *listener) { - int incomesock; - int port; - struct sockaddr_in inaddr; - socklen_t sockaddr_len; struct fast_task_info *task; while (g_sf_global_vars.continue_flag) { - sockaddr_len = sizeof(inaddr); - incomesock = accept(listener->sock, - (struct sockaddr*)&inaddr, &sockaddr_len); - if (incomesock < 0) { //error - if (!(errno == EINTR || errno == EAGAIN)) { - logError("file: "__FILE__", line: %d, " - "accept fail, errno: %d, error info: %s", - __LINE__, errno, strerror(errno)); - } - + if ((task=listener->handler->accept_connection(listener)) == NULL) { continue; } - if (tcpsetnonblockopt(incomesock) != 0) { - close(incomesock); - continue; - } - FC_SET_CLOEXEC(incomesock); - - if ((task=sf_alloc_init_task(listener->handler->ctx, - incomesock)) == NULL) - { - close(incomesock); - continue; - } - - getPeerIpAddPort(incomesock, task->client_ip, - sizeof(task->client_ip), &port); - task->port = port; task->thread_data = listener->handler->ctx->thread_data + - incomesock % listener->handler->ctx->work_threads; + task->event.fd % listener->handler->ctx->work_threads; if (listener->handler->ctx->accept_done_func != NULL) { - if (listener->handler->ctx->accept_done_func( - task, inaddr.sin_addr.s_addr, + if (listener->handler->ctx->accept_done_func(task, + listener->inaddr.sin_addr.s_addr, listener->is_inner) != 0) { - close(incomesock); + listener->handler->close_connection(task); sf_release_task(task); continue; } } if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { - close(incomesock); + listener->handler->close_connection(task); sf_release_task(task); } } diff --git a/src/sf_service.h b/src/sf_service.h index b793963..c4585dc 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -85,6 +85,9 @@ void sf_set_current_time(); int sf_create_socket_server(SFListener *listener, const char *bind_addr); void sf_close_socket_server(SFListener *listener); +struct fast_task_info *sf_accept_socket_connection(SFListener *listener); + +void sf_close_socket_connection(struct fast_task_info *task); int sf_socket_server_ex(SFContext *sf_context); #define sf_socket_server() sf_socket_server_ex(&g_sf_context) @@ -127,7 +130,7 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler); int sf_init_task(struct fast_task_info *task); static inline struct fast_task_info *sf_alloc_init_task( - SFContext *sf_context, const int sock) + SFNetworkHandler *handler, const int fd) { struct fast_task_info *task; @@ -139,11 +142,11 @@ static inline struct fast_task_info *sf_alloc_init_task( __LINE__); return NULL; } + __sync_add_and_fetch(&task->reffer_count, 1); __sync_bool_compare_and_swap(&task->canceled, 1, 0); - task->ctx = sf_context; - task->event.fd = sock; - + task->handler = handler; + task->event.fd = fd; return task; } diff --git a/src/sf_types.h b/src/sf_types.h index 1f96905..d91d6aa 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -53,6 +53,17 @@ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); typedef int (*sf_error_handler_callback)(const int errnum); +typedef enum { + sf_network_type_sock = 's', + sf_network_type_rdma = 'r' +} SFNetworkType; + +typedef enum { + sf_comm_action_continue = 'c', + sf_comm_action_break = 'b', + sf_comm_action_finish = 'f' +} SFCommAction; + struct sf_listener; typedef int (*sf_create_server_callback)(struct sf_listener *listener, const char *bind_addr); @@ -63,15 +74,10 @@ typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task); typedef int (*sf_connect_server_done_callback)(struct fast_task_info *task); typedef void (*sf_close_connection_callback)(struct fast_task_info *task); -typedef int (*sf_send_data_callback)(struct fast_task_info *task, - bool *send_done); -typedef int (*sf_recv_data_callback)(struct fast_task_info *task, - bool *recv_done); - -typedef enum { - sf_network_type_sock = 's', - sf_network_type_rdma = 'r' -} SFNetworkType; +typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, + SFCommAction *action); +typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task, + SFCommAction *action); struct sf_network_handler; typedef struct sf_listener { @@ -83,6 +89,7 @@ typedef struct sf_listener { int sock; //for socket void *id; //for rdma_cm }; + struct sockaddr_in inaddr; //for accept } SFListener; struct sf_context;