add files: multi_socket_client.[hc]

pull/37/head
yuqing 2018-05-18 16:48:26 +08:00
parent 83d1eb2bde
commit 6c9b0b4401
9 changed files with 381 additions and 19 deletions

View File

@ -1,9 +1,10 @@
Version 1.38 2018-05-17 Version 1.38 2018-05-18
* connection_pool.c: set err_no to 0 when success * connection_pool.c: set err_no to 0 when success
* shared_func.h: add functions float2buff / buff2float, double2buff / buff2double * shared_func.h: add functions float2buff / buff2float, double2buff / buff2double
* logger.h: add function log_get_level_caption * logger.h: add function log_get_level_caption
* add files: common_blocked_queue.[hc] * add files: common_blocked_queue.[hc]
* add files: multi_socket_client.[hc]
Version 1.37 2018-02-24 Version 1.37 2018-02-24
* ini_file_reader.c function annotations LOCAL_IP_GET support index, such as: * ini_file_reader.c function annotations LOCAL_IP_GET support index, such as:

View File

@ -12,7 +12,8 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \
connection_pool.lo fast_mpool.lo fast_allocator.lo \ connection_pool.lo fast_mpool.lo fast_allocator.lo \
fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \ fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \
system_info.lo fast_blocked_queue.lo id_generator.lo \ system_info.lo fast_blocked_queue.lo id_generator.lo \
char_converter.lo char_convert_loader.lo common_blocked_queue.lo char_converter.lo char_convert_loader.lo common_blocked_queue.lo \
multi_socket_client.lo
FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
logger.o sockopt.o base64.o sched_thread.o \ logger.o sockopt.o base64.o sched_thread.o \
@ -22,7 +23,8 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
connection_pool.o fast_mpool.o fast_allocator.o \ connection_pool.o fast_mpool.o fast_allocator.o \
fast_buffer.o multi_skiplist.o flat_skiplist.o \ fast_buffer.o multi_skiplist.o flat_skiplist.o \
system_info.o fast_blocked_queue.o id_generator.o \ system_info.o fast_blocked_queue.o id_generator.o \
char_converter.o char_convert_loader.o common_blocked_queue.o char_converter.o char_convert_loader.o common_blocked_queue.o \
multi_socket_client.o
HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \
@ -33,7 +35,8 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h \ fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h \
skiplist_common.h system_info.h fast_blocked_queue.h \ skiplist_common.h system_info.h fast_blocked_queue.h \
php7_ext_wrapper.h id_generator.h char_converter.h \ php7_ext_wrapper.h id_generator.h char_converter.h \
char_convert_loader.h common_blocked_queue.h char_convert_loader.h common_blocked_queue.h \
multi_socket_client.h
ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS)

View File

@ -40,7 +40,7 @@ void fast_buffer_destroy(FastBuffer *buffer)
} }
} }
static int fast_buffer_check(FastBuffer *buffer, const int inc_len) int fast_buffer_check(FastBuffer *buffer, const int inc_len)
{ {
int alloc_size; int alloc_size;
char *buff; char *buff;

View File

@ -4,9 +4,9 @@
#include <stdint.h> #include <stdint.h>
typedef struct fast_buffer { typedef struct fast_buffer {
char *data; char *data;
int alloc_size; int alloc_size;
int length; int length;
} FastBuffer; } FastBuffer;
#ifdef __cplusplus #ifdef __cplusplus
@ -40,6 +40,8 @@ static inline void fast_buffer_reset(FastBuffer *buffer)
void fast_buffer_destroy(FastBuffer *buffer); void fast_buffer_destroy(FastBuffer *buffer);
int fast_buffer_check(FastBuffer *buffer, const int inc_len);
int fast_buffer_append(FastBuffer *buffer, const char *format, ...); int fast_buffer_append(FastBuffer *buffer, const char *format, ...);
int fast_buffer_append_buff(FastBuffer *buffer, const char *data, const int len); int fast_buffer_append_buff(FastBuffer *buffer, const char *data, const int len);

View File

@ -35,7 +35,7 @@ int flat_skiplist_init_ex(FlatSkiplist *sl, const int level_count,
return EINVAL; return EINVAL;
} }
if (level_count > 20) { if (level_count > 30) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"level count: %d is too large", "level count: %d is too large",
__LINE__, level_count); __LINE__, level_count);

View File

@ -74,39 +74,39 @@ typedef struct ioevent_puller {
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
#define IOEVENT_GET_EVENTS(ioevent, index) \ #define IOEVENT_GET_EVENTS(ioevent, index) \
ioevent->events[index].events (ioevent)->events[index].events
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \ #define IOEVENT_GET_EVENTS(ioevent, index) kqueue_ev_convert( \
ioevent->events[index].filter, ioevent->events[index].flags) (ioevent)->events[index].filter, (ioevent)->events[index].flags)
#elif IOEVENT_USE_PORT #elif IOEVENT_USE_PORT
#define IOEVENT_GET_EVENTS(ioevent, index) \ #define IOEVENT_GET_EVENTS(ioevent, index) \
ioevent->events[index].portev_events (ioevent)->events[index].portev_events
#else #else
#error port me #error port me
#endif #endif
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
#define IOEVENT_GET_DATA(ioevent, index) \ #define IOEVENT_GET_DATA(ioevent, index) \
ioevent->events[index].data.ptr (ioevent)->events[index].data.ptr
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#define IOEVENT_GET_DATA(ioevent, index) \ #define IOEVENT_GET_DATA(ioevent, index) \
ioevent->events[index].udata (ioevent)->events[index].udata
#elif IOEVENT_USE_PORT #elif IOEVENT_USE_PORT
#define IOEVENT_GET_DATA(ioevent, index) \ #define IOEVENT_GET_DATA(ioevent, index) \
ioevent->events[index].portev_user (ioevent)->events[index].portev_user
#else #else
#error port me #error port me
#endif #endif
#if IOEVENT_USE_EPOLL #if IOEVENT_USE_EPOLL
#define IOEVENT_CLEAR_DATA(ioevent, index) \ #define IOEVENT_CLEAR_DATA(ioevent, index) \
ioevent->events[index].data.ptr = NULL (ioevent)->events[index].data.ptr = NULL
#elif IOEVENT_USE_KQUEUE #elif IOEVENT_USE_KQUEUE
#define IOEVENT_CLEAR_DATA(ioevent, index) \ #define IOEVENT_CLEAR_DATA(ioevent, index) \
ioevent->events[index].udata = NULL (ioevent)->events[index].udata = NULL
#elif IOEVENT_USE_PORT #elif IOEVENT_USE_PORT
#define IOEVENT_CLEAR_DATA(ioevent, index) \ #define IOEVENT_CLEAR_DATA(ioevent, index) \
ioevent->events[index].portev_user = NULL (ioevent)->events[index].portev_user = NULL
#else #else
#error port me #error port me
#endif #endif

View File

@ -36,7 +36,7 @@ int multi_skiplist_init_ex(MultiSkiplist *sl, const int level_count,
return EINVAL; return EINVAL;
} }
if (level_count > 20) { if (level_count > 30) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"level count: %d is too large", "level count: %d is too large",
__LINE__, level_count); __LINE__, level_count);

267
src/multi_socket_client.c Normal file
View File

@ -0,0 +1,267 @@
#include "common_define.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include "logger.h"
#include "sockopt.h"
#include "sched_thread.h"
#include "fast_buffer.h"
#include "multi_socket_client.h"
int fast_multi_sock_client_init(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fast_multi_sock_client_get_body_length_func get_body_length_func,
const int init_buffer_size, const int timeout)
{
int result;
int new_init_buffer_size;
int i;
memset(client, 0, sizeof(FastMultiSockClient));
if (entry_count <= 0) {
logError("file: "__FILE__", line: %d, "
"invalid entry_count: %d <= 0",
__LINE__, entry_count);
return EINVAL;
}
if (header_length <= 0) {
logError("file: "__FILE__", line: %d, "
"invalid header_length: %d <= 0",
__LINE__, header_length);
return EINVAL;
}
if ((result=ioevent_init(&client->ioevent, entry_count,
timeout * 1000, 0)) != 0)
{
logError("file: "__FILE__", line: %d, "
"ioevent_init fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
return result;
}
if (init_buffer_size <= 0) {
new_init_buffer_size = 4 * 1024;
} else {
new_init_buffer_size = init_buffer_size;
}
if (new_init_buffer_size < header_length) {
new_init_buffer_size = header_length;
}
for (i=0; i<entry_count; i++) {
if ((result=fast_buffer_init_ex(&entries[i].buffer, new_init_buffer_size)) != 0) {
return result;
}
}
client->entry_count = entry_count;
client->header_length = header_length;
client->get_body_length_func = get_body_length_func;
client->entries = entries;
client->timeout = timeout;
return 0;
}
void fast_multi_sock_client_destroy(FastMultiSockClient *client)
{
int i;
ioevent_destroy(&client->ioevent);
for (i=0; i<client->entry_count; i++) {
fast_buffer_destroy(&client->entries[i].buffer);
}
}
static int fast_multi_sock_client_send_data(FastMultiSockClient *client,
FastBuffer *buffer)
{
int i;
for (i=0; i<client->entry_count; i++) {
client->entries[i].remain = client->header_length;
client->entries[i].done = false;
client->entries[i].buffer.length = 0;
client->entries[i].error_no = tcpsenddata(client->entries[i].conn->sock,
buffer->data, buffer->length, client->timeout);
if (client->entries[i].error_no != 0) {
client->entries[i].done = true;
continue;
}
client->entries[i].error_no = ioevent_attach(&client->ioevent,
client->entries[i].conn->sock, IOEVENT_READ,
client->entries + i);
if (client->entries[i].error_no != 0) {
int result;
client->entries[i].done = true;
result = client->entries[i].error_no;
logError("file: "__FILE__", line: %d, "
"ioevent_attach fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
continue;
}
client->pulling_count++;
}
return client->pulling_count > 0 ? 0 : ENOENT;
}
static inline void fast_multi_sock_client_finish(FastMultiSockClient *client,
FastMultiSockEntry *entry, const int error_no)
{
entry->error_no = error_no;
entry->done = true;
client->pulling_count--;
ioevent_detach(&client->ioevent, entry->conn->sock);
if (error_no == 0) {
client->success_count++;
}
}
static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
FastMultiSockEntry *entry)
{
int bytes;
int result;
result = 0;
while (entry->remain > 0) {
bytes = read(entry->conn->sock, entry->buffer.data +
entry->buffer.length, entry->remain);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"server: %s:%d, ignore interupt signal",
__LINE__, entry->conn->ip_addr,
entry->conn->port);
continue;
}
else {
result = errno != 0 ? errno : ECONNRESET;
logWarning("file: "__FILE__", line: %d, "
"server: %s:%d, recv failed, "
"errno: %d, error info: %s",
__LINE__, entry->conn->ip_addr,
entry->conn->port,
result, strerror(result));
break;
}
}
else if (bytes == 0) {
logDebug("file: "__FILE__", line: %d, "
"server: %s:%d, sock: %d, recv failed, "
"connection disconnected",
__LINE__, entry->conn->ip_addr, entry->conn->port,
entry->conn->sock);
result = ECONNRESET;
break;
}
entry->buffer.length += bytes;
entry->remain -= bytes;
if (entry->remain == 0 && entry->buffer.length == client->header_length) {
int body_length;
body_length = client->get_body_length_func(&entry->buffer);
if (body_length < 0) {
logError("file: "__FILE__", line: %d, "
"server: %s:%d, body_length: %d < 0",
__LINE__, entry->conn->ip_addr,
entry->conn->port, body_length);
result = EINVAL;
break;
}
if ((result=fast_buffer_check(&entry->buffer, body_length)) != 0) {
break;
}
entry->remain = body_length; //to recv body
}
}
return result;
}
static int fast_multi_sock_client_recv_data(FastMultiSockClient *client)
{
int result;
int event;
int count;
int index;
time_t remain_time;
FastMultiSockEntry *entry;
while (client->pulling_count > 0) {
remain_time = client->deadline_time - get_current_time();
if (remain_time <= 0) { //timeout
break;
}
count = ioevent_poll_ex(&client->ioevent, remain_time * 1000);
for (index=0; index<count; index++) {
event = IOEVENT_GET_EVENTS(&client->ioevent, index);
entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(&client->ioevent, index);
if (event & IOEVENT_ERROR) {
logDebug("file: "__FILE__", line: %d, "
"server: %s:%d, recv error event: %d, "
"connection reset", __LINE__,
entry->conn->ip_addr, entry->conn->port, event);
fast_multi_sock_client_finish(client, entry, ECONNRESET);
continue;
}
result = fast_multi_sock_client_do_recv(client, entry);
if (result != 0 || entry->remain == 0) {
fast_multi_sock_client_finish(client, entry, result);
}
}
}
if (client->pulling_count > 0) {
int i;
for (i=0; i<client->entry_count; i++) {
if (!client->entries[i].done) {
fast_multi_sock_client_finish(client, client->entries + i, ETIMEDOUT);
}
}
}
return client->success_count > 0 ? 0 : ENOENT;
}
int fast_multi_sock_client_request(FastMultiSockClient *client,
FastBuffer *buffer)
{
int result;
client->deadline_time = get_current_time() + client->timeout;
client->pulling_count = 0;
client->success_count = 0;
if ((result=fast_multi_sock_client_send_data(client, buffer)) != 0) {
return result;
}
return fast_multi_sock_client_recv_data(client);
}

89
src/multi_socket_client.h Normal file
View File

@ -0,0 +1,89 @@
/**
* Copyright (C) 2018 Happy Fish / YuQing
*
* FastDFS may be copied only under the terms of the GNU General
* Public License V3, which may be found in the FastDFS source kit.
* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.
**/
//multi_socket_client.h
#ifndef _MULTI_SOCKET_CLIENT_H_
#define _MULTI_SOCKET_CLIENT_H_
#include <net/if.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include "common_define.h"
#include "connection_pool.h"
#include "fast_timer.h"
#include "ioevent.h"
//return the body length
typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *buffer);
typedef struct fast_multi_sock_entry {
ConnectionInfo *conn;
FastBuffer buffer; //recv buffer
int remain; //remain bytes
int error_no; //0 for success
bool done;
} FastMultiSockEntry;
typedef struct fast_multi_sock_client {
int entry_count;
int header_length; //package header size
int pulling_count;
int success_count;
int timeout;
time_t deadline_time;
FastMultiSockEntry *entries;
fast_multi_sock_client_get_body_length_func get_body_length_func;
IOEventPoller ioevent;
} FastMultiSockClient;
#ifdef __cplusplus
extern "C" {
#endif
/**
init function
@param client the client context
@param entries the socket entries
@param entry_count the count of socket entries
@param header_length the header length of a package
@param get_body_length_func the get body length function
@param init_buffer_size the initial size of response buffer
@param timeout the timeout in seconds
@return error no, 0 for success, != 0 fail
*/
int fast_multi_sock_client_init(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fast_multi_sock_client_get_body_length_func get_body_length_func,
const int init_buffer_size, const int timeout);
/**
destroy function
@param client the client context
@return none
*/
void fast_multi_sock_client_destroy(FastMultiSockClient *client);
/**
request function
@param client the client context
@param buffer the buffer to send
@return error no, 0 for success, != 0 fail
*/
int fast_multi_sock_client_request(FastMultiSockClient *client,
FastBuffer *buffer);
#ifdef __cplusplus
}
#endif
#endif