add rpc idempotency

connection_manager
YuQing 2020-09-15 14:49:34 +08:00
parent cf9999b0b1
commit 70a5822bdc
16 changed files with 1771 additions and 4 deletions

View File

@ -8,12 +8,20 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION)
ALL_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h
SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \
sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \
idempotency/server/channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
idempotency/client/receipt_handler.lo \
idempotency/client/client_channel.lo
ALL_OBJS = $(SHARED_OBJS)
ALL_LIBS = libserverframe.so
all: $(ALL_LIBS)
libserverframe.so: sf_nio.lo sf_service.lo sf_global.lo sf_func.lo sf_util.lo \
sf_configs.lo
libserverframe.so: $(SHARED_OBJS)
cc -shared -o $@ $^ $(LIB_PATH)
@ -33,5 +41,5 @@ install:
if [ ! -e $(TARGET_PREFIX)/lib/libserverframe.so ]; then ln -s $(TARGET_LIB)/libserverframe.so $(TARGET_PREFIX)/lib/libserverframe.so; fi
clean:
rm -f *.lo $(ALL_LIBS) $(ALL_PRGS)
rm -f $(ALL_OBJS) $(ALL_LIBS) $(ALL_PRGS)

View File

@ -0,0 +1,280 @@
//client_channel.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <fcntl.h>
#include "fastcommon/logger.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/fc_queue.h"
#include "sf/sf_util.h"
#include "sf/sf_func.h"
#include "sf/sf_nio.h"
#include "sf/sf_global.h"
#include "sf/sf_service.h"
#include "client_channel.h"
typedef struct {
IdempotencyClientChannel **buckets;
uint32_t capacity;
uint32_t count;
pthread_mutex_t lock;
} ClientChannelHashtable;
typedef struct {
struct fast_mblock_man channel_allocator;
ClientChannelHashtable htable;
} ClientChannelContext;
static ClientChannelContext channel_context;
static int init_htable(ClientChannelHashtable *htable, const int hint_capacity)
{
int result;
int bytes;
if ((result=init_pthread_lock(&htable->lock)) != 0) {
return result;
}
if (hint_capacity <= 1024) {
htable->capacity = 1361;
} else {
htable->capacity = fc_ceil_prime(hint_capacity);
}
bytes = sizeof(IdempotencyClientChannel *) * htable->capacity;
htable->buckets = (IdempotencyClientChannel **)fc_malloc(bytes);
if (htable->buckets == NULL) {
return ENOMEM;
}
memset(htable->buckets, 0, bytes);
htable->count = 0;
return 0;
}
static int idempotency_channel_alloc_init(void *element, void *args)
{
int result;
IdempotencyClientChannel *channel;
channel = (IdempotencyClientChannel *)element;
if ((result=fast_mblock_init_ex1(&channel->receipt_allocator,
"idempotency_receipt", sizeof(IdempotencyClientReceipt),
1024, 0, NULL, NULL, true)) != 0)
{
return result;
}
if ((result=init_pthread_lock_cond_pair(&channel->lc_pair)) != 0) {
return result;
}
FC_INIT_LIST_HEAD(&channel->dlink);
return fc_queue_init(&channel->queue, (long)
(&((IdempotencyClientReceipt *)NULL)->next));
}
int client_channel_init_ex(const int hint_capacity)
{
int result;
if ((result=fast_mblock_init_ex1(&channel_context.channel_allocator,
"channel_info", sizeof(IdempotencyClientChannel),
64, 0, idempotency_channel_alloc_init, NULL, true)) != 0)
{
return result;
}
if ((result=init_htable(&channel_context.htable, hint_capacity)) != 0) {
return result;
}
return 0;
}
void client_channel_destroy()
{
}
struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel,
const uint32_t hash_code, const char *server_ip, const short port,
int *err_no)
{
struct fast_task_info *task;
task = free_queue_pop();
if (task == NULL) {
logError("file: "__FILE__", line: %d, "
"malloc task buff failed, you should "
"increase the parameter: max_connections",
__LINE__);
*err_no = ENOMEM;
return NULL;
}
snprintf(task->server_ip, sizeof(task->server_ip), "%s", server_ip);
task->port = port;
task->canceled = false;
task->ctx = &g_sf_context;
task->event.fd = -1;
task->arg = channel;
task->thread_data = g_sf_context.thread_data +
hash_code % g_sf_context.work_threads;
channel->in_ioevent = 1;
channel->last_connect_time = get_current_time();
if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) {
channel->in_ioevent = 0;
free_queue_push(task);
return NULL;
}
return task;
}
int idempotency_client_channel_check_reconnect(
IdempotencyClientChannel *channel)
{
int result;
time_t current_time;
if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) {
return 0;
}
current_time = get_current_time();
if (channel->last_connect_time >= current_time) {
sleep(1);
channel->last_connect_time = ++current_time;
}
logDebug("file: "__FILE__", line: %d, "
"trigger connect to server %s:%d",
__LINE__, channel->task->server_ip,
channel->task->port);
channel->task->canceled = false;
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {
channel->last_connect_time = current_time;
} else {
__sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); //rollback
}
return result;
}
struct idempotency_client_channel *idempotency_client_channel_get(
const char *server_ip, const short server_port,
const int timeout, int *err_no)
{
int r;
int key_len;
bool found;
char key[64];
uint32_t hash_code;
IdempotencyClientChannel **bucket;
IdempotencyClientChannel *previous;
IdempotencyClientChannel *current;
IdempotencyClientChannel *channel;
key_len = snprintf(key, sizeof(key), "%s_%d", server_ip, server_port);
hash_code = simple_hash(key, key_len);
bucket = channel_context.htable.buckets +
hash_code % channel_context.htable.capacity;
previous = NULL;
channel = NULL;
*err_no = 0;
found = false;
PTHREAD_MUTEX_LOCK(&channel_context.htable.lock);
do {
current = *bucket;
while (current != NULL) {
r = conn_pool_compare_ip_and_port(current->task->server_ip,
current->task->port, server_ip, server_port);
if (r == 0) {
channel = current;
found = true;
break;
} else if (r > 0) {
break;
}
previous = current;
current = current->next;
}
if (found) {
break;
}
channel = (IdempotencyClientChannel *)fast_mblock_alloc_object(
&channel_context.channel_allocator);
if (channel == NULL) {
*err_no = ENOMEM;
break;
}
channel->task = alloc_channel_task(channel,
hash_code, server_ip, server_port, err_no);
if (channel->task == NULL) {
fast_mblock_free_object(&channel_context.
channel_allocator, channel);
channel = NULL;
break;
}
if (previous == NULL) {
channel->next = *bucket;
*bucket = channel;
} else {
channel->next = previous->next;
previous->next = channel;
}
channel_context.htable.count++;
} while (0);
PTHREAD_MUTEX_UNLOCK(&channel_context.htable.lock);
if (channel != NULL) {
if ((*err_no=idempotency_client_channel_check_wait_ex(
channel, timeout)) != 0)
{
return NULL;
}
}
return channel;
}
int idempotency_client_channel_push(struct idempotency_client_channel *channel,
const uint64_t req_id)
{
IdempotencyClientReceipt *receipt;
bool notify;
receipt = (IdempotencyClientReceipt *)fast_mblock_alloc_object(
&channel->receipt_allocator);
if (receipt == NULL) {
return ENOMEM;
}
receipt->req_id = req_id;
fc_queue_push_ex(&channel->queue, receipt, &notify);
if (notify) {
if (__sync_add_and_fetch(&channel->in_ioevent, 0)) {
if (__sync_add_and_fetch(&channel->established, 0)) {
sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE);
}
} else {
return idempotency_client_channel_check_reconnect(channel);
}
}
return 0;
}

View File

@ -0,0 +1,76 @@
//client_channel.h
#ifndef IDEMPOTENCY_CLIENT_CHANNEL_H
#define IDEMPOTENCY_CLIENT_CHANNEL_H
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/fc_atomic.h"
#include "client_types.h"
#ifdef __cplusplus
extern "C" {
#endif
#define client_channel_init() client_channel_init_ex(0)
int client_channel_init_ex(const int hint_capacity);
void client_channel_destroy();
struct idempotency_client_channel *idempotency_client_channel_get(
const char *server_ip, const short server_port,
const int timeout, int *err_no);
static inline uint64_t idempotency_client_channel_next_seq_id(
struct idempotency_client_channel *channel)
{
return __sync_add_and_fetch(&channel->next_req_id, 1);
}
int idempotency_client_channel_push(struct idempotency_client_channel *channel,
const uint64_t req_id);
int idempotency_client_channel_check_reconnect(
IdempotencyClientChannel *channel);
static inline void idempotency_client_channel_set_id_key(
IdempotencyClientChannel *channel, const uint32_t new_id,
const uint32_t new_key)
{
uint32_t old_id;
uint32_t old_key;
old_id = __sync_add_and_fetch(&channel->id, 0);
old_key = __sync_add_and_fetch(&channel->key, 0);
FC_ATOMIC_CAS(channel->id, old_id, new_id);
FC_ATOMIC_CAS(channel->key, old_key, new_key);
}
#define idempotency_client_channel_check_wait(channel) \
idempotency_client_channel_check_wait_ex(channel, 1)
static inline int idempotency_client_channel_check_wait_ex(
struct idempotency_client_channel *channel, const int timeout)
{
struct timespec ts;
if (__sync_add_and_fetch(&channel->established, 0)) {
return 0;
}
idempotency_client_channel_check_reconnect(channel);
PTHREAD_MUTEX_LOCK(&channel->lc_pair.lock);
ts.tv_sec = get_current_time() + timeout;
ts.tv_nsec = 0;
pthread_cond_timedwait(&channel->lc_pair.cond,
&channel->lc_pair.lock, &ts);
PTHREAD_MUTEX_UNLOCK(&channel->lc_pair.lock);
return __sync_add_and_fetch(&channel->established, 0) ? 0 : ETIMEDOUT;
}
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,45 @@
#ifndef _IDEMPOTENCY_CLIENT_TYPES_H
#define _IDEMPOTENCY_CLIENT_TYPES_H
#include "fastcommon/fast_task_queue.h"
#include "fastcommon/fast_mblock.h"
#include "fastcommon/fc_list.h"
#include "fastcommon/fc_queue.h"
typedef struct idempotency_client_receipt {
uint64_t req_id;
struct idempotency_client_receipt *next;
} IdempotencyClientReceipt;
typedef struct idempotency_client_channel {
volatile uint32_t id; //channel id, 0 for invalid
volatile int key; //channel key
volatile char in_ioevent;
volatile char in_heartbeat;
volatile char established;
time_t last_connect_time;
time_t last_pkg_time; //last communication time
pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify
volatile uint64_t next_req_id;
struct fast_mblock_man receipt_allocator;
struct fast_task_info *task;
struct fc_queue queue;
struct fc_queue_info waiting_resp_qinfo;
struct fc_list_head dlink; //LRU chain for heartbeat
struct idempotency_client_channel *next;
} IdempotencyClientChannel;
typedef struct idempotency_receipt_thread_context {
struct fc_list_head head; //LRU head for hearbeat
} IdempotencyReceiptThreadContext;
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,401 @@
//receipt_handler.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <fcntl.h>
#include "fastcommon/logger.h"
#include "fastcommon/sockopt.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/ioevent_loop.h"
#include "../../sf_util.h"
#include "../../sf_func.h"
#include "../../sf_nio.h"
#include "../../sf_global.h"
#include "../../sf_service.h"
#include "../../sf_proto.h"
#include "client_channel.h"
#include "receipt_handler.h"
static IdempotencyReceiptThreadContext *receipt_thread_contexts = NULL;
static int receipt_init_task(struct fast_task_info *task)
{
task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side
task->network_timeout = SF_G_NETWORK_TIMEOUT;
return 0;
}
static int receipt_recv_timeout_callback(struct fast_task_info *task)
{
IdempotencyClientChannel *channel;
if (SF_NIO_TASK_STAGE_FETCH(task) == SF_NIO_STAGE_CONNECT) {
logError("file: "__FILE__", line: %d, "
"connect to server %s:%d timeout",
__LINE__, task->server_ip, task->port);
return ETIMEDOUT;
}
channel = (IdempotencyClientChannel *)task->arg;
if (channel->waiting_resp_qinfo.head != NULL) {
logError("file: "__FILE__", line: %d, "
"waiting receipt response from server %s:%d timeout",
__LINE__, task->server_ip, task->port);
return ETIMEDOUT;
}
return 0;
}
static void receipt_task_finish_cleanup(struct fast_task_info *task)
{
IdempotencyClientChannel *channel;
if (task->event.fd >= 0) {
sf_task_detach_thread(task);
close(task->event.fd);
task->event.fd = -1;
}
channel = (IdempotencyClientChannel *)task->arg;
fc_list_del_init(&channel->dlink);
__sync_bool_compare_and_swap(&channel->established, 1, 0);
__sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0);
logDebug("file: "__FILE__", line: %d, "
"receipt task for server %s:%d exit",
__LINE__, task->server_ip, task->port);
}
static int setup_channel_request(struct fast_task_info *task)
{
IdempotencyClientChannel *channel;
SFCommonProtoHeader *header;
FSProtoSetupChannelReq *req;
channel = (IdempotencyClientChannel *)task->arg;
header = (SFCommonProtoHeader *)task->data;
req = (FSProtoSetupChannelReq *)(header + 1);
int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id);
int2buff(__sync_add_and_fetch(&channel->key, 0), req->key);
FS_PROTO_SET_HEADER(header, FS_SERVICE_PROTO_SETUP_CHANNEL_REQ,
sizeof(FSProtoSetupChannelReq));
task->length = sizeof(SFCommonProtoHeader) + sizeof(FSProtoSetupChannelReq);
return sf_send_add_event(task);
}
static int check_report_req_receipt(struct fast_task_info *task,
int *count)
{
IdempotencyClientChannel *channel;
SFCommonProtoHeader *header;
FSProtoReportReqReceiptHeader *rheader;
FSProtoReportReqReceiptBody *rbody;
FSProtoReportReqReceiptBody *rstart;
IdempotencyClientReceipt *last;
IdempotencyClientReceipt *receipt;
char *buff_end;
if (task->length > 0) {
*count = 0;
logWarning("file: "__FILE__", line: %d, "
"server %s:%d, task length: %d != 0, skip check "
"and report receipt request!", __LINE__,
task->server_ip, task->port, task->length);
return 0;
}
channel = (IdempotencyClientChannel *)task->arg;
if (channel->waiting_resp_qinfo.head != NULL) {
*count = 0;
return 0;
}
fc_queue_pop_to_queue(&channel->queue, &channel->waiting_resp_qinfo);
if (channel->waiting_resp_qinfo.head == NULL) {
*count = 0;
return 0;
}
header = (SFCommonProtoHeader *)task->data;
rheader = (FSProtoReportReqReceiptHeader *)(header + 1);
rbody = rstart = (FSProtoReportReqReceiptBody *)(rheader + 1);
buff_end = task->data + task->size;
last = NULL;
receipt = channel->waiting_resp_qinfo.head;
do {
//check buffer remain space
if (buff_end - (char *)rbody < sizeof(FSProtoReportReqReceiptBody)) {
break;
}
long2buff(receipt->req_id, rbody->req_id);
rbody++;
last = receipt;
receipt = receipt->next;
} while (receipt != NULL);
if (receipt != NULL) { //repush to queue
struct fc_queue_info qinfo;
bool notify;
qinfo.head = receipt;
qinfo.tail = channel->waiting_resp_qinfo.tail;
fc_queue_push_queue_to_head_ex(&channel->queue, &qinfo, &notify);
last->next = NULL;
channel->waiting_resp_qinfo.tail = last;
}
*count = rbody - rstart;
int2buff(*count, rheader->count);
task->length = (char *)rbody - task->data;
int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len);
header->cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ;
return sf_send_add_event(task);
}
static inline void update_lru_chain(struct fast_task_info *task)
{
IdempotencyReceiptThreadContext *thread_ctx;
IdempotencyClientChannel *channel;
thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg;
channel = (IdempotencyClientChannel *)task->arg;
channel->last_pkg_time = g_current_time;
fc_list_move_tail(&channel->dlink, &thread_ctx->head);
}
static int report_req_receipt_request(struct fast_task_info *task,
const bool update_lru)
{
int result;
int count;
if ((result=check_report_req_receipt(task, &count)) != 0) {
return result;
}
if (count == 0) {
result = sf_set_read_event(task);
} else if (update_lru) {
update_lru_chain(task);
}
return 0;
}
static inline int receipt_expect_body_length(struct fast_task_info *task,
const int expect_body_len)
{
if ((int)(task->length - sizeof(SFCommonProtoHeader)) != expect_body_len) {
logError("file: "__FILE__", line: %d, "
"server %s:%d, response body length: %d != %d",
__LINE__, task->server_ip, task->port, (int)(task->length -
sizeof(SFCommonProtoHeader)), expect_body_len);
return EINVAL;
}
return 0;
}
static int deal_setup_channel_response(struct fast_task_info *task)
{
int result;
IdempotencyReceiptThreadContext *thread_ctx;
FSProtoSetupChannelResp *resp;
IdempotencyClientChannel *channel;
int channel_id;
int channel_key;
if ((result=receipt_expect_body_length(task,
sizeof(FSProtoSetupChannelResp))) != 0)
{
return result;
}
channel = (IdempotencyClientChannel *)task->arg;
if (__sync_add_and_fetch(&channel->established, 0)) {
logWarning("file: "__FILE__", line: %d, "
"response from server %s:%d, unexpected cmd: "
"SETUP_CHANNEL_RESP, ignore it!",
__LINE__, task->server_ip, task->port);
return 0;
}
resp = (FSProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader));
channel_id = buff2int(resp->channel_id);
channel_key = buff2int(resp->key);
idempotency_client_channel_set_id_key(channel, channel_id, channel_key);
if (__sync_bool_compare_and_swap(&channel->established, 0, 1)) {
thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg;
fc_list_add_tail(&channel->dlink, &thread_ctx->head);
}
PTHREAD_MUTEX_LOCK(&channel->lc_pair.lock);
pthread_cond_broadcast(&channel->lc_pair.cond);
PTHREAD_MUTEX_UNLOCK(&channel->lc_pair.lock);
if (channel->waiting_resp_qinfo.head != NULL) {
bool notify;
fc_queue_push_queue_to_head_ex(&channel->queue,
&channel->waiting_resp_qinfo, &notify);
channel->waiting_resp_qinfo.head = NULL;
channel->waiting_resp_qinfo.tail = NULL;
}
return 0;
}
static inline int deal_report_req_receipt_response(struct fast_task_info *task)
{
int result;
IdempotencyClientChannel *channel;
IdempotencyClientReceipt *current;
IdempotencyClientReceipt *deleted;
if ((result=receipt_expect_body_length(task, 0)) != 0) {
return result;
}
channel = (IdempotencyClientChannel *)task->arg;
if (channel->waiting_resp_qinfo.head == NULL) {
logWarning("file: "__FILE__", line: %d, "
"response from server %s:%d, unexpect cmd: "
"REPORT_REQ_RECEIPT_RESP", __LINE__,
task->server_ip, task->port);
return 0;
}
current = channel->waiting_resp_qinfo.head;
do {
deleted = current;
current = current->next;
fast_mblock_free_object(&channel->receipt_allocator, deleted);
} while (current != NULL);
channel->waiting_resp_qinfo.head = NULL;
channel->waiting_resp_qinfo.tail = NULL;
return 0;
}
static int receipt_deal_task(struct fast_task_info *task)
{
int result;
int stage;
do {
stage = SF_NIO_TASK_STAGE_FETCH(task);
if (stage == SF_NIO_STAGE_HANDSHAKE) {
result = setup_channel_request(task);
break;
} else if (stage == SF_NIO_STAGE_CONTINUE) {
if (((IdempotencyClientChannel *)task->arg)->established) {
result = report_req_receipt_request(task, true);
} else {
result = 0; //just ignore
}
break;
}
result = buff2short(((SFCommonProtoHeader *)task->data)->status);
if (result != 0) {
int msg_len;
char *message;
msg_len = task->length - sizeof(SFCommonProtoHeader);
message = task->data + sizeof(SFCommonProtoHeader);
logError("file: "__FILE__", line: %d, "
"response from server %s:%d, cmd: %d (%s), "
"status: %d, error info: %.*s",
__LINE__, task->server_ip, task->port,
((SFCommonProtoHeader *)task->data)->cmd,
sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd),
result, msg_len, message);
break;
}
switch (((SFCommonProtoHeader *)task->data)->cmd) {
case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP:
result = deal_setup_channel_response(task);
break;
case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP:
result = deal_report_req_receipt_response(task);
break;
default:
logError("file: "__FILE__", line: %d, "
"response from server %s:%d, unexpect cmd: %d (%s)",
__LINE__, task->server_ip, task->port,
((SFCommonProtoHeader *)task->data)->cmd,
sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd));
result = EINVAL;
break;
}
if (result == 0) {
update_lru_chain(task);
task->offset = task->length = 0;
result = report_req_receipt_request(task, false);
}
} while (0);
return result > 0 ? -1 * result : result;
}
static int receipt_thread_loop_callback(struct nio_thread_data *thread_data)
{
IdempotencyClientChannel *channel;
IdempotencyClientChannel *tmp;
IdempotencyReceiptThreadContext *thread_ctx;
thread_ctx = (IdempotencyReceiptThreadContext *)thread_data->arg;
fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) {
//check heartbeat
//channel->task
}
return 0;
}
static void *receipt_alloc_thread_extra_data(const int thread_index)
{
IdempotencyReceiptThreadContext *ctx;
ctx = receipt_thread_contexts + thread_index;
FC_INIT_LIST_HEAD(&ctx->head);
return ctx;
}
int receipt_handler_init()
{
receipt_thread_contexts = (IdempotencyReceiptThreadContext *)fc_malloc(
sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS);
if (receipt_thread_contexts == NULL) {
return ENOMEM;
}
return sf_service_init_ex2(&g_sf_context,
receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
NULL, sf_proto_set_body_length, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task);
}
int receipt_handler_destroy()
{
return 0;
}

View File

@ -0,0 +1,19 @@
//receipt_handler.h
#ifndef _FS_IDEMPOTENCY_RECEIPT_HANDLER_H
#define _FS_IDEMPOTENCY_RECEIPT_HANDLER_H
#include "client_types.h"
#ifdef __cplusplus
extern "C" {
#endif
int receipt_handler_init();
int receipt_handler_destroy();
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,289 @@
#include <limits.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include "fastcommon/shared_func.h"
#include "fastcommon/logger.h"
#include "fastcommon/fast_mblock.h"
#include "fastcommon/sched_thread.h"
#include "sf/sf_global.h"
#include "channel_htable.h"
#include "channel.h"
typedef struct {
IdempotencyChannel **buckets;
uint32_t capacity;
uint32_t count;
pthread_mutex_t lock;
} ChannelHashtable;
typedef struct {
struct {
uint32_t max;
uint32_t current;
} channel_ids;
struct fast_mblock_man channel_allocator;
ChannelHashtable delay_free_htable; //for delay free
ChannelHTableContext htable_ctx;
struct {
uint32_t reserve_interval; //channel reserve interval in seconds
time_t last_check_time;
FastTimer timer;
} timeout_ctx;
} ChannelContext;
static ChannelContext channel_context;
static int init_htable(ChannelHashtable *htable, const int hint_capacity)
{
int result;
int bytes;
if ((result=init_pthread_lock(&htable->lock)) != 0) {
return result;
}
if (hint_capacity < 1024) {
htable->capacity = 1361;
} else {
htable->capacity = fc_ceil_prime(hint_capacity);
}
bytes = sizeof(IdempotencyChannel *) * htable->capacity;
htable->buckets = (IdempotencyChannel **)fc_malloc(bytes);
if (htable->buckets == NULL) {
return ENOMEM;
}
memset(htable->buckets, 0, bytes);
htable->count = 0;
return 0;
}
static int idempotency_channel_alloc_init(void *element, void *args)
{
IdempotencyChannel *channel;
channel = (IdempotencyChannel *)element;
channel->id = ++channel_context.channel_ids.current;
channel->request_htable.buckets = (IdempotencyRequest **)(channel + 1);
return init_pthread_lock(&channel->request_htable.lock);
}
int idempotency_channel_init(const uint32_t max_channel_id,
const int request_hint_capacity,
const uint32_t reserve_interval,
const uint32_t shared_lock_count)
{
int result;
int request_htable_capacity;
int element_size;
request_htable_capacity = fc_ceil_prime(request_hint_capacity);
idempotency_request_init(request_htable_capacity);
element_size = sizeof(IdempotencyChannel) + sizeof(IdempotencyRequest *) *
request_htable_capacity;
if ((result=fast_mblock_init_ex1(&channel_context.channel_allocator,
"channel_info", element_size, 1024, max_channel_id,
idempotency_channel_alloc_init, NULL, true)) != 0)
{
return result;
}
if ((result=fast_timer_init(&channel_context.timeout_ctx.timer,
2 * reserve_interval + 1,
get_current_time())) != 0)
{
return result;
}
channel_context.channel_ids.max = max_channel_id;
channel_context.channel_ids.current = 0;
channel_context.timeout_ctx.last_check_time = get_current_time();
channel_context.timeout_ctx.reserve_interval = reserve_interval;
if ((result=init_htable(&channel_context.delay_free_htable,
max_channel_id / 100)) != 0)
{
return result;
}
return idempotency_channel_htable_init(&channel_context.htable_ctx,
shared_lock_count, max_channel_id / 10);
}
static void add_to_delay_free_htable(IdempotencyChannel *channel)
{
IdempotencyChannel **bucket;
bucket = channel_context.delay_free_htable.buckets + channel->id %
channel_context.delay_free_htable.capacity;
PTHREAD_MUTEX_LOCK(&channel_context.delay_free_htable.lock);
channel->next = *bucket;
*bucket = channel;
channel_context.delay_free_htable.count++;
fast_timer_add(&channel_context.timeout_ctx.timer, &channel->timer);
PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock);
}
IdempotencyChannel *idempotency_channel_find_and_hold(
const uint32_t channel_id, const int key, int *result)
{
IdempotencyChannel *channel;
if ((channel=idempotency_channel_htable_find(&channel_context.
htable_ctx, channel_id)) == NULL)
{
*result = ENOENT;
return NULL;
}
if (channel->key != key) {
*result = EPERM;
return NULL;
}
*result = 0;
__sync_add_and_fetch(&channel->ref_count, 1);
return channel;
}
static IdempotencyChannel *htable_remove(const uint32_t channel_id,
const bool need_lock, const bool remove_timer)
{
IdempotencyChannel **bucket;
IdempotencyChannel *previous;
IdempotencyChannel *channel;
bucket = channel_context.delay_free_htable.buckets + channel_id %
channel_context.delay_free_htable.capacity;
previous = NULL;
if (need_lock) {
PTHREAD_MUTEX_LOCK(&channel_context.delay_free_htable.lock);
}
channel = *bucket;
while (channel != NULL) {
if (channel->id == channel_id) {
if (previous == NULL) {
*bucket = channel->next;
} else {
previous->next = channel->next;
}
channel_context.delay_free_htable.count--;
if (remove_timer) {
fast_timer_remove(&channel_context.timeout_ctx.timer,
&channel->timer);
}
break;
}
previous = channel;
channel = channel->next;
}
if (need_lock) {
PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock);
}
return channel;
}
static void do_free_channel(IdempotencyChannel *channel)
{
idempotency_request_htable_clear(&channel->request_htable);
fast_mblock_free_object(&channel_context.channel_allocator, channel);
}
static void recycle_timeout_entries()
{
uint32_t channel_id;
FastTimerEntry head;
FastTimerEntry *entry;
IdempotencyChannel *channel;
PTHREAD_MUTEX_LOCK(&channel_context.delay_free_htable.lock);
if (g_current_time - channel_context.timeout_ctx.last_check_time <= 10) {
PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock);
return;
}
channel_context.timeout_ctx.last_check_time = g_current_time;
fast_timer_timeouts_get(&channel_context.timeout_ctx.timer,
g_current_time, &head);
entry = head.next;
while (entry != NULL) {
channel_id = ((IdempotencyChannel *)entry)->id;
entry = entry->next;
if ((channel=htable_remove(channel_id, false, false)) != NULL) {
do_free_channel(channel);
}
}
PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock);
}
IdempotencyChannel *idempotency_channel_alloc(const uint32_t channel_id,
const int key)
{
IdempotencyChannel *channel;
do {
if (channel_id != 0) {
if ((channel=htable_remove(channel_id, true, true)) != NULL) {
if (channel->key == key) {
break;
} else {
add_to_delay_free_htable(channel);
}
}
}
if (channel_context.delay_free_htable.count > 0 && (g_current_time -
channel_context.timeout_ctx.last_check_time) > 10)
{
recycle_timeout_entries();
}
if ((channel=(IdempotencyChannel *)fast_mblock_alloc_object(
&channel_context.channel_allocator)) == NULL)
{
return NULL;
}
channel->key = rand();
} while (0);
__sync_bool_compare_and_swap(&channel->is_valid, 0, 1);
__sync_add_and_fetch(&channel->ref_count, 1);
idempotency_channel_htable_add(&channel_context.
htable_ctx, channel);
return channel;
}
void idempotency_channel_release(IdempotencyChannel *channel,
const bool is_holder)
{
if (is_holder) {
channel->timer.expires = g_current_time +
channel_context.timeout_ctx.reserve_interval;
__sync_bool_compare_and_swap(&channel->is_valid, 1, 0);
}
if (__sync_sub_and_fetch(&channel->ref_count, 1) == 0) {
if (channel->timer.expires <= g_current_time) { //expired
do_free_channel(channel);
} else {
add_to_delay_free_htable(channel);
}
}
}
void idempotency_channel_free(IdempotencyChannel *channel)
{
__sync_bool_compare_and_swap(&channel->is_valid, 1, 0);
if (__sync_sub_and_fetch(&channel->ref_count, 1) == 0) {
do_free_channel(channel);
} else {
channel->timer.expires = g_current_time +
channel_context.timeout_ctx.reserve_interval;
}
}

View File

@ -0,0 +1,46 @@
#ifndef _SF_IDEMPOTENCY_SERVER_CHANNEL_H
#define _SF_IDEMPOTENCY_SERVER_CHANNEL_H
#include "fastcommon/fast_timer.h"
#include "request_htable.h"
#ifdef __cplusplus
extern "C" {
#endif
int idempotency_channel_init(const uint32_t max_channel_id,
const int request_hint_capacity,
const uint32_t reserve_interval,
const uint32_t shared_lock_count);
IdempotencyChannel *idempotency_channel_alloc(const uint32_t channel_id,
const int key);
void idempotency_channel_release(IdempotencyChannel *channel,
const bool is_holder);
IdempotencyChannel *idempotency_channel_find_and_hold(
const uint32_t channel_id, const int key, int *result);
void idempotency_channel_free(IdempotencyChannel *channel);
static inline int idempotency_channel_add_request(IdempotencyChannel *
channel, IdempotencyRequest *request)
{
return idempotency_request_htable_add(
&channel->request_htable, request);
}
static inline int idempotency_channel_remove_request(
IdempotencyChannel *channel, const uint64_t req_id)
{
return idempotency_request_htable_remove(
&channel->request_htable, req_id);
}
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,149 @@
#include <limits.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include "fastcommon/shared_func.h"
#include "fastcommon/logger.h"
#include "fastcommon/fast_mblock.h"
#include "fastcommon/sched_thread.h"
#include "sf/sf_global.h"
#include "channel_htable.h"
int idempotency_channel_htable_init(ChannelHTableContext *ctx,
const uint32_t shared_lock_count, const uint32_t hint_capacity)
{
int result;
int64_t bytes;
pthread_mutex_t *lock;
pthread_mutex_t *end;
ctx->shared_locks.count = fc_ceil_prime(shared_lock_count);
ctx->htable.capacity = fc_ceil_prime(hint_capacity);
bytes = sizeof(pthread_mutex_t) * ctx->shared_locks.count;
ctx->shared_locks.locks = (pthread_mutex_t *)fc_malloc(bytes);
if (ctx->shared_locks.locks == NULL) {
return ENOMEM;
}
end = ctx->shared_locks.locks + ctx->shared_locks.count;
for (lock=ctx->shared_locks.locks; lock<end; lock++) {
if ((result=init_pthread_lock(lock)) != 0) {
return result;
}
}
bytes = sizeof(IdempotencyChannel *) * ctx->htable.capacity;
ctx->htable.buckets = (IdempotencyChannel **)fc_malloc(bytes);
if (ctx->htable.buckets == NULL) {
return ENOMEM;
}
memset(ctx->htable.buckets, 0, bytes);
ctx->htable.count = 0;
return 0;
}
int idempotency_channel_htable_add(ChannelHTableContext *ctx,
IdempotencyChannel *channel)
{
int result;
pthread_mutex_t *lock;
IdempotencyChannel **bucket;
IdempotencyChannel *previous;
IdempotencyChannel *current;
lock = ctx->shared_locks.locks + channel->id % ctx->shared_locks.count;
bucket = ctx->htable.buckets + channel->id % ctx->htable.capacity;
previous = NULL;
result = 0;
PTHREAD_MUTEX_LOCK(lock);
current = *bucket;
while (current != NULL) {
if (current->id == channel->id) {
result = EEXIST;
break;
} else if (current->id > channel->id) {
break;
}
previous = current;
current = current->next;
}
if (result == 0) {
if (previous == NULL) {
channel->next = *bucket;
*bucket = channel;
} else {
channel->next = previous->next;
previous->next = channel;
}
ctx->htable.count++;
}
PTHREAD_MUTEX_UNLOCK(lock);
return result;
}
IdempotencyChannel *idempotency_channel_htable_remove(
ChannelHTableContext *ctx, const uint32_t channel_id)
{
pthread_mutex_t *lock;
IdempotencyChannel **bucket;
IdempotencyChannel *previous;
IdempotencyChannel *current;
lock = ctx->shared_locks.locks + channel_id % ctx->shared_locks.count;
bucket = ctx->htable.buckets + channel_id % ctx->htable.capacity;
previous = NULL;
PTHREAD_MUTEX_LOCK(lock);
current = *bucket;
while (current != NULL) {
if (current->id == channel_id) {
if (previous == NULL) {
*bucket = current->next;
} else {
previous->next = current->next;
}
ctx->htable.count--;
break;
} else if (current->id > channel_id) {
current = NULL;
break;
}
previous = current;
current = current->next;
}
PTHREAD_MUTEX_UNLOCK(lock);
return current;
}
IdempotencyChannel *idempotency_channel_htable_find(
ChannelHTableContext *ctx, const uint32_t channel_id)
{
pthread_mutex_t *lock;
IdempotencyChannel **bucket;
IdempotencyChannel *current;
lock = ctx->shared_locks.locks + channel_id % ctx->shared_locks.count;
bucket = ctx->htable.buckets + channel_id % ctx->htable.capacity;
PTHREAD_MUTEX_LOCK(lock);
current = *bucket;
while (current != NULL) {
if (current->id == channel_id) {
break;
} else if (current->id > channel_id) {
current = NULL;
break;
}
current = current->next;
}
PTHREAD_MUTEX_UNLOCK(lock);
return current;
}

View File

@ -0,0 +1,43 @@
#ifndef _SF_IDEMPOTENCY_CHANNEL_HTABLE_H
#define _SF_IDEMPOTENCY_CHANNEL_HTABLE_H
#include "idempotency_types.h"
typedef struct channel_shared_locks {
pthread_mutex_t *locks;
uint32_t count;
} ChannelSharedLocks;
typedef struct idempotency_channel_htable {
IdempotencyChannel **buckets;
uint32_t capacity;
uint32_t count;
} IdempotencyChannelHTable;
typedef struct channel_htable_context {
ChannelSharedLocks shared_locks;
IdempotencyChannelHTable htable;
} ChannelHTableContext;
#ifdef __cplusplus
extern "C" {
#endif
int idempotency_channel_htable_init(ChannelHTableContext *ctx,
const uint32_t shared_lock_count, const uint32_t hint_capacity);
int idempotency_channel_htable_add(ChannelHTableContext *ctx,
IdempotencyChannel *channel);
IdempotencyChannel *idempotency_channel_htable_remove(
ChannelHTableContext *ctx, const uint32_t channel_id);
IdempotencyChannel *idempotency_channel_htable_find(
ChannelHTableContext *ctx, const uint32_t channel_id);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,44 @@
#ifndef _IDEMPOTENCY_SERVER_TYPES_H
#define _IDEMPOTENCY_SERVER_TYPES_H
#include "fastcommon/fast_mblock.h"
#include "fastcommon/fast_timer.h"
typedef struct idempotency_request {
uint64_t req_id;
volatile int ref_count;
bool finished;
struct {
int result;
int inc_alloc;
} output;
struct fast_mblock_man *allocator; //for free
struct idempotency_request *next;
} IdempotencyRequest;
typedef struct idempotency_request_htable {
IdempotencyRequest **buckets;
int count;
pthread_mutex_t lock;
} IdempotencyRequestHTable;
typedef struct idempotency_channel {
FastTimerEntry timer; //must be the first
uint32_t id;
int key; //for retrieve validation
volatile int ref_count;
volatile char is_valid;
IdempotencyRequestHTable request_htable;
struct idempotency_channel *next;
} IdempotencyChannel;
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,159 @@
#include <limits.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include "fastcommon/shared_func.h"
#include "fastcommon/logger.h"
#include "fastcommon/fast_mblock.h"
#include "fastcommon/sched_thread.h"
#include "sf/sf_global.h"
#include "request_htable.h"
typedef struct idempotency_request_context {
uint32_t htable_capacity;
} IdempotencyRequestContext;
static IdempotencyRequestContext request_ctx;
void idempotency_request_init(const uint32_t htable_capacity)
{
request_ctx.htable_capacity = htable_capacity;
}
int idempotency_request_htable_add(IdempotencyRequestHTable *htable,
IdempotencyRequest *request)
{
int result;
IdempotencyRequest **bucket;
IdempotencyRequest *previous;
IdempotencyRequest *current;
bucket = htable->buckets + request->req_id % request_ctx.htable_capacity;
previous = NULL;
result = 0;
PTHREAD_MUTEX_LOCK(&htable->lock);
current = *bucket;
while (current != NULL) {
if (current->req_id == request->req_id) {
request->output = current->output;
request->finished = current->finished;
result = EEXIST;
break;
} else if (current->req_id > request->req_id) {
break;
}
previous = current;
current = current->next;
}
if (result == 0) {
if (previous == NULL) {
request->next = *bucket;
*bucket = request;
} else {
request->next = previous->next;
previous->next = request;
}
htable->count++;
}
PTHREAD_MUTEX_UNLOCK(&htable->lock);
if (result == 0) {
__sync_add_and_fetch(&request->ref_count, 2);
}
return result;
}
int idempotency_request_htable_remove(IdempotencyRequestHTable *htable,
const uint64_t req_id)
{
IdempotencyRequest **bucket;
IdempotencyRequest *previous;
IdempotencyRequest *current;
bucket = htable->buckets + req_id % request_ctx.htable_capacity;
previous = NULL;
PTHREAD_MUTEX_LOCK(&htable->lock);
current = *bucket;
while (current != NULL) {
if (current->req_id == req_id) {
if (previous == NULL) {
*bucket = current->next;
} else {
previous->next = current->next;
}
htable->count--;
break;
} else if (current->req_id > req_id) {
current = NULL;
break;
}
previous = current;
current = current->next;
}
PTHREAD_MUTEX_UNLOCK(&htable->lock);
if (current != NULL) {
idempotency_request_release(current);
return 0;
} else {
return ENOENT;
}
}
void idempotency_request_htable_clear(IdempotencyRequestHTable *htable)
{
IdempotencyRequest **bucket;
IdempotencyRequest **end;
IdempotencyRequest *head;
IdempotencyRequest *previous;
IdempotencyRequest *current;
IdempotencyRequest *deleted;
head = NULL;
PTHREAD_MUTEX_LOCK(&htable->lock);
do {
if (htable->count == 0) {
break;
}
previous = NULL;
end = htable->buckets + request_ctx.htable_capacity;
for (bucket=htable->buckets; bucket<end; bucket++) {
if (*bucket == NULL) {
continue;
}
current = *bucket;
do {
if (previous == NULL) {
head = current;
} else {
previous->next = current;
}
previous = current;
current = current->next;
} while (current != NULL);
*bucket = NULL;
}
if (previous != NULL) {
previous->next = NULL;
}
htable->count = 0;
} while (0);
PTHREAD_MUTEX_UNLOCK(&htable->lock);
while (head != NULL) {
deleted = head;
head = head->next;
idempotency_request_release(deleted);
}
}

View File

@ -0,0 +1,32 @@
#ifndef _SF_IDEMPOTENCY_REQUEST_HTABLE_H
#define _SF_IDEMPOTENCY_REQUEST_HTABLE_H
#include "idempotency_types.h"
#ifdef __cplusplus
extern "C" {
#endif
void idempotency_request_init(const uint32_t hint_capacity);
int idempotency_request_htable_add(IdempotencyRequestHTable
*htable, IdempotencyRequest *request);
int idempotency_request_htable_remove(IdempotencyRequestHTable *htable,
const uint64_t req_id);
void idempotency_request_htable_clear(IdempotencyRequestHTable *htable);
static inline void idempotency_request_release(IdempotencyRequest *request)
{
if (__sync_sub_and_fetch(&request->ref_count, 1) == 0) {
fast_mblock_free_object(request->allocator, request);
}
}
#ifdef __cplusplus
}
#endif
#endif

42
src/sf_proto.c Normal file
View File

@ -0,0 +1,42 @@
#include <errno.h>
#include "fastcommon/shared_func.h"
#include "sf_proto.h"
int sf_proto_set_body_length(struct fast_task_info *task)
{
SFCommonProtoHeader *header;
header = (SFCommonProtoHeader *)task->data;
if (!FS_PROTO_CHECK_MAGIC(header->magic)) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, magic "FS_PROTO_MAGIC_FORMAT
" is invalid, expect: "FS_PROTO_MAGIC_FORMAT,
__LINE__, task->client_ip,
FS_PROTO_MAGIC_PARAMS(header->magic),
FS_PROTO_MAGIC_EXPECT_PARAMS);
return EINVAL;
}
task->length = buff2int(header->body_len); //set body length
return 0;
}
const char *sf_get_cmd_caption(const int cmd)
{
switch (cmd) {
case FS_SERVICE_PROTO_SETUP_CHANNEL_REQ:
return "SETUP_CHANNEL_REQ";
case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP:
return "SETUP_CHANNEL_RESP";
case FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ:
return "CLOSE_CHANNEL_REQ";
case FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP:
return "CLOSE_CHANNEL_RESP";
case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ:
return "REPORT_REQ_RECEIPT_REQ";
case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP:
return "REPORT_REQ_RECEIPT_RESP";
default:
return "UNKOWN";
}
}

113
src/sf_proto.h Normal file
View File

@ -0,0 +1,113 @@
//sf_proto.h
#ifndef _FS_IDEMPOTENCY_PROTO_H
#define _FS_IDEMPOTENCY_PROTO_H
#include "fastcommon/fast_task_queue.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/logger.h"
#include "fastcommon/connection_pool.h"
#include "fastcommon/sockopt.h"
#include "sf_types.h"
//for request idempotency
#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 51
#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 52
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 53
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 54
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 55
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 56
#define FS_PROTO_MAGIC_CHAR '@'
#define FS_PROTO_SET_MAGIC(m) \
m[0] = m[1] = m[2] = m[3] = FS_PROTO_MAGIC_CHAR
#define FS_PROTO_CHECK_MAGIC(m) \
(m[0] == FS_PROTO_MAGIC_CHAR && m[1] == FS_PROTO_MAGIC_CHAR && \
m[2] == FS_PROTO_MAGIC_CHAR && m[3] == FS_PROTO_MAGIC_CHAR)
#define FS_PROTO_MAGIC_FORMAT "0x%02X%02X%02X%02X"
#define FS_PROTO_MAGIC_EXPECT_PARAMS \
FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR, \
FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR
#define FS_PROTO_MAGIC_PARAMS(m) \
m[0], m[1], m[2], m[3]
#define FS_PROTO_SET_HEADER(header, _cmd, _body_len) \
do { \
FS_PROTO_SET_MAGIC((header)->magic); \
(header)->cmd = _cmd; \
(header)->status[0] = (header)->status[1] = 0; \
int2buff(_body_len, (header)->body_len); \
} while (0)
#define FS_PROTO_SET_RESPONSE_HEADER(proto_header, resp_header) \
do { \
(proto_header)->cmd = (resp_header).cmd; \
short2buff((resp_header).status, (proto_header)->status); \
int2buff((resp_header).body_len, (proto_header)->body_len);\
} while (0)
typedef struct sf_common_proto_header {
unsigned char magic[4]; //magic number
char body_len[4]; //body length
char status[2]; //status to store errno
char flags[2];
unsigned char cmd; //the command code
char padding[3];
} SFCommonProtoHeader;
typedef struct fs_proto_setup_channel_req {
char channel_id[4]; //for hint
char key[4]; //for validate when channel_id > 0
} FSProtoSetupChannelReq;
typedef struct fs_proto_setup_channel_resp {
char channel_id[4];
char key[4];
} FSProtoSetupChannelResp;
typedef struct fs_proto_report_req_receipt_header {
char count[4];
char padding[4];
} FSProtoReportReqReceiptHeader;
typedef struct fs_proto_report_req_receipt_body {
char req_id[8];
} FSProtoReportReqReceiptBody;
#ifdef __cplusplus
extern "C" {
#endif
int sf_proto_set_body_length(struct fast_task_info *task);
const char *sf_get_cmd_caption(const int cmd);
static inline void sf_log_network_error_ex(SFResponseInfo *response,
const ConnectionInfo *conn, const int result, const int line)
{
if (response->error.length > 0) {
logError("file: "__FILE__", line: %d, "
"server %s:%d, %s", line,
conn->ip_addr, conn->port,
response->error.message);
} else {
logError("file: "__FILE__", line: %d, "
"communicate with server %s:%d fail, "
"errno: %d, error info: %s", line,
conn->ip_addr, conn->port,
result, STRERROR(result));
}
}
#define sf_log_network_error(response, conn, result) \
sf_log_network_error_ex(response, conn, result, __LINE__)
#ifdef __cplusplus
}
#endif
#endif

View File

@ -11,6 +11,8 @@
#include "fastcommon/connection_pool.h"
#include "fastcommon/fast_task_queue.h"
#define FS_ERROR_INFO_SIZE 256
typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask,
const bool bInnerPort);
typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask);
@ -41,5 +43,24 @@ typedef struct sf_context {
sf_recv_timeout_callback timeout_callback;
} SFContext;
#endif
typedef struct {
int body_len; //body length
short flags;
short status;
unsigned char cmd; //command
} SFHeaderInfo;
typedef struct {
SFHeaderInfo header;
char *body;
} SFRequestInfo;
typedef struct {
SFHeaderInfo header;
struct {
int length;
char message[FS_ERROR_INFO_SIZE];
} error;
} SFResponseInfo;
#endif