multi_socket_client support timeout in ms

pull/37/head
YuQing 2020-02-22 15:53:50 +08:00
parent fd4368e6e2
commit faa1b6ddf2
3 changed files with 83 additions and 43 deletions

View File

@ -19,11 +19,17 @@
static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
FastMultiSockEntry *entry);
int fast_multi_sock_client_init(FastMultiSockClient *client,
static int64_t fms_get_current_time_ms()
{
return (int64_t)get_current_time() * 1000LL;
}
int fast_multi_sock_client_init_ex(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fast_multi_sock_client_get_body_length_func get_body_length_func,
const int init_recv_buffer_size, const int timeout)
fms_client_get_body_length_func get_body_length_func,
fms_client_get_current_time_ms_func get_current_time_ms_func,
const int init_recv_buffer_size, const int timeout_ms)
{
int result;
int new_init_recv_buffer_size;
@ -45,7 +51,7 @@ int fast_multi_sock_client_init(FastMultiSockClient *client,
}
if ((result=ioevent_init(&client->ioevent, entry_count,
timeout * 1000, 0)) != 0)
timeout_ms, 0)) != 0)
{
logError("file: "__FILE__", line: %d, "
"ioevent_init fail, errno: %d, error info: %s",
@ -74,12 +80,24 @@ int fast_multi_sock_client_init(FastMultiSockClient *client,
client->entry_count = entry_count;
client->header_length = header_length;
client->get_body_length_func = get_body_length_func;
client->get_current_time_ms_func = get_current_time_ms_func;
client->entries = entries;
client->timeout = timeout;
client->timeout_ms = timeout_ms;
return 0;
}
int fast_multi_sock_client_init(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fms_client_get_body_length_func get_body_length_func,
const int init_recv_buffer_size, const int timeout)
{
return fast_multi_sock_client_init_ex(client, entries, entry_count,
header_length, get_body_length_func, fms_get_current_time_ms,
init_recv_buffer_size, timeout * 1000);
}
void fast_multi_sock_client_destroy(FastMultiSockClient *client)
{
int i;
@ -104,15 +122,13 @@ static int fast_multi_sock_client_do_send(FastMultiSockClient *client,
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
else if (errno == EINTR) { //should retry
} else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"server: %s:%d, ignore interupt signal",
__LINE__, entry->conn->ip_addr,
entry->conn->port);
continue;
}
else {
} else {
result = errno != 0 ? errno : ECONNRESET;
logError("file: "__FILE__", line: %d, "
"send to server %s:%d fail, "
@ -123,8 +139,7 @@ static int fast_multi_sock_client_do_send(FastMultiSockClient *client,
break;
}
}
else if (bytes == 0) {
} else if (bytes == 0) {
logError("file: "__FILE__", line: %d, "
"send to server %s:%d, sock: %d fail, "
"connection disconnected",
@ -138,7 +153,7 @@ static int fast_multi_sock_client_do_send(FastMultiSockClient *client,
entry->remain -= bytes;
if (entry->remain == 0) {
entry->remain = client->header_length; //to recv pkg header
entry->recv_stage = fast_multi_sock_stage_recv_header;
entry->recv_stage = fms_stage_recv_header;
entry->io_callback = fast_multi_sock_client_do_recv;
if (ioevent_modify(&client->ioevent, entry->conn->sock,
IOEVENT_READ, entry) != 0)
@ -222,15 +237,13 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
else if (errno == EINTR) { //should retry
} else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"server: %s:%d, ignore interupt signal",
__LINE__, entry->conn->ip_addr,
entry->conn->port);
continue;
}
else {
} else {
result = errno != 0 ? errno : ECONNRESET;
logError("file: "__FILE__", line: %d, "
"server: %s:%d, recv failed, "
@ -241,8 +254,7 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
break;
}
}
else if (bytes == 0) {
} else if (bytes == 0) {
logError("file: "__FILE__", line: %d, "
"server: %s:%d, sock: %d, recv failed, "
"connection disconnected",
@ -255,24 +267,25 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client,
entry->recv_buffer.length += bytes;
entry->remain -= bytes;
if (entry->remain == 0 && entry->recv_stage == fast_multi_sock_stage_recv_header) {
if (entry->remain == 0 && entry->recv_stage == fms_stage_recv_header) {
int body_length;
entry->recv_stage = fast_multi_sock_stage_recv_body;
entry->recv_stage = fms_stage_recv_body;
body_length = client->get_body_length_func(&entry->recv_buffer);
if (body_length < 0) {
logError("file: "__FILE__", line: %d, "
"server: %s:%d, body_length: %d < 0",
__LINE__, entry->conn->ip_addr,
entry->conn->port, body_length);
result = EINVAL;
result = EPIPE;
break;
}
else if (body_length == 0) {
} else if (body_length == 0) {
break;
}
if ((result=fast_buffer_check(&entry->recv_buffer, body_length)) != 0) {
if ((result=fast_buffer_check(&entry->recv_buffer,
body_length)) != 0)
{
break;
}
entry->remain = body_length; //to recv body
@ -290,20 +303,22 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
int event;
int count;
int index;
time_t remain_timeout;
int remain_timeout;
FastMultiSockEntry *entry;
while (client->pulling_count > 0) {
remain_timeout = client->deadline_time - get_current_time();
remain_timeout = client->deadline_time_ms -
client->get_current_time_ms_func();
if (remain_timeout < 0) { //timeout
break;
}
count = ioevent_poll_ex(&client->ioevent, remain_timeout * 1000);
count = ioevent_poll_ex(&client->ioevent, remain_timeout);
logInfo("poll count: %d\n", count);
for (index=0; index<count; index++) {
event = IOEVENT_GET_EVENTS(&client->ioevent, index);
entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(&client->ioevent, index);
entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(
&client->ioevent, index);
if (event & IOEVENT_ERROR) {
logError("file: "__FILE__", line: %d, "
@ -333,7 +348,8 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
int i;
for (i=0; i<client->entry_count; i++) {
if (!client->entries[i].done) {
fast_multi_sock_client_finish(client, client->entries + i, ETIMEDOUT);
fast_multi_sock_client_finish(client,
client->entries + i, ETIMEDOUT);
logError("file: "__FILE__", line: %d, "
"recv from %s:%d timedout",
__LINE__, client->entries[i].conn->ip_addr,
@ -345,7 +361,8 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client)
logInfo("file: "__FILE__", line: %d, pulling_count: %d, "
"success_count: %d\n", __LINE__,
client->pulling_count, client->success_count);
return client->success_count > 0 ? 0 : (remain_timeout > 0 ? ENOENT : ETIMEDOUT);
return client->success_count > 0 ? 0 :
(remain_timeout > 0 ? ENOENT : ETIMEDOUT);
}
int fast_multi_sock_client_request(FastMultiSockClient *client,
@ -353,7 +370,8 @@ int fast_multi_sock_client_request(FastMultiSockClient *client,
{
int result;
client->deadline_time = get_current_time() + client->timeout;
client->deadline_time_ms = client->get_current_time_ms_func() +
client->timeout_ms;
client->pulling_count = 0;
client->success_count = 0;
if ((result=fast_multi_sock_client_send_data(client, send_buffer)) != 0) {

View File

@ -22,22 +22,24 @@
#include "ioevent.h"
typedef enum {
fast_multi_sock_stage_recv_header = 'H',
fast_multi_sock_stage_recv_body = 'B'
fms_stage_recv_header = 'H',
fms_stage_recv_body = 'B'
} FastMultiSockRecvStage;
struct fast_multi_sock_client;
struct fast_multi_sock_entry;
typedef int64_t (*fms_client_get_current_time_ms_func)();
//return the body length
typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *recv_buffer);
typedef int (*fms_client_get_body_length_func)(const FastBuffer *recv_buffer);
//IO deal fucntion
typedef int (*fast_multi_sock_client_io_func)(struct fast_multi_sock_client *client,
struct fast_multi_sock_entry *entry);
typedef int (*fast_multi_sock_client_io_func)(struct fast_multi_sock_client *
client, struct fast_multi_sock_entry *entry);
typedef struct fast_multi_sock_entry {
ConnectionInfo *conn; //the socket must be non-block socket
ConnectionInfo *conn; //the connected socket must be non-block socket
FastBuffer *send_buffer; //send buffer for internal use
fast_multi_sock_client_io_func io_callback; //for internal use
FastBuffer recv_buffer; //recv buffer for response package
@ -52,10 +54,11 @@ typedef struct fast_multi_sock_client {
int header_length; //package header size
int pulling_count;
int success_count;
int timeout;
time_t deadline_time;
int timeout_ms;
int64_t deadline_time_ms;
FastMultiSockEntry *entries;
fast_multi_sock_client_get_body_length_func get_body_length_func;
fms_client_get_current_time_ms_func get_current_time_ms_func;
fms_client_get_body_length_func get_body_length_func;
IOEventPoller ioevent;
} FastMultiSockClient;
@ -71,14 +74,33 @@ extern "C" {
@param entry_count the count of socket entries
@param header_length the header length of a package
@param get_body_length_func the get body length function
@param init_recv_buffer_size the initial size of response buffer
@param get_current_time_ms_func the get current time in ms function
@param init_recv_buffer_size the initial size of response buffer
@param timeout_ms the timeout in milliseconds
@return error no, 0 for success, != 0 fail
*/
int fast_multi_sock_client_init_ex(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fms_client_get_body_length_func get_body_length_func,
fms_client_get_current_time_ms_func get_current_time_ms_func,
const int init_recv_buffer_size, const int timeout_ms);
/**
init function
@param client the client context
@param entries the socket entries
@param entry_count the count of socket entries
@param header_length the header length of a package
@param get_body_length_func the get body length function
@param init_recv_buffer_size the initial size of response buffer
@param timeout the timeout in seconds
@return error no, 0 for success, != 0 fail
*/
int fast_multi_sock_client_init(FastMultiSockClient *client,
FastMultiSockEntry *entries, const int entry_count,
const int header_length,
fast_multi_sock_client_get_body_length_func get_body_length_func,
fms_client_get_body_length_func get_body_length_func,
const int init_recv_buffer_size, const int timeout);
/**

View File

@ -1434,7 +1434,7 @@ static void fc_server_log_one_server(FCServerConfig *ctx, FCServerInfo *server)
gaddr->address_array.count);
fc_server_log_group_servers(gaddr);
}
logInfo("");
logInfo(" ");
}
static void fc_server_log_servers(FCServerConfig *ctx)