From 6c9b0b44018b9e9e15e65c2e793d48bf5d56e62e Mon Sep 17 00:00:00 2001 From: yuqing Date: Fri, 18 May 2018 16:48:26 +0800 Subject: [PATCH] add files: multi_socket_client.[hc] --- HISTORY | 3 +- src/Makefile.in | 9 +- src/fast_buffer.c | 2 +- src/fast_buffer.h | 8 +- src/flat_skiplist.c | 2 +- src/ioevent.h | 18 +-- src/multi_skiplist.c | 2 +- src/multi_socket_client.c | 267 ++++++++++++++++++++++++++++++++++++++ src/multi_socket_client.h | 89 +++++++++++++ 9 files changed, 381 insertions(+), 19 deletions(-) create mode 100644 src/multi_socket_client.c create mode 100644 src/multi_socket_client.h diff --git a/HISTORY b/HISTORY index a33f34e..c681678 100644 --- a/HISTORY +++ b/HISTORY @@ -1,9 +1,10 @@ -Version 1.38 2018-05-17 +Version 1.38 2018-05-18 * connection_pool.c: set err_no to 0 when success * shared_func.h: add functions float2buff / buff2float, double2buff / buff2double * logger.h: add function log_get_level_caption * add files: common_blocked_queue.[hc] + * add files: multi_socket_client.[hc] Version 1.37 2018-02-24 * ini_file_reader.c function annotations LOCAL_IP_GET support index, such as: diff --git a/src/Makefile.in b/src/Makefile.in index 92dbd9c..2e029cb 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -12,7 +12,8 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \ connection_pool.lo fast_mpool.lo fast_allocator.lo \ fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \ system_info.lo fast_blocked_queue.lo id_generator.lo \ - char_converter.lo char_convert_loader.lo common_blocked_queue.lo + char_converter.lo char_convert_loader.lo common_blocked_queue.lo \ + multi_socket_client.lo FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ logger.o sockopt.o base64.o sched_thread.o \ @@ -22,7 +23,8 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ connection_pool.o fast_mpool.o fast_allocator.o \ fast_buffer.o multi_skiplist.o flat_skiplist.o \ system_info.o fast_blocked_queue.o id_generator.o \ - char_converter.o char_convert_loader.o common_blocked_queue.o + char_converter.o char_convert_loader.o common_blocked_queue.o \ + multi_socket_client.o HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ @@ -33,7 +35,8 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h \ skiplist_common.h system_info.h fast_blocked_queue.h \ php7_ext_wrapper.h id_generator.h char_converter.h \ - char_convert_loader.h common_blocked_queue.h + char_convert_loader.h common_blocked_queue.h \ + multi_socket_client.h ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) diff --git a/src/fast_buffer.c b/src/fast_buffer.c index 9ffa6c8..24955a1 100644 --- a/src/fast_buffer.c +++ b/src/fast_buffer.c @@ -40,7 +40,7 @@ void fast_buffer_destroy(FastBuffer *buffer) } } -static int fast_buffer_check(FastBuffer *buffer, const int inc_len) +int fast_buffer_check(FastBuffer *buffer, const int inc_len) { int alloc_size; char *buff; diff --git a/src/fast_buffer.h b/src/fast_buffer.h index e1a1e92..7f7ee4c 100644 --- a/src/fast_buffer.h +++ b/src/fast_buffer.h @@ -4,9 +4,9 @@ #include typedef struct fast_buffer { - char *data; - int alloc_size; - int length; + char *data; + int alloc_size; + int length; } FastBuffer; #ifdef __cplusplus @@ -40,6 +40,8 @@ static inline void fast_buffer_reset(FastBuffer *buffer) void fast_buffer_destroy(FastBuffer *buffer); +int fast_buffer_check(FastBuffer *buffer, const int inc_len); + int fast_buffer_append(FastBuffer *buffer, const char *format, ...); int fast_buffer_append_buff(FastBuffer *buffer, const char *data, const int len); diff --git a/src/flat_skiplist.c b/src/flat_skiplist.c index ae0ff2b..8fbb8d3 100644 --- a/src/flat_skiplist.c +++ b/src/flat_skiplist.c @@ -35,7 +35,7 @@ int flat_skiplist_init_ex(FlatSkiplist *sl, const int level_count, return EINVAL; } - if (level_count > 20) { + if (level_count > 30) { logError("file: "__FILE__", line: %d, " "level count: %d is too large", __LINE__, level_count); diff --git a/src/ioevent.h b/src/ioevent.h index 797f037..c76bd63 100644 --- a/src/ioevent.h +++ b/src/ioevent.h @@ -74,39 +74,39 @@ typedef struct ioevent_puller { #if IOEVENT_USE_EPOLL #define IOEVENT_GET_EVENTS(ioevent, index) \ - ioevent->events[index].events + (ioevent)->events[index].events #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \ - ioevent->events[index].filter, ioevent->events[index].flags) + (ioevent)->events[index].filter, (ioevent)->events[index].flags) #elif IOEVENT_USE_PORT #define IOEVENT_GET_EVENTS(ioevent, index) \ - ioevent->events[index].portev_events + (ioevent)->events[index].portev_events #else #error port me #endif #if IOEVENT_USE_EPOLL #define IOEVENT_GET_DATA(ioevent, index) \ - ioevent->events[index].data.ptr + (ioevent)->events[index].data.ptr #elif IOEVENT_USE_KQUEUE #define IOEVENT_GET_DATA(ioevent, index) \ - ioevent->events[index].udata + (ioevent)->events[index].udata #elif IOEVENT_USE_PORT #define IOEVENT_GET_DATA(ioevent, index) \ - ioevent->events[index].portev_user + (ioevent)->events[index].portev_user #else #error port me #endif #if IOEVENT_USE_EPOLL #define IOEVENT_CLEAR_DATA(ioevent, index) \ - ioevent->events[index].data.ptr = NULL + (ioevent)->events[index].data.ptr = NULL #elif IOEVENT_USE_KQUEUE #define IOEVENT_CLEAR_DATA(ioevent, index) \ - ioevent->events[index].udata = NULL + (ioevent)->events[index].udata = NULL #elif IOEVENT_USE_PORT #define IOEVENT_CLEAR_DATA(ioevent, index) \ - ioevent->events[index].portev_user = NULL + (ioevent)->events[index].portev_user = NULL #else #error port me #endif diff --git a/src/multi_skiplist.c b/src/multi_skiplist.c index 1432cb4..0003427 100644 --- a/src/multi_skiplist.c +++ b/src/multi_skiplist.c @@ -36,7 +36,7 @@ int multi_skiplist_init_ex(MultiSkiplist *sl, const int level_count, return EINVAL; } - if (level_count > 20) { + if (level_count > 30) { logError("file: "__FILE__", line: %d, " "level count: %d is too large", __LINE__, level_count); diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c new file mode 100644 index 0000000..b316231 --- /dev/null +++ b/src/multi_socket_client.c @@ -0,0 +1,267 @@ +#include "common_define.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "logger.h" +#include "sockopt.h" +#include "sched_thread.h" +#include "fast_buffer.h" +#include "multi_socket_client.h" + +int fast_multi_sock_client_init(FastMultiSockClient *client, + FastMultiSockEntry *entries, const int entry_count, + const int header_length, + fast_multi_sock_client_get_body_length_func get_body_length_func, + const int init_buffer_size, const int timeout) +{ + int result; + int new_init_buffer_size; + int i; + + memset(client, 0, sizeof(FastMultiSockClient)); + if (entry_count <= 0) { + logError("file: "__FILE__", line: %d, " + "invalid entry_count: %d <= 0", + __LINE__, entry_count); + return EINVAL; + } + + if (header_length <= 0) { + logError("file: "__FILE__", line: %d, " + "invalid header_length: %d <= 0", + __LINE__, header_length); + return EINVAL; + } + + if ((result=ioevent_init(&client->ioevent, entry_count, + timeout * 1000, 0)) != 0) + { + logError("file: "__FILE__", line: %d, " + "ioevent_init fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + if (init_buffer_size <= 0) { + new_init_buffer_size = 4 * 1024; + } else { + new_init_buffer_size = init_buffer_size; + } + + if (new_init_buffer_size < header_length) { + new_init_buffer_size = header_length; + } + + for (i=0; ientry_count = entry_count; + client->header_length = header_length; + client->get_body_length_func = get_body_length_func; + client->entries = entries; + client->timeout = timeout; + + return 0; +} + +void fast_multi_sock_client_destroy(FastMultiSockClient *client) +{ + int i; + + ioevent_destroy(&client->ioevent); + for (i=0; ientry_count; i++) { + fast_buffer_destroy(&client->entries[i].buffer); + } +} + +static int fast_multi_sock_client_send_data(FastMultiSockClient *client, + FastBuffer *buffer) +{ + int i; + + for (i=0; ientry_count; i++) { + client->entries[i].remain = client->header_length; + client->entries[i].done = false; + client->entries[i].buffer.length = 0; + + client->entries[i].error_no = tcpsenddata(client->entries[i].conn->sock, + buffer->data, buffer->length, client->timeout); + if (client->entries[i].error_no != 0) { + client->entries[i].done = true; + continue; + } + + client->entries[i].error_no = ioevent_attach(&client->ioevent, + client->entries[i].conn->sock, IOEVENT_READ, + client->entries + i); + if (client->entries[i].error_no != 0) { + int result; + + client->entries[i].done = true; + result = client->entries[i].error_no; + logError("file: "__FILE__", line: %d, " + "ioevent_attach fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + continue; + } + + client->pulling_count++; + } + + return client->pulling_count > 0 ? 0 : ENOENT; +} + +static inline void fast_multi_sock_client_finish(FastMultiSockClient *client, + FastMultiSockEntry *entry, const int error_no) +{ + entry->error_no = error_no; + entry->done = true; + client->pulling_count--; + ioevent_detach(&client->ioevent, entry->conn->sock); + if (error_no == 0) { + client->success_count++; + } +} + + +static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, + FastMultiSockEntry *entry) +{ + int bytes; + int result; + + result = 0; + while (entry->remain > 0) { + bytes = read(entry->conn->sock, entry->buffer.data + + entry->buffer.length, entry->remain); + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "server: %s:%d, ignore interupt signal", + __LINE__, entry->conn->ip_addr, + entry->conn->port); + continue; + } + else { + result = errno != 0 ? errno : ECONNRESET; + logWarning("file: "__FILE__", line: %d, " + "server: %s:%d, recv failed, " + "errno: %d, error info: %s", + __LINE__, entry->conn->ip_addr, + entry->conn->port, + result, strerror(result)); + + break; + } + } + else if (bytes == 0) { + logDebug("file: "__FILE__", line: %d, " + "server: %s:%d, sock: %d, recv failed, " + "connection disconnected", + __LINE__, entry->conn->ip_addr, entry->conn->port, + entry->conn->sock); + + result = ECONNRESET; + break; + } + + entry->buffer.length += bytes; + entry->remain -= bytes; + if (entry->remain == 0 && entry->buffer.length == client->header_length) { + int body_length; + body_length = client->get_body_length_func(&entry->buffer); + if (body_length < 0) { + logError("file: "__FILE__", line: %d, " + "server: %s:%d, body_length: %d < 0", + __LINE__, entry->conn->ip_addr, + entry->conn->port, body_length); + result = EINVAL; + break; + } + if ((result=fast_buffer_check(&entry->buffer, body_length)) != 0) { + break; + } + entry->remain = body_length; //to recv body + } + } + + return result; +} + +static int fast_multi_sock_client_recv_data(FastMultiSockClient *client) +{ + int result; + int event; + int count; + int index; + time_t remain_time; + FastMultiSockEntry *entry; + + while (client->pulling_count > 0) { + remain_time = client->deadline_time - get_current_time(); + if (remain_time <= 0) { //timeout + break; + } + + count = ioevent_poll_ex(&client->ioevent, remain_time * 1000); + for (index=0; indexioevent, index); + entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(&client->ioevent, index); + + if (event & IOEVENT_ERROR) { + logDebug("file: "__FILE__", line: %d, " + "server: %s:%d, recv error event: %d, " + "connection reset", __LINE__, + entry->conn->ip_addr, entry->conn->port, event); + + fast_multi_sock_client_finish(client, entry, ECONNRESET); + continue; + } + + result = fast_multi_sock_client_do_recv(client, entry); + if (result != 0 || entry->remain == 0) { + fast_multi_sock_client_finish(client, entry, result); + } + } + } + + if (client->pulling_count > 0) { + int i; + for (i=0; ientry_count; i++) { + if (!client->entries[i].done) { + fast_multi_sock_client_finish(client, client->entries + i, ETIMEDOUT); + } + } + } + + return client->success_count > 0 ? 0 : ENOENT; +} + +int fast_multi_sock_client_request(FastMultiSockClient *client, + FastBuffer *buffer) +{ + int result; + + client->deadline_time = get_current_time() + client->timeout; + client->pulling_count = 0; + client->success_count = 0; + if ((result=fast_multi_sock_client_send_data(client, buffer)) != 0) { + return result; + } + + return fast_multi_sock_client_recv_data(client); +} diff --git a/src/multi_socket_client.h b/src/multi_socket_client.h new file mode 100644 index 0000000..9afd95d --- /dev/null +++ b/src/multi_socket_client.h @@ -0,0 +1,89 @@ +/** +* Copyright (C) 2018 Happy Fish / YuQing +* +* FastDFS may be copied only under the terms of the GNU General +* Public License V3, which may be found in the FastDFS source kit. +* Please visit the FastDFS Home Page http://www.csource.org/ for more detail. +**/ + +//multi_socket_client.h + +#ifndef _MULTI_SOCKET_CLIENT_H_ +#define _MULTI_SOCKET_CLIENT_H_ + +#include +#include +#include +#include +#include +#include "common_define.h" +#include "connection_pool.h" +#include "fast_timer.h" +#include "ioevent.h" + +//return the body length +typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *buffer); + +typedef struct fast_multi_sock_entry { + ConnectionInfo *conn; + FastBuffer buffer; //recv buffer + int remain; //remain bytes + int error_no; //0 for success + bool done; +} FastMultiSockEntry; + +typedef struct fast_multi_sock_client { + int entry_count; + int header_length; //package header size + int pulling_count; + int success_count; + int timeout; + time_t deadline_time; + FastMultiSockEntry *entries; + fast_multi_sock_client_get_body_length_func get_body_length_func; + IOEventPoller ioevent; +} FastMultiSockClient; + +#ifdef __cplusplus +extern "C" { +#endif + + + /** + init function + @param client the client context + @param entries the socket entries + @param entry_count the count of socket entries + @param header_length the header length of a package + @param get_body_length_func the get body length function + @param init_buffer_size the initial size of response buffer + @param timeout the timeout in seconds + @return error no, 0 for success, != 0 fail + */ + int fast_multi_sock_client_init(FastMultiSockClient *client, + FastMultiSockEntry *entries, const int entry_count, + const int header_length, + fast_multi_sock_client_get_body_length_func get_body_length_func, + const int init_buffer_size, const int timeout); + + /** + destroy function + @param client the client context + @return none + */ + void fast_multi_sock_client_destroy(FastMultiSockClient *client); + + /** + request function + @param client the client context + @param buffer the buffer to send + @return error no, 0 for success, != 0 fail + */ + int fast_multi_sock_client_request(FastMultiSockClient *client, + FastBuffer *buffer); + +#ifdef __cplusplus +} +#endif + +#endif