change idempotency/server files

connection_manager
YuQing 2020-09-15 17:18:57 +08:00
parent 70a5822bdc
commit 6d4af38931
11 changed files with 269 additions and 20 deletions

View File

@ -1,18 +1,31 @@
.SUFFIXES: .c .lo
COMPILE = $(CC) $(CFLAGS) -fPIC
INC_PATH = -I/usr/include -I/usr/local/include
INC_PATH = -I/usr/local/include
LIB_PATH = $(LIBS) -lfastcommon
TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION)
ALL_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h
TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h sf_proto.h
IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
idempotency/server/request_htable.h \
idempotency/server/channel_htable.h \
idempotency/server/server_handler.h
IDEMP_CLIENT_HEADER = idempotency/client/client_types.h \
idempotency/client/receipt_handler.h \
idempotency/client/client_channel.h
ALL_HEADERS = $(TOP_HEADERS) $(IDEMP_SERVER_HEADER) $(IDEMP_CLIENT_HEADER)
SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \
sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \
idempotency/server/channel.lo \
idempotency/server/server_channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
idempotency/server/server_handler.lo \
idempotency/client/receipt_handler.lo \
idempotency/client/client_channel.lo
@ -34,10 +47,13 @@ libserverframe.so: $(SHARED_OBJS)
install:
mkdir -p $(TARGET_LIB)
mkdir -p $(TARGET_PREFIX)/lib
mkdir -p $(TARGET_PREFIX)/include/sf
mkdir -p $(TARGET_PREFIX)/include/sf/idempotency/server
mkdir -p $(TARGET_PREFIX)/include/sf/idempotency/client
install -m 755 $(ALL_LIBS) $(TARGET_LIB)
cp -f $(ALL_HEADERS) $(TARGET_PREFIX)/include/sf
cp -f $(TOP_HEADERS) $(TARGET_PREFIX)/include/sf
cp -f $(IDEMP_SERVER_HEADER) $(TARGET_PREFIX)/include/sf/idempotency/server
cp -f $(IDEMP_CLIENT_HEADER) $(TARGET_PREFIX)/include/sf/idempotency/client
if [ ! -e $(TARGET_PREFIX)/lib/libserverframe.so ]; then ln -s $(TARGET_LIB)/libserverframe.so $(TARGET_PREFIX)/lib/libserverframe.so; fi
clean:

View File

@ -16,11 +16,11 @@
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/fc_queue.h"
#include "sf/sf_util.h"
#include "sf/sf_func.h"
#include "sf/sf_nio.h"
#include "sf/sf_global.h"
#include "sf/sf_service.h"
#include "../../sf_util.h"
#include "../../sf_func.h"
#include "../../sf_nio.h"
#include "../../sf_global.h"
#include "../../sf_service.h"
#include "client_channel.h"
typedef struct {

View File

@ -2,7 +2,7 @@
#ifndef _SF_IDEMPOTENCY_CHANNEL_HTABLE_H
#define _SF_IDEMPOTENCY_CHANNEL_HTABLE_H
#include "idempotency_types.h"
#include "server_types.h"
typedef struct channel_shared_locks {
pthread_mutex_t *locks;

View File

@ -2,7 +2,7 @@
#ifndef _SF_IDEMPOTENCY_REQUEST_HTABLE_H
#define _SF_IDEMPOTENCY_REQUEST_HTABLE_H
#include "idempotency_types.h"
#include "server_types.h"
#ifdef __cplusplus
extern "C" {

View File

@ -7,7 +7,7 @@
#include "fastcommon/sched_thread.h"
#include "sf/sf_global.h"
#include "channel_htable.h"
#include "channel.h"
#include "server_channel.h"
typedef struct {
IdempotencyChannel **buckets;

View File

@ -0,0 +1,157 @@
//server_handler.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <fcntl.h>
#include "fastcommon/logger.h"
#include "fastcommon/sockopt.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "../../sf_util.h"
#include "../../sf_global.h"
#include "../../sf_proto.h"
#include "server_channel.h"
#include "server_handler.h"
#define SF_TASK_BODY_LENGTH(task) \
(task->length - sizeof(SFCommonProtoHeader))
int service_deal_setup_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response)
{
int result;
FSProtoSetupChannelReq *req;
FSProtoSetupChannelResp *resp;
uint32_t channel_id;
int key;
response->header.cmd = FS_SERVICE_PROTO_SETUP_CHANNEL_RESP;
if ((result=sf_server_expect_body_length(response,
SF_TASK_BODY_LENGTH(task),
sizeof(FSProtoSetupChannelReq))) != 0)
{
return result;
}
req = (FSProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader));
channel_id = buff2int(req->channel_id);
key = buff2int(req->key);
if (*channel != NULL) {
response->error.length = sprintf(response->error.message,
"channel already setup, the channel id: %d", (*channel)->id);
return EEXIST;
}
*channel = idempotency_channel_alloc(channel_id, key);
if (*channel == NULL) {
response->error.length = sprintf(response->error.message,
"alloc channel fail, hint channel id: %d", channel_id);
return ENOMEM;
}
*task_type = FS_SERVER_TASK_TYPE_CHANNEL_HOLDER;
resp = (FSProtoSetupChannelResp *)(task->data +
sizeof(SFCommonProtoHeader));
int2buff((*channel)->id, resp->channel_id);
int2buff((*channel)->key, resp->key);
response->header.body_len = sizeof(FSProtoSetupChannelResp);
//TASK_ARG->context.response_done = true;
return 0;
}
/*
static int check_holder_channel(struct fast_task_info *task)
{
if (SERVER_TASK_TYPE != FS_SERVER_TASK_TYPE_CHANNEL_HOLDER) {
RESPONSE.error.length = sprintf(RESPONSE.error.message,
"unexpect task type: %d", SERVER_TASK_TYPE);
return EINVAL;
}
if (*channel == NULL) {
RESPONSE.error.length = sprintf(
RESPONSE.error.message,
"channel not exist");
return SF_RETRIABLE_ERROR_NO_CHANNEL;
}
return 0;
}
int service_deal_close_channel(struct fast_task_info *task)
{
int result;
if ((result=check_holder_channel(task)) != 0) {
return result;
}
RESPONSE.header.cmd = FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP;
idempotency_channel_free(*channel);
*channel = NULL;
*task_type = FS_SERVER_TASK_TYPE_NONE;
return 0;
}
int service_deal_report_req_receipt(struct fast_task_info *task)
{
int result;
int count;
int success;
int body_len;
int calc_body_len;
int64_t req_id;
FSProtoReportReqReceiptHeader *body_header;
FSProtoReportReqReceiptBody *body_part;
FSProtoReportReqReceiptBody *body_end;
if ((result=check_holder_channel(task)) != 0) {
return result;
}
body_len = SF_TASK_BODY_LENGTH(task);
if ((result=sf_server_check_min_body_length(response, body_len,
sizeof(FSProtoReportReqReceiptHeader))) != 0)
{
return result;
}
body_header = (FSProtoReportReqReceiptHeader *)
(task->data + sizeof(SFCommonProtoHeader));
count = buff2int(body_header->count);
calc_body_len = sizeof(FSProtoReportReqReceiptHeader) +
sizeof(FSProtoReportReqReceiptBody) * count;
if (body_len != calc_body_len) {
RESPONSE.error.length = sprintf(RESPONSE.error.message,
"body length: %d != calculated body length: %d",
body_len, calc_body_len);
return EINVAL;
}
success = 0;
body_part = (FSProtoReportReqReceiptBody *)(body_header + 1);
body_end = body_part + count;
for (; body_part < body_end; body_part++) {
req_id = buff2long(body_part->req_id);
if (idempotency_channel_remove_request(*channel, req_id) == 0) {
success++;
}
}
logInfo("receipt count: %d, success: %d", count, success);
RESPONSE.header.cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP;
return 0;
}
*/

View File

@ -0,0 +1,16 @@
//server_handler.h
#ifndef _FS_IDEMPOTENCY_SERVER_HANDLER_H
#define _FS_IDEMPOTENCY_SERVER_HANDLER_H
#include "server_types.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -11,12 +11,12 @@
#include "sf_types.h"
//for request idempotency
#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 51
#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 52
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 53
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 54
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 55
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 56
#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 111
#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 112
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116
#define FS_PROTO_MAGIC_CHAR '@'
#define FS_PROTO_SET_MAGIC(m) \
@ -106,6 +106,64 @@ static inline void sf_log_network_error_ex(SFResponseInfo *response,
#define sf_log_network_error(response, conn, result) \
sf_log_network_error_ex(response, conn, result, __LINE__)
static inline int sf_server_expect_body_length(SFResponseInfo *response,
const int body_length, const int expect_body_len)
{
if (body_length != expect_body_len) {
response->error.length = sprintf(
response->error.message,
"request body length: %d != %d",
body_length, expect_body_len);
return EINVAL;
}
return 0;
}
static inline int sf_server_check_min_body_length(SFResponseInfo *response,
const int body_length, const int min_body_length)
{
if (body_length < min_body_length) {
response->error.length = sprintf(
response->error.message,
"request body length: %d < %d",
body_length, min_body_length);
return EINVAL;
}
return 0;
}
static inline int sf_server_check_max_body_length(SFResponseInfo *response,
const int body_length, const int max_body_length)
{
if (body_length > max_body_length) {
response->error.length = sprintf(
response->error.message,
"request body length: %d > %d",
body_length, max_body_length);
return EINVAL;
}
return 0;
}
static inline int sf_server_check_body_length(
SFResponseInfo *response, const int body_length,
const int min_body_length, const int max_body_length)
{
int result;
if ((result=sf_server_check_min_body_length(response,
body_length, min_body_length)) != 0)
{
return result;
}
return sf_server_check_max_body_length(response,
body_length, max_body_length);
}
#ifdef __cplusplus
}
#endif

View File

@ -12,6 +12,8 @@
#include "fastcommon/fast_task_queue.h"
#define FS_ERROR_INFO_SIZE 256
#define FS_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define FS_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency
typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask,
const bool bInnerPort);