sockopt.[hc] support tcpwritev and tcpreadv

posix_api
YuQing 2022-01-31 17:03:07 +08:00
parent 787eb3a7d6
commit a19a0071db
3 changed files with 385 additions and 21 deletions

View File

@ -1,7 +1,8 @@
Version 1.56 2022-01-29
Version 1.56 2022-01-31
* add function fc_gettid
* function normalize_path: NULL from parameter for getcwd
* sockopt.[hc] support tcpwritev and tcpreadv
Version 1.55 2022-01-12
* fastcommon php extension adapt to php 8

View File

@ -23,6 +23,7 @@
#include <netdb.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/uio.h>
#define SUB_NET_TYPE_INNER_10_STR2 "inner_10"
#define SUB_NET_TYPE_INNER_172_STR2 "inner_172"
@ -32,6 +33,8 @@
#define SUB_NET_TYPE_INNER_172_STR3 "inner172"
#define SUB_NET_TYPE_INNER_192_STR3 "inner192"
#define FC_IOV_BATCH_SIZE 256
#if defined(OS_LINUX) || defined(OS_FREEBSD)
#include <ifaddrs.h>
@ -165,7 +168,6 @@ int tcprecvdata_ex(int sock, void *data, const int size, \
pollfds.events = POLLIN;
#endif
read_bytes = 0;
ret_code = 0;
p = (unsigned char*)data;
left_bytes = size;
@ -236,7 +238,7 @@ int tcprecvdata_ex(int sock, void *data, const int size, \
return ret_code;
}
int tcpsenddata(int sock, void* data, const int size, const int timeout)
int tcpsenddata(int sock, void *data, const int size, const int timeout)
{
int left_bytes;
int write_bytes;
@ -339,7 +341,6 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \
pollfds.events = POLLIN;
#endif
read_bytes = 0;
ret_code = 0;
p = (unsigned char*)data;
left_bytes = size;
@ -350,13 +351,19 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \
{
TCP_SET_QUICK_ACK(sock);
left_bytes -= read_bytes;
if (left_bytes == 0)
{
break;
}
p += read_bytes;
continue;
}
if (read_bytes < 0)
{
if (!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
if (!(errno == EAGAIN || errno == EWOULDBLOCK ||
(errno == EINTR && try_again_when_interrupt)))
{
ret_code = errno != 0 ? errno : EINTR;
break;
@ -412,12 +419,174 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \
return ret_code;
}
int tcpsenddata_nb(int sock, void* data, const int size, const int timeout)
int tcpreadv_nb_ms(int sock, const int size, const struct iovec *iov,
int iovcnt, const int timeout_ms, int *total_bytes)
{
int left_bytes;
int read_bytes;
int bytes;
int res;
int ret_code;
int remain_count;
int current_count;
int current_done;
int remain_len;
struct iovec *iob;
struct iovec iov_array[FC_IOV_BATCH_SIZE];
struct iovec *iovp;
#ifdef USE_SELECT
fd_set read_set;
struct timeval t;
#else
struct pollfd pollfds;
#endif
#ifdef USE_SELECT
FD_ZERO(&read_set);
FD_SET(sock, &read_set);
#else
pollfds.fd = sock;
pollfds.events = POLLIN;
#endif
ret_code = 0;
iovp = (struct iovec *)iov;
remain_count = current_count = iovcnt;
left_bytes = size;
while (left_bytes > 0)
{
read_bytes = readv(sock, iovp, current_count);
if (read_bytes > 0)
{
TCP_SET_QUICK_ACK(sock);
left_bytes -= read_bytes;
if (left_bytes == 0)
{
break;
}
iob = iovp;
bytes = iob->iov_len;
while (bytes < read_bytes)
{
++iob;
bytes += iob->iov_len;
}
if (bytes == read_bytes)
{
++iob;
if (iob < iovp + current_count) {
bytes += iob->iov_len;
}
}
current_done = iob - iovp;
remain_count -= current_done;
if (remain_count == 0) {
ret_code = EOVERFLOW;
break;
}
if (current_done == current_count) //full done
{
current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ?
remain_count : FC_IOV_BATCH_SIZE);
memcpy(iov_array, iov + (iovcnt - remain_count),
current_count * sizeof(struct iovec));
iovp = iov_array;
}
else //partial done
{
if (iovp == (struct iovec *)iov)
{
current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ?
remain_count : FC_IOV_BATCH_SIZE);
memcpy(iov_array, iob, current_count *
sizeof(struct iovec));
iovp = iov_array;
}
else
{
current_count -= current_done;
iovp = iob;
}
/* adjust the first element */
remain_len = bytes - read_bytes;
if (remain_len < iovp->iov_len)
{
iovp->iov_base = (char *)iovp->iov_base +
(iovp->iov_len - remain_len);
iovp->iov_len = remain_len;
}
}
continue;
}
else if (read_bytes == 0)
{
ret_code = ENOTCONN;
break;
}
if (!(errno == EAGAIN || errno == EWOULDBLOCK ||
(errno == EINTR && try_again_when_interrupt)))
{
ret_code = errno != 0 ? errno : EINTR;
break;
}
#ifdef USE_SELECT
if (timeout_ms <= 0)
{
res = select(sock+1, &read_set, NULL, NULL, NULL);
}
else
{
t.tv_usec = (timeout_ms % 1000) * 1000;
t.tv_sec = timeout_ms / 1000;
res = select(sock+1, &read_set, NULL, NULL, &t);
}
#else
res = poll(&pollfds, 1, timeout_ms);
if (res > 0 && (pollfds.revents & (POLLHUP | POLLERR)))
{
ret_code = ENOTCONN;
break;
}
#endif
if (res < 0)
{
if (errno == EINTR && try_again_when_interrupt)
{
continue;
}
ret_code = errno != 0 ? errno : EINTR;
break;
}
else if (res == 0)
{
ret_code = ETIMEDOUT;
break;
}
}
if (total_bytes != NULL)
{
*total_bytes = size - left_bytes;
}
return ret_code;
}
int tcpsenddata_nb(int sock, void *data, const int size, const int timeout)
{
int left_bytes;
int write_bytes;
int result;
unsigned char* p;
unsigned char *p;
#ifdef USE_SELECT
fd_set write_set;
struct timeval t;
@ -433,24 +602,172 @@ int tcpsenddata_nb(int sock, void* data, const int size, const int timeout)
pollfds.events = POLLOUT;
#endif
p = (unsigned char*)data;
p = (unsigned char *)data;
left_bytes = size;
while (left_bytes > 0)
{
write_bytes = send(sock, p, left_bytes, 0);
if (write_bytes < 0)
{
if (!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
{
return errno != 0 ? errno : EINTR;
}
}
else
if (write_bytes > 0)
{
left_bytes -= write_bytes;
if (left_bytes == 0)
{
break;
}
p += write_bytes;
continue;
}
else if (write_bytes == 0)
{
return ENOTCONN;
}
if (!(errno == EAGAIN || errno == EWOULDBLOCK ||
(errno == EINTR && try_again_when_interrupt)))
{
return errno != 0 ? errno : EINTR;
}
#ifdef USE_SELECT
if (timeout <= 0)
{
result = select(sock+1, NULL, &write_set, NULL, NULL);
}
else
{
t.tv_usec = 0;
t.tv_sec = timeout;
result = select(sock+1, NULL, &write_set, NULL, &t);
}
#else
result = poll(&pollfds, 1, 1000 * timeout);
if (result > 0 && (pollfds.revents & (POLLHUP | POLLERR)))
{
return ENOTCONN;
}
#endif
if (result < 0)
{
if (errno == EINTR && try_again_when_interrupt)
{
continue;
}
return errno != 0 ? errno : EINTR;
}
else if (result == 0)
{
return ETIMEDOUT;
}
}
return 0;
}
int tcpwritev_nb(int sock, const struct iovec *iov,
int iovcnt, const int timeout)
{
int write_bytes;
int bytes;
int result;
int remain_count;
int current_count;
int current_done;
int remain_len;
struct iovec *iob;
struct iovec iov_array[FC_IOV_BATCH_SIZE];
struct iovec *iovp;
#ifdef USE_SELECT
fd_set write_set;
struct timeval t;
#else
struct pollfd pollfds;
#endif
#ifdef USE_SELECT
FD_ZERO(&write_set);
FD_SET(sock, &write_set);
#else
pollfds.fd = sock;
pollfds.events = POLLOUT;
#endif
iovp = (struct iovec *)iov;
remain_count = current_count = iovcnt;
while (remain_count > 0)
{
write_bytes = writev(sock, iovp, current_count);
if (write_bytes > 0)
{
iob = iovp;
bytes = iob->iov_len;
while (bytes < write_bytes)
{
++iob;
bytes += iob->iov_len;
}
if (bytes == write_bytes)
{
++iob;
if (iob < iovp + current_count) {
bytes += iob->iov_len;
}
}
current_done = iob - iovp;
remain_count -= current_done;
if (remain_count == 0) {
break;
}
if (current_done == current_count) //full done
{
current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ?
remain_count : FC_IOV_BATCH_SIZE);
memcpy(iov_array, iov + (iovcnt - remain_count),
current_count * sizeof(struct iovec));
iovp = iov_array;
}
else //partial done
{
if (iovp == (struct iovec *)iov)
{
current_count = ((remain_count <= FC_IOV_BATCH_SIZE) ?
remain_count : FC_IOV_BATCH_SIZE);
memcpy(iov_array, iob, current_count *
sizeof(struct iovec));
iovp = iov_array;
}
else
{
current_count -= current_done;
iovp = iob;
}
/* adjust the first element */
remain_len = bytes - write_bytes;
if (remain_len < iovp->iov_len)
{
iovp->iov_base = (char *)iovp->iov_base +
(iovp->iov_len - remain_len);
iovp->iov_len = remain_len;
}
}
continue;
}
else if (write_bytes == 0)
{
return ENOTCONN;
}
if (!(errno == EAGAIN || errno == EWOULDBLOCK ||
(errno == EINTR && try_again_when_interrupt)))
{
return errno != 0 ? errno : EINTR;
}
#ifdef USE_SELECT
if (timeout <= 0)

View File

@ -159,7 +159,7 @@ int tcprecvdata_ex(int sock, void *data, const int size, \
* count: store the bytes recveived
* return: error no, 0 success, != 0 fail
*/
int tcprecvdata_nb_ex(int sock, void *data, const int size, \
int tcprecvdata_nb_ex(int sock, void *data, const int size,
const int timeout, int *count);
/** recv data (non-block mode) in ms
@ -167,13 +167,45 @@ int tcprecvdata_nb_ex(int sock, void *data, const int size, \
* sock: the socket
* data: the buffer
* size: buffer size (max bytes can receive)
* timeout: read timeout in milliseconds
* timeout_ms: read timeout in milliseconds
* count: store the bytes recveived
* return: error no, 0 success, != 0 fail
*/
int tcprecvdata_nb_ms(int sock, void *data, const int size, \
int tcprecvdata_nb_ms(int sock, void *data, const int size,
const int timeout_ms, int *count);
/** recv data by readv (non-block mode) in ms
* parameters:
* sock: the socket
* size: the expect size
* iov: the iov to send
* iovcnt: the iov count
* timeout_ms: read timeout in milliseconds
* total_bytes: store the bytes recveived
* return: error no, 0 success, != 0 fail
*/
int tcpreadv_nb_ms(int sock, const int size, const struct iovec *iov,
int iovcnt, const int timeout_ms, int *total_bytes);
/** recv data by readv (non-block mode)
* parameters:
* sock: the socket
* size: the expect size
* iov: the iov to send
* iovcnt: the iov count
* timeout: read timeout in seconds
* total_bytes: store the bytes recveived
* return: error no, 0 success, != 0 fail
*/
static inline int tcpreadv_nb_ex(int sock, const int size,
const struct iovec *iov, int iovcnt,
const int timeout, int *total_bytes)
{
return tcpreadv_nb_ms(sock, size, iov, iovcnt,
timeout * 1000, total_bytes);
}
/** send data (block mode)
* parameters:
* sock: the socket
@ -182,7 +214,7 @@ int tcprecvdata_nb_ms(int sock, void *data, const int size, \
* timeout: write timeout
* return: error no, 0 success, != 0 fail
*/
int tcpsenddata(int sock, void* data, const int size, const int timeout);
int tcpsenddata(int sock, void *data, const int size, const int timeout);
/** send data (non-block mode)
* parameters:
@ -192,7 +224,18 @@ int tcpsenddata(int sock, void* data, const int size, const int timeout);
* timeout: write timeout
* return: error no, 0 success, != 0 fail
*/
int tcpsenddata_nb(int sock, void* data, const int size, const int timeout);
int tcpsenddata_nb(int sock, void *data, const int size, const int timeout);
/** send data by writev (non-block mode)
* parameters:
* sock: the socket
* iov: the iov to send
* iovcnt: the iov count
* timeout: write timeout
* return: error no, 0 success, != 0 fail
*/
int tcpwritev_nb(int sock, const struct iovec *iov,
int iovcnt, const int timeout);
/** connect to server by block mode
* parameters:
@ -544,6 +587,9 @@ static inline int socketClientIPv6(const char *server_ip,
#define tcprecvdata_nb(sock, data, size, timeout) \
tcprecvdata_nb_ex(sock, data, size, timeout, NULL)
#define tcpreadv_nb(sock, size, iov, iovcnt, timeout) \
tcpreadv_nb_ex(sock, size, iov, iovcnt, timeout, NULL)
/** send a file
* parameters:
* sock: the socket