diff --git a/HISTORY b/HISTORY index 0d875ae..e2a6155 100644 --- a/HISTORY +++ b/HISTORY @@ -1,7 +1,8 @@ -Version 1.56 2022-01-29 +Version 1.56 2022-01-31 * add function fc_gettid * function normalize_path: NULL from parameter for getcwd + * sockopt.[hc] support tcpwritev and tcpreadv Version 1.55 2022-01-12 * fastcommon php extension adapt to php 8 diff --git a/src/sockopt.c b/src/sockopt.c index 5c0650f..691219e 100644 --- a/src/sockopt.c +++ b/src/sockopt.c @@ -23,6 +23,7 @@ #include #include #include +#include #define SUB_NET_TYPE_INNER_10_STR2 "inner_10" #define SUB_NET_TYPE_INNER_172_STR2 "inner_172" @@ -32,6 +33,8 @@ #define SUB_NET_TYPE_INNER_172_STR3 "inner172" #define SUB_NET_TYPE_INNER_192_STR3 "inner192" +#define FC_IOV_BATCH_SIZE 256 + #if defined(OS_LINUX) || defined(OS_FREEBSD) #include @@ -165,7 +168,6 @@ int tcprecvdata_ex(int sock, void *data, const int size, \ pollfds.events = POLLIN; #endif - read_bytes = 0; ret_code = 0; p = (unsigned char*)data; left_bytes = size; @@ -236,7 +238,7 @@ int tcprecvdata_ex(int sock, void *data, const int size, \ return ret_code; } -int tcpsenddata(int sock, void* data, const int size, const int timeout) +int tcpsenddata(int sock, void *data, const int size, const int timeout) { int left_bytes; int write_bytes; @@ -339,7 +341,6 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \ pollfds.events = POLLIN; #endif - read_bytes = 0; ret_code = 0; p = (unsigned char*)data; left_bytes = size; @@ -350,13 +351,19 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \ { TCP_SET_QUICK_ACK(sock); left_bytes -= read_bytes; + if (left_bytes == 0) + { + break; + } + p += read_bytes; continue; } if (read_bytes < 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) + if (!(errno == EAGAIN || errno == EWOULDBLOCK || + (errno == EINTR && try_again_when_interrupt))) { ret_code = errno != 0 ? errno : EINTR; break; @@ -412,12 +419,174 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \ return ret_code; } -int tcpsenddata_nb(int sock, void* data, const int size, const int timeout) +int tcpreadv_nb_ms(int sock, const int size, const struct iovec *iov, + int iovcnt, const int timeout_ms, int *total_bytes) +{ + int left_bytes; + int read_bytes; + int bytes; + int res; + int ret_code; + int remain_count; + int current_count; + int current_done; + int remain_len; + struct iovec *iob; + struct iovec iov_array[FC_IOV_BATCH_SIZE]; + struct iovec *iovp; + +#ifdef USE_SELECT + fd_set read_set; + struct timeval t; +#else + struct pollfd pollfds; +#endif + +#ifdef USE_SELECT + FD_ZERO(&read_set); + FD_SET(sock, &read_set); +#else + pollfds.fd = sock; + pollfds.events = POLLIN; +#endif + + ret_code = 0; + iovp = (struct iovec *)iov; + remain_count = current_count = iovcnt; + left_bytes = size; + while (left_bytes > 0) + { + read_bytes = readv(sock, iovp, current_count); + if (read_bytes > 0) + { + TCP_SET_QUICK_ACK(sock); + left_bytes -= read_bytes; + if (left_bytes == 0) + { + break; + } + + iob = iovp; + bytes = iob->iov_len; + while (bytes < read_bytes) + { + ++iob; + bytes += iob->iov_len; + } + if (bytes == read_bytes) + { + ++iob; + if (iob < iovp + current_count) { + bytes += iob->iov_len; + } + } + + current_done = iob - iovp; + remain_count -= current_done; + if (remain_count == 0) { + ret_code = EOVERFLOW; + break; + } + + if (current_done == current_count) //full done + { + current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ? + remain_count : FC_IOV_BATCH_SIZE); + memcpy(iov_array, iov + (iovcnt - remain_count), + current_count * sizeof(struct iovec)); + iovp = iov_array; + } + else //partial done + { + if (iovp == (struct iovec *)iov) + { + current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ? + remain_count : FC_IOV_BATCH_SIZE); + memcpy(iov_array, iob, current_count * + sizeof(struct iovec)); + iovp = iov_array; + } + else + { + current_count -= current_done; + iovp = iob; + } + + /* adjust the first element */ + remain_len = bytes - read_bytes; + if (remain_len < iovp->iov_len) + { + iovp->iov_base = (char *)iovp->iov_base + + (iovp->iov_len - remain_len); + iovp->iov_len = remain_len; + } + } + + continue; + } + else if (read_bytes == 0) + { + ret_code = ENOTCONN; + break; + } + + if (!(errno == EAGAIN || errno == EWOULDBLOCK || + (errno == EINTR && try_again_when_interrupt))) + { + ret_code = errno != 0 ? errno : EINTR; + break; + } + +#ifdef USE_SELECT + if (timeout_ms <= 0) + { + res = select(sock+1, &read_set, NULL, NULL, NULL); + } + else + { + t.tv_usec = (timeout_ms % 1000) * 1000; + t.tv_sec = timeout_ms / 1000; + res = select(sock+1, &read_set, NULL, NULL, &t); + } +#else + res = poll(&pollfds, 1, timeout_ms); + if (res > 0 && (pollfds.revents & (POLLHUP | POLLERR))) + { + ret_code = ENOTCONN; + break; + } +#endif + + if (res < 0) + { + if (errno == EINTR && try_again_when_interrupt) + { + continue; + } + ret_code = errno != 0 ? errno : EINTR; + break; + } + else if (res == 0) + { + ret_code = ETIMEDOUT; + break; + } + } + + if (total_bytes != NULL) + { + *total_bytes = size - left_bytes; + } + + return ret_code; +} + +int tcpsenddata_nb(int sock, void *data, const int size, const int timeout) { int left_bytes; int write_bytes; int result; - unsigned char* p; + unsigned char *p; #ifdef USE_SELECT fd_set write_set; struct timeval t; @@ -433,24 +602,172 @@ int tcpsenddata_nb(int sock, void* data, const int size, const int timeout) pollfds.events = POLLOUT; #endif - p = (unsigned char*)data; + p = (unsigned char *)data; left_bytes = size; while (left_bytes > 0) { write_bytes = send(sock, p, left_bytes, 0); - if (write_bytes < 0) - { - if (!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) - { - return errno != 0 ? errno : EINTR; - } - } - else + if (write_bytes > 0) { left_bytes -= write_bytes; + if (left_bytes == 0) + { + break; + } + p += write_bytes; continue; } + else if (write_bytes == 0) + { + return ENOTCONN; + } + + if (!(errno == EAGAIN || errno == EWOULDBLOCK || + (errno == EINTR && try_again_when_interrupt))) + { + return errno != 0 ? errno : EINTR; + } + +#ifdef USE_SELECT + if (timeout <= 0) + { + result = select(sock+1, NULL, &write_set, NULL, NULL); + } + else + { + t.tv_usec = 0; + t.tv_sec = timeout; + result = select(sock+1, NULL, &write_set, NULL, &t); + } +#else + result = poll(&pollfds, 1, 1000 * timeout); + if (result > 0 && (pollfds.revents & (POLLHUP | POLLERR))) + { + return ENOTCONN; + } +#endif + + if (result < 0) + { + if (errno == EINTR && try_again_when_interrupt) + { + continue; + } + return errno != 0 ? errno : EINTR; + } + else if (result == 0) + { + return ETIMEDOUT; + } + } + + return 0; +} + +int tcpwritev_nb(int sock, const struct iovec *iov, + int iovcnt, const int timeout) +{ + int write_bytes; + int bytes; + int result; + int remain_count; + int current_count; + int current_done; + int remain_len; + struct iovec *iob; + struct iovec iov_array[FC_IOV_BATCH_SIZE]; + struct iovec *iovp; + +#ifdef USE_SELECT + fd_set write_set; + struct timeval t; +#else + struct pollfd pollfds; +#endif + +#ifdef USE_SELECT + FD_ZERO(&write_set); + FD_SET(sock, &write_set); +#else + pollfds.fd = sock; + pollfds.events = POLLOUT; +#endif + + iovp = (struct iovec *)iov; + remain_count = current_count = iovcnt; + while (remain_count > 0) + { + write_bytes = writev(sock, iovp, current_count); + if (write_bytes > 0) + { + iob = iovp; + bytes = iob->iov_len; + while (bytes < write_bytes) + { + ++iob; + bytes += iob->iov_len; + } + if (bytes == write_bytes) + { + ++iob; + if (iob < iovp + current_count) { + bytes += iob->iov_len; + } + } + + current_done = iob - iovp; + remain_count -= current_done; + if (remain_count == 0) { + break; + } + + if (current_done == current_count) //full done + { + current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ? + remain_count : FC_IOV_BATCH_SIZE); + memcpy(iov_array, iov + (iovcnt - remain_count), + current_count * sizeof(struct iovec)); + iovp = iov_array; + } + else //partial done + { + if (iovp == (struct iovec *)iov) + { + current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ? + remain_count : FC_IOV_BATCH_SIZE); + memcpy(iov_array, iob, current_count * + sizeof(struct iovec)); + iovp = iov_array; + } + else + { + current_count -= current_done; + iovp = iob; + } + + /* adjust the first element */ + remain_len = bytes - write_bytes; + if (remain_len < iovp->iov_len) + { + iovp->iov_base = (char *)iovp->iov_base + + (iovp->iov_len - remain_len); + iovp->iov_len = remain_len; + } + } + + continue; + } + else if (write_bytes == 0) + { + return ENOTCONN; + } + + if (!(errno == EAGAIN || errno == EWOULDBLOCK || + (errno == EINTR && try_again_when_interrupt))) + { + return errno != 0 ? errno : EINTR; + } #ifdef USE_SELECT if (timeout <= 0) diff --git a/src/sockopt.h b/src/sockopt.h index eff3e84..6d98dd6 100644 --- a/src/sockopt.h +++ b/src/sockopt.h @@ -159,7 +159,7 @@ int tcprecvdata_ex(int sock, void *data, const int size, \ * count: store the bytes recveived * return: error no, 0 success, != 0 fail */ -int tcprecvdata_nb_ex(int sock, void *data, const int size, \ +int tcprecvdata_nb_ex(int sock, void *data, const int size, const int timeout, int *count); /** recv data (non-block mode) in ms @@ -167,13 +167,45 @@ int tcprecvdata_nb_ex(int sock, void *data, const int size, \ * sock: the socket * data: the buffer * size: buffer size (max bytes can receive) - * timeout: read timeout in milliseconds + * timeout_ms: read timeout in milliseconds * count: store the bytes recveived * return: error no, 0 success, != 0 fail */ -int tcprecvdata_nb_ms(int sock, void *data, const int size, \ +int tcprecvdata_nb_ms(int sock, void *data, const int size, const int timeout_ms, int *count); +/** recv data by readv (non-block mode) in ms + * parameters: + * sock: the socket + * size: the expect size + * iov: the iov to send + * iovcnt: the iov count + * timeout_ms: read timeout in milliseconds + * total_bytes: store the bytes recveived + * return: error no, 0 success, != 0 fail +*/ +int tcpreadv_nb_ms(int sock, const int size, const struct iovec *iov, + int iovcnt, const int timeout_ms, int *total_bytes); + + +/** recv data by readv (non-block mode) + * parameters: + * sock: the socket + * size: the expect size + * iov: the iov to send + * iovcnt: the iov count + * timeout: read timeout in seconds + * total_bytes: store the bytes recveived + * return: error no, 0 success, != 0 fail +*/ +static inline int tcpreadv_nb_ex(int sock, const int size, + const struct iovec *iov, int iovcnt, + const int timeout, int *total_bytes) +{ + return tcpreadv_nb_ms(sock, size, iov, iovcnt, + timeout * 1000, total_bytes); +} + /** send data (block mode) * parameters: * sock: the socket @@ -182,7 +214,7 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \ * timeout: write timeout * return: error no, 0 success, != 0 fail */ -int tcpsenddata(int sock, void* data, const int size, const int timeout); +int tcpsenddata(int sock, void *data, const int size, const int timeout); /** send data (non-block mode) * parameters: @@ -192,7 +224,18 @@ int tcpsenddata(int sock, void* data, const int size, const int timeout); * timeout: write timeout * return: error no, 0 success, != 0 fail */ -int tcpsenddata_nb(int sock, void* data, const int size, const int timeout); +int tcpsenddata_nb(int sock, void *data, const int size, const int timeout); + +/** send data by writev (non-block mode) + * parameters: + * sock: the socket + * iov: the iov to send + * iovcnt: the iov count + * timeout: write timeout + * return: error no, 0 success, != 0 fail +*/ +int tcpwritev_nb(int sock, const struct iovec *iov, + int iovcnt, const int timeout); /** connect to server by block mode * parameters: @@ -544,6 +587,9 @@ static inline int socketClientIPv6(const char *server_ip, #define tcprecvdata_nb(sock, data, size, timeout) \ tcprecvdata_nb_ex(sock, data, size, timeout, NULL) +#define tcpreadv_nb(sock, size, iov, iovcnt, timeout) \ + tcpreadv_nb_ex(sock, size, iov, iovcnt, timeout, NULL) + /** send a file * parameters: * sock: the socket