add idempotency/client/rpc_wrapper.h

connection_manager
YuQing 2020-09-18 23:17:17 +08:00
parent 451f6da578
commit a363235a7e
7 changed files with 132 additions and 12 deletions

View File

@ -16,7 +16,8 @@ IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \
idempotency/client/receipt_handler.h \
idempotency/client/client_channel.h
idempotency/client/client_channel.h \
idempotency/client/rpc_wrapper.h
ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER)

View File

@ -0,0 +1,106 @@
#ifndef _IDEMPOTENCY_RPC_WRAPPER_H
#define _IDEMPOTENCY_RPC_WRAPPER_H
#include "../../sf_configs.h"
#define SF_CLIENT_IDEMPOTENCY_UPDATE_WARPER(client_ctx, \
GET_MASTER_CONNECTION, get_conn_arg1, update_callback, ...) \
ConnectionInfo *conn; \
IdempotencyClientChannel *old_channel; \
int result; \
int i; \
uint64_t req_id; \
SFNetRetryIntervalContext net_retry_ctx; \
\
if ((conn=GET_MASTER_CONNECTION(client_ctx, \
get_conn_arg1, &result)) == NULL) \
{ \
return SF_UNIX_ERRNO(result, EIO); \
} \
connection_params = client_ctx->conn_manager. \
get_connection_params(client_ctx, conn); \
\
sf_init_net_retry_interval_context(&net_retry_ctx, \
&client_ctx->net_retry_cfg.interval_mm, \
&client_ctx->net_retry_cfg.network); \
\
while (1) { \
if (client_ctx->idempotency_enabled) { \
req_id = idempotency_client_channel_next_seq_id( \
connection_params->channel); \
} else { \
req_id = 0; \
} \
\
old_channel = connection_params->channel; \
i = 0; \
while (1) { \
if (client_ctx->idempotency_enabled) { \
result = idempotency_client_channel_check_wait( \
connection_params->channel); \
} else { \
result = 0; \
} \
\
if (result == 0) { \
if ((result=update_callback(client_ctx, \
conn, req_id, ##__VA_ARGS__)) == 0) \
{ \
break; \
} \
} \
\
if (result == SF_RETRIABLE_ERROR_CHANNEL_INVALID && \
client_ctx->idempotency_enabled) \
{ \
idempotency_client_channel_check_reconnect( \
connection_params->channel); \
} \
\
SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \
net_retry_cfg.network.times, ++i, result); \
\
logInfo("file: "__FILE__", line: %d, func: %s, " \
"net retry result: %d, retry count: %d", \
__LINE__, __FUNCTION__, result, i); \
\
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \
if ((conn=GET_MASTER_CONNECTION(client_ctx, \
get_conn_arg1, &result)) == NULL) \
{ \
return SF_UNIX_ERRNO(result, EIO); \
} \
\
connection_params = client_ctx->conn_manager. \
get_connection_params(client_ctx, conn); \
if (connection_params->channel != old_channel) { \
break; \
} \
} \
\
if (connection_params->channel != old_channel) { /* master changed */ \
sf_reset_net_retry_interval(&net_retry_ctx); \
continue; \
} \
\
if (client_ctx->idempotency_enabled) { \
idempotency_client_channel_push( \
connection_params->channel, req_id); \
} \
break; \
} \
\
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \
return SF_UNIX_ERRNO(result, EIO)
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -36,6 +36,7 @@ int idempotency_request_htable_add(IdempotencyRequestHTable *htable,
while (current != NULL) {
if (current->req_id == request->req_id) {
request->output.result = current->output.result;
request->output.flags = current->output.flags;
memcpy(request->output.response, current->output.response,
request->output.rsize);
request->finished = current->finished;

View File

@ -6,7 +6,8 @@
#include "fastcommon/fast_timer.h"
typedef struct idempotency_request_result {
int rsize; //response size defined by application
short rsize; //response size defined by application
short flags; //for application
int result;
void *response;
} IdempotencyRequestResult;

View File

@ -84,11 +84,11 @@ static int load_network_parameters(IniFullContext *ini_ctx)
&max_pkg_size)) != 0)
{
return result;
} else if (max_pkg_size < 4096) {
} else if (max_pkg_size < 8192) {
logWarning("file: "__FILE__", line: %d, "
"max_pkg_size: %d is too small, set to 4096",
"max_pkg_size: %d is too small, set to 8192",
__LINE__, (int)max_pkg_size);
max_pkg_size = 4096;
max_pkg_size = 8192;
}
g_sf_global_vars.max_pkg_size = (int)max_pkg_size;
@ -101,11 +101,11 @@ static int load_network_parameters(IniFullContext *ini_ctx)
&min_buff_size)) != 0)
{
return result;
} else if (min_buff_size < 2048) {
} else if (min_buff_size < 8192) {
logWarning("file: "__FILE__", line: %d, "
"min_buff_size: %d is too small, set to 2048",
"min_buff_size: %d is too small, set to 8192",
__LINE__, (int)min_buff_size);
min_buff_size = 2048;
min_buff_size = 8192;
}
g_sf_global_vars.min_buff_size = (int)min_buff_size;

View File

@ -8,6 +8,7 @@
#include "fastcommon/logger.h"
#include "fastcommon/connection_pool.h"
#include "fastcommon/sockopt.h"
#include "sf_define.h"
#include "sf_types.h"
#define SF_PROTO_ACK 116
@ -253,6 +254,16 @@ static inline int sf_active_test(ConnectionInfo *conn,
SF_PROTO_ACTIVE_TEST_RESP);
}
#define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \
do { \
if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \
client_ctx->conn_manager.close_connection(client_ctx, conn); \
} else if (client_ctx->conn_manager.release_connection != NULL) { \
client_ctx->conn_manager.release_connection(client_ctx, conn); \
} \
} while (0)
#ifdef __cplusplus
}
#endif

View File

@ -17,11 +17,11 @@
#define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency
typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask,
typedef void (*sf_accept_done_callback)(struct fast_task_info *task,
const bool bInnerPort);
typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask);
typedef int (*sf_deal_task_func)(struct fast_task_info *pTask);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *pTask);
typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
typedef int (*sf_deal_task_func)(struct fast_task_info *task);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
typedef struct sf_context {
struct nio_thread_data *thread_data;