return;
}
+static void work_queue_item_remove(struct work_queue *wq,
+ struct work_queue_item *item)
+{
+ assert(item && item->data);
+
+ /* call private data deletion callback if needed */
+ if (wq->spec.del_item_data)
+ wq->spec.del_item_data(wq, item->data);
+
+ work_queue_item_dequeue(wq, item);
+
+ work_queue_item_free(item);
+
+ return;
+}
+
/* create new work queue */
struct work_queue *work_queue_new(struct thread_master *m,
const char *queue_name)
new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue));
- if (new == NULL)
- return new;
-
new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name);
new->master = m;
SET_FLAG(new->flags, WQ_UNPLUGGED);
return new;
}
-void work_queue_free(struct work_queue *wq)
+void work_queue_free_original(struct work_queue *wq)
{
if (wq->thread != NULL)
thread_cancel(wq->thread);
+ while (!work_queue_empty(wq)) {
+ struct work_queue_item *item = work_queue_last_item(wq);
+
+ work_queue_item_remove(wq, item);
+ }
+
listnode_delete(work_queues, wq);
XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
return;
}
+void work_queue_free_and_null(struct work_queue **wq)
+{
+ work_queue_free_original(*wq);
+ *wq = NULL;
+}
+
bool work_queue_is_scheduled(struct work_queue *wq)
{
return (wq->thread != NULL);
static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
{
/* if appropriate, schedule work queue thread */
- if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) &&
- !work_queue_empty(wq)) {
+ if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL)
+ && !work_queue_empty(wq)) {
wq->thread = NULL;
thread_add_timer_msec(wq->master, work_queue_run, wq, delay,
&wq->thread);
assert(wq);
- if (!(item = work_queue_item_new(wq))) {
- zlog_err("%s: unable to get new queue item", __func__);
- return;
- }
+ item = work_queue_item_new(wq);
item->data = data;
work_queue_item_enqueue(wq, item);
return;
}
-static void work_queue_item_remove(struct work_queue *wq,
- struct work_queue_item *item)
-{
- assert(item && item->data);
-
- /* call private data deletion callback if needed */
- if (wq->spec.del_item_data)
- wq->spec.del_item_data(wq, item->data);
-
- work_queue_item_dequeue(wq, item);
-
- work_queue_item_free(item);
-
- return;
-}
-
-static void work_queue_item_requeue(struct work_queue *wq, struct work_queue_item *item)
+static void work_queue_item_requeue(struct work_queue *wq,
+ struct work_queue_item *item)
{
work_queue_item_dequeue(wq, item);
char yielded = 0;
wq = THREAD_ARG(thread);
- wq->thread = NULL;
assert(wq);
+ wq->thread = NULL;
+
/* calculate cycle granularity:
* list iteration == 1 run
* listnode processing == 1 cycle
if (wq->cycles.granularity == 0)
wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
- STAILQ_FOREACH_SAFE(item, &wq->items, wq, titem) {
+ STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) {
assert(item && item->data);
/* dont run items which are past their allowed retries */