| |
@@ -7,9 +7,15 @@
|
| |
#include <stdlib.h>
|
| |
#include <time.h>
|
| |
#include <pthread.h>
|
| |
+ #include <sys/epoll.h>
|
| |
+ #include <fcntl.h>
|
| |
+ #include <sys/timerfd.h>
|
| |
|
| |
#define FRAGMENT_BIT (1 << 31)
|
| |
|
| |
+ #define RESPONSE_TIMEOUT 15
|
| |
+ #define MAX_TIMEOUT_RETRY 3
|
| |
+
|
| |
struct gpm_ctx {
|
| |
pthread_mutex_t lock;
|
| |
int fd;
|
| |
@@ -20,6 +26,9 @@
|
| |
gid_t gid;
|
| |
|
| |
int next_xid;
|
| |
+
|
| |
+ int epollfd;
|
| |
+ int timerfd;
|
| |
};
|
| |
|
| |
/* a single global struct is not particularly efficient,
|
| |
@@ -39,6 +48,8 @@
|
| |
pthread_mutex_init(&gpm_global_ctx.lock, &attr);
|
| |
|
| |
gpm_global_ctx.fd = -1;
|
| |
+ gpm_global_ctx.epollfd = -1;
|
| |
+ gpm_global_ctx.timerfd = -1;
|
| |
|
| |
seedp = time(NULL) + getpid() + pthread_self();
|
| |
gpm_global_ctx.next_xid = rand_r(&seedp);
|
| |
@@ -69,6 +80,7 @@
|
| |
struct sockaddr_un addr = {0};
|
| |
char name[PATH_MAX];
|
| |
int ret;
|
| |
+ unsigned flags;
|
| |
int fd = -1;
|
| |
|
| |
ret = get_pipe_name(name);
|
| |
@@ -86,6 +98,18 @@
|
| |
goto done;
|
| |
}
|
| |
|
| |
+ ret = fcntl(fd, F_GETFD, &flags);
|
| |
+ if (ret != 0) {
|
| |
+ ret = errno;
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
+ ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
|
| |
+ if (ret != 0) {
|
| |
+ ret = errno;
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
|
| |
if (ret == -1) {
|
| |
ret = errno;
|
| |
@@ -161,6 +185,158 @@
|
| |
return pthread_mutex_unlock(&gpmctx->lock);
|
| |
}
|
| |
|
| |
+ static void gpm_timer_close(struct gpm_ctx *gpmctx) {
|
| |
+ if (gpmctx->timerfd < 0) {
|
| |
+ return;
|
| |
+ }
|
| |
+
|
| |
+ close(gpmctx->timerfd);
|
| |
+ gpmctx->timerfd = -1;
|
| |
+ }
|
| |
+
|
| |
+ static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
|
| |
+ int ret;
|
| |
+ struct itimerspec its;
|
| |
+
|
| |
+ if (gpmctx->timerfd >= 0) {
|
| |
+ gpm_timer_close(gpmctx);
|
| |
+ }
|
| |
+
|
| |
+ gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
|
| |
+ if (gpmctx->timerfd < 0) {
|
| |
+ return errno;
|
| |
+ }
|
| |
+
|
| |
+ its.it_interval.tv_sec = timeout_seconds;
|
| |
+ its.it_interval.tv_nsec = 0;
|
| |
+ its.it_value.tv_sec = timeout_seconds;
|
| |
+ its.it_value.tv_nsec = 0;
|
| |
+
|
| |
+ ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
|
| |
+ if (ret) {
|
| |
+ ret = errno;
|
| |
+ gpm_timer_close(gpmctx);
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ return 0;
|
| |
+ }
|
| |
+
|
| |
+ static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
|
| |
+ if (gpmctx->epollfd < 0) {
|
| |
+ return;
|
| |
+ }
|
| |
+
|
| |
+ close(gpmctx->epollfd);
|
| |
+ gpmctx->epollfd = -1;
|
| |
+ }
|
| |
+
|
| |
+ static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
|
| |
+ struct epoll_event ev;
|
| |
+ int ret;
|
| |
+
|
| |
+ if (gpmctx->epollfd >= 0) {
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ }
|
| |
+
|
| |
+ gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
|
| |
+ if (gpmctx->epollfd == -1) {
|
| |
+ return errno;
|
| |
+ }
|
| |
+
|
| |
+ /* Add timer */
|
| |
+ ev.events = EPOLLIN;
|
| |
+ ev.data.fd = gpmctx->timerfd;
|
| |
+ ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev);
|
| |
+ if (ret == -1) {
|
| |
+ ret = errno;
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
|
| |
+ int ret;
|
| |
+ int epoll_ret;
|
| |
+ struct epoll_event ev;
|
| |
+ struct epoll_event events[2];
|
| |
+ uint64_t timer_read;
|
| |
+
|
| |
+ if (gpmctx->epollfd < 0) {
|
| |
+ ret = gpm_epoll_setup(gpmctx);
|
| |
+ if (ret)
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ ev.events = event_flags;
|
| |
+ ev.data.fd = gpmctx->fd;
|
| |
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev);
|
| |
+ if (epoll_ret == -1) {
|
| |
+ ret = errno;
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ do {
|
| |
+ epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
|
| |
+ } while (epoll_ret < 0 && errno == EINTR);
|
| |
+
|
| |
+ if (epoll_ret < 0) {
|
| |
+ /* Error while waiting that isn't EINTR */
|
| |
+ ret = errno;
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ } else if (epoll_ret == 0) {
|
| |
+ /* Shouldn't happen as timeout == -1; treat it like a timeout
|
| |
+ * occurred. */
|
| |
+ ret = ETIMEDOUT;
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
|
| |
+ /* Got an event which is only our timer */
|
| |
+ ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
|
| |
+ if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
|
| |
+ /* In the case when reading from the timer failed, don't hide the
|
| |
+ * timer error behind ETIMEDOUT such that it isn't retried */
|
| |
+ ret = errno;
|
| |
+ } else {
|
| |
+ /* If ret == 0, then we definitely timed out. Else, if ret == -1
|
| |
+ * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
|
| |
+ * edge case where epoll thinks the timer can be read, but it
|
| |
+ * is blocking more; treat it like a TIMEOUT and retry, as
|
| |
+ * nothing around us would handle EAGAIN from timer and retry
|
| |
+ * it. */
|
| |
+ ret = ETIMEDOUT;
|
| |
+ }
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ } else {
|
| |
+ /* If ret == 2, then we ignore the timerfd; that way if the next
|
| |
+ * operation cannot be performed immediately, we timeout and retry.
|
| |
+ * If ret == 1 and data.fd == gpmctx->fd, return 0. */
|
| |
+ ret = 0;
|
| |
+ }
|
| |
+
|
| |
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
|
| |
+ if (epoll_ret == -1) {
|
| |
+ /* If we previously had an error, expose that error instead of
|
| |
+ * clobbering it with errno; else if no error, then assume it is
|
| |
+ * better to notify of the error deleting the event than it is
|
| |
+ * to continue. */
|
| |
+ if (ret == 0)
|
| |
+ ret = errno;
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ }
|
| |
+
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ static int gpm_retry_socket(struct gpm_ctx *gpmctx)
|
| |
+ {
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+ gpm_close_socket(gpmctx);
|
| |
+ return gpm_open_socket(gpmctx);
|
| |
+ }
|
| |
+
|
| |
/* must be called after the lock has been grabbed */
|
| |
static int gpm_send_buffer(struct gpm_ctx *gpmctx,
|
| |
char *buffer, uint32_t length)
|
| |
@@ -181,8 +357,13 @@
|
| |
retry = false;
|
| |
do {
|
| |
do {
|
| |
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
|
| |
+ if (ret != 0) {
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
ret = 0;
|
| |
- wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
|
| |
+ wn = write(gpmctx->fd, &size, sizeof(uint32_t));
|
| |
if (wn == -1) {
|
| |
ret = errno;
|
| |
}
|
| |
@@ -190,8 +371,7 @@
|
| |
if (wn != 4) {
|
| |
/* reopen and retry once */
|
| |
if (retry == false) {
|
| |
- gpm_close_socket(gpmctx);
|
| |
- ret = gpm_open_socket(gpmctx);
|
| |
+ ret = gpm_retry_socket(gpmctx);
|
| |
if (ret == 0) {
|
| |
retry = true;
|
| |
continue;
|
| |
@@ -206,9 +386,14 @@
|
| |
|
| |
pos = 0;
|
| |
while (length > pos) {
|
| |
- wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
|
| |
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
|
| |
+ if (ret) {
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
+ wn = write(gpmctx->fd, buffer + pos, length - pos);
|
| |
if (wn == -1) {
|
| |
- if (errno == EINTR) {
|
| |
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
|
| |
continue;
|
| |
}
|
| |
ret = errno;
|
| |
@@ -229,7 +414,7 @@
|
| |
|
| |
/* must be called after the lock has been grabbed */
|
| |
static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
|
| |
- char *buffer, uint32_t *length)
|
| |
+ char **buffer, uint32_t *length)
|
| |
{
|
| |
uint32_t size;
|
| |
ssize_t rn;
|
| |
@@ -237,6 +422,11 @@
|
| |
int ret;
|
| |
|
| |
do {
|
| |
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
|
| |
+ if (ret) {
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
ret = 0;
|
| |
rn = read(gpmctx->fd, &size, sizeof(uint32_t));
|
| |
if (rn == -1) {
|
| |
@@ -256,11 +446,22 @@
|
| |
goto done;
|
| |
}
|
| |
|
| |
+ *buffer = malloc(*length);
|
| |
+ if (*buffer == NULL) {
|
| |
+ ret = ENOMEM;
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
pos = 0;
|
| |
while (*length > pos) {
|
| |
- rn = read(gpmctx->fd, buffer + pos, *length - pos);
|
| |
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
|
| |
+ if (ret) {
|
| |
+ goto done;
|
| |
+ }
|
| |
+
|
| |
+ rn = read(gpmctx->fd, *buffer + pos, *length - pos);
|
| |
if (rn == -1) {
|
| |
- if (errno == EINTR) {
|
| |
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
|
| |
continue;
|
| |
}
|
| |
ret = errno;
|
| |
@@ -279,6 +480,7 @@
|
| |
if (ret) {
|
| |
/* on errors we can only close the fd and return */
|
| |
gpm_close_socket(gpmctx);
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
}
|
| |
return ret;
|
| |
}
|
| |
@@ -312,6 +514,63 @@
|
| |
return &gpm_global_ctx;
|
| |
}
|
| |
|
| |
+ static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
|
| |
+ uint32_t send_length, char** recv_buffer,
|
| |
+ uint32_t *recv_length)
|
| |
+ {
|
| |
+ int ret;
|
| |
+ int retry_count;
|
| |
+
|
| |
+ /* setup timer */
|
| |
+ ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
|
| |
+ if (ret)
|
| |
+ return ret;
|
| |
+
|
| |
+ for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
|
| |
+ /* send to proxy */
|
| |
+ ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
|
| |
+
|
| |
+ if (ret == 0) {
|
| |
+ /* No error, continue to recv */
|
| |
+ } else if (ret == ETIMEDOUT) {
|
| |
+ /* Close and reopen socket before trying again */
|
| |
+ ret = gpm_retry_socket(gpmctx);
|
| |
+ if (ret != 0)
|
| |
+ return ret;
|
| |
+ ret = ETIMEDOUT;
|
| |
+
|
| |
+ /* RETRY entire send */
|
| |
+ continue;
|
| |
+ } else {
|
| |
+ /* Other error */
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
+ /* receive answer */
|
| |
+ ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
|
| |
+ if (ret == 0) {
|
| |
+ /* No error */
|
| |
+ break;
|
| |
+ } else if (ret == ETIMEDOUT) {
|
| |
+ /* Close and reopen socket before trying again */
|
| |
+ ret = gpm_retry_socket(gpmctx);
|
| |
+
|
| |
+ /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
|
| |
+ free(recv_buffer);
|
| |
+ recv_buffer = NULL;
|
| |
+
|
| |
+ if (ret != 0)
|
| |
+ return ret;
|
| |
+ ret = ETIMEDOUT;
|
| |
+ } else {
|
| |
+ /* Other error */
|
| |
+ return ret;
|
| |
+ }
|
| |
+ }
|
| |
+
|
| |
+ return ret;
|
| |
+ }
|
| |
+
|
| |
OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
|
| |
gss_buffer_t buffer)
|
| |
{
|
| |
@@ -402,15 +661,20 @@
|
| |
gp_rpc_msg msg;
|
| |
XDR xdr_call_ctx;
|
| |
XDR xdr_reply_ctx;
|
| |
- char buffer[MAX_RPC_SIZE];
|
| |
- uint32_t length;
|
| |
+ char *send_buffer = NULL;
|
| |
+ char *recv_buffer = NULL;
|
| |
+ uint32_t send_length;
|
| |
+ uint32_t recv_length;
|
| |
uint32_t xid;
|
| |
bool xdrok;
|
| |
bool sockgrab = false;
|
| |
int ret;
|
| |
|
| |
- xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
|
| |
- xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
|
| |
+ send_buffer = malloc(MAX_RPC_SIZE);
|
| |
+ if (send_buffer == NULL)
|
| |
+ return ENOMEM;
|
| |
+
|
| |
+ xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
|
| |
|
| |
memset(&msg, 0, sizeof(gp_rpc_msg));
|
| |
msg.header.type = GP_RPC_CALL;
|
| |
@@ -453,22 +717,22 @@
|
| |
goto done;
|
| |
}
|
| |
|
| |
- /* send to proxy */
|
| |
- ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
|
| |
- if (ret) {
|
| |
- goto done;
|
| |
- }
|
| |
+ /* set send_length */
|
| |
+ send_length = xdr_getpos(&xdr_call_ctx);
|
| |
|
| |
- /* receive answer */
|
| |
- ret = gpm_recv_buffer(gpmctx, buffer, &length);
|
| |
- if (ret) {
|
| |
+ /* Send request, receive response with timeout */
|
| |
+ ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
|
| |
+ &recv_length);
|
| |
+ if (ret)
|
| |
goto done;
|
| |
- }
|
| |
|
| |
/* release the lock */
|
| |
gpm_release_sock(gpmctx);
|
| |
sockgrab = false;
|
| |
|
| |
+ /* Create the reply context */
|
| |
+ xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
|
| |
+
|
| |
/* decode header */
|
| |
memset(&msg, 0, sizeof(gp_rpc_msg));
|
| |
xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg);
|
| |
@@ -492,12 +756,21 @@
|
| |
}
|
| |
|
| |
done:
|
| |
+ gpm_timer_close(gpmctx);
|
| |
+ gpm_epoll_close(gpmctx);
|
| |
+
|
| |
if (sockgrab) {
|
| |
gpm_release_sock(gpmctx);
|
| |
}
|
| |
xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg);
|
| |
xdr_destroy(&xdr_call_ctx);
|
| |
- xdr_destroy(&xdr_reply_ctx);
|
| |
+
|
| |
+ if (recv_buffer != NULL)
|
| |
+ xdr_destroy(&xdr_reply_ctx);
|
| |
+
|
| |
+ free(send_buffer);
|
| |
+ free(recv_buffer);
|
| |
+
|
| |
return ret;
|
| |
}
|
| |
|
| |
This converts client socket requests to use O_NONBLOCK in order to
add timeout functionality. The current timeout window three periods
of 15 seconds each. Upon failure, ETIMEDOUT is returned. If reading a
request times out, the entire request is retried.
This also adds extended logging on the server in level=3, allowing requests
and responses to be tracked through reading, decoding, encoding the reply,
and writing.