]> git.proxmox.com Git - mirror_spl-debian.git/blobdiff - module/spl/spl-taskq.c
Ensure a minimum of one slab is reclaimed
[mirror_spl-debian.git] / module / spl / spl-taskq.c
index 5960761f4a41406fba99e2b7635c2fee1ee61ad0..67a8345725d5352e4d51a4688fdccb4314127789 100644 (file)
-/*
- *  This file is part of the SPL: Solaris Porting Layer.
- *
- *  Copyright (c) 2008 Lawrence Livermore National Security, LLC.
- *  Produced at Lawrence Livermore National Laboratory
- *  Written by:
- *          Brian Behlendorf <behlendorf1@llnl.gov>,
- *          Herb Wartens <wartens2@llnl.gov>,
- *          Jim Garlick <garlick@llnl.gov>
+/*****************************************************************************\
+ *  Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
+ *  Copyright (C) 2007 The Regents of the University of California.
+ *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+ *  Written by Brian Behlendorf <behlendorf1@llnl.gov>.
  *  UCRL-CODE-235197
  *
- *  This is free software; you can redistribute it and/or modify it
- *  under the terms of the GNU General Public License as published by
- *  the Free Software Foundation; either version 2 of the License, or
- *  (at your option) any later version.
+ *  This file is part of the SPL, Solaris Porting Layer.
+ *  For details, see <http://github.com/behlendorf/spl/>.
  *
- *  This is distributed in the hope that it will be useful, but WITHOUT
+ *  The SPL is free software; you can redistribute it and/or modify it
+ *  under the terms of the GNU General Public License as published by the
+ *  Free Software Foundation; either version 2 of the License, or (at your
+ *  option) any later version.
+ *
+ *  The SPL is distributed in the hope that it will be useful, but WITHOUT
  *  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  *  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  *  for more details.
  *
  *  You should have received a copy of the GNU General Public License along
- *  with this program; if not, write to the Free Software Foundation, Inc.,
- *  51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
- */
+ *  with the SPL.  If not, see <http://www.gnu.org/licenses/>.
+ *****************************************************************************
+ *  Solaris Porting Layer (SPL) Task Queue Implementation.
+\*****************************************************************************/
 
 #include <sys/taskq.h>
 #include <sys/kmem.h>
+#include <spl-debug.h>
 
-#ifdef DEBUG_SUBSYSTEM
-#undef DEBUG_SUBSYSTEM
+#ifdef SS_DEBUG_SUBSYS
+#undef SS_DEBUG_SUBSYS
 #endif
 
-#define DEBUG_SUBSYSTEM S_TASKQ
+#define SS_DEBUG_SUBSYS SS_TASKQ
 
 /* Global system-wide dynamic task queue available for all consumers */
 taskq_t *system_taskq;
 EXPORT_SYMBOL(system_taskq);
 
-typedef struct spl_task {
-        spinlock_t              t_lock;
-        struct list_head        t_list;
-        taskqid_t               t_id;
-        task_func_t             *t_func;
-        void                    *t_arg;
-} spl_task_t;
-
-/* NOTE: Must be called with tq->tq_lock held, returns a list_t which
+/*
+ * NOTE: Must be called with tq->tq_lock held, returns a list_t which
  * is not attached to the free, work, or pending taskq lists.
  */
-static spl_task_t *
+static taskq_ent_t *
 task_alloc(taskq_t *tq, uint_t flags)
 {
-        spl_task_t *t;
+        taskq_ent_t *t;
         int count = 0;
-        ENTRY;
+        SENTRY;
 
         ASSERT(tq);
         ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP));               /* One set */
         ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
         ASSERT(spin_is_locked(&tq->tq_lock));
 retry:
-        /* Acquire spl_task_t's from free list if available */
+        /* Acquire taskq_ent_t's from free list if available */
         if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
-                t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
-                list_del_init(&t->t_list);
-                RETURN(t);
+                t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
+
+                ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+
+                list_del_init(&t->tqent_list);
+                SRETURN(t);
         }
 
         /* Free list is empty and memory allocations are prohibited */
         if (flags & TQ_NOALLOC)
-                RETURN(NULL);
+                SRETURN(NULL);
 
-        /* Hit maximum spl_task_t pool size */
+        /* Hit maximum taskq_ent_t pool size */
         if (tq->tq_nalloc >= tq->tq_maxalloc) {
                 if (flags & TQ_NOSLEEP)
-                        RETURN(NULL);
-
-                /* Sleep periodically polling the free list for an available
-                 * spl_task_t.  If a full second passes and we have not found
-                 * one gives up and return a NULL to the caller. */
-                if (flags & TQ_SLEEP) {
-                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-                        schedule_timeout(HZ / 100);
-                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
-                        if (count < 100)
-                                GOTO(retry, count++);
-
-                        RETURN(NULL);
-                }
-
-                /* Unreachable, TQ_SLEEP or TQ_NOSLEEP */
-                SBUG();
+                        SRETURN(NULL);
+
+                /*
+                 * Sleep periodically polling the free list for an available
+                 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
+                 * but we cannot block forever waiting for an taskq_entq_t to
+                 * show up in the free list, otherwise a deadlock can happen.
+                 *
+                 * Therefore, we need to allocate a new task even if the number
+                 * of allocated tasks is above tq->tq_maxalloc, but we still
+                 * end up delaying the task allocation by one second, thereby
+                 * throttling the task dispatch rate.
+                 */
+                 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+                 schedule_timeout(HZ / 100);
+                 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+                 if (count < 100)
+                        SGOTO(retry, count++);
         }
 
-       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-        t = kmem_alloc(sizeof(spl_task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
+        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+        t = kmem_alloc(sizeof(taskq_ent_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
 
-       if (t) {
-               spin_lock_init(&t->t_lock);
-                INIT_LIST_HEAD(&t->t_list);
-               t->t_id = 0;
-               t->t_func = NULL;
-               t->t_arg = NULL;
-               tq->tq_nalloc++;
-       }
+        if (t) {
+                taskq_init_ent(t);
+                tq->tq_nalloc++;
+        }
 
-        RETURN(t);
+        SRETURN(t);
 }
 
-/* NOTE: Must be called with tq->tq_lock held, expects the spl_task_t
+/*
+ * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
  * to already be removed from the free, work, or pending taskq lists.
  */
 static void
-task_free(taskq_t *tq, spl_task_t *t)
+task_free(taskq_t *tq, taskq_ent_t *t)
 {
-        ENTRY;
+        SENTRY;
 
         ASSERT(tq);
         ASSERT(t);
        ASSERT(spin_is_locked(&tq->tq_lock));
-       ASSERT(list_empty(&t->t_list));
+       ASSERT(list_empty(&t->tqent_list));
 
-        kmem_free(t, sizeof(spl_task_t));
+        kmem_free(t, sizeof(taskq_ent_t));
         tq->tq_nalloc--;
 
-       EXIT;
+       SEXIT;
 }
 
-/* NOTE: Must be called with tq->tq_lock held, either destroys the
- * spl_task_t if too many exist or moves it to the free list for later use.
+/*
+ * NOTE: Must be called with tq->tq_lock held, either destroys the
+ * taskq_ent_t if too many exist or moves it to the free list for later use.
  */
 static void
-task_done(taskq_t *tq, spl_task_t *t)
+task_done(taskq_t *tq, taskq_ent_t *t)
 {
-       ENTRY;
+       SENTRY;
        ASSERT(tq);
        ASSERT(t);
        ASSERT(spin_is_locked(&tq->tq_lock));
 
-       list_del_init(&t->t_list);
+       list_del_init(&t->tqent_list);
 
         if (tq->tq_nalloc <= tq->tq_minalloc) {
-               t->t_id = 0;
-               t->t_func = NULL;
-               t->t_arg = NULL;
-                list_add_tail(&t->t_list, &tq->tq_free_list);
+               t->tqent_id = 0;
+               t->tqent_func = NULL;
+               t->tqent_arg = NULL;
+               t->tqent_flags = 0;
+
+                list_add_tail(&t->tqent_list, &tq->tq_free_list);
        } else {
                task_free(tq, t);
        }
 
-        EXIT;
+        SEXIT;
 }
 
-/* Taskqid's are handed out in a monotonically increasing fashion per
- * taskq_t.  We don't handle taskqid wrapping yet, but fortunately it is
- * a 64-bit value so this is probably never going to happen.  The lowest
- * pending taskqid is stored in the taskq_t to make it easy for any
- * taskq_wait()'ers to know if the tasks they're waiting for have
- * completed.  Unfortunately, tq_task_lowest is kept up to date is
- * a pretty brain dead way, something more clever should be done.
+/*
+ * As tasks are submitted to the task queue they are assigned a
+ * monotonically increasing taskqid and added to the tail of the pending
+ * list.  As worker threads become available the tasks are removed from
+ * the head of the pending or priority list, giving preference to the
+ * priority list.  The tasks are then removed from their respective
+ * list, and the taskq_thread servicing the task is added to the active
+ * list, preserving the order using the serviced task's taskqid.
+ * Finally, as tasks complete the taskq_thread servicing the task is
+ * removed from the active list.  This means that the pending task and
+ * active taskq_thread lists are always kept sorted by taskqid. Thus the
+ * lowest outstanding incomplete taskqid can be determined simply by
+ * checking the min taskqid for each head item on the pending, priority,
+ * and active taskq_thread list. This value is stored in
+ * tq->tq_lowest_id and only updated to the new lowest id when the
+ * previous lowest id completes.  All taskqids lower than
+ * tq->tq_lowest_id must have completed.  It is also possible larger
+ * taskqid's have completed because they may be processed in parallel by
+ * several worker threads.  However, this is not a problem because the
+ * behavior of taskq_wait_id() is to block until all previously
+ * submitted taskqid's have completed.
+ *
+ * XXX: Taskqid_t wrapping is not handled.  However, taskqid_t's are
+ * 64-bit values so even if a taskq is processing 2^24 (16,777,216)
+ * taskqid_ts per second it will still take 2^40 seconds, 34,865 years,
+ * before the wrap occurs.  I can live with that for now.
  */
 static int
 taskq_wait_check(taskq_t *tq, taskqid_t id)
@@ -170,21 +187,18 @@ taskq_wait_check(taskq_t *tq, taskqid_t id)
        rc = (id < tq->tq_lowest_id);
        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
-       RETURN(rc);
+       SRETURN(rc);
 }
 
-/* Expected to wait for all previously scheduled tasks to complete.  We do
- * not need to wait for tasked scheduled after this call to complete.  In
- * other words we do not need to drain the entire taskq. */
 void
 __taskq_wait_id(taskq_t *tq, taskqid_t id)
 {
-       ENTRY;
+       SENTRY;
        ASSERT(tq);
 
        wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
 
-       EXIT;
+       SEXIT;
 }
 EXPORT_SYMBOL(__taskq_wait_id);
 
@@ -192,7 +206,7 @@ void
 __taskq_wait(taskq_t *tq)
 {
        taskqid_t id;
-       ENTRY;
+       SENTRY;
        ASSERT(tq);
 
        /* Wait for the largest outstanding taskqid */
@@ -202,7 +216,7 @@ __taskq_wait(taskq_t *tq)
 
        __taskq_wait_id(tq, id);
 
-       EXIT;
+       SEXIT;
 
 }
 EXPORT_SYMBOL(__taskq_wait);
@@ -210,84 +224,207 @@ EXPORT_SYMBOL(__taskq_wait);
 int
 __taskq_member(taskq_t *tq, void *t)
 {
-        int i;
-        ENTRY;
+       struct list_head *l;
+       taskq_thread_t *tqt;
+        SENTRY;
 
        ASSERT(tq);
         ASSERT(t);
 
-        for (i = 0; i < tq->tq_nthreads; i++)
-                if (tq->tq_threads[i] == (struct task_struct *)t)
-                        RETURN(1);
+       list_for_each(l, &tq->tq_thread_list) {
+               tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
+               if (tqt->tqt_thread == (struct task_struct *)t)
+                       SRETURN(1);
+       }
 
-        RETURN(0);
+        SRETURN(0);
 }
 EXPORT_SYMBOL(__taskq_member);
 
 taskqid_t
 __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
 {
-        spl_task_t *t;
+        taskq_ent_t *t;
        taskqid_t rc = 0;
-        ENTRY;
+        SENTRY;
 
         ASSERT(tq);
         ASSERT(func);
-        if (unlikely(in_atomic() && (flags & TQ_SLEEP))) {
-               CERROR("May schedule while atomic: %s/0x%08x/%d\n",
-                       current->comm, preempt_count(), current->pid);
-               SBUG();
-       }
+
+       /* Solaris assumes TQ_SLEEP if not passed explicitly */
+       if (!(flags & (TQ_SLEEP | TQ_NOSLEEP)))
+               flags |= TQ_SLEEP;
+
+       if (unlikely(in_atomic() && (flags & TQ_SLEEP)))
+               PANIC("May schedule while atomic: %s/0x%08x/%d\n",
+                   current->comm, preempt_count(), current->pid);
 
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
 
        /* Taskq being destroyed and all tasks drained */
        if (!(tq->tq_flags & TQ_ACTIVE))
-               GOTO(out, rc = 0);
+               SGOTO(out, rc = 0);
 
        /* Do not queue the task unless there is idle thread for it */
        ASSERT(tq->tq_nactive <= tq->tq_nthreads);
        if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
-               GOTO(out, rc = 0);
+               SGOTO(out, rc = 0);
 
         if ((t = task_alloc(tq, flags)) == NULL)
-               GOTO(out, rc = 0);
+               SGOTO(out, rc = 0);
+
+       spin_lock(&t->tqent_lock);
 
-       spin_lock(&t->t_lock);
-       list_add_tail(&t->t_list, &tq->tq_pend_list);
-       t->t_id = rc = tq->tq_next_id;
+       /* Queue to the priority list instead of the pending list */
+       if (flags & TQ_FRONT)
+               list_add_tail(&t->tqent_list, &tq->tq_prio_list);
+       else
+               list_add_tail(&t->tqent_list, &tq->tq_pend_list);
+
+       t->tqent_id = rc = tq->tq_next_id;
        tq->tq_next_id++;
-        t->t_func = func;
-        t->t_arg = arg;
-       spin_unlock(&t->t_lock);
+        t->tqent_func = func;
+        t->tqent_arg = arg;
+
+       ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+
+       spin_unlock(&t->tqent_lock);
 
        wake_up(&tq->tq_work_waitq);
 out:
        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-       RETURN(rc);
+       SRETURN(rc);
 }
 EXPORT_SYMBOL(__taskq_dispatch);
 
-/* NOTE: Must be called with tq->tq_lock held */
+void
+__taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
+   taskq_ent_t *t)
+{
+       SENTRY;
+
+       ASSERT(tq);
+       ASSERT(func);
+       ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
+
+       spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+
+       /* Taskq being destroyed and all tasks drained */
+       if (!(tq->tq_flags & TQ_ACTIVE)) {
+               t->tqent_id = 0;
+               goto out;
+       }
+
+       spin_lock(&t->tqent_lock);
+
+       /*
+        * Mark it as a prealloc'd task.  This is important
+        * to ensure that we don't free it later.
+        */
+       t->tqent_flags |= TQENT_FLAG_PREALLOC;
+
+       /* Queue to the priority list instead of the pending list */
+       if (flags & TQ_FRONT)
+               list_add_tail(&t->tqent_list, &tq->tq_prio_list);
+       else
+               list_add_tail(&t->tqent_list, &tq->tq_pend_list);
+
+       t->tqent_id = tq->tq_next_id;
+       tq->tq_next_id++;
+       t->tqent_func = func;
+       t->tqent_arg = arg;
+
+       spin_unlock(&t->tqent_lock);
+
+       wake_up(&tq->tq_work_waitq);
+out:
+       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+       SEXIT;
+}
+EXPORT_SYMBOL(__taskq_dispatch_ent);
+
+int
+__taskq_empty_ent(taskq_ent_t *t)
+{
+       return list_empty(&t->tqent_list);
+}
+EXPORT_SYMBOL(__taskq_empty_ent);
+
+void
+__taskq_init_ent(taskq_ent_t *t)
+{
+       spin_lock_init(&t->tqent_lock);
+       INIT_LIST_HEAD(&t->tqent_list);
+       t->tqent_id = 0;
+       t->tqent_func = NULL;
+       t->tqent_arg = NULL;
+       t->tqent_flags = 0;
+}
+EXPORT_SYMBOL(__taskq_init_ent);
+
+/*
+ * Returns the lowest incomplete taskqid_t.  The taskqid_t may
+ * be queued on the pending list, on the priority list,  or on
+ * the work list currently being handled, but it is not 100%
+ * complete yet.
+ */
 static taskqid_t
 taskq_lowest_id(taskq_t *tq)
 {
        taskqid_t lowest_id = tq->tq_next_id;
-        spl_task_t *t;
-       ENTRY;
+        taskq_ent_t *t;
+       taskq_thread_t *tqt;
+       SENTRY;
 
        ASSERT(tq);
        ASSERT(spin_is_locked(&tq->tq_lock));
 
-       list_for_each_entry(t, &tq->tq_pend_list, t_list)
-               if (t->t_id < lowest_id)
-                       lowest_id = t->t_id;
+       if (!list_empty(&tq->tq_pend_list)) {
+               t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
+               lowest_id = MIN(lowest_id, t->tqent_id);
+       }
+
+       if (!list_empty(&tq->tq_prio_list)) {
+               t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
+               lowest_id = MIN(lowest_id, t->tqent_id);
+       }
 
-       list_for_each_entry(t, &tq->tq_work_list, t_list)
-               if (t->t_id < lowest_id)
-                       lowest_id = t->t_id;
+       if (!list_empty(&tq->tq_active_list)) {
+               tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
+                                tqt_active_list);
+               ASSERT(tqt->tqt_id != 0);
+               lowest_id = MIN(lowest_id, tqt->tqt_id);
+       }
 
-       RETURN(lowest_id);
+       SRETURN(lowest_id);
+}
+
+/*
+ * Insert a task into a list keeping the list sorted by increasing
+ * taskqid.
+ */
+static void
+taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
+{
+       taskq_thread_t *w;
+       struct list_head *l;
+
+       SENTRY;
+       ASSERT(tq);
+       ASSERT(tqt);
+       ASSERT(spin_is_locked(&tq->tq_lock));
+
+       list_for_each_prev(l, &tq->tq_active_list) {
+               w = list_entry(l, taskq_thread_t, tqt_active_list);
+               if (w->tqt_id < tqt->tqt_id) {
+                       list_add(&tqt->tqt_active_list, l);
+                       break;
+               }
+       }
+       if (l == &tq->tq_active_list)
+               list_add(&tqt->tqt_active_list, &tq->tq_active_list);
+
+       SEXIT;
 }
 
 static int
@@ -295,14 +432,20 @@ taskq_thread(void *args)
 {
         DECLARE_WAITQUEUE(wait, current);
         sigset_t blocked;
-       taskqid_t id;
-        taskq_t *tq = args;
-        spl_task_t *t;
-       ENTRY;
+       taskq_thread_t *tqt = args;
+        taskq_t *tq;
+        taskq_ent_t *t;
+       struct list_head *pend_list;
+       SENTRY;
 
-        ASSERT(tq);
+        ASSERT(tqt);
+       tq = tqt->tqt_tq;
         current->flags |= PF_NOFREEZE;
 
+       /* Disable the direct memory reclaim path */
+       if (tq->tq_flags & TASKQ_NORECLAIM)
+               current->flags |= PF_MEMALLOC;
+
         sigfillset(&blocked);
         sigprocmask(SIG_BLOCK, &blocked, NULL);
         flush_signals(current);
@@ -314,38 +457,66 @@ taskq_thread(void *args)
 
         while (!kthread_should_stop()) {
 
-               add_wait_queue(&tq->tq_work_waitq, &wait);
-               if (list_empty(&tq->tq_pend_list)) {
+               if (list_empty(&tq->tq_pend_list) &&
+                   list_empty(&tq->tq_prio_list)) {
+                       add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
                        schedule();
                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+                       remove_wait_queue(&tq->tq_work_waitq, &wait);
                } else {
                        __set_current_state(TASK_RUNNING);
                }
 
-               remove_wait_queue(&tq->tq_work_waitq, &wait);
-                if (!list_empty(&tq->tq_pend_list)) {
-                        t = list_entry(tq->tq_pend_list.next,spl_task_t,t_list);
-                        list_del_init(&t->t_list);
-                       list_add_tail(&t->t_list, &tq->tq_work_list);
+
+               if (!list_empty(&tq->tq_prio_list))
+                       pend_list = &tq->tq_prio_list;
+               else if (!list_empty(&tq->tq_pend_list))
+                       pend_list = &tq->tq_pend_list;
+               else
+                       pend_list = NULL;
+
+               if (pend_list) {
+                        t = list_entry(pend_list->next, taskq_ent_t, tqent_list);
+                        list_del_init(&t->tqent_list);
+
+                       /* In order to support recursively dispatching a
+                        * preallocated taskq_ent_t, tqent_id must be
+                        * stored prior to executing tqent_func. */
+                       tqt->tqt_id = t->tqent_id;
+
+                       /* We must store a copy of the flags prior to
+                        * servicing the task (servicing a prealloc'd task
+                        * returns the ownership of the tqent back to
+                        * the caller of taskq_dispatch). Thus,
+                        * tqent_flags _may_ change within the call. */
+                       tqt->tqt_flags = t->tqent_flags;
+
+                       taskq_insert_in_order(tq, tqt);
                         tq->tq_nactive++;
                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
                        /* Perform the requested task */
-                        t->t_func(t->t_arg);
+                        t->tqent_func(t->tqent_arg);
 
                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
                         tq->tq_nactive--;
-                       id = t->t_id;
-                        task_done(tq, t);
+                       list_del_init(&tqt->tqt_active_list);
+
+                       /* For prealloc'd tasks, we don't free anything. */
+                       if ((tq->tq_flags & TASKQ_DYNAMIC) ||
+                           !(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
+                               task_done(tq, t);
 
                        /* When the current lowest outstanding taskqid is
                         * done calculate the new lowest outstanding id */
-                       if (tq->tq_lowest_id == id) {
+                       if (tq->tq_lowest_id == tqt->tqt_id) {
                                tq->tq_lowest_id = taskq_lowest_id(tq);
-                               ASSERT(tq->tq_lowest_id > id);
+                               ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
                        }
 
+                       tqt->tqt_id = 0;
+                       tqt->tqt_flags = 0;
                         wake_up_all(&tq->tq_wait_waitq);
                }
 
@@ -355,9 +526,12 @@ taskq_thread(void *args)
 
        __set_current_state(TASK_RUNNING);
         tq->tq_nthreads--;
+       list_del_init(&tqt->tqt_thread_list);
+       kmem_free(tqt, sizeof(taskq_thread_t));
+
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
-       RETURN(0);
+       SRETURN(0);
 }
 
 taskq_t *
@@ -365,9 +539,9 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
                int minalloc, int maxalloc, uint_t flags)
 {
         taskq_t *tq;
-        struct task_struct *t;
+       taskq_thread_t *tqt;
         int rc = 0, i, j = 0;
-        ENTRY;
+        SENTRY;
 
         ASSERT(name != NULL);
         ASSERT(pri <= maxclsyspri);
@@ -375,18 +549,23 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
         ASSERT(maxalloc <= INT_MAX);
         ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
 
+       /* Scale the number of threads using nthreads as a percentage */
+       if (flags & TASKQ_THREADS_CPU_PCT) {
+               ASSERT(nthreads <= 100);
+               ASSERT(nthreads >= 0);
+               nthreads = MIN(nthreads, 100);
+               nthreads = MAX(nthreads, 0);
+               nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
+       }
+
         tq = kmem_alloc(sizeof(*tq), KM_SLEEP);
         if (tq == NULL)
-                RETURN(NULL);
-
-        tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
-        if (tq->tq_threads == NULL) {
-                kmem_free(tq, sizeof(*tq));
-                RETURN(NULL);
-        }
+                SRETURN(NULL);
 
         spin_lock_init(&tq->tq_lock);
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+        INIT_LIST_HEAD(&tq->tq_thread_list);
+        INIT_LIST_HEAD(&tq->tq_active_list);
         tq->tq_name      = name;
         tq->tq_nactive   = 0;
        tq->tq_nthreads  = 0;
@@ -398,8 +577,8 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
        tq->tq_next_id   = 1;
        tq->tq_lowest_id = 1;
         INIT_LIST_HEAD(&tq->tq_free_list);
-        INIT_LIST_HEAD(&tq->tq_work_list);
         INIT_LIST_HEAD(&tq->tq_pend_list);
+        INIT_LIST_HEAD(&tq->tq_prio_list);
         init_waitqueue_head(&tq->tq_work_waitq);
         init_waitqueue_head(&tq->tq_wait_waitq);
 
@@ -409,19 +588,26 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
 
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
-        for (i = 0; i < nthreads; i++) {
-                t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
-                if (t) {
-                        tq->tq_threads[i] = t;
-                        kthread_bind(t, i % num_online_cpus());
-                        set_user_nice(t, PRIO_TO_NICE(pri));
-                        wake_up_process(t);
+       for (i = 0; i < nthreads; i++) {
+               tqt = kmem_alloc(sizeof(*tqt), KM_SLEEP);
+               INIT_LIST_HEAD(&tqt->tqt_thread_list);
+               INIT_LIST_HEAD(&tqt->tqt_active_list);
+               tqt->tqt_tq = tq;
+               tqt->tqt_id = 0;
+
+               tqt->tqt_thread = kthread_create(taskq_thread, tqt,
+                                                "%s/%d", name, i);
+               if (tqt->tqt_thread) {
+                       list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
+                       kthread_bind(tqt->tqt_thread, i % num_online_cpus());
+                       set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
+                       wake_up_process(tqt->tqt_thread);
                        j++;
-                } else {
-                        tq->tq_threads[i] = NULL;
-                        rc = 1;
-                }
-        }
+               } else {
+                       kmem_free(tqt, sizeof(taskq_thread_t));
+                       rc = 1;
+               }
+       }
 
         /* Wait for all threads to be started before potential destroy */
        wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
@@ -431,16 +617,17 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
                 tq = NULL;
         }
 
-        RETURN(tq);
+        SRETURN(tq);
 }
 EXPORT_SYMBOL(__taskq_create);
 
 void
 __taskq_destroy(taskq_t *tq)
 {
-       spl_task_t *t;
-       int i, nthreads;
-       ENTRY;
+       struct task_struct *thread;
+       taskq_thread_t *tqt;
+       taskq_ent_t *t;
+       SENTRY;
 
        ASSERT(tq);
        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
@@ -450,52 +637,69 @@ __taskq_destroy(taskq_t *tq)
        /* TQ_ACTIVE cleared prevents new tasks being added to pending */
         __taskq_wait(tq);
 
-       nthreads = tq->tq_nthreads;
-       for (i = 0; i < nthreads; i++)
-               if (tq->tq_threads[i])
-                       kthread_stop(tq->tq_threads[i]);
-
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
 
+       /*
+        * Signal each thread to exit and block until it does.  Each thread
+        * is responsible for removing itself from the list and freeing its
+        * taskq_thread_t.  This allows for idle threads to opt to remove
+        * themselves from the taskq.  They can be recreated as needed.
+        */
+       while (!list_empty(&tq->tq_thread_list)) {
+               tqt = list_entry(tq->tq_thread_list.next,
+                                taskq_thread_t, tqt_thread_list);
+               thread = tqt->tqt_thread;
+               spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+               kthread_stop(thread);
+
+               spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+       }
+
         while (!list_empty(&tq->tq_free_list)) {
-               t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
-               list_del_init(&t->t_list);
+               t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
+
+               ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+
+               list_del_init(&t->tqent_list);
                 task_free(tq, t);
         }
 
         ASSERT(tq->tq_nthreads == 0);
         ASSERT(tq->tq_nalloc == 0);
+        ASSERT(list_empty(&tq->tq_thread_list));
+        ASSERT(list_empty(&tq->tq_active_list));
         ASSERT(list_empty(&tq->tq_free_list));
-        ASSERT(list_empty(&tq->tq_work_list));
         ASSERT(list_empty(&tq->tq_pend_list));
+        ASSERT(list_empty(&tq->tq_prio_list));
 
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-        kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));
+
         kmem_free(tq, sizeof(taskq_t));
 
-       EXIT;
+       SEXIT;
 }
 EXPORT_SYMBOL(__taskq_destroy);
 
 int
 spl_taskq_init(void)
 {
-        ENTRY;
+        SENTRY;
 
        /* Solaris creates a dynamic taskq of up to 64 threads, however in
         * a Linux environment 1 thread per-core is usually about right */
         system_taskq = taskq_create("spl_system_taskq", num_online_cpus(),
                                    minclsyspri, 4, 512, TASKQ_PREPOPULATE);
        if (system_taskq == NULL)
-               RETURN(1);
+               SRETURN(1);
 
-        RETURN(0);
+        SRETURN(0);
 }
 
 void
 spl_taskq_fini(void)
 {
-        ENTRY;
+        SENTRY;
        taskq_destroy(system_taskq);
-        EXIT;
+        SEXIT;
 }