#include #include #include #include #include #include #include #include #include #include //#include #include #include #include #include #include #include "fastcommon/shared_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/pthread_func.h" #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/fast_task_queue.h" #include "fastcommon/ioevent_loop.h" #include "sf_global.h" #include "sf_nio.h" #define SF_CTX ((SFContext *)(task->ctx)) void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback) { sf_context->header_size = header_size; sf_context->set_body_length = set_body_length_func; sf_context->deal_task = deal_func; sf_context->task_cleanup_func = cleanup_func; sf_context->timeout_callback = timeout_callback; } void sf_task_detach_thread(struct fast_task_info *task) { ioevent_detach(&task->thread_data->ev_puller, task->event.fd); if (task->event.timer.expires > 0) { fast_timer_remove(&task->thread_data->timer, &task->event.timer); task->event.timer.expires = 0; } if (SF_CTX->remove_from_ready_list) { ioevent_remove(&task->thread_data->ev_puller, task); } } void sf_task_switch_thread(struct fast_task_info *task, const int new_thread_index) { sf_task_detach_thread(task); task->thread_data = SF_CTX->thread_data + new_thread_index; } void sf_task_finish_clean_up(struct fast_task_info *task) { /* assert(task->event.fd >= 0); if (task->event.fd < 0) { logWarning("file: "__FILE__", line: %d, " "task: %p already cleaned", __LINE__, task); return; } */ if (task->finish_callback != NULL) { task->finish_callback(task); task->finish_callback = NULL; } sf_task_detach_thread(task); close(task->event.fd); task->event.fd = -1; __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); free_queue_push(task); } static inline int set_write_event(struct fast_task_info *task) { int result; if (task->event.callback == (IOEventCallback)sf_client_sock_write) { return 0; } task->event.callback = (IOEventCallback)sf_client_sock_write; if (ioevent_modify(&task->thread_data->ev_puller, task->event.fd, IOEVENT_WRITE, task) != 0) { result = errno != 0 ? errno : ENOENT; iovent_add_to_deleted_list(task); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " "errno: %d, error info: %s", __LINE__, result, strerror(result)); return result; } return 0; } int sf_set_read_event(struct fast_task_info *task) { int result; sf_nio_set_stage(task, SF_NIO_STAGE_RECV); if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } task->event.callback = (IOEventCallback)sf_client_sock_read; if (ioevent_modify(&task->thread_data->ev_puller, task->event.fd, IOEVENT_READ, task) != 0) { result = errno != 0 ? errno : ENOENT; iovent_add_to_deleted_list(task); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " "errno: %d, error info: %s", __LINE__, result, strerror(result)); return result; } return 0; } static inline int sf_ioevent_add(struct fast_task_info *task, IOEventCallback callback, const int timeout) { int result; result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ, callback, timeout); return result > 0 ? -1 * result : result; } static inline int sf_nio_init(struct fast_task_info *task) { int current_connections; current_connections = __sync_add_and_fetch( &g_sf_global_vars.connection_stat.current_count, 1); if (current_connections > g_sf_global_vars.connection_stat.max_count) { g_sf_global_vars.connection_stat.max_count = current_connections; } return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, task->network_timeout); } static int sf_client_sock_connect(int sock, short event, void *arg) { int result; socklen_t len; struct fast_task_info *task; task = (struct fast_task_info *)arg; if (event & IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { len = sizeof(result); if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { result = errno != 0 ? errno : EACCES; } } if (result != 0) { logError("file: "__FILE__", line: %d, " "connect to server %s:%d fail, errno: %d, " "error info: %s", __LINE__, task->server_ip, task->port, result, STRERROR(result)); iovent_add_to_deleted_list(task); return -1; } logInfo("file: "__FILE__", line: %d, " "connect to server %s:%d successfully", __LINE__, task->server_ip, task->port); sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE); return SF_CTX->deal_task(task); } static int sf_connect_server(struct fast_task_info *task) { int result; if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, O_NONBLOCK, NULL, &result)) < 0) { return result > 0 ? -1 * result : result; } result = asyncconnectserverbyip(task->event.fd, task->server_ip, task->port); if (result == 0) { if ((result=sf_ioevent_add(task, (IOEventCallback) sf_client_sock_read, task->network_timeout)) != 0) { return result; } logInfo("file: "__FILE__", line: %d, " "connect to server %s:%d successfully", __LINE__, task->server_ip, task->port); sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE); return SF_CTX->deal_task(task); } else if (result == EINPROGRESS) { result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) sf_client_sock_connect, task->connect_timeout); return result > 0 ? -1 * result : result; } else { close(task->event.fd); task->event.fd = -1; logError("file: "__FILE__", line: %d, " "connect to server %s:%d fail, errno: %d, " "error info: %s", __LINE__, task->server_ip, task->port, result, STRERROR(result)); return result > 0 ? -1 * result : result; } } static int sf_nio_deal_task(struct fast_task_info *task) { int result; int stage; stage = SF_NIO_TASK_STAGE_FETCH(task); switch (SF_NIO_STAGE_ONLY(stage)) { case SF_NIO_STAGE_INIT: sf_nio_set_stage(task, SF_NIO_STAGE_RECV); result = sf_nio_init(task); break; case SF_NIO_STAGE_CONNECT: result = sf_connect_server(task); break; case SF_NIO_STAGE_RECV: if ((result=sf_set_read_event(task)) == 0) { sf_client_sock_read(task->event.fd, IOEVENT_READ, task); } break; case SF_NIO_STAGE_SEND: result = sf_send_add_event(task); break; case SF_NIO_STAGE_CONTINUE: //continue deal result = SF_CTX->deal_task(task); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread if ((result=sf_ioevent_add(task, (IOEventCallback) sf_client_sock_read, task->network_timeout)) == 0) { result = SF_CTX->deal_task(task); } break; case SF_NIO_STAGE_CLOSE: result = -EIO; //close this socket break; default: logError("file: "__FILE__", line: %d, " "client ip: %s, invalid stage: %d", __LINE__, task->client_ip, stage); result = -EINVAL; break; } if (result < 0) { iovent_add_to_deleted_list(task); } return result; } int sf_nio_notify_ex(struct fast_task_info *task, const int new_stage, const char *file, const int line) { int64_t n; int result; int old_stage; bool notify; old_stage = SF_NIO_TASK_STAGE_FETCH(task); if (!(new_stage == SF_NIO_STAGE_INIT || new_stage == SF_NIO_STAGE_CONNECT || new_stage == SF_NIO_STAGE_CLOSE)) { if (SF_NIO_STAGE_IS_INPROGRESS(old_stage)) { logWarning("file: "__FILE__", line: %d, " "from caller {file: %s, line: %d}, " "client ip: %s, nio stage in progress, " "current stage: %d, skip set to %d", __LINE__, file, line, task->client_ip, old_stage, new_stage); return EBUSY; } } if (!__sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage)) { logWarning("file: "__FILE__", line: %d, " "from caller {file: %s, line: %d}, " "client ip: %s, skip set stage to %d because stage " "changed, current stage: %d", __LINE__, file, line, task->client_ip, new_stage, SF_NIO_TASK_STAGE_FETCH(task)); return EEXIST; } task->next = NULL; pthread_mutex_lock(&task->thread_data->waiting_queue.lock); if (task->thread_data->waiting_queue.tail == NULL) { task->thread_data->waiting_queue.head = task; notify = true; } else { task->thread_data->waiting_queue.tail->next = task; notify = false; } task->thread_data->waiting_queue.tail = task; pthread_mutex_unlock(&task->thread_data->waiting_queue.lock); if (notify) { n = 1; if (write(FC_NOTIFY_WRITE_FD(task->thread_data), &n, sizeof(n)) != sizeof(n)) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__", line: %d, " "write eventfd %d fail, errno: %d, error info: %s", __LINE__, FC_NOTIFY_WRITE_FD(task->thread_data), result, STRERROR(result)); return result; } } return 0; } void sf_recv_notify_read(int sock, short event, void *arg) { int64_t n; struct nio_thread_data *thread_data; struct fast_task_info *task; struct fast_task_info *current; thread_data = (struct nio_thread_data *)arg; if (read(sock, &n, sizeof(n)) < 0) { logWarning("file: "__FILE__", line: %d, " "read from eventfd %d fail, errno: %d, error info: %s", __LINE__, sock, errno, STRERROR(errno)); } pthread_mutex_lock(&thread_data->waiting_queue.lock); current = thread_data->waiting_queue.head; thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL; pthread_mutex_unlock(&thread_data->waiting_queue.lock); while (current != NULL) { task = current; current = current->next; if (!task->canceled) { sf_nio_deal_task(task); } } } int sf_send_add_event(struct fast_task_info *task) { task->offset = 0; if (task->length > 0) { /* direct send */ sf_nio_set_stage(task, SF_NIO_STAGE_SEND); if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { return errno != 0 ? errno : EIO; } } return 0; } int sf_client_sock_read(int sock, short event, void *arg) { int stage; int bytes; int recv_bytes; int total_read; struct fast_task_info *task; task = (struct fast_task_info *)arg; stage = SF_NIO_TASK_STAGE_FETCH(task); if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_RECV)) { return 0; } //assert(sock >= 0); if (event & IOEVENT_TIMEOUT) { if (task->offset == 0 && task->req_count > 0) { if (SF_CTX->timeout_callback != NULL) { if (SF_CTX->timeout_callback(task) != 0) { iovent_add_to_deleted_list(task); return -1; } } task->event.timer.expires = g_current_time + task->network_timeout; fast_timer_add(&task->thread_data->timer, &task->event.timer); } else { if (task->length > 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, recv timeout, " "recv offset: %d, expect length: %d", __LINE__, task->client_ip, task->offset, task->length); } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, req_count: %"PRId64", recv timeout", __LINE__, task->client_ip, task->req_count); } iovent_add_to_deleted_list(task); return -1; } return 0; } if (event & IOEVENT_ERROR) { logDebug("file: "__FILE__", line: %d, " "client ip: %s, recv error event: %d, " "close connection", __LINE__, task->client_ip, event); iovent_add_to_deleted_list(task); return -1; } if (stage != SF_NIO_STAGE_RECV_INPROGRESS) { if (!__sync_bool_compare_and_swap(&task->nio_stage, stage, SF_NIO_STAGE_RECV_INPROGRESS)) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, nio stage change from %d to %d, " "skip read!", __LINE__, task->client_ip, stage, SF_NIO_TASK_STAGE_FETCH(task)); return 0; } } total_read = 0; while (1) { fast_timer_modify(&task->thread_data->timer, &task->event.timer, g_current_time + task->network_timeout); if (task->length == 0) { //recv header recv_bytes = SF_CTX->header_size - task->offset; } else { recv_bytes = task->length - task->offset; } bytes = read(sock, task->data + task->offset, recv_bytes); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; } else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); continue; } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, recv fail, " "errno: %d, error info: %s", __LINE__, task->client_ip, errno, strerror(errno)); iovent_add_to_deleted_list(task); return -1; } } else if (bytes == 0) { if (task->offset > 0) { if (task->length > 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection " "disconnected, expect pkg length: %d, " "recv pkg length: %d", __LINE__, task->client_ip, task->length, task->offset); } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection " "disconnected, recv pkg length: %d", __LINE__, task->client_ip, task->offset); } } else { logDebug("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, recv fail, " "connection disconnected", __LINE__, task->client_ip, sock); } iovent_add_to_deleted_list(task); return -1; } total_read += bytes; task->offset += bytes; if (task->length == 0) { //header if (task->offset < SF_CTX->header_size) { break; } if (SF_CTX->set_body_length(task) != 0) { iovent_add_to_deleted_list(task); return -1; } if (task->length < 0) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d < 0", __LINE__, task->client_ip, task->length); iovent_add_to_deleted_list(task); return -1; } task->length += SF_CTX->header_size; if (task->length > g_sf_global_vars.max_pkg_size) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d > " "max pkg size: %d", __LINE__, task->client_ip, task->length, g_sf_global_vars.max_pkg_size); iovent_add_to_deleted_list(task); return -1; } if (task->length > task->size) { int old_size; if (!SF_CTX->realloc_task_buffer) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d exceeds " "task size: %d, but realloc buffer disabled", __LINE__, task->client_ip, task->size, task->length); iovent_add_to_deleted_list(task); return -1; } old_size = task->size; if (free_queue_realloc_buffer(task, task->length) != 0) { logError("file: "__FILE__", line: %d, " "client ip: %s, realloc buffer size " "from %d to %d fail", __LINE__, task->client_ip, task->size, task->length); iovent_add_to_deleted_list(task); return -1; } logDebug("file: "__FILE__", line: %d, " "client ip: %s, task length: %d, realloc buffer size " "from %d to %d", __LINE__, task->client_ip, task->length, old_size, task->size); } } if (task->offset >= task->length) { //recv done task->req_count++; sf_nio_set_stage(task, SF_NIO_STAGE_SEND); if (SF_CTX->deal_task(task) < 0) { //fatal error iovent_add_to_deleted_list(task); return -1; } break; } } return total_read; } int sf_client_sock_write(int sock, short event, void *arg) { int stage; int bytes; int total_write; struct fast_task_info *task; //assert(sock >= 0); task = (struct fast_task_info *)arg; stage = SF_NIO_TASK_STAGE_FETCH(task); if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_SEND)) { return 0; } if (event & IOEVENT_TIMEOUT) { logError("file: "__FILE__", line: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, " "remain: %d", __LINE__, task->client_ip, task->length, task->offset, task->length - task->offset); iovent_add_to_deleted_list(task); return -1; } if (event & IOEVENT_ERROR) { logDebug("file: "__FILE__", line: %d, " "client ip: %s, recv error event: %d, " "close connection", __LINE__, task->client_ip, event); iovent_add_to_deleted_list(task); return -1; } if (stage != SF_NIO_STAGE_SEND_INPROGRESS) { if (!__sync_bool_compare_and_swap(&task->nio_stage, stage, SF_NIO_STAGE_SEND_INPROGRESS)) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, nio stage change from %d to %d, " "skip write!", __LINE__, task->client_ip, stage, SF_NIO_TASK_STAGE_FETCH(task)); return 0; } } total_write = 0; while (1) { fast_timer_modify(&task->thread_data->timer, &task->event.timer, g_current_time + task->network_timeout); bytes = write(sock, task->data + task->offset, task->length - task->offset); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if (set_write_event(task) != 0) { return -1; } break; } else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", __LINE__, task->client_ip); continue; } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, send fail, " "errno: %d, error info: %s", __LINE__, task->client_ip, errno, strerror(errno)); iovent_add_to_deleted_list(task); return -1; } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, send failed, " "connection disconnected", __LINE__, task->client_ip, sock); iovent_add_to_deleted_list(task); return -1; } total_write += bytes; task->offset += bytes; if (task->offset >= task->length) { task->offset = 0; task->length = 0; if (sf_set_read_event(task) != 0) { return -1; } break; } } return total_write; }