#205 Non-blocking IO + Extended request debug logging
Merged 6 years ago by cipherboy. Opened 6 years ago by cipherboy.
cipherboy/gssproxy nonblocking  into  master

file modified
+295 -22
@@ -7,9 +7,15 @@ 

  #include <stdlib.h>

  #include <time.h>

  #include <pthread.h>

+ #include <sys/epoll.h>

+ #include <fcntl.h>

+ #include <sys/timerfd.h>

  

  #define FRAGMENT_BIT (1 << 31)

  

+ #define RESPONSE_TIMEOUT 15

+ #define MAX_TIMEOUT_RETRY 3

+ 

  struct gpm_ctx {

      pthread_mutex_t lock;

      int fd;
@@ -20,6 +26,9 @@ 

      gid_t gid;

  

      int next_xid;

+ 

+     int epollfd;

+     int timerfd;

  };

  

  /* a single global struct is not particularly efficient,
@@ -39,6 +48,8 @@ 

      pthread_mutex_init(&gpm_global_ctx.lock, &attr);

  

      gpm_global_ctx.fd = -1;

+     gpm_global_ctx.epollfd = -1;

+     gpm_global_ctx.timerfd = -1;

  

      seedp = time(NULL) + getpid() + pthread_self();

      gpm_global_ctx.next_xid = rand_r(&seedp);
@@ -69,6 +80,7 @@ 

      struct sockaddr_un addr = {0};

      char name[PATH_MAX];

      int ret;

+     unsigned flags;

      int fd = -1;

  

      ret = get_pipe_name(name);
@@ -86,6 +98,18 @@ 

          goto done;

      }

  

+     ret = fcntl(fd, F_GETFD, &flags);

+     if (ret != 0) {

+         ret = errno;

+         goto done;

+     }

+ 

+     ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);

+     if (ret != 0) {

+         ret = errno;

+         goto done;

+     }

+ 

      ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));

      if (ret == -1) {

          ret = errno;
@@ -161,6 +185,158 @@ 

      return pthread_mutex_unlock(&gpmctx->lock);

  }

  

+ static void gpm_timer_close(struct gpm_ctx *gpmctx) {

+     if (gpmctx->timerfd < 0) {

+         return;

+     }

+ 

+     close(gpmctx->timerfd);

+     gpmctx->timerfd = -1;

+ }

+ 

+ static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {

+     int ret;

+     struct itimerspec its;

+ 

+     if (gpmctx->timerfd >= 0) {

+         gpm_timer_close(gpmctx);

+     }

+ 

+     gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);

+     if (gpmctx->timerfd < 0) {

+         return errno;

+     }

+ 

+     its.it_interval.tv_sec = timeout_seconds;

+     its.it_interval.tv_nsec = 0;

+     its.it_value.tv_sec = timeout_seconds;

+     its.it_value.tv_nsec = 0;

+ 

+     ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);

+     if (ret) {

+         ret = errno;

+         gpm_timer_close(gpmctx);

+         return ret;

+     }

+ 

+     return 0;

+ }

+ 

+ static void gpm_epoll_close(struct gpm_ctx *gpmctx) {

+     if (gpmctx->epollfd < 0) {

+         return;

+     }

+ 

+     close(gpmctx->epollfd);

+     gpmctx->epollfd = -1;

+ }

+ 

+ static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {

+     struct epoll_event ev;

+     int ret;

+ 

+     if (gpmctx->epollfd >= 0) {

+         gpm_epoll_close(gpmctx);

+     }

+ 

+     gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);

+     if (gpmctx->epollfd == -1) {

+         return errno;

+     }

+ 

+     /* Add timer */

+     ev.events = EPOLLIN;

+     ev.data.fd = gpmctx->timerfd;

+     ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev);

+     if (ret == -1) {

+         ret = errno;

+         gpm_epoll_close(gpmctx);

+         return ret;

+     }

+ 

+     return ret;

+ }

+ 

+ static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {

+     int ret;

+     int epoll_ret;

+     struct epoll_event ev;

+     struct epoll_event events[2];

+     uint64_t timer_read;

+ 

+     if (gpmctx->epollfd < 0) {

+         ret = gpm_epoll_setup(gpmctx);

+         if (ret)

+             return ret;

+     }

+ 

+     ev.events = event_flags;

+     ev.data.fd = gpmctx->fd;

+     epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev);

+     if (epoll_ret == -1) {

+         ret = errno;

+         gpm_epoll_close(gpmctx);

+         return ret;

+     }

+ 

+     do {

+         epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);

+     } while (epoll_ret < 0 && errno == EINTR);

+ 

+     if (epoll_ret < 0) {

+         /* Error while waiting that isn't EINTR */

+         ret = errno;

+         gpm_epoll_close(gpmctx);

+     } else if (epoll_ret == 0) {

+         /* Shouldn't happen as timeout == -1; treat it like a timeout

+          * occurred. */

+         ret = ETIMEDOUT;

+         gpm_epoll_close(gpmctx);

+     } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {

+         /* Got an event which is only our timer */

+         ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));

+         if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {

+             /* In the case when reading from the timer failed, don't hide the

+              * timer error behind ETIMEDOUT such that it isn't retried */

+             ret = errno;

+         } else {

+             /* If ret == 0, then we definitely timed out. Else, if ret == -1

+              * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird

+              * edge case where epoll thinks the timer can be read, but it

+              * is blocking more; treat it like a TIMEOUT and retry, as

+              * nothing around us would handle EAGAIN from timer and retry

+              * it. */

+             ret = ETIMEDOUT;

+         }

+         gpm_epoll_close(gpmctx);

+     } else {

+         /* If ret == 2, then we ignore the timerfd; that way if the next

+          * operation cannot be performed immediately, we timeout and retry.

+          * If ret == 1 and data.fd == gpmctx->fd, return 0. */

+         ret = 0;

+     }

+ 

+     epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);

+     if (epoll_ret == -1) {

+         /* If we previously had an error, expose that error instead of

+          * clobbering it with errno; else if no error, then assume it is

+          * better to notify of the error deleting the event than it is

+          * to continue. */

+         if (ret == 0)

+             ret = errno;

+         gpm_epoll_close(gpmctx);

+     }

+ 

+     return ret;

+ }

+ 

+ static int gpm_retry_socket(struct gpm_ctx *gpmctx)

+ {

+     gpm_epoll_close(gpmctx);

+     gpm_close_socket(gpmctx);

+     return gpm_open_socket(gpmctx);

+ }

+ 

  /* must be called after the lock has been grabbed */

  static int gpm_send_buffer(struct gpm_ctx *gpmctx,

                             char *buffer, uint32_t length)
@@ -181,8 +357,13 @@ 

      retry = false;

      do {

          do {

+             ret = gpm_epoll_wait(gpmctx, EPOLLOUT);

+             if (ret != 0) {

+                 goto done;

+             }

+ 

              ret = 0;

-             wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);

+             wn = write(gpmctx->fd, &size, sizeof(uint32_t));

              if (wn == -1) {

                  ret = errno;

              }
@@ -190,8 +371,7 @@ 

          if (wn != 4) {

              /* reopen and retry once */

              if (retry == false) {

-                 gpm_close_socket(gpmctx);

-                 ret = gpm_open_socket(gpmctx);

+                 ret = gpm_retry_socket(gpmctx);

                  if (ret == 0) {

                      retry = true;

                      continue;
@@ -206,9 +386,14 @@ 

  

      pos = 0;

      while (length > pos) {

-         wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);

+         ret = gpm_epoll_wait(gpmctx, EPOLLOUT);

+         if (ret) {

+             goto done;

+         }

+ 

+         wn = write(gpmctx->fd, buffer + pos, length - pos);

          if (wn == -1) {

-             if (errno == EINTR) {

+             if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {

                  continue;

              }

              ret = errno;
@@ -229,7 +414,7 @@ 

  

  /* must be called after the lock has been grabbed */

  static int gpm_recv_buffer(struct gpm_ctx *gpmctx,

-                            char *buffer, uint32_t *length)

+                            char **buffer, uint32_t *length)

  {

      uint32_t size;

      ssize_t rn;
@@ -237,6 +422,11 @@ 

      int ret;

  

      do {

+         ret = gpm_epoll_wait(gpmctx, EPOLLIN);

+         if (ret) {

+             goto done;

+         }

+ 

          ret = 0;

          rn = read(gpmctx->fd, &size, sizeof(uint32_t));

          if (rn == -1) {
@@ -256,11 +446,22 @@ 

          goto done;

      }

  

+     *buffer = malloc(*length);

+     if (*buffer == NULL) {

+         ret = ENOMEM;

+         goto done;

+     }

+ 

      pos = 0;

      while (*length > pos) {

-         rn = read(gpmctx->fd, buffer + pos, *length - pos);

+         ret = gpm_epoll_wait(gpmctx, EPOLLIN);

+         if (ret) {

+             goto done;

+         }

+ 

+         rn = read(gpmctx->fd, *buffer + pos, *length - pos);

          if (rn == -1) {

-             if (errno == EINTR) {

+             if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {

                  continue;

              }

              ret = errno;
@@ -279,6 +480,7 @@ 

      if (ret) {

          /* on errors we can only close the fd and return */

          gpm_close_socket(gpmctx);

+         gpm_epoll_close(gpmctx);

      }

      return ret;

  }
@@ -312,6 +514,63 @@ 

      return &gpm_global_ctx;

  }

  

+ static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,

+                               uint32_t send_length, char** recv_buffer,

+                               uint32_t *recv_length)

+ {

+     int ret;

+     int retry_count;

+ 

+     /* setup timer */

+     ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);

+     if (ret)

+         return ret;

+ 

+     for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {

+         /* send to proxy */

+         ret = gpm_send_buffer(gpmctx, send_buffer, send_length);

+ 

+         if (ret == 0) {

+             /* No error, continue to recv */

+         } else if (ret == ETIMEDOUT) {

+             /* Close and reopen socket before trying again */

+             ret = gpm_retry_socket(gpmctx);

+             if (ret != 0)

+                 return ret;

+             ret = ETIMEDOUT;

+ 

+             /* RETRY entire send */

+             continue;

+         } else {

+             /* Other error */

+             return ret;

+         }

+ 

+         /* receive answer */

+         ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);

+         if (ret == 0) {

+             /* No error */

+             break;

+         } else if (ret == ETIMEDOUT) {

+             /* Close and reopen socket before trying again */

+             ret = gpm_retry_socket(gpmctx);

+ 

+             /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */

+             free(recv_buffer);

+             recv_buffer = NULL;

+ 

+             if (ret != 0)

+                 return ret;

+             ret = ETIMEDOUT;

+         } else {

+             /* Other error */

+             return ret;

+         }

+     }

+ 

+     return ret;

+ }

+ 

  OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,

                               gss_buffer_t buffer)

  {
@@ -402,15 +661,20 @@ 

      gp_rpc_msg msg;

      XDR xdr_call_ctx;

      XDR xdr_reply_ctx;

-     char buffer[MAX_RPC_SIZE];

-     uint32_t length;

+     char *send_buffer = NULL;

+     char *recv_buffer = NULL;

+     uint32_t send_length;

+     uint32_t recv_length;

      uint32_t xid;

      bool xdrok;

      bool sockgrab = false;

      int ret;

  

-     xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);

-     xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);

+     send_buffer = malloc(MAX_RPC_SIZE);

+     if (send_buffer == NULL)

+         return ENOMEM;

+ 

+     xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);

  

      memset(&msg, 0, sizeof(gp_rpc_msg));

      msg.header.type = GP_RPC_CALL;
@@ -453,22 +717,22 @@ 

          goto done;

      }

  

-     /* send to proxy */

-     ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));

-     if (ret) {

-         goto done;

-     }

+     /* set send_length */

+     send_length = xdr_getpos(&xdr_call_ctx);

  

-     /* receive answer */

-     ret = gpm_recv_buffer(gpmctx, buffer, &length);

-     if (ret) {

+     /* Send request, receive response with timeout */

+     ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,

+                              &recv_length);

+     if (ret)

          goto done;

-     }

  

      /* release the lock */

      gpm_release_sock(gpmctx);

      sockgrab = false;

  

+     /* Create the reply context */

+     xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);

+ 

      /* decode header */

      memset(&msg, 0, sizeof(gp_rpc_msg));

      xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg);
@@ -492,12 +756,21 @@ 

      }

  

  done:

+     gpm_timer_close(gpmctx);

+     gpm_epoll_close(gpmctx);

+ 

      if (sockgrab) {

          gpm_release_sock(gpmctx);

      }

      xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg);

      xdr_destroy(&xdr_call_ctx);

-     xdr_destroy(&xdr_reply_ctx);

+ 

+     if (recv_buffer != NULL)

+         xdr_destroy(&xdr_reply_ctx);

+ 

+     free(send_buffer);

+     free(recv_buffer);

+ 

      return ret;

  }

  

@@ -372,9 +372,12 @@ 

      xdrmem_create(&xdr_reply_ctx, reply_buffer, MAX_RPC_SIZE, XDR_ENCODE);

  

      /* decode request */

+     GPDEBUGN(3, "[status] Processing request [%p (%zu)]\n", inbuf, inlen);

      ret = gp_rpc_decode_call(&xdr_call_ctx, &xid, &proc, &arg, &acc, &rej);

      if (!ret) {

          /* execute request */

+         GPDEBUGN(3, "[status] Executing request %d (%s) from [%p (%zu)]\n",

+                  proc, gp_rpc_procname(proc), inbuf, inlen);

          ret = gp_rpc_execute(gpcall, proc, &arg, &res);

          if (ret) {

              acc = GP_RPC_SYSTEM_ERR;
@@ -388,6 +391,9 @@ 

          /* return encoded buffer */

          ret = gp_rpc_return_buffer(&xdr_reply_ctx,

                                     reply_buffer, outbuf, outlen);

+         GPDEBUGN(3, "[status] Returned buffer %d (%s) from [%p (%zu)]: "

+                  "[%p (%zu)]\n", proc, gp_rpc_procname(proc), inbuf, inlen,

+                  *outbuf, *outlen);

      }

      /* free resources */

      gp_rpc_free_xdrs(proc, &arg, &res);

file modified
+12
@@ -487,6 +487,8 @@ 

  

      wbuf = calloc(1, sizeof(struct gp_buffer));

      if (!wbuf) {

+         GPDEBUGN(3, "[status] OOM in gp_socket_send_data: %p (%zu)\n",

+                  buffer, buflen);

          /* too bad, must kill the client connection now */

          gp_conn_free(conn);

          return;
@@ -513,6 +515,8 @@ 

  

      vecs = 0;

  

+     GPDEBUGN(3, "[status] Sending data: %p (%zu)\n", wbuf->data, wbuf->size);

+ 

      if (wbuf->pos == 0) {

          /* first write, send the buffer size as packet header */

          size = wbuf->size | FRAGMENT_BIT;
@@ -535,6 +539,9 @@ 

              gp_socket_schedule_write(vctx, wbuf);

          } else {

              /* error on socket, close and release it */

+             GPDEBUGN(3, "[status] Error %d in gp_socket_write on writing for "

+                      "[%p (%zu:%zu)]\n", errno, wbuf->data, wbuf->pos,

+                      wbuf->size);

              gp_conn_free(wbuf->conn);

              gp_buffer_free(wbuf);

          }
@@ -544,6 +551,8 @@ 

          if (wn < (ssize_t) sizeof(size)) {

              /* don't bother trying to handle sockets that can't

               * buffer even 4 bytes */

+             GPDEBUGN(3, "[status] Sending data [%p (%zu)]: failed with short "

+                      "write of %d\n", wbuf->data, wbuf->size, wn);

              gp_conn_free(wbuf->conn);

              gp_buffer_free(wbuf);

              return;
@@ -551,6 +560,9 @@ 

          wn -= sizeof(size);

      }

  

+     GPDEBUGN(3, "[status] Sending data [%p (%zu)]: successful write of %d\n",

+              wbuf->data, wbuf->size, wn);

+ 

      wbuf->pos += wn;

      if (wbuf->size > wbuf->pos) {

          /* short write, reschedule */

file modified
+5
@@ -319,6 +319,7 @@ 

              break;

  

          case GP_QUERY_OUT:

+             GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);

              gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);

              gp_query_free(q, false);

              break;
@@ -381,7 +382,11 @@ 

          gp_debug_set_conn_id(gp_conn_get_cid(q->conn));

  

          /* handle the client request */

+         GPDEBUGN(3, "[status] Handling query input: %p (%zu)\n", q->buffer,

+                  q->buflen);

          gp_handle_query(t->pool, q);

+         GPDEBUGN(3 ,"[status] Handling query output: %p (%zu)\n", q->buffer,

+                  q->buflen);

  

          /* now get lock on main queue, to play with the reply list */

          /* ======> POOL LOCK */

This converts client socket requests to use O_NONBLOCK in order to
add timeout functionality. The current timeout window three periods
of 15 seconds each. Upon failure, ETIMEDOUT is returned. If reading a
request times out, the entire request is retried.

This also adds extended logging on the server in level=3, allowing requests
and responses to be tracked through reading, decoding, encoding the reply,
and writing.

rebased

6 years ago

rebased

6 years ago
  • None of your debug statements are wrapped to 78 characters; please fix.
  • Please also reflow your commit bodies to 72 characters; more information on this formatting convention can be found here.
  • The epoll gunk currently in gpm_make_call should be in its own helper function.
  • Non-blocking commit message is misleading; it is send and recv together, not individually, that hit the retry limit.
  • Typo; "will be expose to the caller"
  • Please use less of if (func(foo) == val) and reuse more temp vars (there's usually one lying around in the function already)

re: epoll gunk, are you referring to the gssproxy socket retry or the entire loop? I broke the socket retry logic into another function, but if you want the loop too, that's possible as well.

Otherwise, updated. Sorry about widths.

(there's one remaining if (func(foo) == val), but that's because the call would then clobber ret. Do you want me to introduce another temp variable?)

rebased

6 years ago

Pagure decided to eat my email, so:

re: epoll gunk, are you referring to the gssproxy socket retry or the
entire loop? I broke the socket retry logic into another function, but
if you want the loop too, that's possible as well.

The large loop. Whether that should also include the
gpm_timer_setup() call as well is up to you.

(there's one remaining if (func(foo) == val), but that's because the
call would then clobber ret. Do you want me to introduce another
temp variable?) ``

That would probably be good - maybe an epoll-specific one, epoll_ret,
or so?

This should look a little better then :)

Moved the loop to gpm_timeout_loop(...), and added epoll_ret.

rebased

6 years ago

please call it gpm_send_recv_loop()

rebased

6 years ago

I think you missed one place to use epoll_ret - ret = epoll_wait(gpmctx->epollfd, events, 2, -1);

rebased

6 years ago

@rharwood: Originally I thought that would have the same problem we're trying to avoid -- now clobbering epoll_ret with the latter epoll_ctl call return value. But I guess not. :) Updated.

Commit 4097daf fixes this pull-request

Pull-Request has been merged by ascheel@redhat.com

6 years ago

Commit d46603c fixes this pull-request

Pull-Request has been merged by ascheel@redhat.com

6 years ago

Commit 4097daf fixes this pull-request

Pull-Request has been merged by ascheel@redhat.com

6 years ago