]> git.proxmox.com Git - mirror_spl-debian.git/commitdiff
taskq delay/cancel functionality
authorBrian Behlendorf <behlendorf1@llnl.gov>
Thu, 6 Dec 2012 20:38:19 +0000 (12:38 -0800)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Wed, 12 Dec 2012 17:54:07 +0000 (09:54 -0800)
Add the ability to dispatch a delayed task to a taskq.  The desired
behavior is for the task to be queued but not executed by a worker
thread until the expiration time is reached.  To achieve this two
new functions were added.

* taskq_dispatch_delay() -

  This function behaves exactly like taskq_dispatch() however it
takes a third 'expire_time' argument.  The caller should pass the
desired time the task should be executed as an absolute value in
jiffies.  The task is guarenteed not to run before this time, it
may run slightly latter if all the worker threads are busy.

* taskq_cancel_id() -

  Given a task id attempt to cancel the task before it gets executed.
This is primarily useful for canceling delay tasks but can be used for
canceling any previously dispatched task.  There are three possible
return values.

  0      - The task was found and canceled before it was executed.
  ENOENT - The task was not found, either it was already run or an
           invalid task id was supplied by the caller.
  EBUSY  - The task is currently executing any may not be canceled.
           This function will block until the task has been completed.

* taskq_wait_all() -

  The taskq_wait_id() function was renamed taskq_wait_all() to more
clearly reflect its actual behavior.  It is only curreny used by
the splat taskq regression tests.

* taskq_wait_id() -

  Historically, the only difference between this function and
taskq_wait() was that you passed the task id.  In both functions you
would block until ALL lower task ids which executed.  This was
semantically correct but could be very slow particularly if there
were delay tasks submitted.

  To better accomidate the delay tasks this function was reimplemnted.
It will now only block until the passed task id has been completed.

This is actually a fairly low risk change for a few reasons.

* Only new ZFS callers will make use of the new interfaces and
  very little common code was changed to support the new functions.

* The existing taskq_wait() implementation was not changed just
  slightly refactored.

* The newly optimized taskq_wait_id() implementation was never
  used by ZFS we can't accidentally introduce a new bug there.

NOTE: This functionality does not exist in the Illumos taskqs.

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
include/sys/taskq.h
module/spl/spl-taskq.c
module/splat/splat-taskq.c

index 84b5632080c7dbec3581c27c7fcf4a743fd32e65..3839de28859662c1108202f46a7d4adc44502848 100644 (file)
 #define TASKQ_THREADS_CPU_PCT  0x00000008
 #define TASKQ_DC_BATCH         0x00000010
 
-typedef unsigned long taskqid_t;
-typedef void (task_func_t)(void *);
-
-typedef struct taskq_ent {
-       spinlock_t              tqent_lock;
-       struct list_head        tqent_list;
-       taskqid_t               tqent_id;
-       task_func_t             *tqent_func;
-       void                    *tqent_arg;
-       uintptr_t               tqent_flags;
-} taskq_ent_t;
-
-#define TQENT_FLAG_PREALLOC    0x1
-
 /*
  * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
  * KM_SLEEP/KM_NOSLEEP.  TQ_NOQUEUE/TQ_NOALLOC are set particularly
@@ -69,6 +55,9 @@ typedef struct taskq_ent {
 #define TQ_FRONT               0x08000000
 #define TQ_ACTIVE              0x80000000
 
+typedef unsigned long taskqid_t;
+typedef void (task_func_t)(void *);
+
 typedef struct taskq {
        spinlock_t              tq_lock;       /* protects taskq_t */
        unsigned long           tq_lock_flags; /* interrupt state */
@@ -87,16 +76,33 @@ typedef struct taskq {
        struct list_head        tq_free_list;  /* free task_t's */
        struct list_head        tq_pend_list;  /* pending task_t's */
        struct list_head        tq_prio_list;  /* priority pending task_t's */
+       struct list_head        tq_delay_list; /* delayed task_t's */
        wait_queue_head_t       tq_work_waitq; /* new work waitq */
        wait_queue_head_t       tq_wait_waitq; /* wait waitq */
 } taskq_t;
 
+typedef struct taskq_ent {
+       spinlock_t              tqent_lock;
+       wait_queue_head_t       tqent_waitq;
+       struct timer_list       tqent_timer;
+       struct list_head        tqent_list;
+       taskqid_t               tqent_id;
+       task_func_t             *tqent_func;
+       void                    *tqent_arg;
+       taskq_t                 *tqent_taskq;
+       uintptr_t               tqent_flags;
+} taskq_ent_t;
+
+#define TQENT_FLAG_PREALLOC     0x1
+#define TQENT_FLAG_CANCEL       0x2
+
 typedef struct taskq_thread {
        struct list_head        tqt_thread_list;
        struct list_head        tqt_active_list;
        struct task_struct      *tqt_thread;
        taskq_t                 *tqt_tq;
        taskqid_t               tqt_id;
+       taskq_ent_t             *tqt_task;
        uintptr_t               tqt_flags;
 } taskq_thread_t;
 
@@ -104,6 +110,8 @@ typedef struct taskq_thread {
 extern taskq_t *system_taskq;
 
 extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
+extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *,
+    uint_t, clock_t);
 extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
     taskq_ent_t *);
 extern int taskq_empty_ent(taskq_ent_t *);
@@ -111,7 +119,9 @@ extern void taskq_init_ent(taskq_ent_t *);
 extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
 extern void taskq_destroy(taskq_t *);
 extern void taskq_wait_id(taskq_t *, taskqid_t);
+extern void taskq_wait_all(taskq_t *, taskqid_t);
 extern void taskq_wait(taskq_t *);
+extern int taskq_cancel_id(taskq_t *, taskqid_t);
 extern int taskq_member(taskq_t *, void *);
 
 #define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \
index 2007cf084e7ba9bcd1b6b6440ff2ac4a72ca56b6..c9ae0a50b65edd186560c784fd345bf16ae930f2 100644 (file)
@@ -69,6 +69,8 @@ retry:
                t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
 
                ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+               ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
+               ASSERT(!timer_pending(&t->tqent_timer));
 
                list_del_init(&t->tqent_list);
                SRETURN(t);
@@ -126,6 +128,7 @@ task_free(taskq_t *tq, taskq_ent_t *t)
        ASSERT(t);
        ASSERT(spin_is_locked(&tq->tq_lock));
        ASSERT(list_empty(&t->tqent_list));
+       ASSERT(!timer_pending(&t->tqent_timer));
 
        kmem_free(t, sizeof(taskq_ent_t));
        tq->tq_nalloc--;
@@ -145,6 +148,9 @@ task_done(taskq_t *tq, taskq_ent_t *t)
        ASSERT(t);
        ASSERT(spin_is_locked(&tq->tq_lock));
 
+       /* Wake tasks blocked in taskq_wait_id() */
+       wake_up_all(&t->tqent_waitq);
+
        list_del_init(&t->tqent_list);
 
        if (tq->tq_nalloc <= tq->tq_minalloc) {
@@ -162,56 +168,261 @@ task_done(taskq_t *tq, taskq_ent_t *t)
 }
 
 /*
- * As tasks are submitted to the task queue they are assigned a
- * monotonically increasing taskqid and added to the tail of the pending
- * list.  As worker threads become available the tasks are removed from
- * the head of the pending or priority list, giving preference to the
- * priority list.  The tasks are then removed from their respective
- * list, and the taskq_thread servicing the task is added to the active
- * list, preserving the order using the serviced task's taskqid.
- * Finally, as tasks complete the taskq_thread servicing the task is
- * removed from the active list.  This means that the pending task and
- * active taskq_thread lists are always kept sorted by taskqid. Thus the
- * lowest outstanding incomplete taskqid can be determined simply by
- * checking the min taskqid for each head item on the pending, priority,
- * and active taskq_thread list. This value is stored in
- * tq->tq_lowest_id and only updated to the new lowest id when the
- * previous lowest id completes.  All taskqids lower than
- * tq->tq_lowest_id must have completed.  It is also possible larger
- * taskqid's have completed because they may be processed in parallel by
- * several worker threads.  However, this is not a problem because the
- * behavior of taskq_wait_id() is to block until all previously
- * submitted taskqid's have completed.
- *
- * XXX: Taskqid_t wrapping is not handled.  However, taskqid_t's are
- * 64-bit values so even if a taskq is processing 2^24 (16,777,216)
- * taskqid_ts per second it will still take 2^40 seconds, 34,865 years,
- * before the wrap occurs.  I can live with that for now.
+ * When a delayed task timer expires remove it from the delay list and
+ * add it to the priority list in order for immediate processing.
  */
-static int
-taskq_wait_check(taskq_t *tq, taskqid_t id)
+static void
+task_expire(unsigned long data)
 {
-       int rc;
+       taskq_ent_t *w, *t = (taskq_ent_t *)data;
+       taskq_t *tq = t->tqent_taskq;
+       struct list_head *l;
 
        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
-       rc = (id < tq->tq_lowest_id);
+
+       if (t->tqent_flags & TQENT_FLAG_CANCEL) {
+               ASSERT(list_empty(&t->tqent_list));
+               spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+               return;
+       }
+
+       /*
+        * The priority list must be maintained in strict task id order
+        * from lowest to highest for lowest_id to be easily calculable.
+        */
+       list_del(&t->tqent_list);
+       list_for_each_prev(l, &tq->tq_prio_list) {
+               w = list_entry(l, taskq_ent_t, tqent_list);
+               if (w->tqent_id < t->tqent_id) {
+                       list_add(&t->tqent_list, l);
+                       break;
+               }
+       }
+       if (l == &tq->tq_prio_list)
+               list_add(&t->tqent_list, &tq->tq_prio_list);
+
        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
-       SRETURN(rc);
+       wake_up(&tq->tq_work_waitq);
+}
+
+/*
+ * Returns the lowest incomplete taskqid_t.  The taskqid_t may
+ * be queued on the pending list, on the priority list, on the
+ * delay list, or on the work list currently being handled, but
+ * it is not 100% complete yet.
+ */
+static taskqid_t
+taskq_lowest_id(taskq_t *tq)
+{
+       taskqid_t lowest_id = tq->tq_next_id;
+       taskq_ent_t *t;
+       taskq_thread_t *tqt;
+       SENTRY;
+
+       ASSERT(tq);
+       ASSERT(spin_is_locked(&tq->tq_lock));
+
+       if (!list_empty(&tq->tq_pend_list)) {
+               t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
+               lowest_id = MIN(lowest_id, t->tqent_id);
+       }
+
+       if (!list_empty(&tq->tq_prio_list)) {
+               t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
+               lowest_id = MIN(lowest_id, t->tqent_id);
+       }
+
+       if (!list_empty(&tq->tq_delay_list)) {
+               t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
+               lowest_id = MIN(lowest_id, t->tqent_id);
+       }
+
+       if (!list_empty(&tq->tq_active_list)) {
+               tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
+                   tqt_active_list);
+               ASSERT(tqt->tqt_id != 0);
+               lowest_id = MIN(lowest_id, tqt->tqt_id);
+       }
+
+       SRETURN(lowest_id);
+}
+
+/*
+ * Insert a task into a list keeping the list sorted by increasing taskqid.
+ */
+static void
+taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
+{
+       taskq_thread_t *w;
+       struct list_head *l;
+
+       SENTRY;
+       ASSERT(tq);
+       ASSERT(tqt);
+       ASSERT(spin_is_locked(&tq->tq_lock));
+
+       list_for_each_prev(l, &tq->tq_active_list) {
+               w = list_entry(l, taskq_thread_t, tqt_active_list);
+               if (w->tqt_id < tqt->tqt_id) {
+                       list_add(&tqt->tqt_active_list, l);
+                       break;
+               }
+       }
+       if (l == &tq->tq_active_list)
+               list_add(&tqt->tqt_active_list, &tq->tq_active_list);
+
+       SEXIT;
+}
+
+/*
+ * Find and return a task from the given list if it exists.  The list
+ * must be in lowest to highest task id order.
+ */
+static taskq_ent_t *
+taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
+{
+       struct list_head *l;
+       taskq_ent_t *t;
+       SENTRY;
+
+       ASSERT(spin_is_locked(&tq->tq_lock));
+
+       list_for_each(l, lh) {
+               t = list_entry(l, taskq_ent_t, tqent_list);
+
+               if (t->tqent_id == id)
+                       SRETURN(t);
+
+               if (t->tqent_id > id)
+                       break;
+       }
+
+       SRETURN(NULL);
 }
 
+/*
+ * Find an already dispatched task given the task id regardless of what
+ * state it is in.  If a task is still pending or executing it will be
+ * returned and 'active' set appropriately.  If the task has already
+ * been run then NULL is returned.
+ */
+static taskq_ent_t *
+taskq_find(taskq_t *tq, taskqid_t id, int *active)
+{
+       taskq_thread_t *tqt;
+       struct list_head *l;
+       taskq_ent_t *t;
+       SENTRY;
+
+       ASSERT(spin_is_locked(&tq->tq_lock));
+       *active = 0;
+
+       t = taskq_find_list(tq, &tq->tq_delay_list, id);
+       if (t)
+               SRETURN(t);
+
+       t = taskq_find_list(tq, &tq->tq_prio_list, id);
+       if (t)
+               SRETURN(t);
+
+       t = taskq_find_list(tq, &tq->tq_pend_list, id);
+       if (t)
+               SRETURN(t);
+
+       list_for_each(l, &tq->tq_active_list) {
+               tqt = list_entry(l, taskq_thread_t, tqt_active_list);
+               if (tqt->tqt_id == id) {
+                       t = tqt->tqt_task;
+                       *active = 1;
+                       SRETURN(t);
+               }
+       }
+
+       SRETURN(NULL);
+}
+
+/*
+ * The taskq_wait_id() function blocks until the passed task id completes.
+ * This does not guarantee that all lower task id's have completed.
+ */
 void
 taskq_wait_id(taskq_t *tq, taskqid_t id)
 {
+       DEFINE_WAIT(wait);
+       taskq_ent_t *t;
+       int active = 0;
        SENTRY;
+
        ASSERT(tq);
+       ASSERT(id > 0);
 
-       wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
+       spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+       t = taskq_find(tq, id, &active);
+       if (t)
+               prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE);
+       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+       /*
+        * We rely on the kernels autoremove_wake_function() function to
+        * remove us from the wait queue in the context of wake_up().
+        * Once woken the taskq_ent_t pointer must never be accessed.
+        */
+       if (t) {
+               t = NULL;
+               schedule();
+               __set_current_state(TASK_RUNNING);
+       }
 
        SEXIT;
 }
 EXPORT_SYMBOL(taskq_wait_id);
 
+/*
+ * The taskq_wait() function will block until all previously submitted
+ * tasks have been completed.  A previously submitted task is defined as
+ * a task with a lower task id than the current task queue id.  Note that
+ * all task id's are assigned monotonically at dispatch time.
+ *
+ * Waiting for all previous tasks to complete is accomplished by tracking
+ * the lowest outstanding task id.  As tasks are dispatched they are added
+ * added to the tail of the pending, priority, or delay lists.  And as
+ * worker threads become available the tasks are removed from the heads
+ * of these lists and linked to the worker threads.  This ensures the
+ * lists are kept in lowest to highest task id order.
+ *
+ * Therefore the lowest outstanding task id can be quickly determined by
+ * checking the head item from all of these lists.  This value is stored
+ * with the task queue as the lowest id.  It only needs to be recalculated
+ * when either the task with the current lowest id completes or is canceled.
+ *
+ * By blocking until the lowest task id exceeds the current task id when
+ * the function was called we ensure all previous tasks have completed.
+ *
+ * NOTE: When there are multiple worked threads it is possible for larger
+ * task ids to complete before smaller ones.  Conversely when the task
+ * queue contains delay tasks with small task ids, you may block for a
+ * considerable length of time waiting for them to expire and execute.
+ */
+static int
+taskq_wait_check(taskq_t *tq, taskqid_t id)
+{
+       int rc;
+
+       spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+       rc = (id < tq->tq_lowest_id);
+       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+       SRETURN(rc);
+}
+
+void
+taskq_wait_all(taskq_t *tq, taskqid_t id)
+{
+       wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
+}
+EXPORT_SYMBOL(taskq_wait_all);
+
 void
 taskq_wait(taskq_t *tq)
 {
@@ -224,7 +435,7 @@ taskq_wait(taskq_t *tq)
        id = tq->tq_next_id - 1;
        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
-       taskq_wait_id(tq, id);
+       taskq_wait_all(tq, id);
 
        SEXIT;
 
@@ -251,6 +462,63 @@ taskq_member(taskq_t *tq, void *t)
 }
 EXPORT_SYMBOL(taskq_member);
 
+/*
+ * Cancel an already dispatched task given the task id.  Still pending tasks
+ * will be immediately canceled, and if the task is active the function will
+ * block until it completes.  Preallocated tasks which are canceled must be
+ * freed by the caller.
+ */
+int
+taskq_cancel_id(taskq_t *tq, taskqid_t id)
+{
+       taskq_ent_t *t;
+       int active = 0;
+       int rc = ENOENT;
+       SENTRY;
+
+       ASSERT(tq);
+
+       spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+       t = taskq_find(tq, id, &active);
+       if (t && !active) {
+               list_del_init(&t->tqent_list);
+               t->tqent_flags |= TQENT_FLAG_CANCEL;
+
+               /*
+                * When canceling the lowest outstanding task id we
+                * must recalculate the new lowest outstanding id.
+                */
+               if (tq->tq_lowest_id == t->tqent_id) {
+                       tq->tq_lowest_id = taskq_lowest_id(tq);
+                       ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
+               }
+
+               /*
+                * The task_expire() function takes the tq->tq_lock so drop
+                * drop the lock before synchronously cancelling the timer.
+                */
+               if (timer_pending(&t->tqent_timer)) {
+                       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+                       del_timer_sync(&t->tqent_timer);
+                       spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+               }
+
+               if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
+                       task_done(tq, t);
+
+               rc = 0;
+       }
+       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+       if (active) {
+               taskq_wait_id(tq, id);
+               rc = EBUSY;
+       }
+
+       SRETURN(rc);
+}
+EXPORT_SYMBOL(taskq_cancel_id);
+
 taskqid_t
 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
 {
@@ -287,6 +555,10 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
        tq->tq_next_id++;
        t->tqent_func = func;
        t->tqent_arg = arg;
+       t->tqent_taskq = tq;
+       t->tqent_timer.data = 0;
+       t->tqent_timer.function = NULL;
+       t->tqent_timer.expires = 0;
 
        ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
 
@@ -299,6 +571,50 @@ out:
 }
 EXPORT_SYMBOL(taskq_dispatch);
 
+taskqid_t
+taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
+    uint_t flags, clock_t expire_time)
+{
+       taskq_ent_t *t;
+       taskqid_t rc = 0;
+       SENTRY;
+
+       ASSERT(tq);
+       ASSERT(func);
+
+       spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+
+       /* Taskq being destroyed and all tasks drained */
+       if (!(tq->tq_flags & TQ_ACTIVE))
+               SGOTO(out, rc = 0);
+
+       if ((t = task_alloc(tq, flags)) == NULL)
+               SGOTO(out, rc = 0);
+
+       spin_lock(&t->tqent_lock);
+
+       /* Queue to the delay list for subsequent execution */
+       list_add_tail(&t->tqent_list, &tq->tq_delay_list);
+
+       t->tqent_id = rc = tq->tq_next_id;
+       tq->tq_next_id++;
+       t->tqent_func = func;
+       t->tqent_arg = arg;
+       t->tqent_taskq = tq;
+       t->tqent_timer.data = (unsigned long)t;
+       t->tqent_timer.function = task_expire;
+       t->tqent_timer.expires = (unsigned long)expire_time;
+       add_timer(&t->tqent_timer);
+
+       ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+
+       spin_unlock(&t->tqent_lock);
+out:
+       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+       SRETURN(rc);
+}
+EXPORT_SYMBOL(taskq_dispatch_delay);
+
 void
 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
    taskq_ent_t *t)
@@ -335,6 +651,7 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
        tq->tq_next_id++;
        t->tqent_func = func;
        t->tqent_arg = arg;
+       t->tqent_taskq = tq;
 
        spin_unlock(&t->tqent_lock);
 
@@ -356,78 +673,17 @@ void
 taskq_init_ent(taskq_ent_t *t)
 {
        spin_lock_init(&t->tqent_lock);
+       init_waitqueue_head(&t->tqent_waitq);
+       init_timer(&t->tqent_timer);
        INIT_LIST_HEAD(&t->tqent_list);
        t->tqent_id = 0;
        t->tqent_func = NULL;
        t->tqent_arg = NULL;
        t->tqent_flags = 0;
+       t->tqent_taskq = NULL;
 }
 EXPORT_SYMBOL(taskq_init_ent);
 
-/*
- * Returns the lowest incomplete taskqid_t.  The taskqid_t may
- * be queued on the pending list, on the priority list,  or on
- * the work list currently being handled, but it is not 100%
- * complete yet.
- */
-static taskqid_t
-taskq_lowest_id(taskq_t *tq)
-{
-       taskqid_t lowest_id = tq->tq_next_id;
-       taskq_ent_t *t;
-       taskq_thread_t *tqt;
-       SENTRY;
-
-       ASSERT(tq);
-       ASSERT(spin_is_locked(&tq->tq_lock));
-
-       if (!list_empty(&tq->tq_pend_list)) {
-               t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
-               lowest_id = MIN(lowest_id, t->tqent_id);
-       }
-
-       if (!list_empty(&tq->tq_prio_list)) {
-               t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
-               lowest_id = MIN(lowest_id, t->tqent_id);
-       }
-
-       if (!list_empty(&tq->tq_active_list)) {
-               tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
-                   tqt_active_list);
-               ASSERT(tqt->tqt_id != 0);
-               lowest_id = MIN(lowest_id, tqt->tqt_id);
-       }
-
-       SRETURN(lowest_id);
-}
-
-/*
- * Insert a task into a list keeping the list sorted by increasing taskqid.
- */
-static void
-taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
-{
-       taskq_thread_t *w;
-       struct list_head *l;
-
-       SENTRY;
-       ASSERT(tq);
-       ASSERT(tqt);
-       ASSERT(spin_is_locked(&tq->tq_lock));
-
-       list_for_each_prev(l, &tq->tq_active_list) {
-               w = list_entry(l, taskq_thread_t, tqt_active_list);
-               if (w->tqt_id < tqt->tqt_id) {
-                       list_add(&tqt->tqt_active_list, l);
-                       break;
-               }
-       }
-       if (l == &tq->tq_active_list)
-               list_add(&tqt->tqt_active_list, &tq->tq_active_list);
-
-       SEXIT;
-}
-
 static int
 taskq_thread(void *args)
 {
@@ -481,6 +737,7 @@ taskq_thread(void *args)
                         * preallocated taskq_ent_t, tqent_id must be
                         * stored prior to executing tqent_func. */
                        tqt->tqt_id = t->tqent_id;
+                       tqt->tqt_task = t;
 
                        /* We must store a copy of the flags prior to
                         * servicing the task (servicing a prealloc'd task
@@ -499,6 +756,7 @@ taskq_thread(void *args)
                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
                        tq->tq_nactive--;
                        list_del_init(&tqt->tqt_active_list);
+                       tqt->tqt_task = NULL;
 
                        /* For prealloc'd tasks, we don't free anything. */
                        if ((tq->tq_flags & TASKQ_DYNAMIC) ||
@@ -576,6 +834,7 @@ taskq_create(const char *name, int nthreads, pri_t pri,
        INIT_LIST_HEAD(&tq->tq_free_list);
        INIT_LIST_HEAD(&tq->tq_pend_list);
        INIT_LIST_HEAD(&tq->tq_prio_list);
+       INIT_LIST_HEAD(&tq->tq_delay_list);
        init_waitqueue_head(&tq->tq_work_waitq);
        init_waitqueue_head(&tq->tq_wait_waitq);
 
@@ -669,6 +928,7 @@ taskq_destroy(taskq_t *tq)
        ASSERT(list_empty(&tq->tq_free_list));
        ASSERT(list_empty(&tq->tq_pend_list));
        ASSERT(list_empty(&tq->tq_prio_list));
+       ASSERT(list_empty(&tq->tq_delay_list));
 
        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
index b94930cc9bc0a62cc2498ff6252b11b7e36170cb..7fad4627e5b995b9cb7c3cb1eda652b28b68907f 100644 (file)
@@ -548,10 +548,10 @@ splat_taskq_test4(struct file *file, void *arg)
  * next pending task as soon as it completes its current task.  This
  * means that tasks do not strictly complete in order in which they
  * were dispatched (increasing task id).  This is fine but we need to
- * verify that taskq_wait_id() blocks until the passed task id and all
+ * verify that taskq_wait_all() blocks until the passed task id and all
  * lower task ids complete.  We do this by dispatching the following
  * specific sequence of tasks each of which block for N time units.
- * We then use taskq_wait_id() to unblock at specific task id and
+ * We then use taskq_wait_all() to unblock at specific task id and
  * verify the only the expected task ids have completed and in the
  * correct order.  The two cases of interest are:
  *
@@ -562,17 +562,17 @@ splat_taskq_test4(struct file *file, void *arg)
  *
  * The following table shows each task id and how they will be
  * scheduled.  Each rows represent one time unit and each column
- * one of the three worker threads.  The places taskq_wait_id()
+ * one of the three worker threads.  The places taskq_wait_all()
  * must unblock for a specific id are identified as well as the
  * task ids which must have completed and their order.
  *
- *       +-----+       <--- taskq_wait_id(tq, 8) unblocks
+ *       +-----+       <--- taskq_wait_all(tq, 8) unblocks
  *       |     |            Required Completion Order: 1,2,4,5,3,8,6,7
  * +-----+     |
  * |     |     |
  * |     |     +-----+
  * |     |     |  8  |
- * |     |     +-----+ <--- taskq_wait_id(tq, 3) unblocks
+ * |     |     +-----+ <--- taskq_wait_all(tq, 3) unblocks
  * |     |  7  |     |      Required Completion Order: 1,2,4,5,3
  * |     +-----+     |
  * |  6  |     |     |
@@ -712,13 +712,13 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc)
 
        splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' "
                     "waiting for taskqid %d completion\n", tq_arg.name, 3);
-       taskq_wait_id(tq, 3);
+       taskq_wait_all(tq, 3);
        if ((rc = splat_taskq_test_order(&tq_arg, order1)))
                goto out;
 
        splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' "
                     "waiting for taskqid %d completion\n", tq_arg.name, 8);
-       taskq_wait_id(tq, 8);
+       taskq_wait_all(tq, 8);
        rc = splat_taskq_test_order(&tq_arg, order2);
 
 out:
@@ -874,7 +874,7 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc)
        splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' "
                     "waiting for taskqid %d completion\n", tq_arg.name,
                     SPLAT_TASKQ_ORDER_MAX);
-       taskq_wait_id(tq, SPLAT_TASKQ_ORDER_MAX);
+       taskq_wait_all(tq, SPLAT_TASKQ_ORDER_MAX);
        rc = splat_taskq_test_order(&tq_arg, order);
 
 out:
@@ -975,7 +975,7 @@ splat_taskq_test7_impl(struct file *file, void *arg, boolean_t prealloc)
        if (tq_arg.flag == 0) {
                splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,
                             "Taskq '%s' waiting\n", tq_arg.name);
-               taskq_wait_id(tq, SPLAT_TASKQ_DEPTH_MAX);
+               taskq_wait_all(tq, SPLAT_TASKQ_DEPTH_MAX);
        }
 
        splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,