From 6d4af38931135e106b4f6a8539200cfd7e757d5e Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 15 Sep 2020 17:18:57 +0800 Subject: [PATCH] change idempotency/server files --- src/Makefile.in | 28 +++- src/idempotency/client/client_channel.c | 10 +- src/idempotency/server/channel_htable.h | 2 +- src/idempotency/server/request_htable.h | 2 +- .../server/{channel.c => server_channel.c} | 2 +- .../server/{channel.h => server_channel.h} | 0 src/idempotency/server/server_handler.c | 157 ++++++++++++++++++ src/idempotency/server/server_handler.h | 16 ++ .../{idempotency_types.h => server_types.h} | 0 src/sf_proto.h | 70 +++++++- src/sf_types.h | 2 + 11 files changed, 269 insertions(+), 20 deletions(-) rename src/idempotency/server/{channel.c => server_channel.c} (99%) rename src/idempotency/server/{channel.h => server_channel.h} (100%) create mode 100644 src/idempotency/server/server_handler.c create mode 100644 src/idempotency/server/server_handler.h rename src/idempotency/server/{idempotency_types.h => server_types.h} (100%) diff --git a/src/Makefile.in b/src/Makefile.in index 28f7af0..aa3638b 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -1,18 +1,31 @@ .SUFFIXES: .c .lo COMPILE = $(CC) $(CFLAGS) -fPIC -INC_PATH = -I/usr/include -I/usr/local/include +INC_PATH = -I/usr/local/include LIB_PATH = $(LIBS) -lfastcommon TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION) -ALL_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ - sf_func.h sf_util.h sf_configs.h +TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ + sf_func.h sf_util.h sf_configs.h sf_proto.h + +IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ + idempotency/server/server_channel.h \ + idempotency/server/request_htable.h \ + idempotency/server/channel_htable.h \ + idempotency/server/server_handler.h + +IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \ + idempotency/client/receipt_handler.h \ + idempotency/client/client_channel.h + +ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER) SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \ sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \ - idempotency/server/channel.lo \ + idempotency/server/server_channel.lo \ idempotency/server/request_htable.lo \ idempotency/server/channel_htable.lo \ + idempotency/server/server_handler.lo \ idempotency/client/receipt_handler.lo \ idempotency/client/client_channel.lo @@ -34,10 +47,13 @@ libserverframe.so: $(SHARED_OBJS) install: mkdir -p $(TARGET_LIB) mkdir -p $(TARGET_PREFIX)/lib - mkdir -p $(TARGET_PREFIX)/include/sf + mkdir -p $(TARGET_PREFIX)/include/sf/idempotency/server + mkdir -p $(TARGET_PREFIX)/include/sf/idempotency/client install -m 755 $(ALL_LIBS) $(TARGET_LIB) - cp -f $(ALL_HEADERS) $(TARGET_PREFIX)/include/sf + cp -f $(TOP_HEADERS) $(TARGET_PREFIX)/include/sf + cp -f $(IDEMP_SERVER_HEADER) $(TARGET_PREFIX)/include/sf/idempotency/server + cp -f $(IDEMP_CLIENT_HEADER) $(TARGET_PREFIX)/include/sf/idempotency/client if [ ! -e $(TARGET_PREFIX)/lib/libserverframe.so ]; then ln -s $(TARGET_LIB)/libserverframe.so $(TARGET_PREFIX)/lib/libserverframe.so; fi clean: diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 4564c17..813253c 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -16,11 +16,11 @@ #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/fc_queue.h" -#include "sf/sf_util.h" -#include "sf/sf_func.h" -#include "sf/sf_nio.h" -#include "sf/sf_global.h" -#include "sf/sf_service.h" +#include "../../sf_util.h" +#include "../../sf_func.h" +#include "../../sf_nio.h" +#include "../../sf_global.h" +#include "../../sf_service.h" #include "client_channel.h" typedef struct { diff --git a/src/idempotency/server/channel_htable.h b/src/idempotency/server/channel_htable.h index 7db0f20..c849658 100644 --- a/src/idempotency/server/channel_htable.h +++ b/src/idempotency/server/channel_htable.h @@ -2,7 +2,7 @@ #ifndef _SF_IDEMPOTENCY_CHANNEL_HTABLE_H #define _SF_IDEMPOTENCY_CHANNEL_HTABLE_H -#include "idempotency_types.h" +#include "server_types.h" typedef struct channel_shared_locks { pthread_mutex_t *locks; diff --git a/src/idempotency/server/request_htable.h b/src/idempotency/server/request_htable.h index 1ee091e..c5b2ba5 100644 --- a/src/idempotency/server/request_htable.h +++ b/src/idempotency/server/request_htable.h @@ -2,7 +2,7 @@ #ifndef _SF_IDEMPOTENCY_REQUEST_HTABLE_H #define _SF_IDEMPOTENCY_REQUEST_HTABLE_H -#include "idempotency_types.h" +#include "server_types.h" #ifdef __cplusplus extern "C" { diff --git a/src/idempotency/server/channel.c b/src/idempotency/server/server_channel.c similarity index 99% rename from src/idempotency/server/channel.c rename to src/idempotency/server/server_channel.c index 48fe193..d9d3dfc 100644 --- a/src/idempotency/server/channel.c +++ b/src/idempotency/server/server_channel.c @@ -7,7 +7,7 @@ #include "fastcommon/sched_thread.h" #include "sf/sf_global.h" #include "channel_htable.h" -#include "channel.h" +#include "server_channel.h" typedef struct { IdempotencyChannel **buckets; diff --git a/src/idempotency/server/channel.h b/src/idempotency/server/server_channel.h similarity index 100% rename from src/idempotency/server/channel.h rename to src/idempotency/server/server_channel.h diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c new file mode 100644 index 0000000..a8a41e0 --- /dev/null +++ b/src/idempotency/server/server_handler.c @@ -0,0 +1,157 @@ +//server_handler.c + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/logger.h" +#include "fastcommon/sockopt.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/pthread_func.h" +#include "fastcommon/sched_thread.h" +#include "../../sf_util.h" +#include "../../sf_global.h" +#include "../../sf_proto.h" +#include "server_channel.h" +#include "server_handler.h" + +#define SF_TASK_BODY_LENGTH(task) \ + (task->length - sizeof(SFCommonProtoHeader)) + +int service_deal_setup_channel(struct fast_task_info *task, + int *task_type, IdempotencyChannel **channel, + SFResponseInfo *response) +{ + int result; + FSProtoSetupChannelReq *req; + FSProtoSetupChannelResp *resp; + uint32_t channel_id; + int key; + + response->header.cmd = FS_SERVICE_PROTO_SETUP_CHANNEL_RESP; + if ((result=sf_server_expect_body_length(response, + SF_TASK_BODY_LENGTH(task), + sizeof(FSProtoSetupChannelReq))) != 0) + { + return result; + } + + req = (FSProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + channel_id = buff2int(req->channel_id); + key = buff2int(req->key); + if (*channel != NULL) { + response->error.length = sprintf(response->error.message, + "channel already setup, the channel id: %d", (*channel)->id); + return EEXIST; + } + + *channel = idempotency_channel_alloc(channel_id, key); + if (*channel == NULL) { + response->error.length = sprintf(response->error.message, + "alloc channel fail, hint channel id: %d", channel_id); + return ENOMEM; + } + + *task_type = FS_SERVER_TASK_TYPE_CHANNEL_HOLDER; + + resp = (FSProtoSetupChannelResp *)(task->data + + sizeof(SFCommonProtoHeader)); + int2buff((*channel)->id, resp->channel_id); + int2buff((*channel)->key, resp->key); + response->header.body_len = sizeof(FSProtoSetupChannelResp); + //TASK_ARG->context.response_done = true; + return 0; +} + +/* +static int check_holder_channel(struct fast_task_info *task) +{ + if (SERVER_TASK_TYPE != FS_SERVER_TASK_TYPE_CHANNEL_HOLDER) { + RESPONSE.error.length = sprintf(RESPONSE.error.message, + "unexpect task type: %d", SERVER_TASK_TYPE); + return EINVAL; + } + + if (*channel == NULL) { + RESPONSE.error.length = sprintf( + RESPONSE.error.message, + "channel not exist"); + return SF_RETRIABLE_ERROR_NO_CHANNEL; + } + + return 0; +} + +int service_deal_close_channel(struct fast_task_info *task) +{ + int result; + if ((result=check_holder_channel(task)) != 0) { + return result; + } + + RESPONSE.header.cmd = FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP; + idempotency_channel_free(*channel); + *channel = NULL; + *task_type = FS_SERVER_TASK_TYPE_NONE; + return 0; +} + +int service_deal_report_req_receipt(struct fast_task_info *task) +{ + int result; + int count; + int success; + int body_len; + int calc_body_len; + int64_t req_id; + FSProtoReportReqReceiptHeader *body_header; + FSProtoReportReqReceiptBody *body_part; + FSProtoReportReqReceiptBody *body_end; + + if ((result=check_holder_channel(task)) != 0) { + return result; + } + + body_len = SF_TASK_BODY_LENGTH(task); + if ((result=sf_server_check_min_body_length(response, body_len, + sizeof(FSProtoReportReqReceiptHeader))) != 0) + { + return result; + } + + body_header = (FSProtoReportReqReceiptHeader *) + (task->data + sizeof(SFCommonProtoHeader)); + count = buff2int(body_header->count); + calc_body_len = sizeof(FSProtoReportReqReceiptHeader) + + sizeof(FSProtoReportReqReceiptBody) * count; + if (body_len != calc_body_len) { + RESPONSE.error.length = sprintf(RESPONSE.error.message, + "body length: %d != calculated body length: %d", + body_len, calc_body_len); + return EINVAL; + } + + success = 0; + body_part = (FSProtoReportReqReceiptBody *)(body_header + 1); + body_end = body_part + count; + for (; body_part < body_end; body_part++) { + req_id = buff2long(body_part->req_id); + if (idempotency_channel_remove_request(*channel, req_id) == 0) { + success++; + } + } + + logInfo("receipt count: %d, success: %d", count, success); + + RESPONSE.header.cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; + return 0; +} + +*/ diff --git a/src/idempotency/server/server_handler.h b/src/idempotency/server/server_handler.h new file mode 100644 index 0000000..047a67c --- /dev/null +++ b/src/idempotency/server/server_handler.h @@ -0,0 +1,16 @@ +//server_handler.h + +#ifndef _FS_IDEMPOTENCY_SERVER_HANDLER_H +#define _FS_IDEMPOTENCY_SERVER_HANDLER_H + +#include "server_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/idempotency_types.h b/src/idempotency/server/server_types.h similarity index 100% rename from src/idempotency/server/idempotency_types.h rename to src/idempotency/server/server_types.h diff --git a/src/sf_proto.h b/src/sf_proto.h index 65f38fe..280ce1f 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -11,12 +11,12 @@ #include "sf_types.h" //for request idempotency -#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 51 -#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 52 -#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 53 -#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 54 -#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 55 -#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 56 +#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 111 +#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 112 +#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113 +#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114 +#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115 +#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116 #define FS_PROTO_MAGIC_CHAR '@' #define FS_PROTO_SET_MAGIC(m) \ @@ -106,6 +106,64 @@ static inline void sf_log_network_error_ex(SFResponseInfo *response, #define sf_log_network_error(response, conn, result) \ sf_log_network_error_ex(response, conn, result, __LINE__) + +static inline int sf_server_expect_body_length(SFResponseInfo *response, + const int body_length, const int expect_body_len) +{ + if (body_length != expect_body_len) { + response->error.length = sprintf( + response->error.message, + "request body length: %d != %d", + body_length, expect_body_len); + return EINVAL; + } + + return 0; +} + +static inline int sf_server_check_min_body_length(SFResponseInfo *response, + const int body_length, const int min_body_length) +{ + if (body_length < min_body_length) { + response->error.length = sprintf( + response->error.message, + "request body length: %d < %d", + body_length, min_body_length); + return EINVAL; + } + + return 0; +} + +static inline int sf_server_check_max_body_length(SFResponseInfo *response, + const int body_length, const int max_body_length) +{ + if (body_length > max_body_length) { + response->error.length = sprintf( + response->error.message, + "request body length: %d > %d", + body_length, max_body_length); + return EINVAL; + } + + return 0; +} + +static inline int sf_server_check_body_length( + SFResponseInfo *response, const int body_length, + const int min_body_length, const int max_body_length) +{ + int result; + if ((result=sf_server_check_min_body_length(response, + body_length, min_body_length)) != 0) + { + return result; + } + return sf_server_check_max_body_length(response, + body_length, max_body_length); +} + + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index 926308a..35c2e8b 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -12,6 +12,8 @@ #include "fastcommon/fast_task_queue.h" #define FS_ERROR_INFO_SIZE 256 +#define FS_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency +#define FS_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, const bool bInnerPort);