diff --git a/src/Makefile.in b/src/Makefile.in index d7386f1..28f7af0 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -8,12 +8,20 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION) ALL_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ sf_func.h sf_util.h sf_configs.h +SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \ + sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \ + idempotency/server/channel.lo \ + idempotency/server/request_htable.lo \ + idempotency/server/channel_htable.lo \ + idempotency/client/receipt_handler.lo \ + idempotency/client/client_channel.lo + +ALL_OBJS = $(SHARED_OBJS) ALL_LIBS = libserverframe.so all: $(ALL_LIBS) -libserverframe.so: sf_nio.lo sf_service.lo sf_global.lo sf_func.lo sf_util.lo \ - sf_configs.lo +libserverframe.so: $(SHARED_OBJS) cc -shared -o $@ $^ $(LIB_PATH) @@ -33,5 +41,5 @@ install: if [ ! -e $(TARGET_PREFIX)/lib/libserverframe.so ]; then ln -s $(TARGET_LIB)/libserverframe.so $(TARGET_PREFIX)/lib/libserverframe.so; fi clean: - rm -f *.lo $(ALL_LIBS) $(ALL_PRGS) + rm -f $(ALL_OBJS) $(ALL_LIBS) $(ALL_PRGS) diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c new file mode 100644 index 0000000..4564c17 --- /dev/null +++ b/src/idempotency/client/client_channel.c @@ -0,0 +1,280 @@ +//client_channel.c + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/logger.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/pthread_func.h" +#include "fastcommon/sched_thread.h" +#include "fastcommon/fc_queue.h" +#include "sf/sf_util.h" +#include "sf/sf_func.h" +#include "sf/sf_nio.h" +#include "sf/sf_global.h" +#include "sf/sf_service.h" +#include "client_channel.h" + +typedef struct { + IdempotencyClientChannel **buckets; + uint32_t capacity; + uint32_t count; + pthread_mutex_t lock; +} ClientChannelHashtable; + +typedef struct { + struct fast_mblock_man channel_allocator; + ClientChannelHashtable htable; +} ClientChannelContext; + +static ClientChannelContext channel_context; + +static int init_htable(ClientChannelHashtable *htable, const int hint_capacity) +{ + int result; + int bytes; + + if ((result=init_pthread_lock(&htable->lock)) != 0) { + return result; + } + + if (hint_capacity <= 1024) { + htable->capacity = 1361; + } else { + htable->capacity = fc_ceil_prime(hint_capacity); + } + bytes = sizeof(IdempotencyClientChannel *) * htable->capacity; + htable->buckets = (IdempotencyClientChannel **)fc_malloc(bytes); + if (htable->buckets == NULL) { + return ENOMEM; + } + memset(htable->buckets, 0, bytes); + htable->count = 0; + + return 0; +} + +static int idempotency_channel_alloc_init(void *element, void *args) +{ + int result; + IdempotencyClientChannel *channel; + + channel = (IdempotencyClientChannel *)element; + if ((result=fast_mblock_init_ex1(&channel->receipt_allocator, + "idempotency_receipt", sizeof(IdempotencyClientReceipt), + 1024, 0, NULL, NULL, true)) != 0) + { + return result; + } + + if ((result=init_pthread_lock_cond_pair(&channel->lc_pair)) != 0) { + return result; + } + + FC_INIT_LIST_HEAD(&channel->dlink); + return fc_queue_init(&channel->queue, (long) + (&((IdempotencyClientReceipt *)NULL)->next)); +} + +int client_channel_init_ex(const int hint_capacity) +{ + int result; + if ((result=fast_mblock_init_ex1(&channel_context.channel_allocator, + "channel_info", sizeof(IdempotencyClientChannel), + 64, 0, idempotency_channel_alloc_init, NULL, true)) != 0) + { + return result; + } + + if ((result=init_htable(&channel_context.htable, hint_capacity)) != 0) { + return result; + } + + return 0; +} + +void client_channel_destroy() +{ +} + +struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, + const uint32_t hash_code, const char *server_ip, const short port, + int *err_no) +{ + struct fast_task_info *task; + + task = free_queue_pop(); + if (task == NULL) { + logError("file: "__FILE__", line: %d, " + "malloc task buff failed, you should " + "increase the parameter: max_connections", + __LINE__); + *err_no = ENOMEM; + return NULL; + } + + snprintf(task->server_ip, sizeof(task->server_ip), "%s", server_ip); + task->port = port; + task->canceled = false; + task->ctx = &g_sf_context; + task->event.fd = -1; + task->arg = channel; + task->thread_data = g_sf_context.thread_data + + hash_code % g_sf_context.work_threads; + channel->in_ioevent = 1; + channel->last_connect_time = get_current_time(); + if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { + channel->in_ioevent = 0; + free_queue_push(task); + return NULL; + } + return task; +} + +int idempotency_client_channel_check_reconnect( + IdempotencyClientChannel *channel) +{ + int result; + time_t current_time; + + if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { + return 0; + } + + current_time = get_current_time(); + if (channel->last_connect_time >= current_time) { + sleep(1); + channel->last_connect_time = ++current_time; + } + + logDebug("file: "__FILE__", line: %d, " + "trigger connect to server %s:%d", + __LINE__, channel->task->server_ip, + channel->task->port); + + channel->task->canceled = false; + if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { + channel->last_connect_time = current_time; + } else { + __sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); //rollback + } + return result; +} + +struct idempotency_client_channel *idempotency_client_channel_get( + const char *server_ip, const short server_port, + const int timeout, int *err_no) +{ + int r; + int key_len; + bool found; + char key[64]; + uint32_t hash_code; + IdempotencyClientChannel **bucket; + IdempotencyClientChannel *previous; + IdempotencyClientChannel *current; + IdempotencyClientChannel *channel; + + key_len = snprintf(key, sizeof(key), "%s_%d", server_ip, server_port); + hash_code = simple_hash(key, key_len); + bucket = channel_context.htable.buckets + + hash_code % channel_context.htable.capacity; + previous = NULL; + channel = NULL; + *err_no = 0; + found = false; + + PTHREAD_MUTEX_LOCK(&channel_context.htable.lock); + do { + current = *bucket; + while (current != NULL) { + r = conn_pool_compare_ip_and_port(current->task->server_ip, + current->task->port, server_ip, server_port); + if (r == 0) { + channel = current; + found = true; + break; + } else if (r > 0) { + break; + } + + previous = current; + current = current->next; + } + + if (found) { + break; + } + + channel = (IdempotencyClientChannel *)fast_mblock_alloc_object( + &channel_context.channel_allocator); + if (channel == NULL) { + *err_no = ENOMEM; + break; + } + + channel->task = alloc_channel_task(channel, + hash_code, server_ip, server_port, err_no); + if (channel->task == NULL) { + fast_mblock_free_object(&channel_context. + channel_allocator, channel); + channel = NULL; + break; + } + + if (previous == NULL) { + channel->next = *bucket; + *bucket = channel; + } else { + channel->next = previous->next; + previous->next = channel; + } + channel_context.htable.count++; + } while (0); + PTHREAD_MUTEX_UNLOCK(&channel_context.htable.lock); + + if (channel != NULL) { + if ((*err_no=idempotency_client_channel_check_wait_ex( + channel, timeout)) != 0) + { + return NULL; + } + } + + return channel; +} + +int idempotency_client_channel_push(struct idempotency_client_channel *channel, + const uint64_t req_id) +{ + IdempotencyClientReceipt *receipt; + bool notify; + + receipt = (IdempotencyClientReceipt *)fast_mblock_alloc_object( + &channel->receipt_allocator); + if (receipt == NULL) { + return ENOMEM; + } + + receipt->req_id = req_id; + fc_queue_push_ex(&channel->queue, receipt, ¬ify); + if (notify) { + if (__sync_add_and_fetch(&channel->in_ioevent, 0)) { + if (__sync_add_and_fetch(&channel->established, 0)) { + sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE); + } + } else { + return idempotency_client_channel_check_reconnect(channel); + } + } + + return 0; +} diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h new file mode 100644 index 0000000..b10c8f2 --- /dev/null +++ b/src/idempotency/client/client_channel.h @@ -0,0 +1,76 @@ +//client_channel.h + +#ifndef IDEMPOTENCY_CLIENT_CHANNEL_H +#define IDEMPOTENCY_CLIENT_CHANNEL_H + +#include "fastcommon/pthread_func.h" +#include "fastcommon/sched_thread.h" +#include "fastcommon/fc_atomic.h" +#include "client_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define client_channel_init() client_channel_init_ex(0) + +int client_channel_init_ex(const int hint_capacity); +void client_channel_destroy(); + +struct idempotency_client_channel *idempotency_client_channel_get( + const char *server_ip, const short server_port, + const int timeout, int *err_no); + +static inline uint64_t idempotency_client_channel_next_seq_id( + struct idempotency_client_channel *channel) +{ + return __sync_add_and_fetch(&channel->next_req_id, 1); +} + +int idempotency_client_channel_push(struct idempotency_client_channel *channel, + const uint64_t req_id); + +int idempotency_client_channel_check_reconnect( + IdempotencyClientChannel *channel); + +static inline void idempotency_client_channel_set_id_key( + IdempotencyClientChannel *channel, const uint32_t new_id, + const uint32_t new_key) +{ + uint32_t old_id; + uint32_t old_key; + + old_id = __sync_add_and_fetch(&channel->id, 0); + old_key = __sync_add_and_fetch(&channel->key, 0); + FC_ATOMIC_CAS(channel->id, old_id, new_id); + FC_ATOMIC_CAS(channel->key, old_key, new_key); +} + +#define idempotency_client_channel_check_wait(channel) \ + idempotency_client_channel_check_wait_ex(channel, 1) + +static inline int idempotency_client_channel_check_wait_ex( + struct idempotency_client_channel *channel, const int timeout) +{ + struct timespec ts; + + if (__sync_add_and_fetch(&channel->established, 0)) { + return 0; + } + + idempotency_client_channel_check_reconnect(channel); + PTHREAD_MUTEX_LOCK(&channel->lc_pair.lock); + ts.tv_sec = get_current_time() + timeout; + ts.tv_nsec = 0; + pthread_cond_timedwait(&channel->lc_pair.cond, + &channel->lc_pair.lock, &ts); + PTHREAD_MUTEX_UNLOCK(&channel->lc_pair.lock); + + return __sync_add_and_fetch(&channel->established, 0) ? 0 : ETIMEDOUT; +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h new file mode 100644 index 0000000..58606dc --- /dev/null +++ b/src/idempotency/client/client_types.h @@ -0,0 +1,45 @@ + +#ifndef _IDEMPOTENCY_CLIENT_TYPES_H +#define _IDEMPOTENCY_CLIENT_TYPES_H + +#include "fastcommon/fast_task_queue.h" +#include "fastcommon/fast_mblock.h" +#include "fastcommon/fc_list.h" +#include "fastcommon/fc_queue.h" + +typedef struct idempotency_client_receipt { + uint64_t req_id; + struct idempotency_client_receipt *next; +} IdempotencyClientReceipt; + +typedef struct idempotency_client_channel { + volatile uint32_t id; //channel id, 0 for invalid + volatile int key; //channel key + volatile char in_ioevent; + volatile char in_heartbeat; + volatile char established; + time_t last_connect_time; + time_t last_pkg_time; //last communication time + pthread_lock_cond_pair_t lc_pair; //for channel valid check and notify + volatile uint64_t next_req_id; + struct fast_mblock_man receipt_allocator; + struct fast_task_info *task; + struct fc_queue queue; + struct fc_queue_info waiting_resp_qinfo; + struct fc_list_head dlink; //LRU chain for heartbeat + struct idempotency_client_channel *next; +} IdempotencyClientChannel; + +typedef struct idempotency_receipt_thread_context { + struct fc_list_head head; //LRU head for hearbeat +} IdempotencyReceiptThreadContext; + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c new file mode 100644 index 0000000..0b04d82 --- /dev/null +++ b/src/idempotency/client/receipt_handler.c @@ -0,0 +1,401 @@ +//receipt_handler.c + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/logger.h" +#include "fastcommon/sockopt.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/pthread_func.h" +#include "fastcommon/sched_thread.h" +#include "fastcommon/ioevent_loop.h" +#include "../../sf_util.h" +#include "../../sf_func.h" +#include "../../sf_nio.h" +#include "../../sf_global.h" +#include "../../sf_service.h" +#include "../../sf_proto.h" +#include "client_channel.h" +#include "receipt_handler.h" + +static IdempotencyReceiptThreadContext *receipt_thread_contexts = NULL; + +static int receipt_init_task(struct fast_task_info *task) +{ + task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side + task->network_timeout = SF_G_NETWORK_TIMEOUT; + return 0; +} + +static int receipt_recv_timeout_callback(struct fast_task_info *task) +{ + IdempotencyClientChannel *channel; + + if (SF_NIO_TASK_STAGE_FETCH(task) == SF_NIO_STAGE_CONNECT) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%d timeout", + __LINE__, task->server_ip, task->port); + return ETIMEDOUT; + } + + channel = (IdempotencyClientChannel *)task->arg; + if (channel->waiting_resp_qinfo.head != NULL) { + logError("file: "__FILE__", line: %d, " + "waiting receipt response from server %s:%d timeout", + __LINE__, task->server_ip, task->port); + return ETIMEDOUT; + } + + return 0; +} + +static void receipt_task_finish_cleanup(struct fast_task_info *task) +{ + IdempotencyClientChannel *channel; + + if (task->event.fd >= 0) { + sf_task_detach_thread(task); + close(task->event.fd); + task->event.fd = -1; + } + + channel = (IdempotencyClientChannel *)task->arg; + + fc_list_del_init(&channel->dlink); + __sync_bool_compare_and_swap(&channel->established, 1, 0); + __sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); + + logDebug("file: "__FILE__", line: %d, " + "receipt task for server %s:%d exit", + __LINE__, task->server_ip, task->port); +} + +static int setup_channel_request(struct fast_task_info *task) +{ + IdempotencyClientChannel *channel; + SFCommonProtoHeader *header; + FSProtoSetupChannelReq *req; + + channel = (IdempotencyClientChannel *)task->arg; + header = (SFCommonProtoHeader *)task->data; + req = (FSProtoSetupChannelReq *)(header + 1); + int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id); + int2buff(__sync_add_and_fetch(&channel->key, 0), req->key); + + FS_PROTO_SET_HEADER(header, FS_SERVICE_PROTO_SETUP_CHANNEL_REQ, + sizeof(FSProtoSetupChannelReq)); + task->length = sizeof(SFCommonProtoHeader) + sizeof(FSProtoSetupChannelReq); + return sf_send_add_event(task); +} + +static int check_report_req_receipt(struct fast_task_info *task, + int *count) +{ + IdempotencyClientChannel *channel; + SFCommonProtoHeader *header; + FSProtoReportReqReceiptHeader *rheader; + FSProtoReportReqReceiptBody *rbody; + FSProtoReportReqReceiptBody *rstart; + IdempotencyClientReceipt *last; + IdempotencyClientReceipt *receipt; + char *buff_end; + + if (task->length > 0) { + *count = 0; + logWarning("file: "__FILE__", line: %d, " + "server %s:%d, task length: %d != 0, skip check " + "and report receipt request!", __LINE__, + task->server_ip, task->port, task->length); + return 0; + } + + channel = (IdempotencyClientChannel *)task->arg; + if (channel->waiting_resp_qinfo.head != NULL) { + *count = 0; + return 0; + } + + fc_queue_pop_to_queue(&channel->queue, &channel->waiting_resp_qinfo); + if (channel->waiting_resp_qinfo.head == NULL) { + *count = 0; + return 0; + } + + header = (SFCommonProtoHeader *)task->data; + rheader = (FSProtoReportReqReceiptHeader *)(header + 1); + rbody = rstart = (FSProtoReportReqReceiptBody *)(rheader + 1); + buff_end = task->data + task->size; + last = NULL; + receipt = channel->waiting_resp_qinfo.head; + do { + //check buffer remain space + if (buff_end - (char *)rbody < sizeof(FSProtoReportReqReceiptBody)) { + break; + } + + long2buff(receipt->req_id, rbody->req_id); + rbody++; + + last = receipt; + receipt = receipt->next; + } while (receipt != NULL); + + if (receipt != NULL) { //repush to queue + struct fc_queue_info qinfo; + bool notify; + + qinfo.head = receipt; + qinfo.tail = channel->waiting_resp_qinfo.tail; + fc_queue_push_queue_to_head_ex(&channel->queue, &qinfo, ¬ify); + + last->next = NULL; + channel->waiting_resp_qinfo.tail = last; + } + + *count = rbody - rstart; + int2buff(*count, rheader->count); + task->length = (char *)rbody - task->data; + int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); + header->cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; + return sf_send_add_event(task); +} + +static inline void update_lru_chain(struct fast_task_info *task) +{ + IdempotencyReceiptThreadContext *thread_ctx; + IdempotencyClientChannel *channel; + + thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; + channel = (IdempotencyClientChannel *)task->arg; + channel->last_pkg_time = g_current_time; + fc_list_move_tail(&channel->dlink, &thread_ctx->head); +} + +static int report_req_receipt_request(struct fast_task_info *task, + const bool update_lru) +{ + int result; + int count; + + if ((result=check_report_req_receipt(task, &count)) != 0) { + return result; + } + + if (count == 0) { + result = sf_set_read_event(task); + } else if (update_lru) { + update_lru_chain(task); + } + + return 0; +} + +static inline int receipt_expect_body_length(struct fast_task_info *task, + const int expect_body_len) +{ + if ((int)(task->length - sizeof(SFCommonProtoHeader)) != expect_body_len) { + logError("file: "__FILE__", line: %d, " + "server %s:%d, response body length: %d != %d", + __LINE__, task->server_ip, task->port, (int)(task->length - + sizeof(SFCommonProtoHeader)), expect_body_len); + return EINVAL; + } + + return 0; +} + +static int deal_setup_channel_response(struct fast_task_info *task) +{ + int result; + IdempotencyReceiptThreadContext *thread_ctx; + FSProtoSetupChannelResp *resp; + IdempotencyClientChannel *channel; + int channel_id; + int channel_key; + + if ((result=receipt_expect_body_length(task, + sizeof(FSProtoSetupChannelResp))) != 0) + { + return result; + } + + channel = (IdempotencyClientChannel *)task->arg; + if (__sync_add_and_fetch(&channel->established, 0)) { + logWarning("file: "__FILE__", line: %d, " + "response from server %s:%d, unexpected cmd: " + "SETUP_CHANNEL_RESP, ignore it!", + __LINE__, task->server_ip, task->port); + return 0; + } + + resp = (FSProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); + channel_id = buff2int(resp->channel_id); + channel_key = buff2int(resp->key); + idempotency_client_channel_set_id_key(channel, channel_id, channel_key); + if (__sync_bool_compare_and_swap(&channel->established, 0, 1)) { + thread_ctx = (IdempotencyReceiptThreadContext *)task->thread_data->arg; + fc_list_add_tail(&channel->dlink, &thread_ctx->head); + } + + PTHREAD_MUTEX_LOCK(&channel->lc_pair.lock); + pthread_cond_broadcast(&channel->lc_pair.cond); + PTHREAD_MUTEX_UNLOCK(&channel->lc_pair.lock); + + if (channel->waiting_resp_qinfo.head != NULL) { + bool notify; + fc_queue_push_queue_to_head_ex(&channel->queue, + &channel->waiting_resp_qinfo, ¬ify); + channel->waiting_resp_qinfo.head = NULL; + channel->waiting_resp_qinfo.tail = NULL; + } + + return 0; +} + +static inline int deal_report_req_receipt_response(struct fast_task_info *task) +{ + int result; + IdempotencyClientChannel *channel; + IdempotencyClientReceipt *current; + IdempotencyClientReceipt *deleted; + + if ((result=receipt_expect_body_length(task, 0)) != 0) { + return result; + } + + channel = (IdempotencyClientChannel *)task->arg; + if (channel->waiting_resp_qinfo.head == NULL) { + logWarning("file: "__FILE__", line: %d, " + "response from server %s:%d, unexpect cmd: " + "REPORT_REQ_RECEIPT_RESP", __LINE__, + task->server_ip, task->port); + return 0; + } + + current = channel->waiting_resp_qinfo.head; + do { + deleted = current; + current = current->next; + + fast_mblock_free_object(&channel->receipt_allocator, deleted); + } while (current != NULL); + + channel->waiting_resp_qinfo.head = NULL; + channel->waiting_resp_qinfo.tail = NULL; + return 0; +} + +static int receipt_deal_task(struct fast_task_info *task) +{ + int result; + int stage; + + do { + stage = SF_NIO_TASK_STAGE_FETCH(task); + if (stage == SF_NIO_STAGE_HANDSHAKE) { + result = setup_channel_request(task); + break; + } else if (stage == SF_NIO_STAGE_CONTINUE) { + if (((IdempotencyClientChannel *)task->arg)->established) { + result = report_req_receipt_request(task, true); + } else { + result = 0; //just ignore + } + break; + } + + result = buff2short(((SFCommonProtoHeader *)task->data)->status); + if (result != 0) { + int msg_len; + char *message; + + msg_len = task->length - sizeof(SFCommonProtoHeader); + message = task->data + sizeof(SFCommonProtoHeader); + logError("file: "__FILE__", line: %d, " + "response from server %s:%d, cmd: %d (%s), " + "status: %d, error info: %.*s", + __LINE__, task->server_ip, task->port, + ((SFCommonProtoHeader *)task->data)->cmd, + sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd), + result, msg_len, message); + break; + } + + switch (((SFCommonProtoHeader *)task->data)->cmd) { + case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP: + result = deal_setup_channel_response(task); + break; + case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: + result = deal_report_req_receipt_response(task); + break; + default: + logError("file: "__FILE__", line: %d, " + "response from server %s:%d, unexpect cmd: %d (%s)", + __LINE__, task->server_ip, task->port, + ((SFCommonProtoHeader *)task->data)->cmd, + sf_get_cmd_caption(((SFCommonProtoHeader *)task->data)->cmd)); + result = EINVAL; + break; + } + + if (result == 0) { + update_lru_chain(task); + task->offset = task->length = 0; + result = report_req_receipt_request(task, false); + } + } while (0); + + return result > 0 ? -1 * result : result; +} + +static int receipt_thread_loop_callback(struct nio_thread_data *thread_data) +{ + IdempotencyClientChannel *channel; + IdempotencyClientChannel *tmp; + IdempotencyReceiptThreadContext *thread_ctx; + + thread_ctx = (IdempotencyReceiptThreadContext *)thread_data->arg; + fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) { + //check heartbeat + //channel->task + } + + return 0; +} + +static void *receipt_alloc_thread_extra_data(const int thread_index) +{ + IdempotencyReceiptThreadContext *ctx; + + ctx = receipt_thread_contexts + thread_index; + FC_INIT_LIST_HEAD(&ctx->head); + return ctx; +} + +int receipt_handler_init() +{ + receipt_thread_contexts = (IdempotencyReceiptThreadContext *)fc_malloc( + sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS); + if (receipt_thread_contexts == NULL) { + return ENOMEM; + } + + return sf_service_init_ex2(&g_sf_context, + receipt_alloc_thread_extra_data, receipt_thread_loop_callback, + NULL, sf_proto_set_body_length, receipt_deal_task, + receipt_task_finish_cleanup, receipt_recv_timeout_callback, + 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task); +} + +int receipt_handler_destroy() +{ + return 0; +} diff --git a/src/idempotency/client/receipt_handler.h b/src/idempotency/client/receipt_handler.h new file mode 100644 index 0000000..1402cb5 --- /dev/null +++ b/src/idempotency/client/receipt_handler.h @@ -0,0 +1,19 @@ +//receipt_handler.h + +#ifndef _FS_IDEMPOTENCY_RECEIPT_HANDLER_H +#define _FS_IDEMPOTENCY_RECEIPT_HANDLER_H + +#include "client_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int receipt_handler_init(); +int receipt_handler_destroy(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/channel.c b/src/idempotency/server/channel.c new file mode 100644 index 0000000..48fe193 --- /dev/null +++ b/src/idempotency/server/channel.c @@ -0,0 +1,289 @@ +#include +#include +#include +#include "fastcommon/shared_func.h" +#include "fastcommon/logger.h" +#include "fastcommon/fast_mblock.h" +#include "fastcommon/sched_thread.h" +#include "sf/sf_global.h" +#include "channel_htable.h" +#include "channel.h" + +typedef struct { + IdempotencyChannel **buckets; + uint32_t capacity; + uint32_t count; + pthread_mutex_t lock; +} ChannelHashtable; + +typedef struct { + struct { + uint32_t max; + uint32_t current; + } channel_ids; + struct fast_mblock_man channel_allocator; + ChannelHashtable delay_free_htable; //for delay free + ChannelHTableContext htable_ctx; + + struct { + uint32_t reserve_interval; //channel reserve interval in seconds + time_t last_check_time; + FastTimer timer; + } timeout_ctx; + +} ChannelContext; + +static ChannelContext channel_context; + +static int init_htable(ChannelHashtable *htable, const int hint_capacity) +{ + int result; + int bytes; + + if ((result=init_pthread_lock(&htable->lock)) != 0) { + return result; + } + + if (hint_capacity < 1024) { + htable->capacity = 1361; + } else { + htable->capacity = fc_ceil_prime(hint_capacity); + } + bytes = sizeof(IdempotencyChannel *) * htable->capacity; + htable->buckets = (IdempotencyChannel **)fc_malloc(bytes); + if (htable->buckets == NULL) { + return ENOMEM; + } + memset(htable->buckets, 0, bytes); + htable->count = 0; + + return 0; +} + +static int idempotency_channel_alloc_init(void *element, void *args) +{ + IdempotencyChannel *channel; + + channel = (IdempotencyChannel *)element; + channel->id = ++channel_context.channel_ids.current; + channel->request_htable.buckets = (IdempotencyRequest **)(channel + 1); + return init_pthread_lock(&channel->request_htable.lock); +} + +int idempotency_channel_init(const uint32_t max_channel_id, + const int request_hint_capacity, + const uint32_t reserve_interval, + const uint32_t shared_lock_count) +{ + int result; + int request_htable_capacity; + int element_size; + + request_htable_capacity = fc_ceil_prime(request_hint_capacity); + idempotency_request_init(request_htable_capacity); + + element_size = sizeof(IdempotencyChannel) + sizeof(IdempotencyRequest *) * + request_htable_capacity; + if ((result=fast_mblock_init_ex1(&channel_context.channel_allocator, + "channel_info", element_size, 1024, max_channel_id, + idempotency_channel_alloc_init, NULL, true)) != 0) + { + return result; + } + + if ((result=fast_timer_init(&channel_context.timeout_ctx.timer, + 2 * reserve_interval + 1, + get_current_time())) != 0) + { + return result; + } + + channel_context.channel_ids.max = max_channel_id; + channel_context.channel_ids.current = 0; + channel_context.timeout_ctx.last_check_time = get_current_time(); + channel_context.timeout_ctx.reserve_interval = reserve_interval; + if ((result=init_htable(&channel_context.delay_free_htable, + max_channel_id / 100)) != 0) + { + return result; + } + + return idempotency_channel_htable_init(&channel_context.htable_ctx, + shared_lock_count, max_channel_id / 10); +} + +static void add_to_delay_free_htable(IdempotencyChannel *channel) +{ + IdempotencyChannel **bucket; + + bucket = channel_context.delay_free_htable.buckets + channel->id % + channel_context.delay_free_htable.capacity; + PTHREAD_MUTEX_LOCK(&channel_context.delay_free_htable.lock); + channel->next = *bucket; + *bucket = channel; + channel_context.delay_free_htable.count++; + + fast_timer_add(&channel_context.timeout_ctx.timer, &channel->timer); + PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock); +} + +IdempotencyChannel *idempotency_channel_find_and_hold( + const uint32_t channel_id, const int key, int *result) +{ + IdempotencyChannel *channel; + if ((channel=idempotency_channel_htable_find(&channel_context. + htable_ctx, channel_id)) == NULL) + { + *result = ENOENT; + return NULL; + } + + if (channel->key != key) { + *result = EPERM; + return NULL; + } + + *result = 0; + __sync_add_and_fetch(&channel->ref_count, 1); + return channel; +} + +static IdempotencyChannel *htable_remove(const uint32_t channel_id, + const bool need_lock, const bool remove_timer) +{ + IdempotencyChannel **bucket; + IdempotencyChannel *previous; + IdempotencyChannel *channel; + + bucket = channel_context.delay_free_htable.buckets + channel_id % + channel_context.delay_free_htable.capacity; + previous = NULL; + if (need_lock) { + PTHREAD_MUTEX_LOCK(&channel_context.delay_free_htable.lock); + } + channel = *bucket; + while (channel != NULL) { + if (channel->id == channel_id) { + if (previous == NULL) { + *bucket = channel->next; + } else { + previous->next = channel->next; + } + channel_context.delay_free_htable.count--; + if (remove_timer) { + fast_timer_remove(&channel_context.timeout_ctx.timer, + &channel->timer); + } + break; + } + + previous = channel; + channel = channel->next; + } + + if (need_lock) { + PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock); + } + return channel; +} + +static void do_free_channel(IdempotencyChannel *channel) +{ + idempotency_request_htable_clear(&channel->request_htable); + fast_mblock_free_object(&channel_context.channel_allocator, channel); +} + +static void recycle_timeout_entries() +{ + uint32_t channel_id; + FastTimerEntry head; + FastTimerEntry *entry; + IdempotencyChannel *channel; + + PTHREAD_MUTEX_LOCK(&channel_context.delay_free_htable.lock); + if (g_current_time - channel_context.timeout_ctx.last_check_time <= 10) { + PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock); + return; + } + + channel_context.timeout_ctx.last_check_time = g_current_time; + fast_timer_timeouts_get(&channel_context.timeout_ctx.timer, + g_current_time, &head); + entry = head.next; + while (entry != NULL) { + channel_id = ((IdempotencyChannel *)entry)->id; + entry = entry->next; + + if ((channel=htable_remove(channel_id, false, false)) != NULL) { + do_free_channel(channel); + } + } + + PTHREAD_MUTEX_UNLOCK(&channel_context.delay_free_htable.lock); +} + +IdempotencyChannel *idempotency_channel_alloc(const uint32_t channel_id, + const int key) +{ + IdempotencyChannel *channel; + + do { + if (channel_id != 0) { + if ((channel=htable_remove(channel_id, true, true)) != NULL) { + if (channel->key == key) { + break; + } else { + add_to_delay_free_htable(channel); + } + } + } + + if (channel_context.delay_free_htable.count > 0 && (g_current_time - + channel_context.timeout_ctx.last_check_time) > 10) + { + recycle_timeout_entries(); + } + + if ((channel=(IdempotencyChannel *)fast_mblock_alloc_object( + &channel_context.channel_allocator)) == NULL) + { + return NULL; + } + channel->key = rand(); + } while (0); + + __sync_bool_compare_and_swap(&channel->is_valid, 0, 1); + __sync_add_and_fetch(&channel->ref_count, 1); + idempotency_channel_htable_add(&channel_context. + htable_ctx, channel); + return channel; +} + +void idempotency_channel_release(IdempotencyChannel *channel, + const bool is_holder) +{ + if (is_holder) { + channel->timer.expires = g_current_time + + channel_context.timeout_ctx.reserve_interval; + __sync_bool_compare_and_swap(&channel->is_valid, 1, 0); + } + + if (__sync_sub_and_fetch(&channel->ref_count, 1) == 0) { + if (channel->timer.expires <= g_current_time) { //expired + do_free_channel(channel); + } else { + add_to_delay_free_htable(channel); + } + } +} + +void idempotency_channel_free(IdempotencyChannel *channel) +{ + __sync_bool_compare_and_swap(&channel->is_valid, 1, 0); + if (__sync_sub_and_fetch(&channel->ref_count, 1) == 0) { + do_free_channel(channel); + } else { + channel->timer.expires = g_current_time + + channel_context.timeout_ctx.reserve_interval; + } +} diff --git a/src/idempotency/server/channel.h b/src/idempotency/server/channel.h new file mode 100644 index 0000000..9909b37 --- /dev/null +++ b/src/idempotency/server/channel.h @@ -0,0 +1,46 @@ + +#ifndef _SF_IDEMPOTENCY_SERVER_CHANNEL_H +#define _SF_IDEMPOTENCY_SERVER_CHANNEL_H + +#include "fastcommon/fast_timer.h" +#include "request_htable.h" + +#ifdef __cplusplus +extern "C" { +#endif + + int idempotency_channel_init(const uint32_t max_channel_id, + const int request_hint_capacity, + const uint32_t reserve_interval, + const uint32_t shared_lock_count); + + IdempotencyChannel *idempotency_channel_alloc(const uint32_t channel_id, + const int key); + + void idempotency_channel_release(IdempotencyChannel *channel, + const bool is_holder); + + IdempotencyChannel *idempotency_channel_find_and_hold( + const uint32_t channel_id, const int key, int *result); + + void idempotency_channel_free(IdempotencyChannel *channel); + + static inline int idempotency_channel_add_request(IdempotencyChannel * + channel, IdempotencyRequest *request) + { + return idempotency_request_htable_add( + &channel->request_htable, request); + } + + static inline int idempotency_channel_remove_request( + IdempotencyChannel *channel, const uint64_t req_id) + { + return idempotency_request_htable_remove( + &channel->request_htable, req_id); + } + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/channel_htable.c b/src/idempotency/server/channel_htable.c new file mode 100644 index 0000000..c3e6605 --- /dev/null +++ b/src/idempotency/server/channel_htable.c @@ -0,0 +1,149 @@ +#include +#include +#include +#include "fastcommon/shared_func.h" +#include "fastcommon/logger.h" +#include "fastcommon/fast_mblock.h" +#include "fastcommon/sched_thread.h" +#include "sf/sf_global.h" +#include "channel_htable.h" + +int idempotency_channel_htable_init(ChannelHTableContext *ctx, + const uint32_t shared_lock_count, const uint32_t hint_capacity) +{ + int result; + int64_t bytes; + pthread_mutex_t *lock; + pthread_mutex_t *end; + + ctx->shared_locks.count = fc_ceil_prime(shared_lock_count); + ctx->htable.capacity = fc_ceil_prime(hint_capacity); + + bytes = sizeof(pthread_mutex_t) * ctx->shared_locks.count; + ctx->shared_locks.locks = (pthread_mutex_t *)fc_malloc(bytes); + if (ctx->shared_locks.locks == NULL) { + return ENOMEM; + } + end = ctx->shared_locks.locks + ctx->shared_locks.count; + for (lock=ctx->shared_locks.locks; lockhtable.capacity; + ctx->htable.buckets = (IdempotencyChannel **)fc_malloc(bytes); + if (ctx->htable.buckets == NULL) { + return ENOMEM; + } + memset(ctx->htable.buckets, 0, bytes); + ctx->htable.count = 0; + + return 0; +} + +int idempotency_channel_htable_add(ChannelHTableContext *ctx, + IdempotencyChannel *channel) +{ + int result; + pthread_mutex_t *lock; + IdempotencyChannel **bucket; + IdempotencyChannel *previous; + IdempotencyChannel *current; + + lock = ctx->shared_locks.locks + channel->id % ctx->shared_locks.count; + bucket = ctx->htable.buckets + channel->id % ctx->htable.capacity; + previous = NULL; + result = 0; + + PTHREAD_MUTEX_LOCK(lock); + current = *bucket; + while (current != NULL) { + if (current->id == channel->id) { + result = EEXIST; + break; + } else if (current->id > channel->id) { + break; + } + + previous = current; + current = current->next; + } + + if (result == 0) { + if (previous == NULL) { + channel->next = *bucket; + *bucket = channel; + } else { + channel->next = previous->next; + previous->next = channel; + } + ctx->htable.count++; + } + PTHREAD_MUTEX_UNLOCK(lock); + + return result; +} + +IdempotencyChannel *idempotency_channel_htable_remove( + ChannelHTableContext *ctx, const uint32_t channel_id) +{ + pthread_mutex_t *lock; + IdempotencyChannel **bucket; + IdempotencyChannel *previous; + IdempotencyChannel *current; + + lock = ctx->shared_locks.locks + channel_id % ctx->shared_locks.count; + bucket = ctx->htable.buckets + channel_id % ctx->htable.capacity; + previous = NULL; + + PTHREAD_MUTEX_LOCK(lock); + current = *bucket; + while (current != NULL) { + if (current->id == channel_id) { + if (previous == NULL) { + *bucket = current->next; + } else { + previous->next = current->next; + } + ctx->htable.count--; + break; + } else if (current->id > channel_id) { + current = NULL; + break; + } + + previous = current; + current = current->next; + } + PTHREAD_MUTEX_UNLOCK(lock); + + return current; +} + +IdempotencyChannel *idempotency_channel_htable_find( + ChannelHTableContext *ctx, const uint32_t channel_id) +{ + pthread_mutex_t *lock; + IdempotencyChannel **bucket; + IdempotencyChannel *current; + + lock = ctx->shared_locks.locks + channel_id % ctx->shared_locks.count; + bucket = ctx->htable.buckets + channel_id % ctx->htable.capacity; + + PTHREAD_MUTEX_LOCK(lock); + current = *bucket; + while (current != NULL) { + if (current->id == channel_id) { + break; + } else if (current->id > channel_id) { + current = NULL; + break; + } + + current = current->next; + } + PTHREAD_MUTEX_UNLOCK(lock); + + return current; +} diff --git a/src/idempotency/server/channel_htable.h b/src/idempotency/server/channel_htable.h new file mode 100644 index 0000000..7db0f20 --- /dev/null +++ b/src/idempotency/server/channel_htable.h @@ -0,0 +1,43 @@ + +#ifndef _SF_IDEMPOTENCY_CHANNEL_HTABLE_H +#define _SF_IDEMPOTENCY_CHANNEL_HTABLE_H + +#include "idempotency_types.h" + +typedef struct channel_shared_locks { + pthread_mutex_t *locks; + uint32_t count; +} ChannelSharedLocks; + +typedef struct idempotency_channel_htable { + IdempotencyChannel **buckets; + uint32_t capacity; + uint32_t count; +} IdempotencyChannelHTable; + +typedef struct channel_htable_context { + ChannelSharedLocks shared_locks; + IdempotencyChannelHTable htable; +} ChannelHTableContext; + +#ifdef __cplusplus +extern "C" { +#endif + + int idempotency_channel_htable_init(ChannelHTableContext *ctx, + const uint32_t shared_lock_count, const uint32_t hint_capacity); + + int idempotency_channel_htable_add(ChannelHTableContext *ctx, + IdempotencyChannel *channel); + + IdempotencyChannel *idempotency_channel_htable_remove( + ChannelHTableContext *ctx, const uint32_t channel_id); + + IdempotencyChannel *idempotency_channel_htable_find( + ChannelHTableContext *ctx, const uint32_t channel_id); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/idempotency_types.h b/src/idempotency/server/idempotency_types.h new file mode 100644 index 0000000..efe400f --- /dev/null +++ b/src/idempotency/server/idempotency_types.h @@ -0,0 +1,44 @@ + +#ifndef _IDEMPOTENCY_SERVER_TYPES_H +#define _IDEMPOTENCY_SERVER_TYPES_H + +#include "fastcommon/fast_mblock.h" +#include "fastcommon/fast_timer.h" + +typedef struct idempotency_request { + uint64_t req_id; + volatile int ref_count; + bool finished; + struct { + int result; + int inc_alloc; + } output; + struct fast_mblock_man *allocator; //for free + struct idempotency_request *next; +} IdempotencyRequest; + +typedef struct idempotency_request_htable { + IdempotencyRequest **buckets; + int count; + pthread_mutex_t lock; +} IdempotencyRequestHTable; + +typedef struct idempotency_channel { + FastTimerEntry timer; //must be the first + uint32_t id; + int key; //for retrieve validation + volatile int ref_count; + volatile char is_valid; + IdempotencyRequestHTable request_htable; + struct idempotency_channel *next; +} IdempotencyChannel; + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/idempotency/server/request_htable.c b/src/idempotency/server/request_htable.c new file mode 100644 index 0000000..92b0f0a --- /dev/null +++ b/src/idempotency/server/request_htable.c @@ -0,0 +1,159 @@ +#include +#include +#include +#include "fastcommon/shared_func.h" +#include "fastcommon/logger.h" +#include "fastcommon/fast_mblock.h" +#include "fastcommon/sched_thread.h" +#include "sf/sf_global.h" +#include "request_htable.h" + +typedef struct idempotency_request_context { + uint32_t htable_capacity; +} IdempotencyRequestContext; + +static IdempotencyRequestContext request_ctx; + +void idempotency_request_init(const uint32_t htable_capacity) +{ + request_ctx.htable_capacity = htable_capacity; +} + +int idempotency_request_htable_add(IdempotencyRequestHTable *htable, + IdempotencyRequest *request) +{ + int result; + IdempotencyRequest **bucket; + IdempotencyRequest *previous; + IdempotencyRequest *current; + + bucket = htable->buckets + request->req_id % request_ctx.htable_capacity; + previous = NULL; + result = 0; + + PTHREAD_MUTEX_LOCK(&htable->lock); + current = *bucket; + while (current != NULL) { + if (current->req_id == request->req_id) { + request->output = current->output; + request->finished = current->finished; + result = EEXIST; + break; + } else if (current->req_id > request->req_id) { + break; + } + + previous = current; + current = current->next; + } + + if (result == 0) { + if (previous == NULL) { + request->next = *bucket; + *bucket = request; + } else { + request->next = previous->next; + previous->next = request; + } + htable->count++; + } + PTHREAD_MUTEX_UNLOCK(&htable->lock); + + if (result == 0) { + __sync_add_and_fetch(&request->ref_count, 2); + } + return result; +} + +int idempotency_request_htable_remove(IdempotencyRequestHTable *htable, + const uint64_t req_id) +{ + IdempotencyRequest **bucket; + IdempotencyRequest *previous; + IdempotencyRequest *current; + + bucket = htable->buckets + req_id % request_ctx.htable_capacity; + previous = NULL; + + PTHREAD_MUTEX_LOCK(&htable->lock); + current = *bucket; + while (current != NULL) { + if (current->req_id == req_id) { + if (previous == NULL) { + *bucket = current->next; + } else { + previous->next = current->next; + } + htable->count--; + break; + } else if (current->req_id > req_id) { + current = NULL; + break; + } + + previous = current; + current = current->next; + } + PTHREAD_MUTEX_UNLOCK(&htable->lock); + + if (current != NULL) { + idempotency_request_release(current); + return 0; + } else { + return ENOENT; + } +} + +void idempotency_request_htable_clear(IdempotencyRequestHTable *htable) +{ + IdempotencyRequest **bucket; + IdempotencyRequest **end; + IdempotencyRequest *head; + IdempotencyRequest *previous; + IdempotencyRequest *current; + IdempotencyRequest *deleted; + + head = NULL; + PTHREAD_MUTEX_LOCK(&htable->lock); + do { + if (htable->count == 0) { + break; + } + + previous = NULL; + end = htable->buckets + request_ctx.htable_capacity; + for (bucket=htable->buckets; bucketnext = current; + } + + previous = current; + current = current->next; + } while (current != NULL); + + *bucket = NULL; + } + + if (previous != NULL) { + previous->next = NULL; + } + + htable->count = 0; + } while (0); + PTHREAD_MUTEX_UNLOCK(&htable->lock); + + while (head != NULL) { + deleted = head; + head = head->next; + + idempotency_request_release(deleted); + } +} diff --git a/src/idempotency/server/request_htable.h b/src/idempotency/server/request_htable.h new file mode 100644 index 0000000..1ee091e --- /dev/null +++ b/src/idempotency/server/request_htable.h @@ -0,0 +1,32 @@ + +#ifndef _SF_IDEMPOTENCY_REQUEST_HTABLE_H +#define _SF_IDEMPOTENCY_REQUEST_HTABLE_H + +#include "idempotency_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + + void idempotency_request_init(const uint32_t hint_capacity); + + int idempotency_request_htable_add(IdempotencyRequestHTable + *htable, IdempotencyRequest *request); + + int idempotency_request_htable_remove(IdempotencyRequestHTable *htable, + const uint64_t req_id); + + void idempotency_request_htable_clear(IdempotencyRequestHTable *htable); + + static inline void idempotency_request_release(IdempotencyRequest *request) + { + if (__sync_sub_and_fetch(&request->ref_count, 1) == 0) { + fast_mblock_free_object(request->allocator, request); + } + } + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/sf_proto.c b/src/sf_proto.c new file mode 100644 index 0000000..a7698a1 --- /dev/null +++ b/src/sf_proto.c @@ -0,0 +1,42 @@ + +#include +#include "fastcommon/shared_func.h" +#include "sf_proto.h" + +int sf_proto_set_body_length(struct fast_task_info *task) +{ + SFCommonProtoHeader *header; + + header = (SFCommonProtoHeader *)task->data; + if (!FS_PROTO_CHECK_MAGIC(header->magic)) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, magic "FS_PROTO_MAGIC_FORMAT + " is invalid, expect: "FS_PROTO_MAGIC_FORMAT, + __LINE__, task->client_ip, + FS_PROTO_MAGIC_PARAMS(header->magic), + FS_PROTO_MAGIC_EXPECT_PARAMS); + return EINVAL; + } + + task->length = buff2int(header->body_len); //set body length + return 0; +} +const char *sf_get_cmd_caption(const int cmd) +{ + switch (cmd) { + case FS_SERVICE_PROTO_SETUP_CHANNEL_REQ: + return "SETUP_CHANNEL_REQ"; + case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP: + return "SETUP_CHANNEL_RESP"; + case FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ: + return "CLOSE_CHANNEL_REQ"; + case FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP: + return "CLOSE_CHANNEL_RESP"; + case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ: + return "REPORT_REQ_RECEIPT_REQ"; + case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: + return "REPORT_REQ_RECEIPT_RESP"; + default: + return "UNKOWN"; + } +} diff --git a/src/sf_proto.h b/src/sf_proto.h new file mode 100644 index 0000000..65f38fe --- /dev/null +++ b/src/sf_proto.h @@ -0,0 +1,113 @@ +//sf_proto.h + +#ifndef _FS_IDEMPOTENCY_PROTO_H +#define _FS_IDEMPOTENCY_PROTO_H + +#include "fastcommon/fast_task_queue.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/logger.h" +#include "fastcommon/connection_pool.h" +#include "fastcommon/sockopt.h" +#include "sf_types.h" + +//for request idempotency +#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 51 +#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 52 +#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 53 +#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 54 +#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 55 +#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 56 + +#define FS_PROTO_MAGIC_CHAR '@' +#define FS_PROTO_SET_MAGIC(m) \ + m[0] = m[1] = m[2] = m[3] = FS_PROTO_MAGIC_CHAR + +#define FS_PROTO_CHECK_MAGIC(m) \ + (m[0] == FS_PROTO_MAGIC_CHAR && m[1] == FS_PROTO_MAGIC_CHAR && \ + m[2] == FS_PROTO_MAGIC_CHAR && m[3] == FS_PROTO_MAGIC_CHAR) + +#define FS_PROTO_MAGIC_FORMAT "0x%02X%02X%02X%02X" +#define FS_PROTO_MAGIC_EXPECT_PARAMS \ + FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR, \ + FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR + +#define FS_PROTO_MAGIC_PARAMS(m) \ + m[0], m[1], m[2], m[3] + +#define FS_PROTO_SET_HEADER(header, _cmd, _body_len) \ + do { \ + FS_PROTO_SET_MAGIC((header)->magic); \ + (header)->cmd = _cmd; \ + (header)->status[0] = (header)->status[1] = 0; \ + int2buff(_body_len, (header)->body_len); \ + } while (0) + +#define FS_PROTO_SET_RESPONSE_HEADER(proto_header, resp_header) \ + do { \ + (proto_header)->cmd = (resp_header).cmd; \ + short2buff((resp_header).status, (proto_header)->status); \ + int2buff((resp_header).body_len, (proto_header)->body_len);\ + } while (0) + + +typedef struct sf_common_proto_header { + unsigned char magic[4]; //magic number + char body_len[4]; //body length + char status[2]; //status to store errno + char flags[2]; + unsigned char cmd; //the command code + char padding[3]; +} SFCommonProtoHeader; + +typedef struct fs_proto_setup_channel_req { + char channel_id[4]; //for hint + char key[4]; //for validate when channel_id > 0 +} FSProtoSetupChannelReq; + +typedef struct fs_proto_setup_channel_resp { + char channel_id[4]; + char key[4]; +} FSProtoSetupChannelResp; + +typedef struct fs_proto_report_req_receipt_header { + char count[4]; + char padding[4]; +} FSProtoReportReqReceiptHeader; + +typedef struct fs_proto_report_req_receipt_body { + char req_id[8]; +} FSProtoReportReqReceiptBody; + +#ifdef __cplusplus +extern "C" { +#endif + +int sf_proto_set_body_length(struct fast_task_info *task); + +const char *sf_get_cmd_caption(const int cmd); + +static inline void sf_log_network_error_ex(SFResponseInfo *response, + const ConnectionInfo *conn, const int result, const int line) +{ + if (response->error.length > 0) { + logError("file: "__FILE__", line: %d, " + "server %s:%d, %s", line, + conn->ip_addr, conn->port, + response->error.message); + } else { + logError("file: "__FILE__", line: %d, " + "communicate with server %s:%d fail, " + "errno: %d, error info: %s", line, + conn->ip_addr, conn->port, + result, STRERROR(result)); + } +} + +#define sf_log_network_error(response, conn, result) \ + sf_log_network_error_ex(response, conn, result, __LINE__) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/sf_types.h b/src/sf_types.h index 856b2cd..926308a 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -11,6 +11,8 @@ #include "fastcommon/connection_pool.h" #include "fastcommon/fast_task_queue.h" +#define FS_ERROR_INFO_SIZE 256 + typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask); @@ -41,5 +43,24 @@ typedef struct sf_context { sf_recv_timeout_callback timeout_callback; } SFContext; -#endif +typedef struct { + int body_len; //body length + short flags; + short status; + unsigned char cmd; //command +} SFHeaderInfo; +typedef struct { + SFHeaderInfo header; + char *body; +} SFRequestInfo; + +typedef struct { + SFHeaderInfo header; + struct { + int length; + char message[FS_ERROR_INFO_SIZE]; + } error; +} SFResponseInfo; + +#endif