diff --git a/src/multi_socket_client.c b/src/multi_socket_client.c index d32434b..61fa9bf 100644 --- a/src/multi_socket_client.c +++ b/src/multi_socket_client.c @@ -19,11 +19,17 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, FastMultiSockEntry *entry); -int fast_multi_sock_client_init(FastMultiSockClient *client, +static int64_t fms_get_current_time_ms() +{ + return (int64_t)get_current_time() * 1000LL; +} + +int fast_multi_sock_client_init_ex(FastMultiSockClient *client, FastMultiSockEntry *entries, const int entry_count, const int header_length, - fast_multi_sock_client_get_body_length_func get_body_length_func, - const int init_recv_buffer_size, const int timeout) + fms_client_get_body_length_func get_body_length_func, + fms_client_get_current_time_ms_func get_current_time_ms_func, + const int init_recv_buffer_size, const int timeout_ms) { int result; int new_init_recv_buffer_size; @@ -45,7 +51,7 @@ int fast_multi_sock_client_init(FastMultiSockClient *client, } if ((result=ioevent_init(&client->ioevent, entry_count, - timeout * 1000, 0)) != 0) + timeout_ms, 0)) != 0) { logError("file: "__FILE__", line: %d, " "ioevent_init fail, errno: %d, error info: %s", @@ -74,12 +80,24 @@ int fast_multi_sock_client_init(FastMultiSockClient *client, client->entry_count = entry_count; client->header_length = header_length; client->get_body_length_func = get_body_length_func; + client->get_current_time_ms_func = get_current_time_ms_func; client->entries = entries; - client->timeout = timeout; + client->timeout_ms = timeout_ms; return 0; } +int fast_multi_sock_client_init(FastMultiSockClient *client, + FastMultiSockEntry *entries, const int entry_count, + const int header_length, + fms_client_get_body_length_func get_body_length_func, + const int init_recv_buffer_size, const int timeout) +{ + return fast_multi_sock_client_init_ex(client, entries, entry_count, + header_length, get_body_length_func, fms_get_current_time_ms, + init_recv_buffer_size, timeout * 1000); +} + void fast_multi_sock_client_destroy(FastMultiSockClient *client) { int i; @@ -104,15 +122,13 @@ static int fast_multi_sock_client_do_send(FastMultiSockClient *client, if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; - } - else if (errno == EINTR) { //should retry + } else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "server: %s:%d, ignore interupt signal", __LINE__, entry->conn->ip_addr, entry->conn->port); continue; - } - else { + } else { result = errno != 0 ? errno : ECONNRESET; logError("file: "__FILE__", line: %d, " "send to server %s:%d fail, " @@ -123,8 +139,7 @@ static int fast_multi_sock_client_do_send(FastMultiSockClient *client, break; } - } - else if (bytes == 0) { + } else if (bytes == 0) { logError("file: "__FILE__", line: %d, " "send to server %s:%d, sock: %d fail, " "connection disconnected", @@ -138,7 +153,7 @@ static int fast_multi_sock_client_do_send(FastMultiSockClient *client, entry->remain -= bytes; if (entry->remain == 0) { entry->remain = client->header_length; //to recv pkg header - entry->recv_stage = fast_multi_sock_stage_recv_header; + entry->recv_stage = fms_stage_recv_header; entry->io_callback = fast_multi_sock_client_do_recv; if (ioevent_modify(&client->ioevent, entry->conn->sock, IOEVENT_READ, entry) != 0) @@ -222,15 +237,13 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; - } - else if (errno == EINTR) { //should retry + } else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "server: %s:%d, ignore interupt signal", __LINE__, entry->conn->ip_addr, entry->conn->port); continue; - } - else { + } else { result = errno != 0 ? errno : ECONNRESET; logError("file: "__FILE__", line: %d, " "server: %s:%d, recv failed, " @@ -241,8 +254,7 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, break; } - } - else if (bytes == 0) { + } else if (bytes == 0) { logError("file: "__FILE__", line: %d, " "server: %s:%d, sock: %d, recv failed, " "connection disconnected", @@ -255,24 +267,25 @@ static int fast_multi_sock_client_do_recv(FastMultiSockClient *client, entry->recv_buffer.length += bytes; entry->remain -= bytes; - if (entry->remain == 0 && entry->recv_stage == fast_multi_sock_stage_recv_header) { + if (entry->remain == 0 && entry->recv_stage == fms_stage_recv_header) { int body_length; - entry->recv_stage = fast_multi_sock_stage_recv_body; + entry->recv_stage = fms_stage_recv_body; body_length = client->get_body_length_func(&entry->recv_buffer); if (body_length < 0) { logError("file: "__FILE__", line: %d, " "server: %s:%d, body_length: %d < 0", __LINE__, entry->conn->ip_addr, entry->conn->port, body_length); - result = EINVAL; + result = EPIPE; break; - } - else if (body_length == 0) { + } else if (body_length == 0) { break; } - if ((result=fast_buffer_check(&entry->recv_buffer, body_length)) != 0) { + if ((result=fast_buffer_check(&entry->recv_buffer, + body_length)) != 0) + { break; } entry->remain = body_length; //to recv body @@ -290,20 +303,22 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) int event; int count; int index; - time_t remain_timeout; + int remain_timeout; FastMultiSockEntry *entry; while (client->pulling_count > 0) { - remain_timeout = client->deadline_time - get_current_time(); + remain_timeout = client->deadline_time_ms - + client->get_current_time_ms_func(); if (remain_timeout < 0) { //timeout break; } - count = ioevent_poll_ex(&client->ioevent, remain_timeout * 1000); + count = ioevent_poll_ex(&client->ioevent, remain_timeout); logInfo("poll count: %d\n", count); for (index=0; indexioevent, index); - entry = (FastMultiSockEntry *)IOEVENT_GET_DATA(&client->ioevent, index); + entry = (FastMultiSockEntry *)IOEVENT_GET_DATA( + &client->ioevent, index); if (event & IOEVENT_ERROR) { logError("file: "__FILE__", line: %d, " @@ -333,7 +348,8 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) int i; for (i=0; ientry_count; i++) { if (!client->entries[i].done) { - fast_multi_sock_client_finish(client, client->entries + i, ETIMEDOUT); + fast_multi_sock_client_finish(client, + client->entries + i, ETIMEDOUT); logError("file: "__FILE__", line: %d, " "recv from %s:%d timedout", __LINE__, client->entries[i].conn->ip_addr, @@ -345,7 +361,8 @@ static int fast_multi_sock_client_deal_io(FastMultiSockClient *client) logInfo("file: "__FILE__", line: %d, pulling_count: %d, " "success_count: %d\n", __LINE__, client->pulling_count, client->success_count); - return client->success_count > 0 ? 0 : (remain_timeout > 0 ? ENOENT : ETIMEDOUT); + return client->success_count > 0 ? 0 : + (remain_timeout > 0 ? ENOENT : ETIMEDOUT); } int fast_multi_sock_client_request(FastMultiSockClient *client, @@ -353,7 +370,8 @@ int fast_multi_sock_client_request(FastMultiSockClient *client, { int result; - client->deadline_time = get_current_time() + client->timeout; + client->deadline_time_ms = client->get_current_time_ms_func() + + client->timeout_ms; client->pulling_count = 0; client->success_count = 0; if ((result=fast_multi_sock_client_send_data(client, send_buffer)) != 0) { diff --git a/src/multi_socket_client.h b/src/multi_socket_client.h index 6445ad8..1df0c1e 100644 --- a/src/multi_socket_client.h +++ b/src/multi_socket_client.h @@ -22,22 +22,24 @@ #include "ioevent.h" typedef enum { - fast_multi_sock_stage_recv_header = 'H', - fast_multi_sock_stage_recv_body = 'B' + fms_stage_recv_header = 'H', + fms_stage_recv_body = 'B' } FastMultiSockRecvStage; struct fast_multi_sock_client; struct fast_multi_sock_entry; +typedef int64_t (*fms_client_get_current_time_ms_func)(); + //return the body length -typedef int (*fast_multi_sock_client_get_body_length_func)(const FastBuffer *recv_buffer); +typedef int (*fms_client_get_body_length_func)(const FastBuffer *recv_buffer); //IO deal fucntion -typedef int (*fast_multi_sock_client_io_func)(struct fast_multi_sock_client *client, - struct fast_multi_sock_entry *entry); +typedef int (*fast_multi_sock_client_io_func)(struct fast_multi_sock_client * + client, struct fast_multi_sock_entry *entry); typedef struct fast_multi_sock_entry { - ConnectionInfo *conn; //the socket must be non-block socket + ConnectionInfo *conn; //the connected socket must be non-block socket FastBuffer *send_buffer; //send buffer for internal use fast_multi_sock_client_io_func io_callback; //for internal use FastBuffer recv_buffer; //recv buffer for response package @@ -52,10 +54,11 @@ typedef struct fast_multi_sock_client { int header_length; //package header size int pulling_count; int success_count; - int timeout; - time_t deadline_time; + int timeout_ms; + int64_t deadline_time_ms; FastMultiSockEntry *entries; - fast_multi_sock_client_get_body_length_func get_body_length_func; + fms_client_get_current_time_ms_func get_current_time_ms_func; + fms_client_get_body_length_func get_body_length_func; IOEventPoller ioevent; } FastMultiSockClient; @@ -71,14 +74,33 @@ extern "C" { @param entry_count the count of socket entries @param header_length the header length of a package @param get_body_length_func the get body length function - @param init_recv_buffer_size the initial size of response buffer + @param get_current_time_ms_func the get current time in ms function + @param init_recv_buffer_size the initial size of response buffer + @param timeout_ms the timeout in milliseconds + @return error no, 0 for success, != 0 fail + */ + int fast_multi_sock_client_init_ex(FastMultiSockClient *client, + FastMultiSockEntry *entries, const int entry_count, + const int header_length, + fms_client_get_body_length_func get_body_length_func, + fms_client_get_current_time_ms_func get_current_time_ms_func, + const int init_recv_buffer_size, const int timeout_ms); + + /** + init function + @param client the client context + @param entries the socket entries + @param entry_count the count of socket entries + @param header_length the header length of a package + @param get_body_length_func the get body length function + @param init_recv_buffer_size the initial size of response buffer @param timeout the timeout in seconds @return error no, 0 for success, != 0 fail */ int fast_multi_sock_client_init(FastMultiSockClient *client, FastMultiSockEntry *entries, const int entry_count, const int header_length, - fast_multi_sock_client_get_body_length_func get_body_length_func, + fms_client_get_body_length_func get_body_length_func, const int init_recv_buffer_size, const int timeout); /** diff --git a/src/server_id_func.c b/src/server_id_func.c index 947a5eb..d1e8d56 100644 --- a/src/server_id_func.c +++ b/src/server_id_func.c @@ -1434,7 +1434,7 @@ static void fc_server_log_one_server(FCServerConfig *ctx, FCServerInfo *server) gaddr->address_array.count); fc_server_log_group_servers(gaddr); } - logInfo(""); + logInfo(" "); } static void fc_server_log_servers(FCServerConfig *ctx)