ioevent.[hc]: remove care_events in FreeBSD or MacOS

pull/37/head
yuqing 2018-05-21 19:04:02 +08:00
parent 6c9b0b4401
commit 03037c5d5f
5 changed files with 189 additions and 65 deletions

View File

@ -1,10 +1,11 @@
Version 1.38 2018-05-18
Version 1.38 2018-05-21
* connection_pool.c: set err_no to 0 when success
* shared_func.h: add functions float2buff / buff2float, double2buff / buff2double
* logger.h: add function log_get_level_caption
* add files: common_blocked_queue.[hc]
* add files: multi_socket_client.[hc]
* ioevent.[hc]: remove care_events in FreeBSD or MacOS
Version 1.37 2018-02-24
* ini_file_reader.c function annotations LOCAL_IP_GET support index, such as:

View File

@ -47,7 +47,6 @@ int ioevent_init(IOEventPoller *ioevent, const int size,
ioevent->poll_fd = kqueue();
bytes = sizeof(struct kevent) * size;
ioevent->events = (struct kevent *)malloc(bytes);
ioevent->care_events = 0;
#elif IOEVENT_USE_PORT
ioevent->poll_fd = port_create();
bytes = sizeof(port_event_t) * size;
@ -93,7 +92,6 @@ int ioevent_attach(IOEventPoller *ioevent, const int fd, const int e,
if (e & IOEVENT_WRITE) {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | ioevent->extra_events, 0, 0, data);
}
ioevent->care_events = e;
return kevent(ioevent->poll_fd, ev, n, NULL, 0, NULL);
#elif IOEVENT_USE_PORT
return port_associate(ioevent->poll_fd, PORT_SOURCE_FD, fd, e, data);
@ -112,21 +110,21 @@ int ioevent_modify(IOEventPoller *ioevent, const int fd, const int e,
#elif IOEVENT_USE_KQUEUE
struct kevent ev[2];
int n = 0;
if (e & IOEVENT_READ) {
EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | ioevent->extra_events, 0, 0, data);
}
else if ((ioevent->care_events & IOEVENT_READ)) {
else {
EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
}
if (e & IOEVENT_WRITE) {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | ioevent->extra_events, 0, 0, data);
}
else if ((ioevent->care_events & IOEVENT_WRITE)) {
else {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
}
ioevent->care_events = e;
if (n > 0) {
return kevent(ioevent->poll_fd, ev, n, NULL, 0, NULL);
}
@ -143,22 +141,16 @@ int ioevent_detach(IOEventPoller *ioevent, const int fd)
#if IOEVENT_USE_EPOLL
return epoll_ctl(ioevent->poll_fd, EPOLL_CTL_DEL, fd, NULL);
#elif IOEVENT_USE_KQUEUE
struct kevent ev[2];
int n = 0;
if ((ioevent->care_events & IOEVENT_READ)) {
EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
}
if ((ioevent->care_events & IOEVENT_WRITE)) {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
}
struct kevent ev[1];
int r, w;
ioevent->care_events = 0;
if (n > 0) {
return kevent(ioevent->poll_fd, ev, n, NULL, 0, NULL);
}
else {
return 0;
}
EV_SET(&ev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
r = kevent(ioevent->poll_fd, ev, 1, NULL, 0, NULL);
EV_SET(&ev[0], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
w = kevent(ioevent->poll_fd, ev, 1, NULL, 0, NULL);
return (r == 0 || w == 0) ? 0 : r;
#elif IOEVENT_USE_PORT
return port_dissociate(ioevent->poll_fd, PORT_SOURCE_FD, fd);
#endif

View File

@ -65,7 +65,6 @@ typedef struct ioevent_puller {
#elif IOEVENT_USE_KQUEUE
struct kevent *events;
struct timespec timeout;
int care_events;
#elif IOEVENT_USE_PORT
port_event_t *events;
timespec_t timeout;

View File

@ -16,14 +16,17 @@
#include "fast_buffer.h"
#include "multi_socket_client.h"
static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
FastMultiSockEntry *entry);
int fast_multi_sock_client_init(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fast_multi_sock_client_get_body_length_func get_body_length_func,
const int init_buffer_size, const int timeout)
const int init_recv_buffer_size, const int timeout)
{
int result;
int new_init_buffer_size;
int new_init_recv_buffer_size;
int i;
memset(client, 0, sizeof(FastMultiSockClient));
@ -50,18 +53,20 @@ int fast_multi_sock_client_init(FastMultiSockClient *client,
return result;
}
if (init_buffer_size <= 0) {
new_init_buffer_size = 4 * 1024;
if (init_recv_buffer_size <= 0) {
new_init_recv_buffer_size = 4 * 1024;
} else {
new_init_buffer_size = init_buffer_size;
new_init_recv_buffer_size = init_recv_buffer_size;
}
if (new_init_buffer_size < header_length) {
new_init_buffer_size = header_length;
if (new_init_recv_buffer_size < header_length) {
new_init_recv_buffer_size = header_length;
}
for (i=0; i<entry_count; i++) {
if ((result=fast_buffer_init_ex(&entries[i].buffer, new_init_buffer_size)) != 0) {
if ((result=fast_buffer_init_ex(&entries[i].recv_buffer,
new_init_recv_buffer_size)) != 0)
{
return result;
}
}
@ -81,33 +86,132 @@ void fast_multi_sock_client_destroy(FastMultiSockClient *client)
ioevent_destroy(&client->ioevent);
for (i=0; i<client->entry_count; i++) {
fast_buffer_destroy(&client->entries[i].buffer);
fast_buffer_destroy(&client->entries[i].recv_buffer);
}
}
static int fast_multi_sock_client_do_send(FastMultiSockClient *client,
FastMultiSockEntry *entry)
{
int bytes;
int result;
logInfo("file: "__FILE__", line: %d, "
"send remain: %d", __LINE__, entry->remain);
result = 0;
while (entry->remain > 0) {
bytes = write(entry->conn->sock, entry->send_buffer->data +
(entry->send_buffer->length - entry->remain), entry->remain);
logInfo("sock: %d, write bytes: %d, remain: %d", entry->conn->sock, bytes, entry->remain);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"server: %s:%d, ignore interupt signal",
__LINE__, entry->conn->ip_addr,
entry->conn->port);
continue;
}
else {
result = errno != 0 ? errno : ECONNRESET;
logError("file: "__FILE__", line: %d, "
"send to server %s:%d fail, "
"errno: %d, error info: %s",
__LINE__, entry->conn->ip_addr,
entry->conn->port,
result, strerror(result));
break;
}
}
else if (bytes == 0) {
logError("file: "__FILE__", line: %d, "
"send to server %s:%d, sock: %d fail, "
"connection disconnected",
__LINE__, entry->conn->ip_addr, entry->conn->port,
entry->conn->sock);
result = ECONNRESET;
break;
}
entry->remain -= bytes;
if (entry->remain == 0) {
entry->remain = client->header_length; //to recv pkg header
entry->io_callback = fast_multi_sock_client_do_recv;
if ((result=ioevent_modify(&client->ioevent,
entry->conn->sock, IOEVENT_READ, entry)) != 0)
{
logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
break;
}
}
return result;
}
static int fast_multi_sock_client_send_data(FastMultiSockClient *client,
FastBuffer *buffer)
FastBuffer *send_buffer)
{
int i;
int result;
int remain_timeout;
remain_timeout = 0;
for (i=0; i<client->entry_count; i++) {
client->entries[i].remain = client->header_length;
client->entries[i].remain = send_buffer->length;
client->entries[i].done = false;
client->entries[i].buffer.length = 0;
client->entries[i].recv_buffer.length = 0;
client->entries[i].send_buffer = send_buffer;
client->entries[i].io_callback = fast_multi_sock_client_do_send;
if (client->entries[i].conn->sock < 0) {
client->entries[i].error_no = ENOTCONN;
client->entries[i].done = true;
logError("file: "__FILE__", line: %d, "
"NOT connected to %s:%d",
__LINE__, client->entries[i].conn->ip_addr,
client->entries[i].conn->port);
continue;
}
/*
remain_timeout = client->deadline_time - get_current_time();
if (remain_timeout <= 0) {
client->entries[i].error_no = ETIMEDOUT;
client->entries[i].done = true;
logError("file: "__FILE__", line: %d, "
"tcpsenddata to %s:%d timedout",
__LINE__, client->entries[i].conn->ip_addr,
client->entries[i].conn->port);
continue;
}
client->entries[i].error_no = tcpsenddata(client->entries[i].conn->sock,
buffer->data, buffer->length, client->timeout);
if (client->entries[i].error_no != 0) {
client->entries[i].done = true;
result = client->entries[i].error_no;
logError("file: "__FILE__", line: %d, "
"tcpsenddata to %s:%d fail, "
"errno: %d, error info: %s",
__LINE__, client->entries[i].conn->ip_addr,
client->entries[i].conn->port,
result, STRERROR(result));
continue;
}
*/
client->entries[i].error_no = ioevent_attach(&client->ioevent,
client->entries[i].conn->sock, IOEVENT_READ,
client->entries[i].conn->sock, IOEVENT_WRITE,
client->entries + i);
if (client->entries[i].error_no != 0) {
int result;
client->entries[i].done = true;
result = client->entries[i].error_no;
logError("file: "__FILE__", line: %d, "
@ -119,7 +223,7 @@ static int fast_multi_sock_client_send_data(FastMultiSockClient *client,
client->pulling_count++;
}
return client->pulling_count > 0 ? 0 : ENOENT;
return client->pulling_count > 0 ? 0 : (remain_timeout > 0 ? ENOENT : ETIMEDOUT);
}
static inline void fast_multi_sock_client_finish(FastMultiSockClient *client,
@ -134,17 +238,18 @@ static inline void fast_multi_sock_client_finish(FastMultiSockClient *client,
}
}
static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
FastMultiSockEntry *entry)
{
int bytes;
int result;
logInfo("file: "__FILE__", line: %d, "
"recv remain: %d", __LINE__, entry->remain);
result = 0;
while (entry->remain > 0) {
bytes = read(entry->conn->sock, entry->buffer.data +
entry->buffer.length, entry->remain);
bytes = read(entry->conn->sock, entry->recv_buffer.data +
entry->recv_buffer.length, entry->remain);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
@ -158,7 +263,7 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
}
else {
result = errno != 0 ? errno : ECONNRESET;
logWarning("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"server: %s:%d, recv failed, "
"errno: %d, error info: %s",
__LINE__, entry->conn->ip_addr,
@ -169,7 +274,7 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
}
}
else if (bytes == 0) {
logDebug("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"server: %s:%d, sock: %d, recv failed, "
"connection disconnected",
__LINE__, entry->conn->ip_addr, entry->conn->port,
@ -179,11 +284,11 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
break;
}
entry->buffer.length += bytes;
entry->recv_buffer.length += bytes;
entry->remain -= bytes;
if (entry->remain == 0 && entry->buffer.length == client->header_length) {
if (entry->remain == 0 && entry->recv_buffer.length == client->header_length) {
int body_length;
body_length = client->get_body_length_func(&entry->buffer);
body_length = client->get_body_length_func(&entry->recv_buffer);
if (body_length < 0) {
logError("file: "__FILE__", line: %d, "
"server: %s:%d, body_length: %d < 0",
@ -192,38 +297,41 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
result = EINVAL;
break;
}
if ((result=fast_buffer_check(&entry->buffer, body_length)) != 0) {
if ((result=fast_buffer_check(&entry->recv_buffer, body_length)) != 0) {
break;
}
entry->remain = body_length; //to recv body
}
}
logInfo("file: "__FILE__", line: %d, "
"recv remain: %d", __LINE__, entry->remain);
return result;
}
static int fast_multi_sock_client_recv_data(FastMultiSockClient *client)
static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
{
int result;
int event;
int count;
int index;
time_t remain_time;
time_t remain_timeout;
FastMultiSockEntry *entry;
while (client->pulling_count > 0) {
remain_time = client->deadline_time - get_current_time();
if (remain_time <= 0) { //timeout
remain_timeout = client->deadline_time - get_current_time();
if (remain_timeout <= 0) { //timeout
break;
}
count = ioevent_poll_ex(&client->ioevent, remain_time * 1000);
count = ioevent_poll_ex(&client->ioevent, remain_timeout * 1000);
logInfo("poll count: %d\n", count);
for (index=0; index<count; index++) {
event = IOEVENT_GET_EVENTS(&client->ioevent, index);
entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(&client->ioevent, index);
if (event & IOEVENT_ERROR) {
logDebug("file: "__FILE__", line: %d, "
logError("file: "__FILE__", line: %d, "
"server: %s:%d, recv error event: %d, "
"connection reset", __LINE__,
entry->conn->ip_addr, entry->conn->port, event);
@ -232,36 +340,48 @@ static int fast_multi_sock_client_recv_data(FastMultiSockClient *client)
continue;
}
result = fast_multi_sock_client_do_recv(client, entry);
logInfo("sock: %d, event: %d", entry->conn->sock, event);
result = entry->io_callback(client, entry);
if (result != 0 || entry->remain == 0) {
fast_multi_sock_client_finish(client, entry, result);
}
}
}
logInfo("file: "__FILE__", line: %d, pulling_count: %d, "
"success_count: %d\n", __LINE__,
client->pulling_count, client->success_count);
if (client->pulling_count > 0) {
int i;
for (i=0; i<client->entry_count; i++) {
if (!client->entries[i].done) {
fast_multi_sock_client_finish(client, client->entries + i, ETIMEDOUT);
logError("file: "__FILE__", line: %d, "
"recv from %s:%d timedout",
__LINE__, client->entries[i].conn->ip_addr,
client->entries[i].conn->port);
}
}
}
return client->success_count > 0 ? 0 : ENOENT;
logInfo("file: "__FILE__", line: %d, pulling_count: %d, "
"success_count: %d\n", __LINE__,
client->pulling_count, client->success_count);
return client->success_count > 0 ? 0 : (remain_timeout > 0 ? ENOENT : ETIMEDOUT);
}
int fast_multi_sock_client_request(FastMultiSockClient *client,
FastBuffer *buffer)
FastBuffer *send_buffer)
{
int result;
client->deadline_time = get_current_time() + client->timeout;
client->pulling_count = 0;
client->success_count = 0;
if ((result=fast_multi_sock_client_send_data(client, buffer)) != 0) {
if ((result=fast_multi_sock_client_send_data(client, send_buffer)) != 0) {
return result;
}
return fast_multi_sock_client_recv_data(client);
return fast_multi_sock_client_deal_io(client);
}

View File

@ -18,15 +18,27 @@
#include <sys/socket.h>
#include "common_define.h"
#include "connection_pool.h"
#include "fast_timer.h"
#include "fast_buffer.h"
#include "ioevent.h"
typedef enum {
fast_multi_sock_stage_send = 'S',
fast_multi_sock_stage_recv = 'R'
} FastMultiSockStage;
struct fast_multi_sock_client;
struct fast_multi_sock_entry;
//return the body length
typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *buffer);
typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *recv_buffer);
typedef int (*fast_multi_sock_client_io_func)(struct fast_multi_sock_client *client,
struct fast_multi_sock_entry *entry);
typedef struct fast_multi_sock_entry {
ConnectionInfo *conn;
FastBuffer buffer; //recv buffer
FastBuffer *send_buffer; //send buffer for internal use
fast_multi_sock_client_io_func io_callback; //for internal use
FastBuffer recv_buffer; //recv buffer
int remain; //remain bytes
int error_no; //0 for success
bool done;
@ -56,7 +68,7 @@ extern "C" {
@param entry_count the count of socket entries
@param header_length the header length of a package
@param get_body_length_func the get body length function
@param init_buffer_size the initial size of response buffer
@param init_recv_buffer_size the initial size of response buffer
@param timeout the timeout in seconds
@return error no, 0 for success, != 0 fail
*/
@ -64,7 +76,7 @@ extern "C" {
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fast_multi_sock_client_get_body_length_func get_body_length_func,
const int init_buffer_size, const int timeout);
const int init_recv_buffer_size, const int timeout);
/**
destroy function
@ -76,11 +88,11 @@ extern "C" {
/**
request function
@param client the client context
@param buffer the buffer to send
@param send_buffer the buffer to send
@return error no, 0 for success, != 0 fail
*/
int fast_multi_sock_client_request(FastMultiSockClient *client,
FastBuffer *buffer);
FastBuffer *send_buffer);
#ifdef __cplusplus
}