diff --git a/README.md b/README.md index 45e1bc9..b7ccc5d 100644 --- a/README.md +++ b/README.md @@ -1 +1,2 @@ -# libserverframe + +network service framework library extract from FastDFS diff --git a/libserverframe.spec b/libserverframe.spec new file mode 100644 index 0000000..f1b71c1 --- /dev/null +++ b/libserverframe.spec @@ -0,0 +1,60 @@ +%define LibserverframeDevel libserverframe-devel +%define CommitVersion %(echo $COMMIT_VERSION) + +Name: libserverframe +Version: 1.0.10 +Release: 1%{?dist} +Summary: mc common framework library +License: GPL +Group: Arch/Tech +URL: http://github.com/happyfish100/libfastcommon/ +Source: http://github.com/happyfish100/libfastcommon/%{name}-%{version}.tar.gz + +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) + +BuildRequires: libfastcommon-devel >= 1.0.29 +Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id +Requires: libfastcommon >= 1.0.29 +%description +common framework library +commit version: %{CommitVersion} + +%package devel +Summary: Development header file +Requires: %{name}%{?_isa} = %{version}-%{release} + +%description devel +This package provides the header files of libserverframe +commit version: %{CommitVersion} + + +%prep +%setup -q + +%build +./make.sh + +%install +rm -rf %{buildroot} +DESTDIR=$RPM_BUILD_ROOT ./make.sh install + +%post + +%preun + +%postun + +%clean +rm -rf %{buildroot} + +%files +%defattr(-,root,root,-) +/usr/lib64/libserverframe.so* + +%files devel +%defattr(-,root,root,-) +/usr/include/sf/* + +%changelog +* Mon Jun 23 2014 Zaixue Liao +- first RPM release (1.0) diff --git a/make.sh b/make.sh new file mode 100755 index 0000000..8b64a91 --- /dev/null +++ b/make.sh @@ -0,0 +1,99 @@ +ENABLE_STATIC_LIB=0 +ENABLE_SHARED_LIB=1 +TARGET_PREFIX=$DESTDIR/usr +TARGET_CONF_PATH=$DESTDIR/etc/mc + +DEBUG_FLAG=1 + +CFLAGS="$CFLAGS -Wall -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -I/usr/local/include/fastcommon" +if [ "$DEBUG_FLAG" = "1" ]; then + CFLAGS="$CFLAGS -g -O1 -DDEBUG_FLAG" +else + CFLAGS="$CFLAGS -O3" +fi + +LIBS='' +uname=$(uname) +if [ "$uname" = "Linux" ]; then + CFLAGS="$CFLAGS" +elif [ "$uname" = "FreeBSD" ]; then + CFLAGS="$CFLAGS" +elif [ "$uname" = "SunOS" ]; then + CFLAGS="$CFLAGS -D_THREAD_SAFE" + LIBS="$LIBS -lsocket -lnsl -lresolv" + export CC=gcc +elif [ "$uname" = "AIX" ]; then + CFLAGS="$CFLAGS -D_THREAD_SAFE" + export CC=gcc +elif [ "$uname" = "HP-UX" ]; then + CFLAGS="$CFLAGS" +fi + +have_pthread=0 +if [ -f /usr/lib/libpthread.so ] || [ -f /usr/local/lib/libpthread.so ] || [ -f /lib64/libpthread.so ] || [ -f /usr/lib64/libpthread.so ] || [ -f /usr/lib/libpthread.a ] || [ -f /usr/local/lib/libpthread.a ] || [ -f /lib64/libpthread.a ] || [ -f /usr/lib64/libpthread.a ]; then + LIBS="$LIBS -lpthread" + have_pthread=1 +elif [ "$uname" = "HP-UX" ]; then + lib_path="/usr/lib/hpux$OS_BITS" + if [ -f $lib_path/libpthread.so ]; then + LIBS="-L$lib_path -lpthread" + have_pthread=1 + fi +elif [ "$uname" = "FreeBSD" ]; then + if [ -f /usr/lib/libc_r.so ]; then + line=$(nm -D /usr/lib/libc_r.so | grep pthread_create | grep -w T) + if [ $? -eq 0 ]; then + LIBS="$LIBS -lc_r" + have_pthread=1 + fi + elif [ -f /lib64/libc_r.so ]; then + line=$(nm -D /lib64/libc_r.so | grep pthread_create | grep -w T) + if [ $? -eq 0 ]; then + LIBS="$LIBS -lc_r" + have_pthread=1 + fi + elif [ -f /usr/lib64/libc_r.so ]; then + line=$(nm -D /usr/lib64/libc_r.so | grep pthread_create | grep -w T) + if [ $? -eq 0 ]; then + LIBS="$LIBS -lc_r" + have_pthread=1 + fi + fi +fi + +if [ $have_pthread -eq 0 ] && [ "$uname" = "Linux" ]; then + /sbin/ldconfig -p | fgrep libpthread.so > /dev/null + if [ $? -eq 0 ]; then + LIBS="$LIBS -lpthread" + else + echo -E 'Require pthread lib, please check!' + exit 2 + fi +fi + +if [ "$DEBUG_FLAG" = "1" ]; then + if [ "$uname" = "Linux" ]; then + LIBS="$LIBS -ldl -rdynamic" + fi +fi + +sed_replace() +{ + sed_cmd=$1 + filename=$2 + if [ "$uname" = "FreeBSD" ] || [ "$uname" = "Darwin" ]; then + sed -i "" "$sed_cmd" $filename + else + sed -i "$sed_cmd" $filename + fi +} + +cd src +cp Makefile.in Makefile +sed_replace "s#\$(CFLAGS)#$CFLAGS#g" Makefile +sed_replace "s#\$(LIBS)#$LIBS#g" Makefile +sed_replace "s#\$(TARGET_PREFIX)#$TARGET_PREFIX#g" Makefile +sed_replace "s#\$(ENABLE_STATIC_LIB)#$ENABLE_STATIC_LIB#g" Makefile +sed_replace "s#\$(ENABLE_SHARED_LIB)#$ENABLE_SHARED_LIB#g" Makefile +make $1 $2 + diff --git a/src/Makefile.in b/src/Makefile.in new file mode 100644 index 0000000..891afcd --- /dev/null +++ b/src/Makefile.in @@ -0,0 +1,34 @@ +.SUFFIXES: .c .lo + +COMPILE = $(CC) $(CFLAGS) -fPIC +INC_PATH = -I/usr/include -I/usr/include/fastcommon +LIB_PATH = $(LIBS) -lfastcommon +TARGET_LIB = $(TARGET_PREFIX)/lib64 +TARGET_PATH = $(TARGET_PREFIX)/bin + +ALL_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h sf_func.h sf_util.h list.h +ALL_LIBS = libserverframe.so + +all: $(ALL_LIBS) + +libserverframe.so: sf_nio.lo sf_service.lo sf_global.lo sf_func.lo sf_util.lo + cc -shared -o $@ $^ $(LIB_PATH) + +.lo: + $(COMPILE) -o $@ $< $(SHARED_OBJS) $(LIB_PATH) $(INC_PATH) +.c: + $(COMPILE) -o $@ $< $(ALL_OBJS) $(LIB_PATH) $(INC_PATH) +.c.lo: + $(COMPILE) -c -o $@ $< $(INC_PATH) +install: + mkdir -p $(TARGET_PATH) + mkdir -p $(TARGET_LIB) + mkdir -p $(TARGET_PREFIX)/include/sf + + cp -f $(ALL_LIBS) $(TARGET_LIB) + cp -f $(ALL_HEADERS) $(TARGET_PREFIX)/include/sf + +# if [ ! -h $(TARGET_LIB)/libserverframe.so ]; then cd $(TARGET_LIB); ln -s libserverframe.so.1 libserverframe.so; fi +clean: + rm -f *.lo $(ALL_LIBS) $(ALL_PRGS) + diff --git a/src/list.h b/src/list.h new file mode 100644 index 0000000..156d209 --- /dev/null +++ b/src/list.h @@ -0,0 +1,167 @@ +#ifndef _LLIST_H +#define _LLIST_H + + +struct list_head { + struct list_head *next; + struct list_head *prev; +}; + + +#define INIT_LIST_HEAD(head) do { \ + (head)->next = (head)->prev = head; \ + } while (0) + + +static inline void +list_add (struct list_head *_new, struct list_head *head) +{ + _new->prev = head; + _new->next = head->next; + + _new->prev->next = _new; + _new->next->prev = _new; +} + + +static inline void +list_add_tail (struct list_head *_new, struct list_head *head) +{ + _new->next = head; + _new->prev = head->prev; + + _new->prev->next = _new; + _new->next->prev = _new; +} + +static inline void +list_add_internal(struct list_head *_new, struct list_head *prev, + struct list_head *next) +{ + next->prev = _new; + _new->next = next; + + _new->prev = prev; + prev->next = _new; +} + +static inline void +list_del (struct list_head *old) +{ + old->prev->next = old->next; + old->next->prev = old->prev; + + old->next = (struct list_head *)0xbabebabe; + old->prev = (struct list_head *)0xcafecafe; +} + + +static inline void +list_del_init (struct list_head *old) +{ + old->prev->next = old->next; + old->next->prev = old->prev; + + old->next = old; + old->prev = old; +} + + +static inline void +list_move (struct list_head *list, struct list_head *head) +{ + list->prev->next = list->next; + list->next->prev = list->prev; + list_add (list, head); +} + + +static inline void +list_move_tail (struct list_head *list, struct list_head *head) +{ + list->prev->next = list->next; + list->next->prev = list->prev; + list_add_tail (list, head); +} + + +static inline int +list_empty (struct list_head *head) +{ + return (head->next == head); +} + + +static inline void +__list_splice (struct list_head *list, struct list_head *head) +{ + (list->prev)->next = (head->next); + (head->next)->prev = (list->prev); + + (head)->next = (list->next); + (list->next)->prev = (head); +} + + +static inline void +list_splice (struct list_head *list, struct list_head *head) +{ + if (list_empty (list)) + return; + + __list_splice (list, head); +} + + +static inline void +list_splice_init (struct list_head *list, struct list_head *head) +{ + if (list_empty (list)) + return; + + __list_splice (list, head); + INIT_LIST_HEAD (list); +} + +static inline int list_is_last(const struct list_head *list, + const struct list_head *head) +{ + return list->next == head; +} + +static inline int list_count(struct list_head *head) +{ + struct list_head *pos; + int count; + + count = 0; + for (pos = head->next; pos != head; pos = pos->next) { + ++count; + } + return count; +} + +#define list_entry(ptr, type, member) \ + ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) + + +#define list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); pos = pos->next) + + +#define list_for_each_entry(pos, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + + +#define list_for_each_entry_safe(pos, n, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member), \ + n = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.next, typeof(*n), member)) + +#define list_for_each_prev(pos, head) \ + for (pos = (head)->prev; pos != (head); pos = pos->prev) + +#endif /* _LLIST_H */ diff --git a/src/sf_define.h b/src/sf_define.h new file mode 100644 index 0000000..3c97277 --- /dev/null +++ b/src/sf_define.h @@ -0,0 +1,23 @@ +//sf_define.h + +#ifndef _SF_DEFINE_H_ +#define _SF_DEFINE_H_ + +#include "common_define.h" + +#define SF_DEF_THREAD_STACK_SIZE (64 * 1024) +#define SF_DEF_MAX_PACKAGE_SIZE (16 * 1024) +#define SF_DEF_MIN_BUFF_SIZE (64 * 1024) +#define SF_DEF_MAX_BUFF_SIZE (64 * 1024) + +#ifdef __cplusplus +extern "C" { +#endif + + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/sf_func.c b/src/sf_func.c new file mode 100644 index 0000000..753d785 --- /dev/null +++ b/src/sf_func.c @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "shared_func.h" +#include "logger.h" +#include "sockopt.h" +#include "http_func.h" +#include "sf_define.h" +#include "sf_global.h" +#include "sf_func.h" + +int sf_parse_server_info(const char* pServerStr, SFServerInfo* pServerInfo, + const int default_port) +{ + char *parts[2]; + char server_info[256]; + int len; + int count; + + len = strlen(pServerStr); + if (len == 0) { + logError("file: "__FILE__", line: %d, " + "pServerStr \"%s\" is empty!", + __LINE__, pServerStr); + return EINVAL; + } + if (len >= sizeof(server_info)) { + logError("file: "__FILE__", line: %d, " + "pServerStr \"%s\" is too long!", + __LINE__, pServerStr); + return ENAMETOOLONG; + } + + memcpy(server_info, pServerStr, len); + *(server_info + len) = '\0'; + + count = splitEx(server_info, ':', parts, 2); + if (count == 1) { + pServerInfo->port = default_port; + } + else { + char *endptr = NULL; + pServerInfo->port = (int)strtol(parts[1], &endptr, 10); + if ((endptr != NULL && *endptr != '\0') || pServerInfo->port <= 0) { + logError("file: "__FILE__", line: %d, " + "pServerStr: %s, invalid port: %s!", + __LINE__, pServerStr, parts[1]); + return EINVAL; + } + } + + if (getIpaddrByName(parts[0], pServerInfo->ip_addr, + sizeof(pServerInfo->ip_addr)) == INADDR_NONE) + { + logError("file: "__FILE__", line: %d, " + "pServerStr: %s, invalid hostname: %s!", + __LINE__, pServerStr, parts[0]); + return EINVAL; + } + + return 0; +} + +int sf_load_server_info(IniContext *pIniContext, const char *filename, + const char *item_name, SFServerInfo *pServerInfo, + const int default_port) +{ + char *pServerStr; + + pServerStr = iniGetStrValue(NULL, item_name, pIniContext); + if (pServerStr == NULL) { + logError("file: "__FILE__", line: %d, " + "config file: %s, item \"%s\" not exist!", + __LINE__, filename, item_name); + return ENOENT; + } + + return sf_parse_server_info(pServerStr, pServerInfo, default_port); +} + +int sf_connect_to_server(const char *ip_addr, const int port, int *sock) +{ + int result; + *sock = socket(AF_INET, SOCK_STREAM, 0); + if(*sock < 0) { + return errno != 0 ? errno : ENOMEM; + } + tcpsetserveropt(*sock, g_sf_network_timeout); + + if ((result=tcpsetnonblockopt(*sock)) != 0) { + close(*sock); + *sock = -1; + return result; + } + + if ((result=connectserverbyip_nb(*sock, ip_addr, port, + g_sf_connect_timeout)) != 0) + { + close(*sock); + *sock = -1; + return result; + } + + return 0; +} + diff --git a/src/sf_func.h b/src/sf_func.h new file mode 100644 index 0000000..c3dbfaf --- /dev/null +++ b/src/sf_func.h @@ -0,0 +1,26 @@ +//sf_func.h + +#ifndef _SF_FUNC_H +#define _SF_FUNC_H + +#include "common_define.h" +#include "sf_types.h" +#ifdef __cplusplus +extern "C" { +#endif + +int sf_load_server_info(IniContext *pIniContext, const char *filename, + const char *item_name, SFServerInfo *pServerInfo, + const int default_port); + +int sf_parse_server_info(const char* pServerStr, SFServerInfo* pServerInfo, + const int default_port); + +int sf_connect_to_server(const char *ip_addr, const int port, int *sock); + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/sf_global.c b/src/sf_global.c new file mode 100644 index 0000000..b0cdd88 --- /dev/null +++ b/src/sf_global.c @@ -0,0 +1,293 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "shared_func.h" +#include "logger.h" +#include "common_define.h" +#include "sf_define.h" +#include "sf_global.h" + +int g_sf_connect_timeout = DEFAULT_CONNECT_TIMEOUT; +int g_sf_network_timeout = DEFAULT_NETWORK_TIMEOUT; +char g_sf_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'}; + +struct nio_thread_data *g_thread_data = NULL; +volatile bool g_continue_flag = true; +int g_outer_port = 0; +int g_inner_port = 0; +int g_max_connections = DEFAULT_MAX_CONNECTONS; +int g_accept_threads = 1; +int g_work_threads = DEFAULT_WORK_THREADS; +int g_thread_stack_size = SF_DEF_THREAD_STACK_SIZE; +int g_max_pkg_size = SF_DEF_MAX_PACKAGE_SIZE; +int g_min_buff_size = SF_DEF_MIN_BUFF_SIZE; +int g_max_buff_size = SF_DEF_MAX_BUFF_SIZE; +int g_sync_log_buff_interval = SYNC_LOG_BUFF_DEF_INTERVAL; + +bool g_rotate_error_log = false; +int g_log_file_keep_days = 0; + +gid_t g_run_by_gid; +uid_t g_run_by_uid; +char g_run_by_group[32] = {0}; +char g_run_by_user[32] = {0}; +time_t g_up_time = 0; + +char g_inner_bind_addr[IP_ADDRESS_SIZE] = {0}; +char g_outer_bind_addr[IP_ADDRESS_SIZE] = {0}; + +SFConnectionStat g_connection_stat = {0, 0}; + +int sf_load_config(const char *server_name, const char *filename, + IniContext *pIniContext, const int default_inner_port, + const int default_outer_port) +{ + char *pBasePath; + char *pBindAddr; + char *pRunByGroup; + char *pRunByUser; + char *pMaxPkgSize; + char *pMinBuffSize; + char *pMaxBuffSize; + char *pThreadStackSize; + int result; + int64_t max_pkg_size; + int64_t min_buff_size; + int64_t max_buff_size; + int64_t thread_stack_size; + + pBasePath = iniGetStrValue(NULL, "base_path", pIniContext); + if (pBasePath == NULL) { + logError("file: "__FILE__", line: %d, " + "conf file \"%s\" must have item " + "\"base_path\"!", __LINE__, filename); + return ENOENT; + } + + snprintf(g_sf_base_path, sizeof(g_sf_base_path), "%s", pBasePath); + chopPath(g_sf_base_path); + if (!fileExists(g_sf_base_path)) { + logError("file: "__FILE__", line: %d, " + "\"%s\" can't be accessed, error info: %s", + __LINE__, g_sf_base_path, strerror(errno)); + return errno != 0 ? errno : ENOENT; + } + if (!isDir(g_sf_base_path)) { + logError("file: "__FILE__", line: %d, " + "\"%s\" is not a directory!", + __LINE__, g_sf_base_path); + return ENOTDIR; + } + + g_sf_connect_timeout = iniGetIntValue(NULL, "connect_timeout", + pIniContext, DEFAULT_CONNECT_TIMEOUT); + if (g_sf_connect_timeout <= 0) { + g_sf_connect_timeout = DEFAULT_CONNECT_TIMEOUT; + } + + g_sf_network_timeout = iniGetIntValue(NULL, "network_timeout", + pIniContext, DEFAULT_NETWORK_TIMEOUT); + if (g_sf_network_timeout <= 0) { + g_sf_network_timeout = DEFAULT_NETWORK_TIMEOUT; + } + + g_inner_port = iniGetIntValue(NULL, "inner_port", pIniContext, + default_inner_port); + if (g_inner_port <= 0) { + g_inner_port = default_inner_port; + } + g_outer_port = iniGetIntValue(NULL, "outer_port", pIniContext, + default_outer_port); + if (g_outer_port <= 0) { + g_outer_port = default_outer_port; + } + + pBindAddr = iniGetStrValue(NULL, "inner_bind_addr", pIniContext); + if (pBindAddr == NULL) { + *g_inner_bind_addr = '\0'; + } + else { + snprintf(g_inner_bind_addr, sizeof(g_inner_bind_addr), "%s", pBindAddr); + } + + pBindAddr = iniGetStrValue(NULL, "outer_bind_addr", pIniContext); + if (pBindAddr == NULL) { + *g_outer_bind_addr = '\0'; + } + else { + snprintf(g_outer_bind_addr, sizeof(g_outer_bind_addr), "%s", pBindAddr); + } + + g_max_connections = iniGetIntValue(NULL, "max_connections", + pIniContext, DEFAULT_MAX_CONNECTONS); + if (g_max_connections <= 0) { + g_max_connections = DEFAULT_MAX_CONNECTONS; + } + + g_accept_threads = iniGetIntValue(NULL, "accept_threads", + pIniContext, 1); + if (g_accept_threads <= 0) { + logError("file: "__FILE__", line: %d, " + "item \"accept_threads\" is invalid, " + "value: %d <= 0!", __LINE__, g_accept_threads); + return EINVAL; + } + + g_work_threads = iniGetIntValue(NULL, "work_threads", + pIniContext, DEFAULT_WORK_THREADS); + if (g_work_threads <= 0) { + logError("file: "__FILE__", line: %d, " + "item \"work_threads\" is invalid, " + "value: %d <= 0!", __LINE__, g_work_threads); + return EINVAL; + } + + if ((result=set_rlimit(RLIMIT_NOFILE, g_max_connections)) != 0) { + return result; + } + + pMaxPkgSize = iniGetStrValue(NULL, + "max_pkg_size", pIniContext); + if (pMaxPkgSize == NULL) { + max_pkg_size = SF_DEF_MAX_PACKAGE_SIZE; + } + else if ((result=parse_bytes(pMaxPkgSize, 1, + &max_pkg_size)) != 0) + { + return result; + } + g_max_pkg_size = (int)max_pkg_size; + + pMinBuffSize = iniGetStrValue(NULL, + "min_buff_size", pIniContext); + if (pMinBuffSize == NULL) { + min_buff_size = SF_DEF_MIN_BUFF_SIZE; + } + else if ((result=parse_bytes(pMinBuffSize, 1, + &min_buff_size)) != 0) + { + return result; + } + g_min_buff_size = (int)min_buff_size; + + pMaxBuffSize = iniGetStrValue(NULL, + "max_buff_size", pIniContext); + if (pMaxBuffSize == NULL) { + max_buff_size = SF_DEF_MAX_BUFF_SIZE; + } + else if ((result=parse_bytes(pMaxBuffSize, 1, + &max_buff_size)) != 0) + { + return result; + } + g_max_buff_size = (int)max_buff_size; + + if (pMinBuffSize == NULL || pMaxBuffSize == NULL) { + g_min_buff_size = g_max_pkg_size; + g_max_buff_size = g_max_pkg_size; + } + else if (g_max_buff_size < g_max_pkg_size) { + g_max_buff_size = g_max_pkg_size; + } + + pRunByGroup = iniGetStrValue(NULL, "run_by_group", pIniContext); + pRunByUser = iniGetStrValue(NULL, "run_by_user", pIniContext); + if (pRunByGroup == NULL) { + *g_run_by_group = '\0'; + } + else { + snprintf(g_run_by_group, sizeof(g_run_by_group), + "%s", pRunByGroup); + } + if (*g_run_by_group == '\0') { + g_run_by_gid = getegid(); + } + else { + struct group *pGroup; + + pGroup = getgrnam(g_run_by_group); + if (pGroup == NULL) { + result = errno != 0 ? errno : ENOENT; + logError("file: "__FILE__", line: %d, " + "getgrnam fail, errno: %d, " + "error info: %s", __LINE__, + result, strerror(result)); + return result; + } + + g_run_by_gid = pGroup->gr_gid; + } + + if (pRunByUser == NULL) { + *g_run_by_user = '\0'; + } + else { + snprintf(g_run_by_user, sizeof(g_run_by_user), + "%s", pRunByUser); + } + if (*g_run_by_user == '\0') { + g_run_by_uid = geteuid(); + } + else { + struct passwd *pUser; + + pUser = getpwnam(g_run_by_user); + if (pUser == NULL) { + result = errno != 0 ? errno : ENOENT; + logError("file: "__FILE__", line: %d, " + "getpwnam fail, errno: %d, " + "error info: %s", __LINE__, + result, strerror(result)); + return result; + } + + g_run_by_uid = pUser->pw_uid; + } + + if ((result=set_run_by(g_run_by_group, g_run_by_user)) != 0) { + return result; + } + + g_sync_log_buff_interval = iniGetIntValue(NULL, + "sync_log_buff_interval", pIniContext, + SYNC_LOG_BUFF_DEF_INTERVAL); + if (g_sync_log_buff_interval <= 0) { + g_sync_log_buff_interval = SYNC_LOG_BUFF_DEF_INTERVAL; + } + + pThreadStackSize = iniGetStrValue(NULL, + "thread_stack_size", pIniContext); + if (pThreadStackSize == NULL) { + thread_stack_size = SF_DEF_THREAD_STACK_SIZE; + } + else if ((result=parse_bytes(pThreadStackSize, 1, + &thread_stack_size)) != 0) + { + return result; + } + g_thread_stack_size = (int)thread_stack_size; + + g_rotate_error_log = iniGetBoolValue(NULL, "rotate_error_log", + pIniContext, false); + g_log_file_keep_days = iniGetIntValue(NULL, "log_file_keep_days", + pIniContext, 0); + + load_log_level(pIniContext); + if ((result=log_set_prefix(g_sf_base_path, server_name)) != 0) { + return result; + } + + //log_set_time_precision(&g_log_context, LOG_TIME_PRECISION_MSECOND); + return 0; +} + diff --git a/src/sf_global.h b/src/sf_global.h new file mode 100644 index 0000000..99db639 --- /dev/null +++ b/src/sf_global.h @@ -0,0 +1,61 @@ +//sf_global.h + +#ifndef _SF_GLOBAL_H +#define _SF_GLOBAL_H + +#include "common_define.h" +#include "ini_file_reader.h" +#include "ioevent.h" + +typedef struct sf_connection_stat { + volatile int current_count; + volatile int max_count; +} SFConnectionStat; + +#ifdef __cplusplus +extern "C" { +#endif + +extern int g_sf_connect_timeout; +extern int g_sf_network_timeout; +extern char g_sf_base_path[MAX_PATH_SIZE]; + +extern struct nio_thread_data *g_thread_data; + +extern volatile bool g_continue_flag; +extern int g_outer_port; +extern int g_inner_port; +extern int g_max_connections; +extern int g_accept_threads; +extern int g_work_threads; +extern int g_thread_stack_size; +extern int g_max_pkg_size; +extern int g_min_buff_size; +extern int g_max_buff_size; +extern int g_sync_log_buff_interval; //sync log buff to disk every interval seconds + +extern time_t g_up_time; + +extern gid_t g_run_by_gid; +extern uid_t g_run_by_uid; +extern char g_run_by_group[32]; +extern char g_run_by_user[32]; + +extern bool g_rotate_error_log; +extern int g_log_file_keep_days; + +extern char g_inner_bind_addr[IP_ADDRESS_SIZE]; +extern char g_outer_bind_addr[IP_ADDRESS_SIZE]; + +extern SFConnectionStat g_connection_stat; + +int sf_load_config(const char *server_name, const char *filename, + IniContext *pIniContext, const int default_inner_port, + const int default_outer_port); + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/sf_nio.c b/src/sf_nio.c new file mode 100644 index 0000000..d547b4e --- /dev/null +++ b/src/sf_nio.c @@ -0,0 +1,441 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "shared_func.h" +#include "sched_thread.h" +#include "pthread_func.h" +#include "logger.h" +#include "sockopt.h" +#include "fast_task_queue.h" +#include "ioevent_loop.h" +#include "sf_global.h" +#include "sf_types.h" +#include "sf_nio.h" + +static int sf_header_size = 0; +static bool sf_remove_from_ready_list = true; +static sf_deal_task_func sf_deal_task = NULL; +static sf_set_body_length_callback sf_set_body_length = NULL; +static TaskCleanUpCallback sf_task_cleanup_func = sf_task_finish_clean_up; +static sf_recv_timeout_callback sf_timeout_callback = NULL; + +static int client_sock_read(int sock, short event, void *arg); + +void sf_set_parameters(const int header_size, sf_set_body_length_callback + set_body_length_func, sf_deal_task_func deal_func, + TaskCleanUpCallback cleanup_func, + sf_recv_timeout_callback timeout_callback) +{ + sf_header_size = header_size; + sf_set_body_length = set_body_length_func; + sf_deal_task = deal_func; + sf_task_cleanup_func = cleanup_func; + sf_timeout_callback = timeout_callback; +} + +void sf_set_remove_from_ready_list(const bool enabled) +{ + sf_remove_from_ready_list = enabled; +} + +TaskCleanUpCallback sf_get_task_cleanup_func() +{ + return sf_task_cleanup_func; +} + +void sf_task_finish_clean_up(struct fast_task_info *pTask) +{ + assert(pTask->event.fd >= 0); + /* + if (pTask->event.fd < 0) { + logWarning("file: "__FILE__", line: %d, " + "pTask: %p already cleaned", + __LINE__, pTask); + return; + } + */ + + if (pTask->finish_callback != NULL) { + pTask->finish_callback(pTask); + pTask->finish_callback = NULL; + } + + ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd); + close(pTask->event.fd); + pTask->event.fd = -1; + + if (pTask->event.timer.expires > 0) { + fast_timer_remove(&pTask->thread_data->timer, + &pTask->event.timer); + pTask->event.timer.expires = 0; + } + + if (sf_remove_from_ready_list) { + ioevent_remove(&pTask->thread_data->ev_puller, pTask); + } + + __sync_fetch_and_sub(&g_connection_stat.current_count, 1); + free_queue_push(pTask); +} + +void sf_recv_notify_read(int sock, short event, void *arg) +{ + int bytes; + int current_connections; + long task_ptr; + struct fast_task_info *pTask; + + while (1) { + if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) { + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { + logError("file: "__FILE__", line: %d, " + "call read failed, " + "errno: %d, error info: %s", + __LINE__, errno, strerror(errno)); + } + + break; + } + else if (bytes == 0) { + break; + } + + current_connections = __sync_add_and_fetch(&g_connection_stat. + current_count, 1); + if (current_connections > g_connection_stat.max_count) { + g_connection_stat.max_count = current_connections; + } + + pTask = (struct fast_task_info *)task_ptr; + if (ioevent_set(pTask, pTask->thread_data, pTask->event.fd, IOEVENT_READ, + (IOEventCallback)client_sock_read, g_sf_network_timeout) != 0) + { + sf_task_cleanup_func(pTask); + continue; + } + } +} + +static inline int set_write_event(struct fast_task_info *pTask) +{ + int result; + + if (pTask->event.callback == (IOEventCallback)sf_client_sock_write) { + return 0; + } + + pTask->event.callback = (IOEventCallback)sf_client_sock_write; + if (ioevent_modify(&pTask->thread_data->ev_puller, + pTask->event.fd, IOEVENT_WRITE, pTask) != 0) + { + result = errno != 0 ? errno : ENOENT; + sf_task_cleanup_func(pTask); + + logError("file: "__FILE__", line: %d, " + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + return 0; +} + +static inline int set_read_event(struct fast_task_info *pTask) +{ + int result; + + if (pTask->event.callback == (IOEventCallback)client_sock_read) { + return 0; + } + + pTask->event.callback = (IOEventCallback)client_sock_read; + if (ioevent_modify(&pTask->thread_data->ev_puller, + pTask->event.fd, IOEVENT_READ, pTask) != 0) + { + result = errno != 0 ? errno : ENOENT; + sf_task_cleanup_func(pTask); + + logError("file: "__FILE__", line: %d, " + "ioevent_modify fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + + return 0; +} + +int sf_send_add_event(struct fast_task_info *pTask) +{ + pTask->offset = 0; + if (pTask->length > 0) { + /* direct send */ + if (sf_client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask) < 0) { + return errno != 0 ? errno : EIO; + } + } + + return 0; +} + +static int client_sock_read(int sock, short event, void *arg) +{ + int bytes; + int recv_bytes; + int total_read; + struct fast_task_info *pTask; + + assert(sock >= 0); + pTask = (struct fast_task_info *)arg; + if (event & IOEVENT_TIMEOUT) { + if (pTask->offset == 0 && pTask->req_count > 0) { + if (sf_timeout_callback != NULL) { + if (sf_timeout_callback(pTask) != 0) { + sf_task_cleanup_func(pTask); + return -1; + } + } + + pTask->event.timer.expires = g_current_time + + g_sf_network_timeout; + fast_timer_add(&pTask->thread_data->timer, + &pTask->event.timer); + } + else { + if (pTask->length > 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, recv timeout, " + "recv offset: %d, expect length: %d", + __LINE__, pTask->client_ip, + pTask->offset, pTask->length); + } + else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, req_count: %ld, recv timeout", + __LINE__, pTask->client_ip, pTask->req_count); + } + + sf_task_cleanup_func(pTask); + return -1; + } + + return 0; + } + + if (event & IOEVENT_ERROR) { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, recv error event: %d, " + "close connection", __LINE__, pTask->client_ip, event); + + sf_task_cleanup_func(pTask); + return -1; + } + + total_read = 0; + while (1) { + fast_timer_modify(&pTask->thread_data->timer, + &pTask->event.timer, g_current_time + + g_sf_network_timeout); + if (pTask->length == 0) { //recv header + recv_bytes = sf_header_size - pTask->offset; + } + else { + recv_bytes = pTask->length - pTask->offset; + } + + bytes = read(sock, pTask->data + pTask->offset, recv_bytes); + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, ignore interupt signal", + __LINE__, pTask->client_ip); + continue; + } + else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, recv failed, " + "errno: %d, error info: %s", + __LINE__, pTask->client_ip, + errno, strerror(errno)); + + sf_task_cleanup_func(pTask); + return -1; + } + } + else if (bytes == 0) { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, sock: %d, recv failed, " + "connection disconnected", + __LINE__, pTask->client_ip, sock); + + sf_task_cleanup_func(pTask); + return -1; + } + + total_read += bytes; + pTask->offset += bytes; + if (pTask->length == 0) { //header + if (pTask->offset < sf_header_size) { + break; + } + + if (sf_set_body_length(pTask) != 0) { + sf_task_cleanup_func(pTask); + return -1; + } + if (pTask->length < 0) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d < 0", + __LINE__, pTask->client_ip, + pTask->length); + + sf_task_cleanup_func(pTask); + return -1; + } + + pTask->length += sf_header_size; + if (pTask->length > g_max_pkg_size) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d > " + "max pkg size: %d", __LINE__, + pTask->client_ip, pTask->length, + g_max_pkg_size); + + sf_task_cleanup_func(pTask); + return -1; + } + + if (pTask->length > pTask->size) { + int old_size; + old_size = pTask->size; + if (free_queue_realloc_buffer(pTask, pTask->length) != 0) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, realloc buffer size " + "from %d to %d fail", __LINE__, + pTask->client_ip, pTask->size, pTask->length); + + sf_task_cleanup_func(pTask); + return -1; + } + + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, task length: %d, realloc buffer size " + "from %d to %d", __LINE__, pTask->client_ip, + pTask->length, old_size, pTask->size); + } + } + + if (pTask->offset >= pTask->length) { //recv done + pTask->req_count++; + if (sf_deal_task(pTask) < 0) { //fatal error + sf_task_cleanup_func(pTask); + return -1; + } + break; + } + } + + return total_read; +} + +int sf_client_sock_write(int sock, short event, void *arg) +{ + int bytes; + int total_write; + struct fast_task_info *pTask; + + assert(sock >= 0); + pTask = (struct fast_task_info *)arg; + if (event & IOEVENT_TIMEOUT) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, send timeout. total length: %d, offset: %d, " + "remain: %d", __LINE__, pTask->client_ip, pTask->length, + pTask->offset, pTask->length - pTask->offset); + + sf_task_cleanup_func(pTask); + return -1; + } + + if (event & IOEVENT_ERROR) { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, recv error event: %d, " + "close connection", __LINE__, pTask->client_ip, event); + + sf_task_cleanup_func(pTask); + return -1; + } + + total_write = 0; + while (1) { + fast_timer_modify(&pTask->thread_data->timer, + &pTask->event.timer, g_current_time + + g_sf_network_timeout); + + bytes = write(sock, pTask->data + pTask->offset, + pTask->length - pTask->offset); + //printf("%08X sended %d bytes\n", (int)pTask, bytes); + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + if (set_write_event(pTask) != 0) { + return -1; + } + break; + } + else if (errno == EINTR) { //should retry + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, ignore interupt signal", + __LINE__, pTask->client_ip); + continue; + } + else { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, send fail, " + "errno: %d, error info: %s", + __LINE__, pTask->client_ip, + errno, strerror(errno)); + + sf_task_cleanup_func(pTask); + return -1; + } + } + else if (bytes == 0) { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, sock: %d, send failed, connection disconnected", + __LINE__, pTask->client_ip, sock); + + sf_task_cleanup_func(pTask); + return -1; + } + + total_write += bytes; + pTask->offset += bytes; + if (pTask->offset >= pTask->length) { + pTask->offset = 0; + pTask->length = 0; + if (set_read_event(pTask) != 0) { + return -1; + } + break; + } + } + + return total_write; +} + diff --git a/src/sf_nio.h b/src/sf_nio.h new file mode 100644 index 0000000..0352a5d --- /dev/null +++ b/src/sf_nio.h @@ -0,0 +1,34 @@ +//sf_nio.h + +#ifndef _SF_NIO_H +#define _SF_NIO_H + +#include +#include +#include +#include "fast_task_queue.h" +#include "sf_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void sf_set_parameters(const int header_size, sf_set_body_length_callback + set_body_length_func, sf_deal_task_func deal_func, + TaskCleanUpCallback cleanup_func, + sf_recv_timeout_callback timeout_callback); +void sf_set_remove_from_ready_list(const bool enabled); +TaskCleanUpCallback sf_get_task_cleanup_func(); + +void sf_recv_notify_read(int sock, short event, void *arg); +int sf_send_add_event(struct fast_task_info *pTask); +int sf_client_sock_write(int sock, short event, void *arg); + +void sf_task_finish_clean_up(struct fast_task_info *pTask); + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/sf_service.c b/src/sf_service.c new file mode 100644 index 0000000..aa111ce --- /dev/null +++ b/src/sf_service.c @@ -0,0 +1,468 @@ +//sf_service.c + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "logger.h" +#include "sockopt.h" +#include "shared_func.h" +#include "pthread_func.h" +#include "sched_thread.h" +#include "ioevent_loop.h" +#include "sf_global.h" +#include "sf_nio.h" +#include "sf_service.h" + +int g_worker_thread_count = 0; +int g_server_outer_sock = -1; +int g_server_inner_sock = -1; +static sf_accept_done_callback sf_accept_done_func = NULL; + +static bool bTerminateFlag = false; + +static void sigQuitHandler(int sig); +static void sigHupHandler(int sig); +static void sigUsrHandler(int sig); + +#if defined(DEBUG_FLAG) +static void sigDumpHandler(int sig); +#endif + + +static void *worker_thread_entrance(void* arg); + +int sf_service_init(sf_alloc_thread_extra_data_callback + alloc_thread_extra_data_callback, + ThreadLoopCallback thread_loop_callback, + sf_accept_done_callback accept_done_callback, + sf_set_body_length_callback set_body_length_func, + sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, + const int proto_header_size, const int task_arg_size) +{ +#define ALLOC_CONNECTIONS_ONCE 256 + int result; + int bytes; + int m; + int init_connections; + int alloc_conn_once; + struct nio_thread_data *pThreadData; + struct nio_thread_data *pDataEnd; + pthread_t tid; + pthread_attr_t thread_attr; + + sf_accept_done_func = accept_done_callback; + sf_set_parameters(proto_header_size, set_body_length_func, deal_func, + task_cleanup_func, timeout_callback); + + if ((result=set_rand_seed()) != 0) { + logCrit("file: "__FILE__", line: %d, " + "set_rand_seed fail, program exit!", __LINE__); + return result; + } + + if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0) { + logError("file: "__FILE__", line: %d, " + "init_pthread_attr fail, program exit!", __LINE__); + return result; + } + + m = g_min_buff_size / (16 * 1024); + if (m == 0) { + m = 1; + } else if (m > 16) { + m = 16; + } + alloc_conn_once = ALLOC_CONNECTIONS_ONCE / m; + init_connections = g_max_connections < alloc_conn_once ? + g_max_connections : alloc_conn_once; + if ((result=free_queue_init_ex(g_max_connections, init_connections, + alloc_conn_once, g_min_buff_size, g_max_buff_size, + task_arg_size)) != 0) + { + return result; + } + + bytes = sizeof(struct nio_thread_data) * g_work_threads; + g_thread_data = (struct nio_thread_data *)malloc(bytes); + if (g_thread_data == NULL) { + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, bytes, errno, strerror(errno)); + return errno != 0 ? errno : ENOMEM; + } + memset(g_thread_data, 0, bytes); + + g_worker_thread_count = 0; + pDataEnd = g_thread_data + g_work_threads; + for (pThreadData=g_thread_data; pThreadDatathread_loop_callback = thread_loop_callback; + if (alloc_thread_extra_data_callback != NULL) { + pThreadData->arg = alloc_thread_extra_data_callback( + (int)(pThreadData - g_thread_data)); + } + else { + pThreadData->arg = NULL; + } + + if (ioevent_init(&pThreadData->ev_puller, + g_max_connections + 2, net_timeout_ms, 0) != 0) + { + result = errno != 0 ? errno : ENOMEM; + logError("file: "__FILE__", line: %d, " + "ioevent_init fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + + result = fast_timer_init(&pThreadData->timer, + 2 * g_sf_network_timeout, g_current_time); + if (result != 0) { + logError("file: "__FILE__", line: %d, " + "fast_timer_init fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + return result; + } + + if (pipe(pThreadData->pipe_fds) != 0) { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "call pipe fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + break; + } + +#if defined(OS_LINUX) + if ((result=fd_add_flags(pThreadData->pipe_fds[0], + O_NONBLOCK | O_NOATIME)) != 0) + { + break; + } +#else + if ((result=fd_add_flags(pThreadData->pipe_fds[0], + O_NONBLOCK)) != 0) + { + break; + } +#endif + + if ((result=pthread_create(&tid, &thread_attr, + worker_thread_entrance, pThreadData)) != 0) + { + logError("file: "__FILE__", line: %d, " + "create thread failed, startup threads: %d, " + "errno: %d, error info: %s", + __LINE__, g_worker_thread_count, + result, strerror(result)); + break; + } + else { + __sync_fetch_and_add(&g_worker_thread_count, 1); + } + } + + pthread_attr_destroy(&thread_attr); + + return 0; +} + +int sf_service_destroy() +{ + struct nio_thread_data *pDataEnd, *pThreadData; + + free_queue_destroy(); + pDataEnd = g_thread_data + g_work_threads; + for (pThreadData=g_thread_data; pThreadDatatimer); + } + free(g_thread_data); + g_thread_data = NULL; + return 0; +} + +static void *worker_thread_entrance(void* arg) +{ + struct nio_thread_data *pThreadData; + + pThreadData = (struct nio_thread_data *)arg; + ioevent_loop(pThreadData, sf_recv_notify_read, sf_get_task_cleanup_func(), + &g_continue_flag); + ioevent_destroy(&pThreadData->ev_puller); + + __sync_fetch_and_sub(&g_worker_thread_count, 1); + return NULL; +} + +static int _socket_server(const char *bind_addr, int port, int *sock) +{ + int result; + *sock = socketServer(bind_addr, port, &result); + if (*sock < 0) { + return result; + } + + if ((result=tcpsetserveropt(*sock, g_sf_network_timeout)) != 0) { + return result; + } + + return 0; +} + +int sf_socket_server() +{ + int result; + if (g_outer_port != g_inner_port) { + if ((result=_socket_server(g_outer_bind_addr, g_outer_port, + &g_server_outer_sock)) != 0) + { + return result; + } + + if ((result=_socket_server(g_inner_bind_addr, g_inner_port, + &g_server_inner_sock)) != 0) + { + return result; + } + } else { + const char *bind_addr; + if (*g_outer_bind_addr != '\0') { + if (*g_inner_bind_addr != '\0') { + bind_addr = ""; + } else { + bind_addr = g_outer_bind_addr; + } + } else { + bind_addr = g_inner_bind_addr; + } + + if ((result=_socket_server(bind_addr, g_outer_port, + &g_server_outer_sock)) != 0) + { + return result; + } + } + + return 0; +} + +static void *accept_thread_entrance(void* arg) +{ + int server_sock; + int incomesock; + long task_ptr; + struct sockaddr_in inaddr; + socklen_t sockaddr_len; + struct fast_task_info *pTask; + char szClientIp[IP_ADDRESS_SIZE]; + + server_sock = (long)arg; + while (g_continue_flag) { + sockaddr_len = sizeof(inaddr); + incomesock = accept(server_sock, (struct sockaddr*)&inaddr, &sockaddr_len); + if (incomesock < 0) { //error + if (!(errno == EINTR || errno == EAGAIN)) { + logError("file: "__FILE__", line: %d, " + "accept fail, errno: %d, error info: %s", + __LINE__, errno, strerror(errno)); + } + + continue; + } + + getPeerIpaddr(incomesock, + szClientIp, IP_ADDRESS_SIZE); + if (tcpsetnonblockopt(incomesock) != 0) { + close(incomesock); + continue; + } + + pTask = free_queue_pop(); + if (pTask == NULL) { + logError("file: "__FILE__", line: %d, " + "malloc task buff failed, you should " + "increase the parameter: max_connections", + __LINE__); + close(incomesock); + continue; + } + strcpy(pTask->client_ip, szClientIp); + + pTask->event.fd = incomesock; + pTask->thread_data = g_thread_data + incomesock % g_work_threads; + if (sf_accept_done_func != NULL) { + sf_accept_done_func(pTask, server_sock == g_server_inner_sock); + } + + task_ptr = (long)pTask; + if (write(pTask->thread_data->pipe_fds[1], &task_ptr, + sizeof(task_ptr)) != sizeof(task_ptr)) + { + logError("file: "__FILE__", line: %d, " + "call write to pipe fd: %d fail, " + "errno: %d, error info: %s", + __LINE__, pTask->thread_data->pipe_fds[1], + errno, strerror(errno)); + close(incomesock); + free_queue_push(pTask); + } + } + + return NULL; +} + +void _accept_loop(int server_sock, const int accept_threads) +{ + pthread_t tid; + pthread_attr_t thread_attr; + int result; + int i; + + if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0) { + logWarning("file: "__FILE__", line: %d, " + "init_pthread_attr fail!", __LINE__); + } + else { + for (i=0; i +#include +#include +#include "ioevent.h" +#include "fast_task_queue.h" +#include "sf_types.h" + +typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); + +#ifdef __cplusplus +extern "C" { +#endif + +extern int g_server_outer_sock; +extern int g_server_inner_sock; + +extern int g_worker_thread_count; + +int sf_service_init(sf_alloc_thread_extra_data_callback + alloc_thread_extra_data_callback, + ThreadLoopCallback thread_loop_callback, + sf_accept_done_callback accept_done_callback, + sf_set_body_length_callback set_body_length_func, + sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, + const int proto_header_size, const int task_arg_size); +int sf_service_destroy(); + +int sf_setup_signal_handler(); +int sf_socket_server(); +void sf_accept_loop(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/sf_types.h b/src/sf_types.h new file mode 100644 index 0000000..0c5e183 --- /dev/null +++ b/src/sf_types.h @@ -0,0 +1,26 @@ +//sf_types.h + +#ifndef _SF_TYPES_H_ +#define _SF_TYPES_H_ + +#include +#include +#include +#include +#include +#include "connection_pool.h" + +struct fast_task_info; +typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, + const bool bInnerPort); +typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask); +typedef int (*sf_deal_task_func)(struct fast_task_info *pTask); +typedef int (*sf_recv_timeout_callback)(struct fast_task_info *pTask); + +typedef struct { + char ip_addr[IP_ADDRESS_SIZE]; + int port; +} SFServerInfo; + +#endif + diff --git a/src/sf_util.c b/src/sf_util.c new file mode 100644 index 0000000..e322082 --- /dev/null +++ b/src/sf_util.c @@ -0,0 +1,52 @@ +#include +#include +#include +#include "sf_util.h" + +int64_t getticks() +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * 1000 + tv.tv_usec / 1000; +} + +void log_plus(const int priority, const char* file, + int line, const char* fmt, ...) +{ + char buf[2048]; + int hlen; + if (g_log_context.log_level < priority) { + return; + } +#ifdef DEBUG_FLAG + hlen = snprintf(buf, sizeof(buf), "%s:%d %ld ", file, line, (long)syscall(SYS_gettid)); +#else + hlen = snprintf(buf, sizeof(buf), "%s:%d ", file, line); +#endif + va_list ap; + va_start(ap, fmt); + hlen += vsnprintf(buf+hlen, sizeof(buf)-hlen, fmt, ap); + va_end(ap); + log_it_ex1(&g_log_context, priority, buf, hlen); +} +int sf_printbuffer(char* buffer,int32_t len) +{ + int i; + if(buffer==NULL) + { + fprintf(stderr, "common-utils parameter is fail"); + return(-1); + } + for(i=0;i +#ifdef DEBUG_FLAG /*only for format check*/ + +#define lemerg(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_EMERG, __FILE__, __LINE__, __VA_ARGS__) +#define lcrit(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_CRIT, __FILE__, __LINE__, __VA_ARGS__) +#define lalert(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_ALERT, __FILE__, __LINE__, __VA_ARGS__) +#define lerr(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_ERR, __FILE__, __LINE__, __VA_ARGS__) +#define lwarning(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_WARNING, __FILE__, __LINE__, __VA_ARGS__) +#define lnotice(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_NOTICE, __FILE__, __LINE__, __VA_ARGS__) +#define linfo(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_INFO, __FILE__, __LINE__, __VA_ARGS__) +#define ldebug(...) snprintf(0,0,__VA_ARGS__), log_plus(LOG_DEBUG, __FILE__, __LINE__, __VA_ARGS__) + +#else +#define lemerg(...) log_plus(LOG_EMERG, __FILE__, __LINE__, __VA_ARGS__) +#define lcrit(...) log_plus(LOG_CRIT, __FILE__, __LINE__, __VA_ARGS__) +#define lalert(...) log_plus(LOG_ALERT, __FILE__, __LINE__, __VA_ARGS__) +#define lerr(...) log_plus(LOG_ERR, __FILE__, __LINE__, __VA_ARGS__) +#define lwarning(...) log_plus(LOG_WARNING, __FILE__, __LINE__, __VA_ARGS__) +#define lnotice(...) log_plus(LOG_NOTICE, __FILE__, __LINE__, __VA_ARGS__) +#define linfo(...) log_plus(LOG_INFO, __FILE__, __LINE__, __VA_ARGS__) +#define ldebug(...) log_plus(LOG_DEBUG, __FILE__, __LINE__, __VA_ARGS__) + +#endif + +#define returnif(b) if(b){ return b; } +#define breakif(b) if(b){ break; } + +#define failvars int eln, eres; const char* emsg + +#define gofailif(b, msg) if(b) { eln=__LINE__; emsg=msg; eres=(b); goto FAIL_; } + +#define logfail() lerr("error at %s:%d errno: %d msg: %s errmsg: %s", \ +__FILE__, eln, eres, emsg, strerror(eres)) + +#define dszoffset(cls, mem) ((char*)&((cls*)0)->mem - ((char*)0)) + +#ifdef __cplusplus +extern "C" { +#endif + +int64_t getticks() ; + +void log_plus(const int priority, const char* file, int line, const char* fmt, ...); + +int sf_printbuffer(char* buffer,int32_t len); +#ifdef __cplusplus +} +#endif + +#endif +