-/*****************************************************************************\
+/*
* Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
* Copyright (C) 2007 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
*
* You should have received a copy of the GNU General Public License along
* with the SPL. If not, see <http://www.gnu.org/licenses/>.
- *****************************************************************************
+ *
* Solaris Porting Layer (SPL) Task Queue Implementation.
-\*****************************************************************************/
+ */
#include <sys/taskq.h>
#include <sys/kmem.h>
+#include <sys/tsd.h>
int spl_taskq_thread_bind = 0;
module_param(spl_taskq_thread_bind, int, 0644);
int spl_taskq_thread_priority = 1;
module_param(spl_taskq_thread_priority, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_priority,
- "Allow non-default priority for taskq threads");
+ "Allow non-default priority for taskq threads");
int spl_taskq_thread_sequential = 4;
module_param(spl_taskq_thread_sequential, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_sequential,
- "Create new taskq threads after N sequential tasks");
+ "Create new taskq threads after N sequential tasks");
/* Global system-wide dynamic task queue available for all consumers */
taskq_t *system_taskq;
EXPORT_SYMBOL(system_taskq);
+/* Global dynamic task queue for long delay */
+taskq_t *system_delay_taskq;
+EXPORT_SYMBOL(system_delay_taskq);
/* Private dedicated taskq for creating new taskq threads on demand. */
static taskq_t *dynamic_taskq;
static taskq_thread_t *taskq_thread_create(taskq_t *);
+/* List of all taskqs */
+LIST_HEAD(tq_list);
+DECLARE_RWSEM(tq_list_sem);
+static uint_t taskq_tsd;
+
static int
task_km_flags(uint_t flags)
{
if (flags & TQ_NOSLEEP)
- return KM_NOSLEEP;
+ return (KM_NOSLEEP);
if (flags & TQ_PUSHPAGE)
- return KM_PUSHPAGE;
+ return (KM_PUSHPAGE);
- return KM_SLEEP;
+ return (KM_SLEEP);
+}
+
+/*
+ * taskq_find_by_name - Find the largest instance number of a named taskq.
+ */
+static int
+taskq_find_by_name(const char *name)
+{
+ struct list_head *tql;
+ taskq_t *tq;
+
+ list_for_each_prev(tql, &tq_list) {
+ tq = list_entry(tql, taskq_t, tq_taskqs);
+ if (strcmp(name, tq->tq_name) == 0)
+ return tq->tq_instance;
+ }
+ return (-1);
}
/*
* is not attached to the free, work, or pending taskq lists.
*/
static taskq_ent_t *
-task_alloc(taskq_t *tq, uint_t flags)
+task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
{
taskq_ent_t *t;
int count = 0;
* end up delaying the task allocation by one second, thereby
* throttling the task dispatch rate.
*/
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
schedule_timeout(HZ / 100);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
+ spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
tq->tq_lock_class);
if (count < 100) {
count++;
}
}
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
- t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags));
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
+ t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
+ spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
if (t) {
taskq_init_ent(t);
ASSERT(list_empty(&t->tqent_list));
ASSERT(!timer_pending(&t->tqent_timer));
- kmem_free(t, sizeof(taskq_ent_t));
+ kmem_free(t, sizeof (taskq_ent_t));
tq->tq_nalloc--;
}
list_del_init(&t->tqent_list);
if (tq->tq_nalloc <= tq->tq_minalloc) {
- t->tqent_id = 0;
+ t->tqent_id = TASKQID_INVALID;
t->tqent_func = NULL;
t->tqent_arg = NULL;
t->tqent_flags = 0;
taskq_ent_t *w, *t = (taskq_ent_t *)data;
taskq_t *tq = t->tqent_taskq;
struct list_head *l;
+ unsigned long flags;
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
if (t->tqent_flags & TQENT_FLAG_CANCEL) {
ASSERT(list_empty(&t->tqent_list));
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
return;
}
+ t->tqent_birth = jiffies;
/*
* The priority list must be maintained in strict task id order
* from lowest to highest for lowest_id to be easily calculable.
if (l == &tq->tq_prio_list)
list_add(&t->tqent_list, &tq->tq_prio_list);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
wake_up(&tq->tq_work_waitq);
}
if (!list_empty(&tq->tq_active_list)) {
tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
tqt_active_list);
- ASSERT(tqt->tqt_id != 0);
+ ASSERT(tqt->tqt_id != TASKQID_INVALID);
lowest_id = MIN(lowest_id, tqt->tqt_id);
}
/*
* Find an already dispatched task given the task id regardless of what
- * state it is in. If a task is still pending or executing it will be
- * returned and 'active' set appropriately. If the task has already
- * been run then NULL is returned.
+ * state it is in. If a task is still pending it will be returned.
+ * If a task is executing, then -EBUSY will be returned instead.
+ * If the task has already been run then NULL is returned.
*/
static taskq_ent_t *
-taskq_find(taskq_t *tq, taskqid_t id, int *active)
+taskq_find(taskq_t *tq, taskqid_t id)
{
taskq_thread_t *tqt;
struct list_head *l;
taskq_ent_t *t;
ASSERT(spin_is_locked(&tq->tq_lock));
- *active = 0;
t = taskq_find_list(tq, &tq->tq_delay_list, id);
if (t)
list_for_each(l, &tq->tq_active_list) {
tqt = list_entry(l, taskq_thread_t, tqt_active_list);
if (tqt->tqt_id == id) {
- t = tqt->tqt_task;
- *active = 1;
- return (t);
+ /*
+ * Instead of returning tqt_task, we just return a non
+ * NULL value to prevent misuse, since tqt_task only
+ * has two valid fields.
+ */
+ return (ERR_PTR(-EBUSY));
}
}
static int
taskq_wait_id_check(taskq_t *tq, taskqid_t id)
{
- int active = 0;
int rc;
+ unsigned long flags;
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
- rc = (taskq_find(tq, id, &active) == NULL);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
+ rc = (taskq_find(tq, id) == NULL);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
return (rc);
}
taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
{
int rc;
+ unsigned long flags;
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
rc = (id < tq->tq_lowest_id);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
return (rc);
}
void
taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
{
- wait_event(tq->tq_wait_waitq,
- taskq_wait_outstanding_check(tq, id ? id : tq->tq_next_id - 1));
+ id = id ? id : tq->tq_next_id - 1;
+ wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
}
EXPORT_SYMBOL(taskq_wait_outstanding);
taskq_wait_check(taskq_t *tq)
{
int rc;
+ unsigned long flags;
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
rc = (tq->tq_lowest_id == tq->tq_next_id);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
return (rc);
}
}
EXPORT_SYMBOL(taskq_wait);
-static int
-taskq_member_impl(taskq_t *tq, void *t)
-{
- struct list_head *l;
- taskq_thread_t *tqt;
- int found = 0;
-
- ASSERT(tq);
- ASSERT(t);
- ASSERT(spin_is_locked(&tq->tq_lock));
-
- list_for_each(l, &tq->tq_thread_list) {
- tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
- if (tqt->tqt_thread == (struct task_struct *)t) {
- found = 1;
- break;
- }
- }
- return (found);
-}
-
int
-taskq_member(taskq_t *tq, void *t)
+taskq_member(taskq_t *tq, kthread_t *t)
{
- int found;
-
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
- found = taskq_member_impl(tq, t);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-
- return (found);
+ return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
}
EXPORT_SYMBOL(taskq_member);
taskq_cancel_id(taskq_t *tq, taskqid_t id)
{
taskq_ent_t *t;
- int active = 0;
int rc = ENOENT;
+ unsigned long flags;
ASSERT(tq);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
- t = taskq_find(tq, id, &active);
- if (t && !active) {
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
+ t = taskq_find(tq, id);
+ if (t && t != ERR_PTR(-EBUSY)) {
list_del_init(&t->tqent_list);
t->tqent_flags |= TQENT_FLAG_CANCEL;
* drop the lock before synchronously cancelling the timer.
*/
if (timer_pending(&t->tqent_timer)) {
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
del_timer_sync(&t->tqent_timer);
- spin_lock_irqsave_nested(&tq->tq_lock,
- tq->tq_lock_flags, tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags,
+ tq->tq_lock_class);
}
if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
rc = 0;
}
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
- if (active) {
+ if (t == ERR_PTR(-EBUSY)) {
taskq_wait_id(tq, id);
rc = EBUSY;
}
taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
{
taskq_ent_t *t;
- taskqid_t rc = 0;
+ taskqid_t rc = TASKQID_INVALID;
+ unsigned long irqflags;
ASSERT(tq);
ASSERT(func);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
/* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TASKQ_ACTIVE))
/* Do not queue the task unless there is idle thread for it */
ASSERT(tq->tq_nactive <= tq->tq_nthreads);
- if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
- goto out;
+ if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
+ /* Dynamic taskq may be able to spawn another thread */
+ if (!(tq->tq_flags & TASKQ_DYNAMIC) || taskq_thread_spawn(tq) == 0)
+ goto out;
+ }
- if ((t = task_alloc(tq, flags)) == NULL)
+ if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
goto out;
spin_lock(&t->tqent_lock);
+ /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
+ if (flags & TQ_NOQUEUE)
+ list_add(&t->tqent_list, &tq->tq_prio_list);
/* Queue to the priority list instead of the pending list */
- if (flags & TQ_FRONT)
+ else if (flags & TQ_FRONT)
list_add_tail(&t->tqent_list, &tq->tq_prio_list);
else
list_add_tail(&t->tqent_list, &tq->tq_pend_list);
t->tqent_timer.data = 0;
t->tqent_timer.function = NULL;
t->tqent_timer.expires = 0;
+ t->tqent_birth = jiffies;
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
wake_up(&tq->tq_work_waitq);
out:
/* Spawn additional taskq threads if required. */
- if (tq->tq_nactive == tq->tq_nthreads)
+ if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
(void) taskq_thread_spawn(tq);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, irqflags);
return (rc);
}
EXPORT_SYMBOL(taskq_dispatch);
taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
uint_t flags, clock_t expire_time)
{
- taskqid_t rc = 0;
+ taskqid_t rc = TASKQID_INVALID;
taskq_ent_t *t;
+ unsigned long irqflags;
ASSERT(tq);
ASSERT(func);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
/* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out;
- if ((t = task_alloc(tq, flags)) == NULL)
+ if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
goto out;
spin_lock(&t->tqent_lock);
/* Spawn additional taskq threads if required. */
if (tq->tq_nactive == tq->tq_nthreads)
(void) taskq_thread_spawn(tq);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, irqflags);
return (rc);
}
EXPORT_SYMBOL(taskq_dispatch_delay);
void
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
- taskq_ent_t *t)
+ taskq_ent_t *t)
{
+ unsigned long irqflags;
ASSERT(tq);
ASSERT(func);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
+ spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
tq->tq_lock_class);
/* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TASKQ_ACTIVE)) {
- t->tqent_id = 0;
+ t->tqent_id = TASKQID_INVALID;
goto out;
}
+ if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
+ /* Dynamic taskq may be able to spawn another thread */
+ if (!(tq->tq_flags & TASKQ_DYNAMIC) || taskq_thread_spawn(tq) == 0)
+ goto out2;
+ flags |= TQ_FRONT;
+ }
+
spin_lock(&t->tqent_lock);
/*
t->tqent_func = func;
t->tqent_arg = arg;
t->tqent_taskq = tq;
+ t->tqent_birth = jiffies;
spin_unlock(&t->tqent_lock);
/* Spawn additional taskq threads if required. */
if (tq->tq_nactive == tq->tq_nthreads)
(void) taskq_thread_spawn(tq);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+out2:
+ spin_unlock_irqrestore(&tq->tq_lock, irqflags);
}
EXPORT_SYMBOL(taskq_dispatch_ent);
int
taskq_empty_ent(taskq_ent_t *t)
{
- return list_empty(&t->tqent_list);
+ return (list_empty(&t->tqent_list));
}
EXPORT_SYMBOL(taskq_empty_ent);
taskq_thread_spawn_task(void *arg)
{
taskq_t *tq = (taskq_t *)arg;
+ unsigned long flags;
- (void) taskq_thread_create(tq);
-
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
- tq->tq_nspawn--;
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ if (taskq_thread_create(tq) == NULL) {
+ /* restore spawning count if failed */
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
+ tq->tq_nspawn--;
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
+ }
}
/*
(tq->tq_nactive == 0) && /* No threads are handling tasks */
(tq->tq_nthreads > 1) && /* More than 1 thread is running */
(!taskq_next_ent(tq)) && /* There are no pending tasks */
- (spl_taskq_thread_dynamic));/* Dynamic taskqs are allowed */
+ (spl_taskq_thread_dynamic)); /* Dynamic taskqs are allowed */
}
static int
taskq_t *tq;
taskq_ent_t *t;
int seq_tasks = 0;
+ unsigned long flags;
+ taskq_ent_t dup_task = {};
ASSERT(tqt);
ASSERT(tqt->tqt_tq);
tq = tqt->tqt_tq;
current->flags |= PF_NOFREEZE;
- #if defined(PF_MEMALLOC_NOIO)
- (void) memalloc_noio_save();
- #endif
+ (void) spl_fstrans_mark();
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ tsd_set(taskq_tsd, tq);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
+ /*
+ * If we are dynamically spawned, decrease spawning count. Note that
+ * we could be created during taskq_create, in which case we shouldn't
+ * do the decrement. But it's fine because taskq_create will reset
+ * tq_nspawn later.
+ */
+ if (tq->tq_flags & TASKQ_DYNAMIC)
+ tq->tq_nspawn--;
/* Immediately exit if more threads than allowed were created. */
if (tq->tq_nthreads >= tq->tq_maxthreads)
}
add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
schedule();
seq_tasks = 0;
- spin_lock_irqsave_nested(&tq->tq_lock,
- tq->tq_lock_flags, tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags,
+ tq->tq_lock_class);
remove_wait_queue(&tq->tq_work_waitq, &wait);
} else {
__set_current_state(TASK_RUNNING);
if ((t = taskq_next_ent(tq)) != NULL) {
list_del_init(&t->tqent_list);
- /* In order to support recursively dispatching a
- * preallocated taskq_ent_t, tqent_id must be
- * stored prior to executing tqent_func. */
+ /*
+ * A TQENT_FLAG_PREALLOC task may be reused or freed
+ * during the task function call. Store tqent_id and
+ * tqent_flags here.
+ *
+ * Also use an on stack taskq_ent_t for tqt_task
+ * assignment in this case. We only populate the two
+ * fields used by the only user in taskq proc file.
+ */
tqt->tqt_id = t->tqent_id;
- tqt->tqt_task = t;
-
- /* We must store a copy of the flags prior to
- * servicing the task (servicing a prealloc'd task
- * returns the ownership of the tqent back to
- * the caller of taskq_dispatch). Thus,
- * tqent_flags _may_ change within the call. */
tqt->tqt_flags = t->tqent_flags;
+ if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
+ dup_task.tqent_func = t->tqent_func;
+ dup_task.tqent_arg = t->tqent_arg;
+ t = &dup_task;
+ }
+ tqt->tqt_task = t;
+
taskq_insert_in_order(tq, tqt);
tq->tq_nactive++;
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
/* Perform the requested task */
t->tqent_func(t->tqent_arg);
- spin_lock_irqsave_nested(&tq->tq_lock,
- tq->tq_lock_flags, tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags,
+ tq->tq_lock_class);
tq->tq_nactive--;
list_del_init(&tqt->tqt_active_list);
tqt->tqt_task = NULL;
if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
task_done(tq, t);
- /* When the current lowest outstanding taskqid is
- * done calculate the new lowest outstanding id */
+ /*
+ * When the current lowest outstanding taskqid is
+ * done calculate the new lowest outstanding id
+ */
if (tq->tq_lowest_id == tqt->tqt_id) {
tq->tq_lowest_id = taskq_lowest_id(tq);
ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
taskq_thread_spawn(tq))
seq_tasks = 0;
- tqt->tqt_id = 0;
+ tqt->tqt_id = TASKQID_INVALID;
tqt->tqt_flags = 0;
wake_up_all(&tq->tq_wait_waitq);
} else {
list_del_init(&tqt->tqt_thread_list);
error:
kmem_free(tqt, sizeof (taskq_thread_t));
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
+
+ tsd_set(taskq_tsd, NULL);
return (0);
}
INIT_LIST_HEAD(&tqt->tqt_thread_list);
INIT_LIST_HEAD(&tqt->tqt_active_list);
tqt->tqt_tq = tq;
- tqt->tqt_id = 0;
+ tqt->tqt_id = TASKQID_INVALID;
tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
"%s", tq->tq_name);
taskq_t *tq;
taskq_thread_t *tqt;
int count = 0, rc = 0, i;
+ unsigned long irqflags;
ASSERT(name != NULL);
ASSERT(minalloc >= 0);
spin_lock_init(&tq->tq_lock);
INIT_LIST_HEAD(&tq->tq_thread_list);
INIT_LIST_HEAD(&tq->tq_active_list);
- tq->tq_name = strdup(name);
- tq->tq_nactive = 0;
- tq->tq_nthreads = 0;
- tq->tq_nspawn = 0;
+ tq->tq_name = strdup(name);
+ tq->tq_nactive = 0;
+ tq->tq_nthreads = 0;
+ tq->tq_nspawn = 0;
tq->tq_maxthreads = nthreads;
- tq->tq_pri = pri;
- tq->tq_minalloc = minalloc;
- tq->tq_maxalloc = maxalloc;
- tq->tq_nalloc = 0;
- tq->tq_flags = (flags | TASKQ_ACTIVE);
- tq->tq_next_id = 1;
- tq->tq_lowest_id = 1;
+ tq->tq_pri = pri;
+ tq->tq_minalloc = minalloc;
+ tq->tq_maxalloc = maxalloc;
+ tq->tq_nalloc = 0;
+ tq->tq_flags = (flags | TASKQ_ACTIVE);
+ tq->tq_next_id = TASKQID_INITIAL;
+ tq->tq_lowest_id = TASKQID_INITIAL;
INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list);
init_waitqueue_head(&tq->tq_work_waitq);
init_waitqueue_head(&tq->tq_wait_waitq);
tq->tq_lock_class = TQ_LOCK_GENERAL;
+ INIT_LIST_HEAD(&tq->tq_taskqs);
if (flags & TASKQ_PREPOPULATE) {
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
+ spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
tq->tq_lock_class);
for (i = 0; i < minalloc; i++)
- task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW));
+ task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
+ &irqflags));
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, irqflags);
}
if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
/* Wait for all threads to be started before potential destroy */
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
+ /*
+ * taskq_thread might have touched nspawn, but we don't want them to
+ * because they're not dynamically spawned. So we reset it to 0
+ */
+ tq->tq_nspawn = 0;
if (rc) {
taskq_destroy(tq);
tq = NULL;
+ } else {
+ down_write(&tq_list_sem);
+ tq->tq_instance = taskq_find_by_name(name) + 1;
+ list_add_tail(&tq->tq_taskqs, &tq_list);
+ up_write(&tq_list_sem);
}
return (tq);
struct task_struct *thread;
taskq_thread_t *tqt;
taskq_ent_t *t;
+ unsigned long flags;
ASSERT(tq);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
tq->tq_flags &= ~TASKQ_ACTIVE;
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
/*
* When TASKQ_ACTIVE is clear new tasks may not be added nor may
taskq_wait(tq);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
- tq->tq_lock_class);
+ /* remove taskq from global list used by the kstats */
+ down_write(&tq_list_sem);
+ list_del(&tq->tq_taskqs);
+ up_write(&tq_list_sem);
+
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
+ /* wait for spawning threads to insert themselves to the list */
+ while (tq->tq_nspawn) {
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
+ schedule_timeout_interruptible(1);
+ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
+ }
/*
* Signal each thread to exit and block until it does. Each thread
tqt = list_entry(tq->tq_thread_list.next,
taskq_thread_t, tqt_thread_list);
thread = tqt->tqt_thread;
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
kthread_stop(thread);
- spin_lock_irqsave_nested(&tq->tq_lock, tq->tq_lock_flags,
+ spin_lock_irqsave_nested(&tq->tq_lock, flags,
tq->tq_lock_class);
}
ASSERT(list_empty(&tq->tq_prio_list));
ASSERT(list_empty(&tq->tq_delay_list));
- spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
strfree(tq->tq_name);
kmem_free(tq, sizeof (taskq_t));
}
EXPORT_SYMBOL(taskq_destroy);
+
+static unsigned int spl_taskq_kick = 0;
+
+/*
+ * 2.6.36 API Change
+ * module_param_cb is introduced to take kernel_param_ops and
+ * module_param_call is marked as obsolete. Also set and get operations
+ * were changed to take a 'const struct kernel_param *'.
+ */
+static int
+#ifdef module_param_cb
+param_set_taskq_kick(const char *val, const struct kernel_param *kp)
+#else
+param_set_taskq_kick(const char *val, struct kernel_param *kp)
+#endif
+{
+ int ret;
+ taskq_t *tq;
+ taskq_ent_t *t;
+ unsigned long flags;
+
+ ret = param_set_uint(val, kp);
+ if (ret < 0 || !spl_taskq_kick)
+ return (ret);
+ /* reset value */
+ spl_taskq_kick = 0;
+
+ down_read(&tq_list_sem);
+ list_for_each_entry(tq, &tq_list, tq_taskqs) {
+ spin_lock_irqsave_nested(&tq->tq_lock, flags,
+ tq->tq_lock_class);
+ /* Check if the first pending is older than 5 seconds */
+ t = taskq_next_ent(tq);
+ if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
+ (void) taskq_thread_spawn(tq);
+ printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
+ tq->tq_name, tq->tq_instance);
+ }
+ spin_unlock_irqrestore(&tq->tq_lock, flags);
+ }
+ up_read(&tq_list_sem);
+ return (ret);
+}
+
+#ifdef module_param_cb
+static const struct kernel_param_ops param_ops_taskq_kick = {
+ .set = param_set_taskq_kick,
+ .get = param_get_uint,
+};
+module_param_cb(spl_taskq_kick, ¶m_ops_taskq_kick, &spl_taskq_kick, 0644);
+#else
+module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
+ &spl_taskq_kick, 0644);
+#endif
+MODULE_PARM_DESC(spl_taskq_kick,
+ "Write nonzero to kick stuck taskqs to spawn more threads");
+
int
spl_taskq_init(void)
{
+ tsd_create(&taskq_tsd, NULL);
+
system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
if (system_taskq == NULL)
return (1);
+ system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
+ maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
+ if (system_delay_taskq == NULL) {
+ taskq_destroy(system_taskq);
+ return (1);
+ }
+
dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
if (dynamic_taskq == NULL) {
taskq_destroy(system_taskq);
+ taskq_destroy(system_delay_taskq);
return (1);
}
- /* This is used to annotate tq_lock, so
- * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
+ /*
+ * This is used to annotate tq_lock, so
+ * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
* does not trigger a lockdep warning re: possible recursive locking
*/
dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
taskq_destroy(dynamic_taskq);
dynamic_taskq = NULL;
+ taskq_destroy(system_delay_taskq);
+ system_delay_taskq = NULL;
+
taskq_destroy(system_taskq);
system_taskq = NULL;
+
+ tsd_destroy(&taskq_tsd);
}