add codes

connection_manager
yuqing 2018-05-10 11:05:08 +08:00
parent 9764049709
commit 6b69e90016
17 changed files with 2001 additions and 1 deletions

View File

@ -1 +1,2 @@
# libserverframe
network service framework library extract from FastDFS

60
libserverframe.spec Normal file
View File

@ -0,0 +1,60 @@
%define LibserverframeDevel libserverframe-devel
%define CommitVersion %(echo $COMMIT_VERSION)
Name: libserverframe
Version: 1.0.10
Release: 1%{?dist}
Summary: mc common framework library
License: GPL
Group: Arch/Tech
URL: http://github.com/happyfish100/libfastcommon/
Source: http://github.com/happyfish100/libfastcommon/%{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
BuildRequires: libfastcommon-devel >= 1.0.29
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
Requires: libfastcommon >= 1.0.29
%description
common framework library
commit version: %{CommitVersion}
%package devel
Summary: Development header file
Requires: %{name}%{?_isa} = %{version}-%{release}
%description devel
This package provides the header files of libserverframe
commit version: %{CommitVersion}
%prep
%setup -q
%build
./make.sh
%install
rm -rf %{buildroot}
DESTDIR=$RPM_BUILD_ROOT ./make.sh install
%post
%preun
%postun
%clean
rm -rf %{buildroot}
%files
%defattr(-,root,root,-)
/usr/lib64/libserverframe.so*
%files devel
%defattr(-,root,root,-)
/usr/include/sf/*
%changelog
* Mon Jun 23 2014 Zaixue Liao <liaozaixue@yongche.com>
- first RPM release (1.0)

99
make.sh Executable file
View File

@ -0,0 +1,99 @@
ENABLE_STATIC_LIB=0
ENABLE_SHARED_LIB=1
TARGET_PREFIX=$DESTDIR/usr
TARGET_CONF_PATH=$DESTDIR/etc/mc
DEBUG_FLAG=1
CFLAGS="$CFLAGS -Wall -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -I/usr/local/include/fastcommon"
if [ "$DEBUG_FLAG" = "1" ]; then
CFLAGS="$CFLAGS -g -O1 -DDEBUG_FLAG"
else
CFLAGS="$CFLAGS -O3"
fi
LIBS=''
uname=$(uname)
if [ "$uname" = "Linux" ]; then
CFLAGS="$CFLAGS"
elif [ "$uname" = "FreeBSD" ]; then
CFLAGS="$CFLAGS"
elif [ "$uname" = "SunOS" ]; then
CFLAGS="$CFLAGS -D_THREAD_SAFE"
LIBS="$LIBS -lsocket -lnsl -lresolv"
export CC=gcc
elif [ "$uname" = "AIX" ]; then
CFLAGS="$CFLAGS -D_THREAD_SAFE"
export CC=gcc
elif [ "$uname" = "HP-UX" ]; then
CFLAGS="$CFLAGS"
fi
have_pthread=0
if [ -f /usr/lib/libpthread.so ] || [ -f /usr/local/lib/libpthread.so ] || [ -f /lib64/libpthread.so ] || [ -f /usr/lib64/libpthread.so ] || [ -f /usr/lib/libpthread.a ] || [ -f /usr/local/lib/libpthread.a ] || [ -f /lib64/libpthread.a ] || [ -f /usr/lib64/libpthread.a ]; then
LIBS="$LIBS -lpthread"
have_pthread=1
elif [ "$uname" = "HP-UX" ]; then
lib_path="/usr/lib/hpux$OS_BITS"
if [ -f $lib_path/libpthread.so ]; then
LIBS="-L$lib_path -lpthread"
have_pthread=1
fi
elif [ "$uname" = "FreeBSD" ]; then
if [ -f /usr/lib/libc_r.so ]; then
line=$(nm -D /usr/lib/libc_r.so | grep pthread_create | grep -w T)
if [ $? -eq 0 ]; then
LIBS="$LIBS -lc_r"
have_pthread=1
fi
elif [ -f /lib64/libc_r.so ]; then
line=$(nm -D /lib64/libc_r.so | grep pthread_create | grep -w T)
if [ $? -eq 0 ]; then
LIBS="$LIBS -lc_r"
have_pthread=1
fi
elif [ -f /usr/lib64/libc_r.so ]; then
line=$(nm -D /usr/lib64/libc_r.so | grep pthread_create | grep -w T)
if [ $? -eq 0 ]; then
LIBS="$LIBS -lc_r"
have_pthread=1
fi
fi
fi
if [ $have_pthread -eq 0 ] && [ "$uname" = "Linux" ]; then
/sbin/ldconfig -p | fgrep libpthread.so > /dev/null
if [ $? -eq 0 ]; then
LIBS="$LIBS -lpthread"
else
echo -E 'Require pthread lib, please check!'
exit 2
fi
fi
if [ "$DEBUG_FLAG" = "1" ]; then
if [ "$uname" = "Linux" ]; then
LIBS="$LIBS -ldl -rdynamic"
fi
fi
sed_replace()
{
sed_cmd=$1
filename=$2
if [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then
sed -i "" "$sed_cmd" $filename
else
sed -i "$sed_cmd" $filename
fi
}
cd src
cp Makefile.in Makefile
sed_replace "s#\$(CFLAGS)#$CFLAGS#g" Makefile
sed_replace "s#\$(LIBS)#$LIBS#g" Makefile
sed_replace "s#\$(TARGET_PREFIX)#$TARGET_PREFIX#g" Makefile
sed_replace "s#\$(ENABLE_STATIC_LIB)#$ENABLE_STATIC_LIB#g" Makefile
sed_replace "s#\$(ENABLE_SHARED_LIB)#$ENABLE_SHARED_LIB#g" Makefile
make $1 $2

34
src/Makefile.in Normal file
View File

@ -0,0 +1,34 @@
.SUFFIXES: .c .lo
COMPILE = $(CC) $(CFLAGS) -fPIC
INC_PATH = -I/usr/include -I/usr/include/fastcommon
LIB_PATH = $(LIBS) -lfastcommon
TARGET_LIB = $(TARGET_PREFIX)/lib64
TARGET_PATH = $(TARGET_PREFIX)/bin
ALL_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h sf_func.h sf_util.h list.h
ALL_LIBS = libserverframe.so
all: $(ALL_LIBS)
libserverframe.so: sf_nio.lo sf_service.lo sf_global.lo sf_func.lo sf_util.lo
cc -shared -o $@ $^ $(LIB_PATH)
.lo:
$(COMPILE) -o $@ $< $(SHARED_OBJS) $(LIB_PATH) $(INC_PATH)
.c:
$(COMPILE) -o $@ $< $(ALL_OBJS) $(LIB_PATH) $(INC_PATH)
.c.lo:
$(COMPILE) -c -o $@ $< $(INC_PATH)
install:
mkdir -p $(TARGET_PATH)
mkdir -p $(TARGET_LIB)
mkdir -p $(TARGET_PREFIX)/include/sf
cp -f $(ALL_LIBS) $(TARGET_LIB)
cp -f $(ALL_HEADERS) $(TARGET_PREFIX)/include/sf
# if [ ! -h $(TARGET_LIB)/libserverframe.so ]; then cd $(TARGET_LIB); ln -s libserverframe.so.1 libserverframe.so; fi
clean:
rm -f *.lo $(ALL_LIBS) $(ALL_PRGS)

167
src/list.h Normal file
View File

@ -0,0 +1,167 @@
#ifndef _LLIST_H
#define _LLIST_H
struct list_head {
struct list_head *next;
struct list_head *prev;
};
#define INIT_LIST_HEAD(head) do { \
(head)->next = (head)->prev = head; \
} while (0)
static inline void
list_add (struct list_head *_new, struct list_head *head)
{
_new->prev = head;
_new->next = head->next;
_new->prev->next = _new;
_new->next->prev = _new;
}
static inline void
list_add_tail (struct list_head *_new, struct list_head *head)
{
_new->next = head;
_new->prev = head->prev;
_new->prev->next = _new;
_new->next->prev = _new;
}
static inline void
list_add_internal(struct list_head *_new, struct list_head *prev,
struct list_head *next)
{
next->prev = _new;
_new->next = next;
_new->prev = prev;
prev->next = _new;
}
static inline void
list_del (struct list_head *old)
{
old->prev->next = old->next;
old->next->prev = old->prev;
old->next = (struct list_head *)0xbabebabe;
old->prev = (struct list_head *)0xcafecafe;
}
static inline void
list_del_init (struct list_head *old)
{
old->prev->next = old->next;
old->next->prev = old->prev;
old->next = old;
old->prev = old;
}
static inline void
list_move (struct list_head *list, struct list_head *head)
{
list->prev->next = list->next;
list->next->prev = list->prev;
list_add (list, head);
}
static inline void
list_move_tail (struct list_head *list, struct list_head *head)
{
list->prev->next = list->next;
list->next->prev = list->prev;
list_add_tail (list, head);
}
static inline int
list_empty (struct list_head *head)
{
return (head->next == head);
}
static inline void
__list_splice (struct list_head *list, struct list_head *head)
{
(list->prev)->next = (head->next);
(head->next)->prev = (list->prev);
(head)->next = (list->next);
(list->next)->prev = (head);
}
static inline void
list_splice (struct list_head *list, struct list_head *head)
{
if (list_empty (list))
return;
__list_splice (list, head);
}
static inline void
list_splice_init (struct list_head *list, struct list_head *head)
{
if (list_empty (list))
return;
__list_splice (list, head);
INIT_LIST_HEAD (list);
}
static inline int list_is_last(const struct list_head *list,
const struct list_head *head)
{
return list->next == head;
}
static inline int list_count(struct list_head *head)
{
struct list_head *pos;
int count;
count = 0;
for (pos = head->next; pos != head; pos = pos->next) {
++count;
}
return count;
}
#define list_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
#define list_for_each_entry(pos, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
pos = list_entry(pos->member.next, typeof(*pos), member))
#define list_for_each_entry_safe(pos, n, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member), \
n = list_entry(pos->member.next, typeof(*pos), member); \
&pos->member != (head); \
pos = n, n = list_entry(n->member.next, typeof(*n), member))
#define list_for_each_prev(pos, head) \
for (pos = (head)->prev; pos != (head); pos = pos->prev)
#endif /* _LLIST_H */

23
src/sf_define.h Normal file
View File

@ -0,0 +1,23 @@
//sf_define.h
#ifndef _SF_DEFINE_H_
#define _SF_DEFINE_H_
#include "common_define.h"
#define SF_DEF_THREAD_STACK_SIZE (64 * 1024)
#define SF_DEF_MAX_PACKAGE_SIZE (16 * 1024)
#define SF_DEF_MIN_BUFF_SIZE (64 * 1024)
#define SF_DEF_MAX_BUFF_SIZE (64 * 1024)
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

116
src/sf_func.c Normal file
View File

@ -0,0 +1,116 @@
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>
#include <grp.h>
#include <pwd.h>
#include "shared_func.h"
#include "logger.h"
#include "sockopt.h"
#include "http_func.h"
#include "sf_define.h"
#include "sf_global.h"
#include "sf_func.h"
int sf_parse_server_info(const char* pServerStr, SFServerInfo* pServerInfo,
const int default_port)
{
char *parts[2];
char server_info[256];
int len;
int count;
len = strlen(pServerStr);
if (len == 0) {
logError("file: "__FILE__", line: %d, "
"pServerStr \"%s\" is empty!",
__LINE__, pServerStr);
return EINVAL;
}
if (len >= sizeof(server_info)) {
logError("file: "__FILE__", line: %d, "
"pServerStr \"%s\" is too long!",
__LINE__, pServerStr);
return ENAMETOOLONG;
}
memcpy(server_info, pServerStr, len);
*(server_info + len) = '\0';
count = splitEx(server_info, ':', parts, 2);
if (count == 1) {
pServerInfo->port = default_port;
}
else {
char *endptr = NULL;
pServerInfo->port = (int)strtol(parts[1], &endptr, 10);
if ((endptr != NULL && *endptr != '\0') || pServerInfo->port <= 0) {
logError("file: "__FILE__", line: %d, "
"pServerStr: %s, invalid port: %s!",
__LINE__, pServerStr, parts[1]);
return EINVAL;
}
}
if (getIpaddrByName(parts[0], pServerInfo->ip_addr,
sizeof(pServerInfo->ip_addr)) == INADDR_NONE)
{
logError("file: "__FILE__", line: %d, "
"pServerStr: %s, invalid hostname: %s!",
__LINE__, pServerStr, parts[0]);
return EINVAL;
}
return 0;
}
int sf_load_server_info(IniContext *pIniContext, const char *filename,
const char *item_name, SFServerInfo *pServerInfo,
const int default_port)
{
char *pServerStr;
pServerStr = iniGetStrValue(NULL, item_name, pIniContext);
if (pServerStr == NULL) {
logError("file: "__FILE__", line: %d, "
"config file: %s, item \"%s\" not exist!",
__LINE__, filename, item_name);
return ENOENT;
}
return sf_parse_server_info(pServerStr, pServerInfo, default_port);
}
int sf_connect_to_server(const char *ip_addr, const int port, int *sock)
{
int result;
*sock = socket(AF_INET, SOCK_STREAM, 0);
if(*sock < 0) {
return errno != 0 ? errno : ENOMEM;
}
tcpsetserveropt(*sock, g_sf_network_timeout);
if ((result=tcpsetnonblockopt(*sock)) != 0) {
close(*sock);
*sock = -1;
return result;
}
if ((result=connectserverbyip_nb(*sock, ip_addr, port,
g_sf_connect_timeout)) != 0)
{
close(*sock);
*sock = -1;
return result;
}
return 0;
}

26
src/sf_func.h Normal file
View File

@ -0,0 +1,26 @@
//sf_func.h
#ifndef _SF_FUNC_H
#define _SF_FUNC_H
#include "common_define.h"
#include "sf_types.h"
#ifdef __cplusplus
extern "C" {
#endif
int sf_load_server_info(IniContext *pIniContext, const char *filename,
const char *item_name, SFServerInfo *pServerInfo,
const int default_port);
int sf_parse_server_info(const char* pServerStr, SFServerInfo* pServerInfo,
const int default_port);
int sf_connect_to_server(const char *ip_addr, const int port, int *sock);
#ifdef __cplusplus
}
#endif
#endif

293
src/sf_global.c Normal file
View File

@ -0,0 +1,293 @@
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>
#include <grp.h>
#include <pwd.h>
#include "shared_func.h"
#include "logger.h"
#include "common_define.h"
#include "sf_define.h"
#include "sf_global.h"
int g_sf_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
int g_sf_network_timeout = DEFAULT_NETWORK_TIMEOUT;
char g_sf_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'};
struct nio_thread_data *g_thread_data = NULL;
volatile bool g_continue_flag = true;
int g_outer_port = 0;
int g_inner_port = 0;
int g_max_connections = DEFAULT_MAX_CONNECTONS;
int g_accept_threads = 1;
int g_work_threads = DEFAULT_WORK_THREADS;
int g_thread_stack_size = SF_DEF_THREAD_STACK_SIZE;
int g_max_pkg_size = SF_DEF_MAX_PACKAGE_SIZE;
int g_min_buff_size = SF_DEF_MIN_BUFF_SIZE;
int g_max_buff_size = SF_DEF_MAX_BUFF_SIZE;
int g_sync_log_buff_interval = SYNC_LOG_BUFF_DEF_INTERVAL;
bool g_rotate_error_log = false;
int g_log_file_keep_days = 0;
gid_t g_run_by_gid;
uid_t g_run_by_uid;
char g_run_by_group[32] = {0};
char g_run_by_user[32] = {0};
time_t g_up_time = 0;
char g_inner_bind_addr[IP_ADDRESS_SIZE] = {0};
char g_outer_bind_addr[IP_ADDRESS_SIZE] = {0};
SFConnectionStat g_connection_stat = {0, 0};
int sf_load_config(const char *server_name, const char *filename,
IniContext *pIniContext, const int default_inner_port,
const int default_outer_port)
{
char *pBasePath;
char *pBindAddr;
char *pRunByGroup;
char *pRunByUser;
char *pMaxPkgSize;
char *pMinBuffSize;
char *pMaxBuffSize;
char *pThreadStackSize;
int result;
int64_t max_pkg_size;
int64_t min_buff_size;
int64_t max_buff_size;
int64_t thread_stack_size;
pBasePath = iniGetStrValue(NULL, "base_path", pIniContext);
if (pBasePath == NULL) {
logError("file: "__FILE__", line: %d, "
"conf file \"%s\" must have item "
"\"base_path\"!", __LINE__, filename);
return ENOENT;
}
snprintf(g_sf_base_path, sizeof(g_sf_base_path), "%s", pBasePath);
chopPath(g_sf_base_path);
if (!fileExists(g_sf_base_path)) {
logError("file: "__FILE__", line: %d, "
"\"%s\" can't be accessed, error info: %s",
__LINE__, g_sf_base_path, strerror(errno));
return errno != 0 ? errno : ENOENT;
}
if (!isDir(g_sf_base_path)) {
logError("file: "__FILE__", line: %d, "
"\"%s\" is not a directory!",
__LINE__, g_sf_base_path);
return ENOTDIR;
}
g_sf_connect_timeout = iniGetIntValue(NULL, "connect_timeout",
pIniContext, DEFAULT_CONNECT_TIMEOUT);
if (g_sf_connect_timeout <= 0) {
g_sf_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
}
g_sf_network_timeout = iniGetIntValue(NULL, "network_timeout",
pIniContext, DEFAULT_NETWORK_TIMEOUT);
if (g_sf_network_timeout <= 0) {
g_sf_network_timeout = DEFAULT_NETWORK_TIMEOUT;
}
g_inner_port = iniGetIntValue(NULL, "inner_port", pIniContext,
default_inner_port);
if (g_inner_port <= 0) {
g_inner_port = default_inner_port;
}
g_outer_port = iniGetIntValue(NULL, "outer_port", pIniContext,
default_outer_port);
if (g_outer_port <= 0) {
g_outer_port = default_outer_port;
}
pBindAddr = iniGetStrValue(NULL, "inner_bind_addr", pIniContext);
if (pBindAddr == NULL) {
*g_inner_bind_addr = '\0';
}
else {
snprintf(g_inner_bind_addr, sizeof(g_inner_bind_addr), "%s", pBindAddr);
}
pBindAddr = iniGetStrValue(NULL, "outer_bind_addr", pIniContext);
if (pBindAddr == NULL) {
*g_outer_bind_addr = '\0';
}
else {
snprintf(g_outer_bind_addr, sizeof(g_outer_bind_addr), "%s", pBindAddr);
}
g_max_connections = iniGetIntValue(NULL, "max_connections",
pIniContext, DEFAULT_MAX_CONNECTONS);
if (g_max_connections <= 0) {
g_max_connections = DEFAULT_MAX_CONNECTONS;
}
g_accept_threads = iniGetIntValue(NULL, "accept_threads",
pIniContext, 1);
if (g_accept_threads <= 0) {
logError("file: "__FILE__", line: %d, "
"item \"accept_threads\" is invalid, "
"value: %d <= 0!", __LINE__, g_accept_threads);
return EINVAL;
}
g_work_threads = iniGetIntValue(NULL, "work_threads",
pIniContext, DEFAULT_WORK_THREADS);
if (g_work_threads <= 0) {
logError("file: "__FILE__", line: %d, "
"item \"work_threads\" is invalid, "
"value: %d <= 0!", __LINE__, g_work_threads);
return EINVAL;
}
if ((result=set_rlimit(RLIMIT_NOFILE, g_max_connections)) != 0) {
return result;
}
pMaxPkgSize = iniGetStrValue(NULL,
"max_pkg_size", pIniContext);
if (pMaxPkgSize == NULL) {
max_pkg_size = SF_DEF_MAX_PACKAGE_SIZE;
}
else if ((result=parse_bytes(pMaxPkgSize, 1,
&max_pkg_size)) != 0)
{
return result;
}
g_max_pkg_size = (int)max_pkg_size;
pMinBuffSize = iniGetStrValue(NULL,
"min_buff_size", pIniContext);
if (pMinBuffSize == NULL) {
min_buff_size = SF_DEF_MIN_BUFF_SIZE;
}
else if ((result=parse_bytes(pMinBuffSize, 1,
&min_buff_size)) != 0)
{
return result;
}
g_min_buff_size = (int)min_buff_size;
pMaxBuffSize = iniGetStrValue(NULL,
"max_buff_size", pIniContext);
if (pMaxBuffSize == NULL) {
max_buff_size = SF_DEF_MAX_BUFF_SIZE;
}
else if ((result=parse_bytes(pMaxBuffSize, 1,
&max_buff_size)) != 0)
{
return result;
}
g_max_buff_size = (int)max_buff_size;
if (pMinBuffSize == NULL || pMaxBuffSize == NULL) {
g_min_buff_size = g_max_pkg_size;
g_max_buff_size = g_max_pkg_size;
}
else if (g_max_buff_size < g_max_pkg_size) {
g_max_buff_size = g_max_pkg_size;
}
pRunByGroup = iniGetStrValue(NULL, "run_by_group", pIniContext);
pRunByUser = iniGetStrValue(NULL, "run_by_user", pIniContext);
if (pRunByGroup == NULL) {
*g_run_by_group = '\0';
}
else {
snprintf(g_run_by_group, sizeof(g_run_by_group),
"%s", pRunByGroup);
}
if (*g_run_by_group == '\0') {
g_run_by_gid = getegid();
}
else {
struct group *pGroup;
pGroup = getgrnam(g_run_by_group);
if (pGroup == NULL) {
result = errno != 0 ? errno : ENOENT;
logError("file: "__FILE__", line: %d, "
"getgrnam fail, errno: %d, "
"error info: %s", __LINE__,
result, strerror(result));
return result;
}
g_run_by_gid = pGroup->gr_gid;
}
if (pRunByUser == NULL) {
*g_run_by_user = '\0';
}
else {
snprintf(g_run_by_user, sizeof(g_run_by_user),
"%s", pRunByUser);
}
if (*g_run_by_user == '\0') {
g_run_by_uid = geteuid();
}
else {
struct passwd *pUser;
pUser = getpwnam(g_run_by_user);
if (pUser == NULL) {
result = errno != 0 ? errno : ENOENT;
logError("file: "__FILE__", line: %d, "
"getpwnam fail, errno: %d, "
"error info: %s", __LINE__,
result, strerror(result));
return result;
}
g_run_by_uid = pUser->pw_uid;
}
if ((result=set_run_by(g_run_by_group, g_run_by_user)) != 0) {
return result;
}
g_sync_log_buff_interval = iniGetIntValue(NULL,
"sync_log_buff_interval", pIniContext,
SYNC_LOG_BUFF_DEF_INTERVAL);
if (g_sync_log_buff_interval <= 0) {
g_sync_log_buff_interval = SYNC_LOG_BUFF_DEF_INTERVAL;
}
pThreadStackSize = iniGetStrValue(NULL,
"thread_stack_size", pIniContext);
if (pThreadStackSize == NULL) {
thread_stack_size = SF_DEF_THREAD_STACK_SIZE;
}
else if ((result=parse_bytes(pThreadStackSize, 1,
&thread_stack_size)) != 0)
{
return result;
}
g_thread_stack_size = (int)thread_stack_size;
g_rotate_error_log = iniGetBoolValue(NULL, "rotate_error_log",
pIniContext, false);
g_log_file_keep_days = iniGetIntValue(NULL, "log_file_keep_days",
pIniContext, 0);
load_log_level(pIniContext);
if ((result=log_set_prefix(g_sf_base_path, server_name)) != 0) {
return result;
}
//log_set_time_precision(&g_log_context, LOG_TIME_PRECISION_MSECOND);
return 0;
}

61
src/sf_global.h Normal file
View File

@ -0,0 +1,61 @@
//sf_global.h
#ifndef _SF_GLOBAL_H
#define _SF_GLOBAL_H
#include "common_define.h"
#include "ini_file_reader.h"
#include "ioevent.h"
typedef struct sf_connection_stat {
volatile int current_count;
volatile int max_count;
} SFConnectionStat;
#ifdef __cplusplus
extern "C" {
#endif
extern int g_sf_connect_timeout;
extern int g_sf_network_timeout;
extern char g_sf_base_path[MAX_PATH_SIZE];
extern struct nio_thread_data *g_thread_data;
extern volatile bool g_continue_flag;
extern int g_outer_port;
extern int g_inner_port;
extern int g_max_connections;
extern int g_accept_threads;
extern int g_work_threads;
extern int g_thread_stack_size;
extern int g_max_pkg_size;
extern int g_min_buff_size;
extern int g_max_buff_size;
extern int g_sync_log_buff_interval; //sync log buff to disk every interval seconds
extern time_t g_up_time;
extern gid_t g_run_by_gid;
extern uid_t g_run_by_uid;
extern char g_run_by_group[32];
extern char g_run_by_user[32];
extern bool g_rotate_error_log;
extern int g_log_file_keep_days;
extern char g_inner_bind_addr[IP_ADDRESS_SIZE];
extern char g_outer_bind_addr[IP_ADDRESS_SIZE];
extern SFConnectionStat g_connection_stat;
int sf_load_config(const char *server_name, const char *filename,
IniContext *pIniContext, const int default_inner_port,
const int default_outer_port);
#ifdef __cplusplus
}
#endif
#endif

441
src/sf_nio.c Normal file
View File

@ -0,0 +1,441 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>
#include "shared_func.h"
#include "sched_thread.h"
#include "pthread_func.h"
#include "logger.h"
#include "sockopt.h"
#include "fast_task_queue.h"
#include "ioevent_loop.h"
#include "sf_global.h"
#include "sf_types.h"
#include "sf_nio.h"
static int sf_header_size = 0;
static bool sf_remove_from_ready_list = true;
static sf_deal_task_func sf_deal_task = NULL;
static sf_set_body_length_callback sf_set_body_length = NULL;
static TaskCleanUpCallback sf_task_cleanup_func = sf_task_finish_clean_up;
static sf_recv_timeout_callback sf_timeout_callback = NULL;
static int client_sock_read(int sock, short event, void *arg);
void sf_set_parameters(const int header_size, sf_set_body_length_callback
set_body_length_func, sf_deal_task_func deal_func,
TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback)
{
sf_header_size = header_size;
sf_set_body_length = set_body_length_func;
sf_deal_task = deal_func;
sf_task_cleanup_func = cleanup_func;
sf_timeout_callback = timeout_callback;
}
void sf_set_remove_from_ready_list(const bool enabled)
{
sf_remove_from_ready_list = enabled;
}
TaskCleanUpCallback sf_get_task_cleanup_func()
{
return sf_task_cleanup_func;
}
void sf_task_finish_clean_up(struct fast_task_info *pTask)
{
assert(pTask->event.fd >= 0);
/*
if (pTask->event.fd < 0) {
logWarning("file: "__FILE__", line: %d, "
"pTask: %p already cleaned",
__LINE__, pTask);
return;
}
*/
if (pTask->finish_callback != NULL) {
pTask->finish_callback(pTask);
pTask->finish_callback = NULL;
}
ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd);
close(pTask->event.fd);
pTask->event.fd = -1;
if (pTask->event.timer.expires > 0) {
fast_timer_remove(&pTask->thread_data->timer,
&pTask->event.timer);
pTask->event.timer.expires = 0;
}
if (sf_remove_from_ready_list) {
ioevent_remove(&pTask->thread_data->ev_puller, pTask);
}
__sync_fetch_and_sub(&g_connection_stat.current_count, 1);
free_queue_push(pTask);
}
void sf_recv_notify_read(int sock, short event, void *arg)
{
int bytes;
int current_connections;
long task_ptr;
struct fast_task_info *pTask;
while (1) {
if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) {
if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
logError("file: "__FILE__", line: %d, "
"call read failed, "
"errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
}
break;
}
else if (bytes == 0) {
break;
}
current_connections = __sync_add_and_fetch(&g_connection_stat.
current_count, 1);
if (current_connections > g_connection_stat.max_count) {
g_connection_stat.max_count = current_connections;
}
pTask = (struct fast_task_info *)task_ptr;
if (ioevent_set(pTask, pTask->thread_data, pTask->event.fd, IOEVENT_READ,
(IOEventCallback)client_sock_read, g_sf_network_timeout) != 0)
{
sf_task_cleanup_func(pTask);
continue;
}
}
}
static inline int set_write_event(struct fast_task_info *pTask)
{
int result;
if (pTask->event.callback == (IOEventCallback)sf_client_sock_write) {
return 0;
}
pTask->event.callback = (IOEventCallback)sf_client_sock_write;
if (ioevent_modify(&pTask->thread_data->ev_puller,
pTask->event.fd, IOEVENT_WRITE, pTask) != 0)
{
result = errno != 0 ? errno : ENOENT;
sf_task_cleanup_func(pTask);
logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
return 0;
}
static inline int set_read_event(struct fast_task_info *pTask)
{
int result;
if (pTask->event.callback == (IOEventCallback)client_sock_read) {
return 0;
}
pTask->event.callback = (IOEventCallback)client_sock_read;
if (ioevent_modify(&pTask->thread_data->ev_puller,
pTask->event.fd, IOEVENT_READ, pTask) != 0)
{
result = errno != 0 ? errno : ENOENT;
sf_task_cleanup_func(pTask);
logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
return 0;
}
int sf_send_add_event(struct fast_task_info *pTask)
{
pTask->offset = 0;
if (pTask->length > 0) {
/* direct send */
if (sf_client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask) < 0) {
return errno != 0 ? errno : EIO;
}
}
return 0;
}
static int client_sock_read(int sock, short event, void *arg)
{
int bytes;
int recv_bytes;
int total_read;
struct fast_task_info *pTask;
assert(sock >= 0);
pTask = (struct fast_task_info *)arg;
if (event & IOEVENT_TIMEOUT) {
if (pTask->offset == 0 && pTask->req_count > 0) {
if (sf_timeout_callback != NULL) {
if (sf_timeout_callback(pTask) != 0) {
sf_task_cleanup_func(pTask);
return -1;
}
}
pTask->event.timer.expires = g_current_time +
g_sf_network_timeout;
fast_timer_add(&pTask->thread_data->timer,
&pTask->event.timer);
}
else {
if (pTask->length > 0) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv timeout, "
"recv offset: %d, expect length: %d",
__LINE__, pTask->client_ip,
pTask->offset, pTask->length);
}
else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, req_count: %ld, recv timeout",
__LINE__, pTask->client_ip, pTask->req_count);
}
sf_task_cleanup_func(pTask);
return -1;
}
return 0;
}
if (event & IOEVENT_ERROR) {
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, recv error event: %d, "
"close connection", __LINE__, pTask->client_ip, event);
sf_task_cleanup_func(pTask);
return -1;
}
total_read = 0;
while (1) {
fast_timer_modify(&pTask->thread_data->timer,
&pTask->event.timer, g_current_time +
g_sf_network_timeout);
if (pTask->length == 0) { //recv header
recv_bytes = sf_header_size - pTask->offset;
}
else {
recv_bytes = pTask->length - pTask->offset;
}
bytes = read(sock, pTask->data + pTask->offset, recv_bytes);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
__LINE__, pTask->client_ip);
continue;
}
else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv failed, "
"errno: %d, error info: %s",
__LINE__, pTask->client_ip,
errno, strerror(errno));
sf_task_cleanup_func(pTask);
return -1;
}
}
else if (bytes == 0) {
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, recv failed, "
"connection disconnected",
__LINE__, pTask->client_ip, sock);
sf_task_cleanup_func(pTask);
return -1;
}
total_read += bytes;
pTask->offset += bytes;
if (pTask->length == 0) { //header
if (pTask->offset < sf_header_size) {
break;
}
if (sf_set_body_length(pTask) != 0) {
sf_task_cleanup_func(pTask);
return -1;
}
if (pTask->length < 0) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d < 0",
__LINE__, pTask->client_ip,
pTask->length);
sf_task_cleanup_func(pTask);
return -1;
}
pTask->length += sf_header_size;
if (pTask->length > g_max_pkg_size) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %d > "
"max pkg size: %d", __LINE__,
pTask->client_ip, pTask->length,
g_max_pkg_size);
sf_task_cleanup_func(pTask);
return -1;
}
if (pTask->length > pTask->size) {
int old_size;
old_size = pTask->size;
if (free_queue_realloc_buffer(pTask, pTask->length) != 0) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, realloc buffer size "
"from %d to %d fail", __LINE__,
pTask->client_ip, pTask->size, pTask->length);
sf_task_cleanup_func(pTask);
return -1;
}
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, task length: %d, realloc buffer size "
"from %d to %d", __LINE__, pTask->client_ip,
pTask->length, old_size, pTask->size);
}
}
if (pTask->offset >= pTask->length) { //recv done
pTask->req_count++;
if (sf_deal_task(pTask) < 0) { //fatal error
sf_task_cleanup_func(pTask);
return -1;
}
break;
}
}
return total_read;
}
int sf_client_sock_write(int sock, short event, void *arg)
{
int bytes;
int total_write;
struct fast_task_info *pTask;
assert(sock >= 0);
pTask = (struct fast_task_info *)arg;
if (event & IOEVENT_TIMEOUT) {
logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout. total length: %d, offset: %d, "
"remain: %d", __LINE__, pTask->client_ip, pTask->length,
pTask->offset, pTask->length - pTask->offset);
sf_task_cleanup_func(pTask);
return -1;
}
if (event & IOEVENT_ERROR) {
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, recv error event: %d, "
"close connection", __LINE__, pTask->client_ip, event);
sf_task_cleanup_func(pTask);
return -1;
}
total_write = 0;
while (1) {
fast_timer_modify(&pTask->thread_data->timer,
&pTask->event.timer, g_current_time +
g_sf_network_timeout);
bytes = write(sock, pTask->data + pTask->offset,
pTask->length - pTask->offset);
//printf("%08X sended %d bytes\n", (int)pTask, bytes);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
if (set_write_event(pTask) != 0) {
return -1;
}
break;
}
else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal",
__LINE__, pTask->client_ip);
continue;
}
else {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, "
"errno: %d, error info: %s",
__LINE__, pTask->client_ip,
errno, strerror(errno));
sf_task_cleanup_func(pTask);
return -1;
}
}
else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, send failed, connection disconnected",
__LINE__, pTask->client_ip, sock);
sf_task_cleanup_func(pTask);
return -1;
}
total_write += bytes;
pTask->offset += bytes;
if (pTask->offset >= pTask->length) {
pTask->offset = 0;
pTask->length = 0;
if (set_read_event(pTask) != 0) {
return -1;
}
break;
}
}
return total_write;
}

34
src/sf_nio.h Normal file
View File

@ -0,0 +1,34 @@
//sf_nio.h
#ifndef _SF_NIO_H
#define _SF_NIO_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "fast_task_queue.h"
#include "sf_types.h"
#ifdef __cplusplus
extern "C" {
#endif
void sf_set_parameters(const int header_size, sf_set_body_length_callback
set_body_length_func, sf_deal_task_func deal_func,
TaskCleanUpCallback cleanup_func,
sf_recv_timeout_callback timeout_callback);
void sf_set_remove_from_ready_list(const bool enabled);
TaskCleanUpCallback sf_get_task_cleanup_func();
void sf_recv_notify_read(int sock, short event, void *arg);
int sf_send_add_event(struct fast_task_info *pTask);
int sf_client_sock_write(int sock, short event, void *arg);
void sf_task_finish_clean_up(struct fast_task_info *pTask);
#ifdef __cplusplus
}
#endif
#endif

468
src/sf_service.c Normal file
View File

@ -0,0 +1,468 @@
//sf_service.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include "logger.h"
#include "sockopt.h"
#include "shared_func.h"
#include "pthread_func.h"
#include "sched_thread.h"
#include "ioevent_loop.h"
#include "sf_global.h"
#include "sf_nio.h"
#include "sf_service.h"
int g_worker_thread_count = 0;
int g_server_outer_sock = -1;
int g_server_inner_sock = -1;
static sf_accept_done_callback sf_accept_done_func = NULL;
static bool bTerminateFlag = false;
static void sigQuitHandler(int sig);
static void sigHupHandler(int sig);
static void sigUsrHandler(int sig);
#if defined(DEBUG_FLAG)
static void sigDumpHandler(int sig);
#endif
static void *worker_thread_entrance(void* arg);
int sf_service_init(sf_alloc_thread_extra_data_callback
alloc_thread_extra_data_callback,
ThreadLoopCallback thread_loop_callback,
sf_accept_done_callback accept_done_callback,
sf_set_body_length_callback set_body_length_func,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_arg_size)
{
#define ALLOC_CONNECTIONS_ONCE 256
int result;
int bytes;
int m;
int init_connections;
int alloc_conn_once;
struct nio_thread_data *pThreadData;
struct nio_thread_data *pDataEnd;
pthread_t tid;
pthread_attr_t thread_attr;
sf_accept_done_func = accept_done_callback;
sf_set_parameters(proto_header_size, set_body_length_func, deal_func,
task_cleanup_func, timeout_callback);
if ((result=set_rand_seed()) != 0) {
logCrit("file: "__FILE__", line: %d, "
"set_rand_seed fail, program exit!", __LINE__);
return result;
}
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0) {
logError("file: "__FILE__", line: %d, "
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
m = g_min_buff_size / (16 * 1024);
if (m == 0) {
m = 1;
} else if (m > 16) {
m = 16;
}
alloc_conn_once = ALLOC_CONNECTIONS_ONCE / m;
init_connections = g_max_connections < alloc_conn_once ?
g_max_connections : alloc_conn_once;
if ((result=free_queue_init_ex(g_max_connections, init_connections,
alloc_conn_once, g_min_buff_size, g_max_buff_size,
task_arg_size)) != 0)
{
return result;
}
bytes = sizeof(struct nio_thread_data) * g_work_threads;
g_thread_data = (struct nio_thread_data *)malloc(bytes);
if (g_thread_data == NULL) {
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail, errno: %d, error info: %s",
__LINE__, bytes, errno, strerror(errno));
return errno != 0 ? errno : ENOMEM;
}
memset(g_thread_data, 0, bytes);
g_worker_thread_count = 0;
pDataEnd = g_thread_data + g_work_threads;
for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++) {
pThreadData->thread_loop_callback = thread_loop_callback;
if (alloc_thread_extra_data_callback != NULL) {
pThreadData->arg = alloc_thread_extra_data_callback(
(int)(pThreadData - g_thread_data));
}
else {
pThreadData->arg = NULL;
}
if (ioevent_init(&pThreadData->ev_puller,
g_max_connections + 2, net_timeout_ms, 0) != 0)
{
result = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, "
"ioevent_init fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
result = fast_timer_init(&pThreadData->timer,
2 * g_sf_network_timeout, g_current_time);
if (result != 0) {
logError("file: "__FILE__", line: %d, "
"fast_timer_init fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
return result;
}
if (pipe(pThreadData->pipe_fds) != 0) {
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, "
"call pipe fail, "
"errno: %d, error info: %s",
__LINE__, result, strerror(result));
break;
}
#if defined(OS_LINUX)
if ((result=fd_add_flags(pThreadData->pipe_fds[0],
O_NONBLOCK | O_NOATIME)) != 0)
{
break;
}
#else
if ((result=fd_add_flags(pThreadData->pipe_fds[0],
O_NONBLOCK)) != 0)
{
break;
}
#endif
if ((result=pthread_create(&tid, &thread_attr,
worker_thread_entrance, pThreadData)) != 0)
{
logError("file: "__FILE__", line: %d, "
"create thread failed, startup threads: %d, "
"errno: %d, error info: %s",
__LINE__, g_worker_thread_count,
result, strerror(result));
break;
}
else {
__sync_fetch_and_add(&g_worker_thread_count, 1);
}
}
pthread_attr_destroy(&thread_attr);
return 0;
}
int sf_service_destroy()
{
struct nio_thread_data *pDataEnd, *pThreadData;
free_queue_destroy();
pDataEnd = g_thread_data + g_work_threads;
for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++) {
fast_timer_destroy(&pThreadData->timer);
}
free(g_thread_data);
g_thread_data = NULL;
return 0;
}
static void *worker_thread_entrance(void* arg)
{
struct nio_thread_data *pThreadData;
pThreadData = (struct nio_thread_data *)arg;
ioevent_loop(pThreadData, sf_recv_notify_read, sf_get_task_cleanup_func(),
&g_continue_flag);
ioevent_destroy(&pThreadData->ev_puller);
__sync_fetch_and_sub(&g_worker_thread_count, 1);
return NULL;
}
static int _socket_server(const char *bind_addr, int port, int *sock)
{
int result;
*sock = socketServer(bind_addr, port, &result);
if (*sock < 0) {
return result;
}
if ((result=tcpsetserveropt(*sock, g_sf_network_timeout)) != 0) {
return result;
}
return 0;
}
int sf_socket_server()
{
int result;
if (g_outer_port != g_inner_port) {
if ((result=_socket_server(g_outer_bind_addr, g_outer_port,
&g_server_outer_sock)) != 0)
{
return result;
}
if ((result=_socket_server(g_inner_bind_addr, g_inner_port,
&g_server_inner_sock)) != 0)
{
return result;
}
} else {
const char *bind_addr;
if (*g_outer_bind_addr != '\0') {
if (*g_inner_bind_addr != '\0') {
bind_addr = "";
} else {
bind_addr = g_outer_bind_addr;
}
} else {
bind_addr = g_inner_bind_addr;
}
if ((result=_socket_server(bind_addr, g_outer_port,
&g_server_outer_sock)) != 0)
{
return result;
}
}
return 0;
}
static void *accept_thread_entrance(void* arg)
{
int server_sock;
int incomesock;
long task_ptr;
struct sockaddr_in inaddr;
socklen_t sockaddr_len;
struct fast_task_info *pTask;
char szClientIp[IP_ADDRESS_SIZE];
server_sock = (long)arg;
while (g_continue_flag) {
sockaddr_len = sizeof(inaddr);
incomesock = accept(server_sock, (struct sockaddr*)&inaddr, &sockaddr_len);
if (incomesock < 0) { //error
if (!(errno == EINTR || errno == EAGAIN)) {
logError("file: "__FILE__", line: %d, "
"accept fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
}
continue;
}
getPeerIpaddr(incomesock,
szClientIp, IP_ADDRESS_SIZE);
if (tcpsetnonblockopt(incomesock) != 0) {
close(incomesock);
continue;
}
pTask = free_queue_pop();
if (pTask == NULL) {
logError("file: "__FILE__", line: %d, "
"malloc task buff failed, you should "
"increase the parameter: max_connections",
__LINE__);
close(incomesock);
continue;
}
strcpy(pTask->client_ip, szClientIp);
pTask->event.fd = incomesock;
pTask->thread_data = g_thread_data + incomesock % g_work_threads;
if (sf_accept_done_func != NULL) {
sf_accept_done_func(pTask, server_sock == g_server_inner_sock);
}
task_ptr = (long)pTask;
if (write(pTask->thread_data->pipe_fds[1], &task_ptr,
sizeof(task_ptr)) != sizeof(task_ptr))
{
logError("file: "__FILE__", line: %d, "
"call write to pipe fd: %d fail, "
"errno: %d, error info: %s",
__LINE__, pTask->thread_data->pipe_fds[1],
errno, strerror(errno));
close(incomesock);
free_queue_push(pTask);
}
}
return NULL;
}
void _accept_loop(int server_sock, const int accept_threads)
{
pthread_t tid;
pthread_attr_t thread_attr;
int result;
int i;
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0) {
logWarning("file: "__FILE__", line: %d, "
"init_pthread_attr fail!", __LINE__);
}
else {
for (i=0; i<accept_threads; i++) {
if ((result=pthread_create(&tid, &thread_attr,
accept_thread_entrance,
(void *)(long)server_sock)) != 0)
{
logError("file: "__FILE__", line: %d, "
"create thread failed, startup threads: %d, "
"errno: %d, error info: %s",
__LINE__, i, result, strerror(result));
break;
}
}
pthread_attr_destroy(&thread_attr);
}
}
void sf_accept_loop()
{
if (g_outer_port != g_inner_port) {
_accept_loop(g_server_inner_sock, g_accept_threads);
}
_accept_loop(g_server_outer_sock, g_accept_threads - 1);
accept_thread_entrance((void *)(long)g_server_outer_sock);
}
#if defined(DEBUG_FLAG)
static void sigDumpHandler(int sig)
{
static bool bDumpFlag = false;
char filename[256];
if (bDumpFlag) {
return;
}
bDumpFlag = true;
snprintf(filename, sizeof(filename),
"%s/logs/sf_dump.log", g_sf_base_path);
//manager_dump_global_vars_to_file(filename);
bDumpFlag = false;
}
#endif
static void sigQuitHandler(int sig)
{
if (!bTerminateFlag) {
bTerminateFlag = true;
g_continue_flag = false;
logCrit("file: "__FILE__", line: %d, " \
"catch signal %d, program exiting...", \
__LINE__, sig);
}
}
static void sigHupHandler(int sig)
{
logInfo("file: "__FILE__", line: %d, " \
"catch signal %d", __LINE__, sig);
}
static void sigUsrHandler(int sig)
{
logInfo("file: "__FILE__", line: %d, "
"catch signal %d, ignore it", __LINE__, sig);
}
int sf_setup_signal_handler()
{
struct sigaction act;
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigUsrHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 ||
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = sigHupHandler;
if(sigaction(SIGHUP, &act, NULL) < 0) {
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = SIG_IGN;
if(sigaction(SIGPIPE, &act, NULL) < 0) {
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 ||
sigaction(SIGTERM, &act, NULL) < 0 ||
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
#if defined(DEBUG_FLAG)
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigDumpHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 ||
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
#endif
return 0;
}

42
src/sf_service.h Normal file
View File

@ -0,0 +1,42 @@
//sf_service.h
#ifndef _SF_SERVICE_H_
#define _SF_SERVICE_H_
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "ioevent.h"
#include "fast_task_queue.h"
#include "sf_types.h"
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
#ifdef __cplusplus
extern "C" {
#endif
extern int g_server_outer_sock;
extern int g_server_inner_sock;
extern int g_worker_thread_count;
int sf_service_init(sf_alloc_thread_extra_data_callback
alloc_thread_extra_data_callback,
ThreadLoopCallback thread_loop_callback,
sf_accept_done_callback accept_done_callback,
sf_set_body_length_callback set_body_length_func,
sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_arg_size);
int sf_service_destroy();
int sf_setup_signal_handler();
int sf_socket_server();
void sf_accept_loop();
#ifdef __cplusplus
}
#endif
#endif

26
src/sf_types.h Normal file
View File

@ -0,0 +1,26 @@
//sf_types.h
#ifndef _SF_TYPES_H_
#define _SF_TYPES_H_
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include "connection_pool.h"
struct fast_task_info;
typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask,
const bool bInnerPort);
typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask);
typedef int (*sf_deal_task_func)(struct fast_task_info *pTask);
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *pTask);
typedef struct {
char ip_addr[IP_ADDRESS_SIZE];
int port;
} SFServerInfo;
#endif

52
src/sf_util.c Normal file
View File

@ -0,0 +1,52 @@
#include <sys/syscall.h>
#include <stdarg.h>
#include <stdio.h>
#include "sf_util.h"
int64_t getticks()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
void log_plus(const int priority, const char* file,
int line, const char* fmt, ...)
{
char buf[2048];
int hlen;
if (g_log_context.log_level < priority) {
return;
}
#ifdef DEBUG_FLAG
hlen = snprintf(buf, sizeof(buf), "%s:%d %ld ", file, line, (long)syscall(SYS_gettid));
#else
hlen = snprintf(buf, sizeof(buf), "%s:%d ", file, line);
#endif
va_list ap;
va_start(ap, fmt);
hlen += vsnprintf(buf+hlen, sizeof(buf)-hlen, fmt, ap);
va_end(ap);
log_it_ex1(&g_log_context, priority, buf, hlen);
}
int sf_printbuffer(char* buffer,int32_t len)
{
int i;
if(buffer==NULL)
{
fprintf(stderr, "common-utils parameter is fail");
return(-1);
}
for(i=0;i<len;i++)
{
if(i%16 == 0)
{
fprintf(stderr,"\n");
}
fprintf(stderr,"[%02x]", (unsigned char)buffer[i]);
}
fprintf(stderr,"\n");
return(0);
}

57
src/sf_util.h Normal file
View File

@ -0,0 +1,57 @@
//sf_util.h
#ifndef _SF_UTIL_H_
#define _SF_UTIL_H_
#include "logger.h"
#include <assert.h>
#ifdef DEBUG_FLAG /*only for format check*/
#define lemerg(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_EMERG, __FILE__, __LINE__, __VA_ARGS__)
#define lcrit(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_CRIT, __FILE__, __LINE__, __VA_ARGS__)
#define lalert(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_ALERT, __FILE__, __LINE__, __VA_ARGS__)
#define lerr(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_ERR, __FILE__, __LINE__, __VA_ARGS__)
#define lwarning(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_WARNING, __FILE__, __LINE__, __VA_ARGS__)
#define lnotice(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_NOTICE, __FILE__, __LINE__, __VA_ARGS__)
#define linfo(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_INFO, __FILE__, __LINE__, __VA_ARGS__)
#define ldebug(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_DEBUG, __FILE__, __LINE__, __VA_ARGS__)
#else
#define lemerg(...) log_plus(LOG_EMERG, __FILE__, __LINE__, __VA_ARGS__)
#define lcrit(...) log_plus(LOG_CRIT, __FILE__, __LINE__, __VA_ARGS__)
#define lalert(...) log_plus(LOG_ALERT, __FILE__, __LINE__, __VA_ARGS__)
#define lerr(...) log_plus(LOG_ERR, __FILE__, __LINE__, __VA_ARGS__)
#define lwarning(...) log_plus(LOG_WARNING, __FILE__, __LINE__, __VA_ARGS__)
#define lnotice(...) log_plus(LOG_NOTICE, __FILE__, __LINE__, __VA_ARGS__)
#define linfo(...) log_plus(LOG_INFO, __FILE__, __LINE__, __VA_ARGS__)
#define ldebug(...) log_plus(LOG_DEBUG, __FILE__, __LINE__, __VA_ARGS__)
#endif
#define returnif(b) if(b){ return b; }
#define breakif(b) if(b){ break; }
#define failvars int eln, eres; const char* emsg
#define gofailif(b, msg) if(b) { eln=__LINE__; emsg=msg; eres=(b); goto FAIL_; }
#define logfail() lerr("error at %s:%d errno: %d msg: %s errmsg: %s", \
__FILE__, eln, eres, emsg, strerror(eres))
#define dszoffset(cls, mem) ((char*)&((cls*)0)->mem - ((char*)0))
#ifdef __cplusplus
extern "C" {
#endif
int64_t getticks() ;
void log_plus(const int priority, const char* file, int line, const char* fmt, ...);
int sf_printbuffer(char* buffer,int32_t len);
#ifdef __cplusplus
}
#endif
#endif