From 03037c5d5fb3973235884a666052a1e876e6c75a Mon Sep 17 00:00:00 2001 From: yuqing Date: Mon, 21 May 2018 19:04:02 +0800 Subject: [PATCH] ioevent.[hc]: remove care_events in FreeBSD or MacOS --- HISTORY | 3 +- src/ioevent.c | 32 +++---- src/ioevent.h | 1 - src/multi_socket_client.c | 192 +++++++++++++++++++++++++++++++------- src/multi_socket_client.h | 26 ++++-- 5 files changed, 189 insertions(+), 65 deletions(-) diff --git a/HISTORY b/HISTORY index c681678..fb0c2a7 100644 --- a/HISTORY +++ b/HISTORY @@ -1,10 +1,11 @@ -Version 1.38 2018-05-18 +Version 1.38 2018-05-21 * connection_pool.c: set err_no to 0 when success * shared_func.h: add functions float2buff / buff2float, double2buff / buff2double * logger.h: add function log_get_level_caption * add files: common_blocked_queue.[hc] * add files: multi_socket_client.[hc] + * ioevent.[hc]: remove care_events in FreeBSD or MacOS Version 1.37 2018-02-24 * ini_file_reader.c function annotations LOCAL_IP_GET support index, such as: diff --git a/src/ioevent.c b/src/ioevent.c index 8c9cb88..f3443d5 100644 --- a/src/ioevent.c +++ b/src/ioevent.c @@ -47,7 +47,6 @@ int ioevent_init(IOEventPoller *ioevent, const int size, ioevent->poll_fd = kqueue(); bytes = sizeof(struct kevent) * size; ioevent->events = (struct kevent *)malloc(bytes); - ioevent->care_events = 0; #elif IOEVENT_USE_PORT ioevent->poll_fd = port_create(); bytes = sizeof(port_event_t) * size; @@ -93,7 +92,6 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e, if (e & IOEVENT_WRITE) { EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | ioevent->extra_events, 0, 0, data); } - ioevent->care_events = e; return kevent(ioevent->poll_fd, ev, n, NULL, 0, NULL); #elif IOEVENT_USE_PORT return port_associate(ioevent->poll_fd, PORT_SOURCE_FD, fd, e, data); @@ -112,21 +110,21 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e, #elif IOEVENT_USE_KQUEUE struct kevent ev[2]; int n = 0; + if (e & IOEVENT_READ) { EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | ioevent->extra_events, 0, 0, data); } - else if ((ioevent->care_events & IOEVENT_READ)) { + else { EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); } if (e & IOEVENT_WRITE) { EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | ioevent->extra_events, 0, 0, data); } - else if ((ioevent->care_events & IOEVENT_WRITE)) { + else { EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); } - ioevent->care_events = e; if (n > 0) { return kevent(ioevent->poll_fd, ev, n, NULL, 0, NULL); } @@ -143,22 +141,16 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd) #if IOEVENT_USE_EPOLL return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL); #elif IOEVENT_USE_KQUEUE - struct kevent ev[2]; - int n = 0; - if ((ioevent->care_events & IOEVENT_READ)) { - EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - } - if ((ioevent->care_events & IOEVENT_WRITE)) { - EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - } + struct kevent ev[1]; + int r, w; - ioevent->care_events = 0; - if (n > 0) { - return kevent(ioevent->poll_fd, ev, n, NULL, 0, NULL); - } - else { - return 0; - } + EV_SET(&ev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + r = kevent(ioevent->poll_fd, ev, 1, NULL, 0, NULL); + + EV_SET(&ev[0], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + w = kevent(ioevent->poll_fd, ev, 1, NULL, 0, NULL); + + return (r == 0 || w == 0) ? 0 : r; #elif IOEVENT_USE_PORT return port_dissociate(ioevent->poll_fd, PORT_SOURCE_FD, fd); #endif diff --git a/src/ioevent.h b/src/ioevent.h index c76bd63..2206d0d 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -65,7 +65,6 @@ typedef struct ioevent_puller { #elif IOEVENT_USE_KQUEUE struct kevent *events; struct timespec timeout; - int care_events; #elif IOEVENT_USE_PORT port_event_t *events; timespec_t timeout; diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c index b316231..48f1ac9 100644 --- a/src/multi_socket_client.c +++ b/src/multi_socket_client.c @@ -16,14 +16,17 @@ #include "fast_buffer.h" #include "multi_socket_client.h" +static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, + FastMultiSockEntry *entry); + int fast_multi_sock_client_init(FastMultiSockClient *client, FastMultiSockEntry *entries, const int entry_count, const int header_length, fast_multi_sock_client_get_body_length_func get_body_length_func, - const int init_buffer_size, const int timeout) + const int init_recv_buffer_size, const int timeout) { int result; - int new_init_buffer_size; + int new_init_recv_buffer_size; int i; memset(client, 0, sizeof(FastMultiSockClient)); @@ -50,18 +53,20 @@ int fast_multi_sock_client_init(FastMultiSockClient *client, return result; } - if (init_buffer_size <= 0) { - new_init_buffer_size = 4 * 1024; + if (init_recv_buffer_size <= 0) { + new_init_recv_buffer_size = 4 * 1024; } else { - new_init_buffer_size = init_buffer_size; + new_init_recv_buffer_size = init_recv_buffer_size; } - if (new_init_buffer_size < header_length) { - new_init_buffer_size = header_length; + if (new_init_recv_buffer_size < header_length) { + new_init_recv_buffer_size = header_length; } for (i=0; iioevent); for (i=0; ientry_count; i++) { - fast_buffer_destroy(&client->entries[i].buffer); + fast_buffer_destroy(&client->entries[i].recv_buffer); } } +static int fast_multi_sock_client_do_send(FastMultiSockClient *client, + FastMultiSockEntry *entry) +{ + int bytes; + int result; + + logInfo("file: "__FILE__", line: %d, " + "send remain: %d", __LINE__, entry->remain); + result = 0; + while (entry->remain > 0) { + bytes = write(entry->conn->sock, entry->send_buffer->data + + (entry->send_buffer->length - entry->remain), entry->remain); + + logInfo("sock: %d, write bytes: %d, remain: %d", entry->conn->sock, bytes, entry->remain); + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "server: %s:%d, ignore interupt signal", + __LINE__, entry->conn->ip_addr, + entry->conn->port); + continue; + } + else { + result = errno != 0 ? errno : ECONNRESET; + logError("file: "__FILE__", line: %d, " + "send to server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, entry->conn->ip_addr, + entry->conn->port, + result, strerror(result)); + + break; + } + } + else if (bytes == 0) { + logError("file: "__FILE__", line: %d, " + "send to server %s:%d, sock: %d fail, " + "connection disconnected", + __LINE__, entry->conn->ip_addr, entry->conn->port, + entry->conn->sock); + + result = ECONNRESET; + break; + } + + entry->remain -= bytes; + if (entry->remain == 0) { + entry->remain = client->header_length; //to recv pkg header + entry->io_callback = fast_multi_sock_client_do_recv; + if ((result=ioevent_modify(&client->ioevent, + entry->conn->sock, IOEVENT_READ, entry)) != 0) + { + logError("file: "__FILE__", line: %d, " + "ioevent_modify fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + } + break; + } + } + + return result; +} + static int fast_multi_sock_client_send_data(FastMultiSockClient *client, - FastBuffer *buffer) + FastBuffer *send_buffer) { int i; + int result; + int remain_timeout; + remain_timeout = 0; for (i=0; ientry_count; i++) { - client->entries[i].remain = client->header_length; + client->entries[i].remain = send_buffer->length; client->entries[i].done = false; - client->entries[i].buffer.length = 0; + client->entries[i].recv_buffer.length = 0; + client->entries[i].send_buffer = send_buffer; + client->entries[i].io_callback = fast_multi_sock_client_do_send; + + if (client->entries[i].conn->sock < 0) { + client->entries[i].error_no = ENOTCONN; + client->entries[i].done = true; + logError("file: "__FILE__", line: %d, " + "NOT connected to %s:%d", + __LINE__, client->entries[i].conn->ip_addr, + client->entries[i].conn->port); + continue; + } + + /* + remain_timeout = client->deadline_time - get_current_time(); + if (remain_timeout <= 0) { + client->entries[i].error_no = ETIMEDOUT; + client->entries[i].done = true; + logError("file: "__FILE__", line: %d, " + "tcpsenddata to %s:%d timedout", + __LINE__, client->entries[i].conn->ip_addr, + client->entries[i].conn->port); + continue; + } client->entries[i].error_no = tcpsenddata(client->entries[i].conn->sock, buffer->data, buffer->length, client->timeout); if (client->entries[i].error_no != 0) { client->entries[i].done = true; + result = client->entries[i].error_no; + logError("file: "__FILE__", line: %d, " + "tcpsenddata to %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, client->entries[i].conn->ip_addr, + client->entries[i].conn->port, + result, STRERROR(result)); continue; } + */ client->entries[i].error_no = ioevent_attach(&client->ioevent, - client->entries[i].conn->sock, IOEVENT_READ, + client->entries[i].conn->sock, IOEVENT_WRITE, client->entries + i); if (client->entries[i].error_no != 0) { - int result; - client->entries[i].done = true; result = client->entries[i].error_no; logError("file: "__FILE__", line: %d, " @@ -119,7 +223,7 @@ static int fast_multi_sock_client_send_data(FastMultiSockClient *client, client->pulling_count++; } - return client->pulling_count > 0 ? 0 : ENOENT; + return client->pulling_count > 0 ? 0 : (remain_timeout > 0 ? ENOENT : ETIMEDOUT); } static inline void fast_multi_sock_client_finish(FastMultiSockClient *client, @@ -134,17 +238,18 @@ static inline void fast_multi_sock_client_finish(FastMultiSockClient *client, } } - static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, FastMultiSockEntry *entry) { int bytes; int result; + logInfo("file: "__FILE__", line: %d, " + "recv remain: %d", __LINE__, entry->remain); result = 0; while (entry->remain > 0) { - bytes = read(entry->conn->sock, entry->buffer.data + - entry->buffer.length, entry->remain); + bytes = read(entry->conn->sock, entry->recv_buffer.data + + entry->recv_buffer.length, entry->remain); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; @@ -158,7 +263,7 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, } else { result = errno != 0 ? errno : ECONNRESET; - logWarning("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "server: %s:%d, recv failed, " "errno: %d, error info: %s", __LINE__, entry->conn->ip_addr, @@ -169,7 +274,7 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, } } else if (bytes == 0) { - logDebug("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "server: %s:%d, sock: %d, recv failed, " "connection disconnected", __LINE__, entry->conn->ip_addr, entry->conn->port, @@ -179,11 +284,11 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, break; } - entry->buffer.length += bytes; + entry->recv_buffer.length += bytes; entry->remain -= bytes; - if (entry->remain == 0 && entry->buffer.length == client->header_length) { + if (entry->remain == 0 && entry->recv_buffer.length == client->header_length) { int body_length; - body_length = client->get_body_length_func(&entry->buffer); + body_length = client->get_body_length_func(&entry->recv_buffer); if (body_length < 0) { logError("file: "__FILE__", line: %d, " "server: %s:%d, body_length: %d < 0", @@ -192,38 +297,41 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, result = EINVAL; break; } - if ((result=fast_buffer_check(&entry->buffer, body_length)) != 0) { + if ((result=fast_buffer_check(&entry->recv_buffer, body_length)) != 0) { break; } entry->remain = body_length; //to recv body } } + logInfo("file: "__FILE__", line: %d, " + "recv remain: %d", __LINE__, entry->remain); return result; } -static int fast_multi_sock_client_recv_data(FastMultiSockClient *client) +static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) { int result; int event; int count; int index; - time_t remain_time; + time_t remain_timeout; FastMultiSockEntry *entry; while (client->pulling_count > 0) { - remain_time = client->deadline_time - get_current_time(); - if (remain_time <= 0) { //timeout + remain_timeout = client->deadline_time - get_current_time(); + if (remain_timeout <= 0) { //timeout break; } - count = ioevent_poll_ex(&client->ioevent, remain_time * 1000); + count = ioevent_poll_ex(&client->ioevent, remain_timeout * 1000); + logInfo("poll count: %d\n", count); for (index=0; indexioevent, index); entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(&client->ioevent, index); if (event & IOEVENT_ERROR) { - logDebug("file: "__FILE__", line: %d, " + logError("file: "__FILE__", line: %d, " "server: %s:%d, recv error event: %d, " "connection reset", __LINE__, entry->conn->ip_addr, entry->conn->port, event); @@ -232,36 +340,48 @@ static int fast_multi_sock_client_recv_data(FastMultiSockClient *client) continue; } - result = fast_multi_sock_client_do_recv(client, entry); + logInfo("sock: %d, event: %d", entry->conn->sock, event); + + result = entry->io_callback(client, entry); if (result != 0 || entry->remain == 0) { fast_multi_sock_client_finish(client, entry, result); } } } + logInfo("file: "__FILE__", line: %d, pulling_count: %d, " + "success_count: %d\n", __LINE__, + client->pulling_count, client->success_count); if (client->pulling_count > 0) { int i; for (i=0; ientry_count; i++) { if (!client->entries[i].done) { fast_multi_sock_client_finish(client, client->entries + i, ETIMEDOUT); + logError("file: "__FILE__", line: %d, " + "recv from %s:%d timedout", + __LINE__, client->entries[i].conn->ip_addr, + client->entries[i].conn->port); } } } - return client->success_count > 0 ? 0 : ENOENT; + logInfo("file: "__FILE__", line: %d, pulling_count: %d, " + "success_count: %d\n", __LINE__, + client->pulling_count, client->success_count); + return client->success_count > 0 ? 0 : (remain_timeout > 0 ? ENOENT : ETIMEDOUT); } int fast_multi_sock_client_request(FastMultiSockClient *client, - FastBuffer *buffer) + FastBuffer *send_buffer) { int result; client->deadline_time = get_current_time() + client->timeout; client->pulling_count = 0; client->success_count = 0; - if ((result=fast_multi_sock_client_send_data(client, buffer)) != 0) { + if ((result=fast_multi_sock_client_send_data(client, send_buffer)) != 0) { return result; } - return fast_multi_sock_client_recv_data(client); + return fast_multi_sock_client_deal_io(client); } diff --git a/src/multi_socket_client.h b/src/multi_socket_client.h index 9afd95d..550dbf4 100644 --- a/src/multi_socket_client.h +++ b/src/multi_socket_client.h @@ -18,15 +18,27 @@ #include #include "common_define.h" #include "connection_pool.h" -#include "fast_timer.h" +#include "fast_buffer.h" #include "ioevent.h" +typedef enum { + fast_multi_sock_stage_send = 'S', + fast_multi_sock_stage_recv = 'R' +} FastMultiSockStage; + +struct fast_multi_sock_client; +struct fast_multi_sock_entry; + //return the body length -typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *buffer); +typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *recv_buffer); +typedef int (*fast_multi_sock_client_io_func)(struct fast_multi_sock_client *client, + struct fast_multi_sock_entry *entry); typedef struct fast_multi_sock_entry { ConnectionInfo *conn; - FastBuffer buffer; //recv buffer + FastBuffer *send_buffer; //send buffer for internal use + fast_multi_sock_client_io_func io_callback; //for internal use + FastBuffer recv_buffer; //recv buffer int remain; //remain bytes int error_no; //0 for success bool done; @@ -56,7 +68,7 @@ extern "C" { @param entry_count the count of socket entries @param header_length the header length of a package @param get_body_length_func the get body length function - @param init_buffer_size the initial size of response buffer + @param init_recv_buffer_size the initial size of response buffer @param timeout the timeout in seconds @return error no, 0 for success, != 0 fail */ @@ -64,7 +76,7 @@ extern "C" { FastMultiSockEntry *entries, const int entry_count, const int header_length, fast_multi_sock_client_get_body_length_func get_body_length_func, - const int init_buffer_size, const int timeout); + const int init_recv_buffer_size, const int timeout); /** destroy function @@ -76,11 +88,11 @@ extern "C" { /** request function @param client the client context - @param buffer the buffer to send + @param send_buffer the buffer to send @return error no, 0 for success, != 0 fail */ int fast_multi_sock_client_request(FastMultiSockClient *client, - FastBuffer *buffer); + FastBuffer *send_buffer); #ifdef __cplusplus }