diff --git a/src/Makefile.in b/src/Makefile.in index aa3638b..41a3940 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -16,7 +16,8 @@ IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \ idempotency/client/receipt_handler.h \ - idempotency/client/client_channel.h + idempotency/client/client_channel.h \ + idempotency/client/rpc_wrapper.h ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER) diff --git a/src/idempotency/client/rpc_wrapper.h b/src/idempotency/client/rpc_wrapper.h new file mode 100644 index 0000000..87109c2 --- /dev/null +++ b/src/idempotency/client/rpc_wrapper.h @@ -0,0 +1,106 @@ + +#ifndef _IDEMPOTENCY_RPC_WRAPPER_H +#define _IDEMPOTENCY_RPC_WRAPPER_H + +#include "../../sf_configs.h" + +#define SF_CLIENT_IDEMPOTENCY_UPDATE_WARPER(client_ctx, \ + GET_MASTER_CONNECTION, get_conn_arg1, update_callback, ...) \ + ConnectionInfo *conn; \ + IdempotencyClientChannel *old_channel; \ + int result; \ + int i; \ + uint64_t req_id; \ + SFNetRetryIntervalContext net_retry_ctx; \ + \ + if ((conn=GET_MASTER_CONNECTION(client_ctx, \ + get_conn_arg1, &result)) == NULL) \ + { \ + return SF_UNIX_ERRNO(result, EIO); \ + } \ + connection_params = client_ctx->conn_manager. \ + get_connection_params(client_ctx, conn); \ + \ + sf_init_net_retry_interval_context(&net_retry_ctx, \ + &client_ctx->net_retry_cfg.interval_mm, \ + &client_ctx->net_retry_cfg.network); \ + \ + while (1) { \ + if (client_ctx->idempotency_enabled) { \ + req_id = idempotency_client_channel_next_seq_id( \ + connection_params->channel); \ + } else { \ + req_id = 0; \ + } \ + \ + old_channel = connection_params->channel; \ + i = 0; \ + while (1) { \ + if (client_ctx->idempotency_enabled) { \ + result = idempotency_client_channel_check_wait( \ + connection_params->channel); \ + } else { \ + result = 0; \ + } \ + \ + if (result == 0) { \ + if ((result=update_callback(client_ctx, \ + conn, req_id, ##__VA_ARGS__)) == 0) \ + { \ + break; \ + } \ + } \ + \ + if (result == SF_RETRIABLE_ERROR_CHANNEL_INVALID && \ + client_ctx->idempotency_enabled) \ + { \ + idempotency_client_channel_check_reconnect( \ + connection_params->channel); \ + } \ + \ + SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ + net_retry_cfg.network.times, ++i, result); \ + \ + logInfo("file: "__FILE__", line: %d, func: %s, " \ + "net retry result: %d, retry count: %d", \ + __LINE__, __FUNCTION__, result, i); \ + \ + SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ + if ((conn=GET_MASTER_CONNECTION(client_ctx, \ + get_conn_arg1, &result)) == NULL) \ + { \ + return SF_UNIX_ERRNO(result, EIO); \ + } \ + \ + connection_params = client_ctx->conn_manager. \ + get_connection_params(client_ctx, conn); \ + if (connection_params->channel != old_channel) { \ + break; \ + } \ + } \ + \ + if (connection_params->channel != old_channel) { /* master changed */ \ + sf_reset_net_retry_interval(&net_retry_ctx); \ + continue; \ + } \ + \ + if (client_ctx->idempotency_enabled) { \ + idempotency_client_channel_push( \ + connection_params->channel, req_id); \ + } \ + break; \ + } \ + \ + SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ + return SF_UNIX_ERRNO(result, EIO) + + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/request_htable.c b/src/idempotency/server/request_htable.c index 9005a6a..0e76a11 100644 --- a/src/idempotency/server/request_htable.c +++ b/src/idempotency/server/request_htable.c @@ -36,6 +36,7 @@ int idempotency_request_htable_add(IdempotencyRequestHTable *htable, while (current != NULL) { if (current->req_id == request->req_id) { request->output.result = current->output.result; + request->output.flags = current->output.flags; memcpy(request->output.response, current->output.response, request->output.rsize); request->finished = current->finished; diff --git a/src/idempotency/server/server_types.h b/src/idempotency/server/server_types.h index 33f4f01..8c30172 100644 --- a/src/idempotency/server/server_types.h +++ b/src/idempotency/server/server_types.h @@ -6,7 +6,8 @@ #include "fastcommon/fast_timer.h" typedef struct idempotency_request_result { - int rsize; //response size defined by application + short rsize; //response size defined by application + short flags; //for application int result; void *response; } IdempotencyRequestResult; diff --git a/src/sf_global.c b/src/sf_global.c index e561ee4..8a8aa61 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -84,11 +84,11 @@ static int load_network_parameters(IniFullContext *ini_ctx) &max_pkg_size)) != 0) { return result; - } else if (max_pkg_size < 4096) { + } else if (max_pkg_size < 8192) { logWarning("file: "__FILE__", line: %d, " - "max_pkg_size: %d is too small, set to 4096", + "max_pkg_size: %d is too small, set to 8192", __LINE__, (int)max_pkg_size); - max_pkg_size = 4096; + max_pkg_size = 8192; } g_sf_global_vars.max_pkg_size = (int)max_pkg_size; @@ -101,11 +101,11 @@ static int load_network_parameters(IniFullContext *ini_ctx) &min_buff_size)) != 0) { return result; - } else if (min_buff_size < 2048) { + } else if (min_buff_size < 8192) { logWarning("file: "__FILE__", line: %d, " - "min_buff_size: %d is too small, set to 2048", + "min_buff_size: %d is too small, set to 8192", __LINE__, (int)min_buff_size); - min_buff_size = 2048; + min_buff_size = 8192; } g_sf_global_vars.min_buff_size = (int)min_buff_size; diff --git a/src/sf_proto.h b/src/sf_proto.h index 320db81..ad0768b 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -8,6 +8,7 @@ #include "fastcommon/logger.h" #include "fastcommon/connection_pool.h" #include "fastcommon/sockopt.h" +#include "sf_define.h" #include "sf_types.h" #define SF_PROTO_ACK 116 @@ -253,6 +254,16 @@ static inline int sf_active_test(ConnectionInfo *conn, SF_PROTO_ACTIVE_TEST_RESP); } +#define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ + do { \ + if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \ + client_ctx->conn_manager.close_connection(client_ctx, conn); \ + } else if (client_ctx->conn_manager.release_connection != NULL) { \ + client_ctx->conn_manager.release_connection(client_ctx, conn); \ + } \ + } while (0) + + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index 56ea59e..022cfc6 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -17,11 +17,11 @@ #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency -typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, +typedef void (*sf_accept_done_callback)(struct fast_task_info *task, const bool bInnerPort); -typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask); -typedef int (*sf_deal_task_func)(struct fast_task_info *pTask); -typedef int (*sf_recv_timeout_callback)(struct fast_task_info *pTask); +typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); +typedef int (*sf_deal_task_func)(struct fast_task_info *task); +typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef struct sf_context { struct nio_thread_data *thread_data;