From c27cb2a9af6bb78a29b47fbf25ee9ad3a0c8c6af Mon Sep 17 00:00:00 2001
From: YuQing <384681@qq.com>
Date: Sat, 27 Aug 2022 21:39:31 +0800
Subject: [PATCH] add files: sf_shared_mbuffer.[hc]
---
src/Makefile.in | 4 +-
src/sf_shared_mbuffer.c | 94 +++++++++++++++++++++++++++++++++++++++
src/sf_shared_mbuffer.h | 97 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 194 insertions(+), 1 deletion(-)
create mode 100644 src/sf_shared_mbuffer.c
create mode 100644 src/sf_shared_mbuffer.h
diff --git a/src/Makefile.in b/src/Makefile.in
index 21b9bd7..b05189d 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -9,7 +9,8 @@ TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h sf_proto.h sf_cluster_cfg.h \
sf_sharding_htable.h sf_connection_manager.h sf_serializer.h \
sf_binlog_index.h sf_file_writer.h sf_binlog_writer.h \
- sf_ordered_writer.h sf_buffered_writer.h sf_iov.h
+ sf_ordered_writer.h sf_buffered_writer.h sf_iov.h \
+ sf_shared_mbuffer.h
IDEMP_COMMON_HEADER = idempotency/common/idempotency_types.h
@@ -33,6 +34,7 @@ SHARED_OBJS = sf_nio.lo sf_iov.lo sf_service.lo sf_global.lo \
sf_connection_manager.lo sf_serializer.lo \
sf_binlog_index.lo sf_file_writer.lo \
sf_binlog_writer.lo sf_ordered_writer.lo \
+ sf_shared_mbuffer.lo \
idempotency/server/server_channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
diff --git a/src/sf_shared_mbuffer.c b/src/sf_shared_mbuffer.c
new file mode 100644
index 0000000..2e6e691
--- /dev/null
+++ b/src/sf_shared_mbuffer.c
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the Lesser GNU General Public License, version 3
+ * or later ("LGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the Lesser GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#include "sf_shared_mbuffer.h"
+
+static int sf_shared_mbuffer_alloc_init(void *element, void *args)
+{
+ SFSharedMBuffer *buffer;
+
+ buffer = (SFSharedMBuffer *)element;
+ buffer->buff = (char *)(buffer + 1);
+ buffer->ctx = (SFSharedMBufferContext *)args;
+ return 0;
+}
+
+int sf_shared_mbuffer_init_ex(SFSharedMBufferContext *context,
+ const char *name_prefix, const int buff_extra_size,
+ const int min_buff_size, const int max_buff_size,
+ const int min_alloc_once, const int64_t memory_limit,
+ const bool need_lock)
+{
+ const double expect_usage_ratio = 0.75;
+ const int reclaim_interval = 1;
+ struct fast_region_info regions[32];
+ struct fast_mblock_object_callbacks object_callbacks;
+ int count;
+ int start;
+ int end;
+ int alloc_once;
+ int buff_size;
+ int i;
+
+ alloc_once = (4 * 1024 * 1024) / max_buff_size;
+ if (alloc_once == 0) {
+ alloc_once = min_alloc_once;
+ } else {
+ i = min_alloc_once;
+ while (i < alloc_once) {
+ i *= 2;
+ }
+ alloc_once = i;
+ }
+
+ count = 1;
+ buff_size = min_buff_size;
+ while (buff_size < max_buff_size) {
+ buff_size *= 2;
+ ++count;
+ alloc_once *= 2;
+ }
+
+ buff_size = min_buff_size;
+ start = 0;
+ end = buff_extra_size + buff_size;
+ FAST_ALLOCATOR_INIT_REGION(regions[0], start, end,
+ end - start, alloc_once);
+
+ //logInfo("[1] start: %d, end: %d, alloc_once: %d", start, end, alloc_once);
+
+ start = end;
+ for (i=1; iallocator, name_prefix,
+ sizeof(SFSharedMBuffer), &object_callbacks, regions, count,
+ memory_limit, expect_usage_ratio, reclaim_interval, need_lock);
+}
+
+void sf_shared_mbuffer_destroy(SFSharedMBufferContext *context)
+{
+ fast_allocator_destroy(&context->allocator);
+}
diff --git a/src/sf_shared_mbuffer.h b/src/sf_shared_mbuffer.h
new file mode 100644
index 0000000..7ed06bb
--- /dev/null
+++ b/src/sf_shared_mbuffer.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the Lesser GNU General Public License, version 3
+ * or later ("LGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the Lesser GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _SF_SHARED_MBUFFER_H__
+#define _SF_SHARED_MBUFFER_H__
+
+#include "fastcommon/fast_allocator.h"
+#include "fastcommon/shared_func.h"
+#include "fastcommon/logger.h"
+
+typedef struct sf_shared_mbuffer_context {
+ struct fast_allocator_context allocator;
+} SFSharedMBufferContext;
+
+typedef struct sf_shared_mbuffer {
+ char *buff;
+ int length;
+ volatile int reffer_count;
+ SFSharedMBufferContext *ctx;
+} SFSharedMBuffer;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define sf_shared_mbuffer_init(context, name_prefix, buff_extra_size, \
+ min_buff_size, max_buff_size, min_alloc_once, memory_limit) \
+ sf_shared_mbuffer_init_ex(context, name_prefix, buff_extra_size, \
+ min_buff_size, max_buff_size, min_alloc_once, memory_limit, true)
+
+int sf_shared_mbuffer_init_ex(SFSharedMBufferContext *context,
+ const char *name_prefix, const int buff_extra_size,
+ const int min_buff_size, const int max_buff_size,
+ const int min_alloc_once, const int64_t memory_limit,
+ const bool need_lock);
+
+void sf_shared_mbuffer_destroy(SFSharedMBufferContext *context);
+
+#define sf_shared_mbuffer_alloc(context, buffer_size) \
+ sf_shared_mbuffer_alloc_ex(context, buffer_size, 1)
+
+static inline SFSharedMBuffer *sf_shared_mbuffer_alloc_ex(
+ SFSharedMBufferContext *context, const int buffer_size,
+ const int init_reffer_count)
+{
+ SFSharedMBuffer *buffer;
+ int sleep_ms;
+
+ sleep_ms = 5;
+ while ((buffer=fast_allocator_alloc(&context->allocator,
+ buffer_size)) == NULL)
+ {
+ if (sleep_ms < 100) {
+ sleep_ms *= 2;
+ }
+ fc_sleep_ms(sleep_ms);
+ }
+
+ if (init_reffer_count > 0) {
+ __sync_add_and_fetch(&buffer->reffer_count, init_reffer_count);
+ }
+ return buffer;
+}
+
+static inline void sf_shared_mbuffer_hold(SFSharedMBuffer *buffer)
+{
+ __sync_add_and_fetch(&buffer->reffer_count, 1);
+}
+
+static inline void sf_shared_mbuffer_release(SFSharedMBuffer *buffer)
+{
+ if (__sync_sub_and_fetch(&buffer->reffer_count, 1) == 0) {
+ /*
+ logDebug("file: "__FILE__", line: %d, "
+ "free shared buffer: %p", __LINE__, buffer);
+ */
+ fast_allocator_free(&buffer->ctx->allocator, buffer);
+ }
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif