]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/workqueue.c
isisd: implemented the 'sequence-number-skipped' notification
[mirror_frr.git] / lib / workqueue.c
index 1789c6b7dbc163636bf94a2842712db4e3abfcf7..24ef24c7744930fb8978ad40d41112c4a0fad91c 100644 (file)
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License
- * along with Quagga; see the file COPYING.  If not, write to the Free
- * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
 #include <zebra.h>
@@ -58,6 +57,22 @@ static void work_queue_item_free(struct work_queue_item *item)
        return;
 }
 
+static void work_queue_item_remove(struct work_queue *wq,
+                                  struct work_queue_item *item)
+{
+       assert(item && item->data);
+
+       /* call private data deletion callback if needed */
+       if (wq->spec.del_item_data)
+               wq->spec.del_item_data(wq, item->data);
+
+       work_queue_item_dequeue(wq, item);
+
+       work_queue_item_free(item);
+
+       return;
+}
+
 /* create new work queue */
 struct work_queue *work_queue_new(struct thread_master *m,
                                  const char *queue_name)
@@ -66,40 +81,35 @@ struct work_queue *work_queue_new(struct thread_master *m,
 
        new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue));
 
-       if (new == NULL)
-               return new;
-
        new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name);
        new->master = m;
        SET_FLAG(new->flags, WQ_UNPLUGGED);
 
-       if ((new->items = list_new()) == NULL) {
-               XFREE(MTYPE_WORK_QUEUE_NAME, new->name);
-               XFREE(MTYPE_WORK_QUEUE, new);
-
-               return NULL;
-       }
-
-       new->items->del = (void (*)(void *))work_queue_item_free;
+       STAILQ_INIT(&new->items);
 
        listnode_add(work_queues, new);
 
        new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
 
-       /* Default values, can be overriden by caller */
+       /* Default values, can be overridden by caller */
        new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
        new->spec.yield = THREAD_YIELD_TIME_SLOT;
+       new->spec.retry = WORK_QUEUE_DEFAULT_RETRY;
 
        return new;
 }
 
-void work_queue_free(struct work_queue *wq)
+void work_queue_free_original(struct work_queue *wq)
 {
        if (wq->thread != NULL)
                thread_cancel(wq->thread);
 
-       /* list_delete frees items via callback */
-       list_delete(wq->items);
+       while (!work_queue_empty(wq)) {
+               struct work_queue_item *item = work_queue_last_item(wq);
+
+               work_queue_item_remove(wq, item);
+       }
+
        listnode_delete(work_queues, wq);
 
        XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
@@ -107,6 +117,12 @@ void work_queue_free(struct work_queue *wq)
        return;
 }
 
+void work_queue_free_and_null(struct work_queue **wq)
+{
+       work_queue_free_original(*wq);
+       *wq = NULL;
+}
+
 bool work_queue_is_scheduled(struct work_queue *wq)
 {
        return (wq->thread != NULL);
@@ -116,9 +132,19 @@ static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
 {
        /* if appropriate, schedule work queue thread */
        if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL)
-           && (listcount(wq->items) > 0)) {
-               wq->thread = thread_add_background(wq->master, work_queue_run,
-                                                  wq, delay);
+           && !work_queue_empty(wq)) {
+               wq->thread = NULL;
+
+               /* Schedule timer if there's a delay, otherwise just schedule
+                * as an 'event'
+                */
+               if (delay > 0)
+                       thread_add_timer_msec(wq->master, work_queue_run, wq,
+                                             delay, &wq->thread);
+               else
+                       thread_add_event(wq->master, work_queue_run, wq, 0,
+                                        &wq->thread);
+
                /* set thread yield time, if needed */
                if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT)
                        thread_set_yield_time(wq->thread, wq->spec.yield);
@@ -133,39 +159,23 @@ void work_queue_add(struct work_queue *wq, void *data)
 
        assert(wq);
 
-       if (!(item = work_queue_item_new(wq))) {
-               zlog_err("%s: unable to get new queue item", __func__);
-               return;
-       }
+       item = work_queue_item_new(wq);
 
        item->data = data;
-       listnode_add(wq->items, item);
+       work_queue_item_enqueue(wq, item);
 
        work_queue_schedule(wq, wq->spec.hold);
 
        return;
 }
 
-static void work_queue_item_remove(struct work_queue *wq, struct listnode *ln)
+static void work_queue_item_requeue(struct work_queue *wq,
+                                   struct work_queue_item *item)
 {
-       struct work_queue_item *item = listgetdata(ln);
-
-       assert(item && item->data);
-
-       /* call private data deletion callback if needed */
-       if (wq->spec.del_item_data)
-               wq->spec.del_item_data(wq, item->data);
+       work_queue_item_dequeue(wq, item);
 
-       list_delete_node(wq->items, ln);
-       work_queue_item_free(item);
-
-       return;
-}
-
-static void work_queue_item_requeue(struct work_queue *wq, struct listnode *ln)
-{
-       LISTNODE_DETACH(wq->items, ln);
-       LISTNODE_ATTACH(wq->items, ln); /* attach to end of list */
+       /* attach to end of list */
+       work_queue_item_enqueue(wq, item);
 }
 
 DEFUN (show_work_queues,
@@ -177,21 +187,21 @@ DEFUN (show_work_queues,
        struct listnode *node;
        struct work_queue *wq;
 
-       vty_out(vty, "%c %8s %5s %8s %8s %21s%s", ' ', "List", "(ms) ",
-               "Q. Runs", "Yields", "Cycle Counts   ", VTY_NEWLINE);
-       vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s%s", 'P', "Items",
+       vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ",
+               "Q. Runs", "Yields", "Cycle Counts   ");
+       vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items",
                "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.",
-               "Name", VTY_NEWLINE);
+               "Name");
 
        for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) {
-               vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s%s",
+               vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n",
                        (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
-                       listcount(wq->items), wq->spec.hold, wq->runs,
+                       work_queue_item_count(wq), wq->spec.hold, wq->runs,
                        wq->yields, wq->cycles.best, wq->cycles.granularity,
                        wq->cycles.total,
                        (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs)
                                   : 0,
-                       wq->name, VTY_NEWLINE);
+                       wq->name);
        }
 
        return CMD_SUCCESS;
@@ -233,16 +243,16 @@ void work_queue_unplug(struct work_queue *wq)
 int work_queue_run(struct thread *thread)
 {
        struct work_queue *wq;
-       struct work_queue_item *item;
-       wq_item_status ret;
+       struct work_queue_item *item, *titem;
+       wq_item_status ret = WQ_SUCCESS;
        unsigned int cycles = 0;
-       struct listnode *node, *nnode;
        char yielded = 0;
 
        wq = THREAD_ARG(thread);
-       wq->thread = NULL;
 
-       assert(wq && wq->items);
+       assert(wq);
+
+       wq->thread = NULL;
 
        /* calculate cycle granularity:
         * list iteration == 1 run
@@ -266,7 +276,7 @@ int work_queue_run(struct thread *thread)
        if (wq->cycles.granularity == 0)
                wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
 
-       for (ALL_LIST_ELEMENTS(wq->items, node, nnode, item)) {
+       STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) {
                assert(item && item->data);
 
                /* dont run items which are past their allowed retries */
@@ -274,7 +284,7 @@ int work_queue_run(struct thread *thread)
                        /* run error handler, if any */
                        if (wq->spec.errorfunc)
                                wq->spec.errorfunc(wq, item->data);
-                       work_queue_item_remove(wq, node);
+                       work_queue_item_remove(wq, item);
                        continue;
                }
 
@@ -298,7 +308,7 @@ int work_queue_run(struct thread *thread)
                }
                case WQ_REQUEUE: {
                        item->ran--;
-                       work_queue_item_requeue(wq, node);
+                       work_queue_item_requeue(wq, item);
                        /* If a single node is being used with a meta-queue
                         * (e.g., zebra),
                         * update the next node as we don't want to exit the
@@ -309,8 +319,8 @@ int work_queue_run(struct thread *thread)
                         * will kick in
                         * to terminate the thread when time has exceeded.
                         */
-                       if (nnode == NULL)
-                               nnode = node;
+                       if (titem == NULL)
+                               titem = item;
                        break;
                }
                case WQ_RETRY_NOW:
@@ -320,10 +330,10 @@ int work_queue_run(struct thread *thread)
                        if (wq->spec.errorfunc)
                                wq->spec.errorfunc(wq, item);
                }
-               /* fall through here is deliberate */
+               /* fallthru */
                case WQ_SUCCESS:
                default: {
-                       work_queue_item_remove(wq, node);
+                       work_queue_item_remove(wq, item);
                        break;
                }
                }
@@ -376,9 +386,14 @@ stats:
 #endif
 
        /* Is the queue done yet? If it is, call the completion callback. */
-       if (listcount(wq->items) > 0)
-               work_queue_schedule(wq, 0);
-       else if (wq->spec.completion_func)
+       if (!work_queue_empty(wq)) {
+               if (ret == WQ_RETRY_LATER ||
+                   ret == WQ_QUEUE_BLOCKED)
+                       work_queue_schedule(wq, wq->spec.retry);
+               else
+                       work_queue_schedule(wq, 0);
+
+       } else if (wq->spec.completion_func)
                wq->spec.completion_func(wq);
 
        return 0;