]> git.proxmox.com Git - mirror_frr.git/commitdiff
lib: mt-safe cancel, round deux
authorQuentin Young <qlyoung@cumulusnetworks.com>
Fri, 9 Jun 2017 03:40:27 +0000 (03:40 +0000)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Fri, 16 Jun 2017 19:56:58 +0000 (19:56 +0000)
* Update pollfds copy as well as the original
* Keep array count for copy in thread_master
* Remove last remnants of POLLHUP in .events field
* Remove unused snmpcount (lolwut)
* Improve docs
* Add missing do_thread_cancel() call in thread_cancel_event()
* Change thread_fetch() to always enter poll() to avoid starving i/o
* Remember to free up cancel_req when destroying thread_master
* Fix dereference of null pointer
* Fix dead store to timeval
* Fix missing condition for condition variable :-)

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
lib/thread.c
lib/thread.h

index fad80ffc3262848267cb534f50ee6cc69a87d5b6..3f6945ca02ebc22e605fd8d42714f8bbc22ed0bc 100644 (file)
@@ -384,6 +384,7 @@ thread_master_create (void)
   rv->owner = pthread_self();
   rv->cancel_req = list_new ();
   rv->cancel_req->del = cancelreq_del;
+  rv->canceled = true;
   pipe (rv->io_pipe);
   set_nonblocking (rv->io_pipe[0]);
   set_nonblocking (rv->io_pipe[1]);
@@ -542,6 +543,7 @@ thread_master_free (struct thread_master *m)
   pthread_mutex_destroy (&m->mtx);
   close (m->io_pipe[0]);
   close (m->io_pipe[1]);
+  list_delete (m->cancel_req);
 
   XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
   XFREE (MTYPE_THREAD_MASTER, m->handler.copy);
@@ -646,7 +648,7 @@ thread_get (struct thread_master *m, u_char type,
 
 static int
 fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
-         nfds_t count, struct timeval *timer_wait)
+         nfds_t count, const struct timeval *timer_wait)
 {
   /* If timer_wait is null here, that means poll() should block indefinitely,
    * unless the thread_master has overriden it by setting ->selectpoll_timeout.
@@ -875,24 +877,57 @@ funcname_thread_add_event (struct thread_master *m,
 
 /* Thread cancellation ------------------------------------------------------ */
 
+/**
+ * NOT's out the .events field of pollfd corresponding to the given file
+ * descriptor. The event to be NOT'd is passed in the 'state' parameter.
+ *
+ * This needs to happen for both copies of pollfd's. See 'thread_fetch'
+ * implementation for details.
+ *
+ * @param master
+ * @param fd
+ * @param state the event to cancel. One or more (OR'd together) of the
+ * following:
+ *   - POLLIN
+ *   - POLLOUT
+ */
 static void
-thread_cancel_read_or_write (struct thread *thread, short int state)
+thread_cancel_rw (struct thread_master *master, int fd, short state)
 {
-  for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i)
-    if (thread->master->handler.pfds[i].fd == thread->u.fd)
-      {
-        thread->master->handler.pfds[i].events &= ~(state);
+  /* Cancel POLLHUP too just in case some bozo set it */
+  state |= POLLHUP;
 
-        /* remove thread fds from pfd list */
-        if (thread->master->handler.pfds[i].events == 0)
-          {
-            memmove(thread->master->handler.pfds+i,
-                    thread->master->handler.pfds+i+1,
-                    (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
-            thread->master->handler.pfdcount--;
-            return;
-          }
-      }
+  /* find the index of corresponding pollfd */
+  nfds_t i;
+
+  for (i = 0; i < master->handler.pfdcount; i++)
+    if (master->handler.pfds[i].fd == fd)
+      break;
+
+  /* NOT out event. */
+  master->handler.pfds[i].events &= ~(state);
+
+  /* If all events are canceled, delete / resize the pollfd array. */
+  if (master->handler.pfds[i].events == 0)
+    {
+      memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
+              (master->handler.pfdcount - i - 1) * sizeof (struct pollfd));
+      master->handler.pfdcount--;
+    }
+
+  /* If we have the same pollfd in the copy, perform the same operations,
+   * otherwise return. */
+  if (i >= master->handler.copycount)
+    return;
+
+  master->handler.copy[i].events &= ~(state);
+
+  if (master->handler.copy[i].events == 0)
+    {
+      memmove(master->handler.copy + i, master->handler.copy + i + 1,
+              (master->handler.copycount - i - 1) * sizeof (struct pollfd));
+      master->handler.copycount--;
+    }
 }
 
 /**
@@ -966,21 +1001,21 @@ do_thread_cancel (struct thread_master *master)
       switch (thread->type)
         {
         case THREAD_READ:
-          thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
-          thread_array = thread->master->read;
+          thread_cancel_rw (master, thread->u.fd, POLLIN);
+          thread_array = master->read;
           break;
         case THREAD_WRITE:
-          thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
-          thread_array = thread->master->write;
+          thread_cancel_rw (master, thread->u.fd, POLLOUT);
+          thread_array = master->write;
           break;
         case THREAD_TIMER:
-          queue = thread->master->timer;
+          queue = master->timer;
           break;
         case THREAD_EVENT:
-          list = &thread->master->event;
+          list = &master->event;
           break;
         case THREAD_READY:
-          list = &thread->master->ready;
+          list = &master->ready;
           break;
         default:
           continue;
@@ -1015,6 +1050,7 @@ do_thread_cancel (struct thread_master *master)
   list_delete_all_node (master->cancel_req);
 
   /* Wake up any threads which may be blocked in thread_cancel_async() */
+  master->canceled = true;
   pthread_cond_broadcast (&master->cancel_cond);
 }
 
@@ -1036,6 +1072,7 @@ thread_cancel_event (struct thread_master *master, void *arg)
     struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
     cr->eventobj = arg;
     listnode_add (master->cancel_req, cr);
+    do_thread_cancel(master);
   }
   pthread_mutex_unlock (&master->mtx);
 }
@@ -1065,35 +1102,56 @@ thread_cancel (struct thread *thread)
 /**
  * Asynchronous cancellation.
  *
- * Called with a pointer to a thread, this function posts a cancellation
- * request and blocks until it is serviced.
+ * Called with either a struct thread ** or void * to an event argument,
+ * this function posts the correct cancellation request and blocks until it is
+ * serviced.
  *
  * If the thread is currently running, execution blocks until it completes.
  *
+ * The last two parameters are mutually exclusive, i.e. if you pass one the
+ * other must be NULL.
+ *
+ * When the cancellation procedure executes on the target thread_master, the
+ * thread * provided is checked for nullity. If it is null, the thread is
+ * assumed to no longer exist and the cancellation request is a no-op. Thus
+ * users of this API must pass a back-reference when scheduling the original
+ * task.
+ *
  * MT-Safe
  *
- * @param thread the thread to cancel
+ * @param master the thread master with the relevant event / task
+ * @param thread pointer to thread to cancel
+ * @param eventobj the event
  */
 void
-thread_cancel_async (struct thread_master *master, struct thread **thread, void *eventobj)
+thread_cancel_async (struct thread_master *master, struct thread **thread,
+                     void *eventobj)
 {
   assert (!(thread && eventobj) && (thread || eventobj));
   assert (master->owner != pthread_self());
 
   pthread_mutex_lock (&master->mtx);
   {
-    if (*thread) {
-      struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
-      cr->threadref = thread;
-      listnode_add (master->cancel_req, cr);
-    }
-    else if (eventobj) {
-      struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
-      cr->eventobj = eventobj;
-      listnode_add (master->cancel_req, cr);
-    }
+    master->canceled = false;
+
+    if (thread)
+      {
+        struct cancel_req *cr =
+          XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
+        cr->threadref = thread;
+        listnode_add (master->cancel_req, cr);
+      }
+    else if (eventobj)
+      {
+        struct cancel_req *cr =
+          XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
+        cr->eventobj = eventobj;
+        listnode_add (master->cancel_req, cr);
+      }
     AWAKEN (master);
-    pthread_cond_wait (&master->cancel_cond, &master->mtx);
+
+    while (!master->canceled)
+      pthread_cond_wait (&master->cancel_cond, &master->mtx);
   }
   pthread_mutex_unlock (&master->mtx);
 }
@@ -1143,13 +1201,22 @@ thread_process_io_helper (struct thread_master *m, struct thread *thread,
   return 1;
 }
 
+/**
+ * Process I/O events.
+ *
+ * Walks through file descriptor array looking for those pollfds whose .revents
+ * field has something interesting. Deletes any invalid file descriptors.
+ *
+ * @param m the thread master
+ * @param num the number of active file descriptors (return value of poll())
+ */
 static void
-thread_process_io (struct thread_master *m, struct pollfd *pfds,
-        unsigned int num, unsigned int count)
+thread_process_io (struct thread_master *m, unsigned int num)
 {
   unsigned int ready = 0;
+  struct pollfd *pfds = m->handler.copy;
 
-  for (nfds_t i = 0; i < count && ready < num ; ++i)
+  for (nfds_t i = 0; i < m->handler.copycount && ready < num ; ++i)
     {
       /* no event for current fd? immediately continue */
       if (pfds[i].revents == 0)
@@ -1178,8 +1245,9 @@ thread_process_io (struct thread_master *m, struct pollfd *pfds,
           m->handler.pfdcount--;
 
           memmove (pfds + i, pfds + i + 1,
-                   (count - i - 1) * sizeof(struct pollfd));
-          count--;
+                   (m->handler.copycount - i - 1) * sizeof(struct pollfd));
+          m->handler.copycount--;
+
           i--;
         }
     }
@@ -1231,14 +1299,14 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
 {
   struct thread *thread = NULL;
   struct timeval now;
-  struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
-  struct timeval *timer_wait = &timer_val;
-  unsigned int count;
+  struct timeval zerotime = { 0, 0 };
+  struct timeval timer_val;
+  struct timeval *timer_wait;
 
   int num = 0;
 
   do {
-    /* 1. Handle signals if any */
+    /* Handle signals if any */
     if (m->handle_signals)
       quagga_sigevent_process ();
 
@@ -1254,41 +1322,41 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
     /* If there are no tasks on the ready queue, we will poll() until a timer
      * expires or we receive I/O, whichever comes first. The strategy for doing
      * this is to set the poll() timeout to the time remaining until the next
-     * timer expires. */
-    if (m->ready.count == 0)
-      {
-        timer_wait = thread_timer_wait (m->timer, &timer_val);
-
-        /* If negative timeout, we wish to poll() indefinitely. */
-        if (timer_wait && timer_wait->tv_sec < 0)
-          {
-            timerclear(&timer_val);
-            timer_wait = &timer_val;
-          }
-
-        /* Calculate number of file descriptors and make a temporary copy */
-        count = m->handler.pfdcount + m->handler.pfdcountsnmp;
-        memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd));
-
-        pthread_mutex_unlock (&m->mtx);
-        {
-          num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait);
-        }
-        pthread_mutex_lock (&m->mtx);
-
-        /* Handle any errors received in poll() */
-        if (num < 0)
-          {
-            if (errno == EINTR)
-              {
-                pthread_mutex_unlock (&m->mtx);
-                continue; /* loop around to signal handler */
-              }
-            zlog_warn ("poll() error: %s", safe_strerror (errno));
-            pthread_mutex_unlock (&m->mtx);
-            return NULL;
-          }
-      }
+     * timer expires. We need to hit poll() at least once per loop to avoid
+     * starvation by events. */
+
+     /* timer_wait will be NULL if there are no pending timers */
+     timer_wait = thread_timer_wait (m->timer, &timer_val);
+
+     /* If negative timeout, we wish to perform a nonblocking poll() */
+     if (timer_wait && !timercmp (timer_wait, &zerotime, >))
+       timer_wait = &zerotime;
+
+     /* Copy pollfd array + # active pollfds in it. Not necessary to copy
+      * the array size as this is fixed. */
+     m->handler.copycount = m->handler.pfdcount;
+     memcpy (m->handler.copy, m->handler.pfds,
+             m->handler.copycount * sizeof (struct pollfd));
+
+     pthread_mutex_unlock (&m->mtx);
+     {
+       num = fd_poll (m, m->handler.copy, m->handler.pfdsize,
+                      m->handler.copycount, timer_wait);
+     }
+     pthread_mutex_lock (&m->mtx);
+
+     /* Handle any errors received in poll() */
+     if (num < 0)
+       {
+         if (errno == EINTR)
+           {
+             pthread_mutex_unlock (&m->mtx);
+             continue; /* loop around to signal handler */
+           }
+         zlog_warn ("poll() error: %s", safe_strerror (errno));
+         pthread_mutex_unlock (&m->mtx);
+         return NULL;
+       }
 
     /* Since we could have received more cancellation requests during poll(), process those */
     do_thread_cancel (m);
@@ -1299,7 +1367,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
 
     /* Post I/O to ready queue. */
     if (num > 0)
-      thread_process_io (m, m->handler.copy, num, count);
+      thread_process_io (m, num);
 
     /* If we have a ready task, break the loop and return it to the caller */
     if ((thread = thread_trim_head (&m->ready)))
index d49d325f750d6b1c6af041a11c40e621e08580f2..e48068b174e5a38f9986dcc036edc078af645106 100644 (file)
@@ -47,16 +47,19 @@ struct pqueue;
 
 struct fd_handler
 {
-  /* number of pfd stored in pfds */
-  nfds_t pfdcount;
-  /* number of pfd stored in pfds + number of snmp pfd */
-  nfds_t pfdcountsnmp;
-  /* number of pfd that fit in the allocated space of pfds */
+  /* number of pfd that fit in the allocated space of pfds. This is a constant
+   * and is the same for both pfds and copy. */
   nfds_t pfdsize;
+
   /* file descriptors to monitor for i/o */
   struct pollfd *pfds;
+  /* number of pollfds stored in pfds */
+  nfds_t pfdcount;
+
   /* chunk used for temp copy of pollfds */
   struct pollfd *copy;
+  /* number of pollfds stored in copy */
+  nfds_t copycount;
 };
 
 struct cancel_req {
@@ -75,6 +78,7 @@ struct thread_master
   struct thread_list ready;
   struct thread_list unuse;
   struct list *cancel_req;
+  bool canceled;
   pthread_cond_t cancel_cond;
   int io_pipe[2];
   int fd_limit;