diff --git a/src/sf_nio.c b/src/sf_nio.c index 4f07210..9b04803 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -98,7 +98,7 @@ static inline int set_write_event(struct fast_task_info *task) task->event.fd, IOEVENT_WRITE, task) != 0) { result = errno != 0 ? errno : ENOENT; - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " @@ -122,7 +122,7 @@ static inline int set_read_event(struct fast_task_info *task) task->event.fd, IOEVENT_READ, task) != 0) { result = errno != 0 ? errno : ENOENT; - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " @@ -195,13 +195,12 @@ static int sf_nio_deal_task(struct fast_task_info *task) } if (result < 0) { - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); } return result; } -#if defined(OS_LINUX) int sf_nio_notify(struct fast_task_info *task, const int stage) { int64_t n; @@ -262,59 +261,11 @@ void sf_recv_notify_read(int sock, short event, void *arg) task = current; current = current->next; - sf_nio_deal_task(task); - } -} - -#else - -int sf_nio_notify(struct fast_task_info *task, const int stage) -{ - long task_addr; - int result; - - task_addr = (long)task; - task->nio_stage = stage; - if (write(NOTIFY_WRITE_FD(task->thread_data), &task_addr, - sizeof(task_addr)) != sizeof(task_addr)) - { - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " - "write to pipe %d fail, errno: %d, error info: %s", - __LINE__, NOTIFY_WRITE_FD(task->thread_data), - result, STRERROR(result)); - return result; - } - - return 0; -} - -void sf_recv_notify_read(int sock, short event, void *arg) -{ - int bytes; - long task_ptr; - struct fast_task_info *task; - - while (1) { - if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { - logError("file: "__FILE__", line: %d, " - "call read failed, " - "errno: %d, error info: %s", - __LINE__, errno, strerror(errno)); - } - - break; + if (!task->canceled) { + sf_nio_deal_task(task); } - else if (bytes == 0) { - break; - } - - task = (struct fast_task_info *)task_ptr; - sf_nio_deal_task(task); } } -#endif int sf_send_add_event(struct fast_task_info *task) { @@ -337,7 +288,7 @@ int sf_client_sock_read(int sock, short event, void *arg) struct fast_task_info *task; task = (struct fast_task_info *)arg; - if (task->nio_stage != SF_NIO_STAGE_RECV) { + if (task->canceled || (task->nio_stage != SF_NIO_STAGE_RECV)) { return 0; } @@ -346,7 +297,7 @@ int sf_client_sock_read(int sock, short event, void *arg) if (task->offset == 0 && task->req_count > 0) { if (SF_CTX->timeout_callback != NULL) { if (SF_CTX->timeout_callback(task) != 0) { - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } } @@ -370,7 +321,7 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, task->client_ip, task->req_count); } - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -382,7 +333,7 @@ int sf_client_sock_read(int sock, short event, void *arg) "client ip: %s, recv error event: %d, " "close connection", __LINE__, task->client_ip, event); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -416,7 +367,7 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, task->client_ip, errno, strerror(errno)); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } } @@ -445,7 +396,7 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, task->client_ip, sock); } - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -457,7 +408,7 @@ int sf_client_sock_read(int sock, short event, void *arg) } if (SF_CTX->set_body_length(task) != 0) { - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } if (task->length < 0) { @@ -466,7 +417,7 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, task->client_ip, task->length); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -478,7 +429,7 @@ int sf_client_sock_read(int sock, short event, void *arg) task->client_ip, task->length, g_sf_global_vars.max_pkg_size); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -491,7 +442,7 @@ int sf_client_sock_read(int sock, short event, void *arg) "from %d to %d fail", __LINE__, task->client_ip, task->size, task->length); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -506,7 +457,7 @@ int sf_client_sock_read(int sock, short event, void *arg) task->req_count++; task->nio_stage = SF_NIO_STAGE_SEND; if (SF_CTX->deal_task(task) < 0) { //fatal error - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } break; @@ -524,13 +475,17 @@ int sf_client_sock_write(int sock, short event, void *arg) assert(sock >= 0); task = (struct fast_task_info *)arg; + if (task->canceled) { + return 0; + } + if (event & IOEVENT_TIMEOUT) { logError("file: "__FILE__", line: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, " "remain: %d", __LINE__, task->client_ip, task->length, task->offset, task->length - task->offset); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -539,7 +494,7 @@ int sf_client_sock_write(int sock, short event, void *arg) "client ip: %s, recv error event: %d, " "close connection", __LINE__, task->client_ip, event); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -572,7 +527,7 @@ int sf_client_sock_write(int sock, short event, void *arg) __LINE__, task->client_ip, errno, strerror(errno)); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } } @@ -582,7 +537,7 @@ int sf_client_sock_write(int sock, short event, void *arg) "connection disconnected", __LINE__, task->client_ip, sock); - SF_CTX->task_cleanup_func(task); + iovent_add_to_deleted_list(task); return -1; } @@ -601,4 +556,3 @@ int sf_client_sock_write(int sock, short event, void *arg) return total_write; } - diff --git a/src/sf_service.c b/src/sf_service.c index 4569cdd..9430776 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -176,6 +176,9 @@ int sf_service_init_ex(SFContext *sf_context, return result; } + if ((result=init_pthread_lock(&thread_data->waiting_queue.lock)) != 0) { + return result; + } #if defined(OS_LINUX) NOTIFY_READ_FD(thread_data) = eventfd(0, EFD_NONBLOCK); if (NOTIFY_READ_FD(thread_data) < 0) { @@ -187,10 +190,6 @@ int sf_service_init_ex(SFContext *sf_context, break; } NOTIFY_WRITE_FD(thread_data) = NOTIFY_READ_FD(thread_data); - - if ((result=init_pthread_lock(&thread_data->waiting_queue.lock)) != 0) { - return result; - } #else if (pipe(thread_data->pipe_fds) != 0) { result = errno != 0 ? errno : EPERM; @@ -355,6 +354,7 @@ static void *accept_thread_entrance(void *arg) } strcpy(task->client_ip, szClientIp); + task->canceled = false; task->ctx = accept_context->sf_context; task->event.fd = incomesock; task->thread_data = accept_context->sf_context->thread_data +