]> git.proxmox.com Git - mirror_spl-debian.git/blame - module/spl/spl-taskq.c
Update a comment to reflect new taskq internals
[mirror_spl-debian.git] / module / spl / spl-taskq.c
CommitLineData
716154c5
BB
1/*****************************************************************************\
2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3 * Copyright (C) 2007 The Regents of the University of California.
4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
715f6251 6 * UCRL-CODE-235197
7 *
716154c5
BB
8 * This file is part of the SPL, Solaris Porting Layer.
9 * For details, see <http://github.com/behlendorf/spl/>.
715f6251 10 *
716154c5
BB
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
15 *
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
715f6251 17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 * for more details.
20 *
21 * You should have received a copy of the GNU General Public License along
716154c5
BB
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *****************************************************************************
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25\*****************************************************************************/
715f6251 26
f4b37741 27#include <sys/taskq.h>
3d061e9d 28#include <sys/kmem.h>
55abb092 29#include <spl-debug.h>
f1ca4da6 30
b17edc10
BB
31#ifdef SS_DEBUG_SUBSYS
32#undef SS_DEBUG_SUBSYS
937879f1 33#endif
34
b17edc10 35#define SS_DEBUG_SUBSYS SS_TASKQ
937879f1 36
e9cb2b4f
BB
37/* Global system-wide dynamic task queue available for all consumers */
38taskq_t *system_taskq;
39EXPORT_SYMBOL(system_taskq);
40
82387586
BB
41/*
42 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
bcd68186 43 * is not attached to the free, work, or pending taskq lists.
f1ca4da6 44 */
046a70c9 45static taskq_ent_t *
bcd68186 46task_alloc(taskq_t *tq, uint_t flags)
47{
046a70c9 48 taskq_ent_t *t;
bcd68186 49 int count = 0;
b17edc10 50 SENTRY;
bcd68186 51
52 ASSERT(tq);
53 ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */
54 ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
3d061e9d 55 ASSERT(spin_is_locked(&tq->tq_lock));
bcd68186 56retry:
046a70c9 57 /* Acquire taskq_ent_t's from free list if available */
bcd68186 58 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
046a70c9 59 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
44217f7a
PS
60
61 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
62
046a70c9 63 list_del_init(&t->tqent_list);
b17edc10 64 SRETURN(t);
bcd68186 65 }
66
7257ec41 67 /* Free list is empty and memory allocations are prohibited */
bcd68186 68 if (flags & TQ_NOALLOC)
b17edc10 69 SRETURN(NULL);
bcd68186 70
046a70c9 71 /* Hit maximum taskq_ent_t pool size */
bcd68186 72 if (tq->tq_nalloc >= tq->tq_maxalloc) {
73 if (flags & TQ_NOSLEEP)
b17edc10 74 SRETURN(NULL);
bcd68186 75
26f7245c
RC
76 /*
77 * Sleep periodically polling the free list for an available
046a70c9
PS
78 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
79 * but we cannot block forever waiting for an taskq_entq_t to
26f7245c
RC
80 * show up in the free list, otherwise a deadlock can happen.
81 *
82 * Therefore, we need to allocate a new task even if the number
83 * of allocated tasks is above tq->tq_maxalloc, but we still
84 * end up delaying the task allocation by one second, thereby
85 * throttling the task dispatch rate.
86 */
87 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
88 schedule_timeout(HZ / 100);
89 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
90 if (count < 100)
91 SGOTO(retry, count++);
bcd68186 92 }
93
26f7245c 94 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
046a70c9 95 t = kmem_alloc(sizeof(taskq_ent_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
749045bb 96 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 97
26f7245c 98 if (t) {
44217f7a 99 taskq_init_ent(t);
26f7245c
RC
100 tq->tq_nalloc++;
101 }
bcd68186 102
b17edc10 103 SRETURN(t);
bcd68186 104}
105
82387586 106/*
046a70c9 107 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
bcd68186 108 * to already be removed from the free, work, or pending taskq lists.
109 */
110static void
046a70c9 111task_free(taskq_t *tq, taskq_ent_t *t)
bcd68186 112{
b17edc10 113 SENTRY;
bcd68186 114
115 ASSERT(tq);
116 ASSERT(t);
117 ASSERT(spin_is_locked(&tq->tq_lock));
046a70c9 118 ASSERT(list_empty(&t->tqent_list));
bcd68186 119
046a70c9 120 kmem_free(t, sizeof(taskq_ent_t));
bcd68186 121 tq->tq_nalloc--;
f1ca4da6 122
b17edc10 123 SEXIT;
bcd68186 124}
125
82387586
BB
126/*
127 * NOTE: Must be called with tq->tq_lock held, either destroys the
046a70c9 128 * taskq_ent_t if too many exist or moves it to the free list for later use.
bcd68186 129 */
f1ca4da6 130static void
046a70c9 131task_done(taskq_t *tq, taskq_ent_t *t)
f1ca4da6 132{
b17edc10 133 SENTRY;
bcd68186 134 ASSERT(tq);
135 ASSERT(t);
136 ASSERT(spin_is_locked(&tq->tq_lock));
137
046a70c9 138 list_del_init(&t->tqent_list);
f1ca4da6 139
bcd68186 140 if (tq->tq_nalloc <= tq->tq_minalloc) {
046a70c9
PS
141 t->tqent_id = 0;
142 t->tqent_func = NULL;
143 t->tqent_arg = NULL;
44217f7a 144 t->tqent_flags = 0;
8f2503e0 145
046a70c9 146 list_add_tail(&t->tqent_list, &tq->tq_free_list);
bcd68186 147 } else {
148 task_free(tq, t);
149 }
f1ca4da6 150
b17edc10 151 SEXIT;
f1ca4da6 152}
153
82387586
BB
154/*
155 * As tasks are submitted to the task queue they are assigned a
f0d8bb26
NB
156 * monotonically increasing taskqid and added to the tail of the pending
157 * list. As worker threads become available the tasks are removed from
158 * the head of the pending or priority list, giving preference to the
05b8f50c
PS
159 * priority list. The tasks are then removed from their respective
160 * list, and the taskq_thread servicing the task is added to the active
161 * list, preserving the order using the serviced task's taskqid.
162 * Finally, as tasks complete the taskq_thread servicing the task is
163 * removed from the active list. This means that the pending task and
164 * active taskq_thread lists are always kept sorted by taskqid. Thus the
165 * lowest outstanding incomplete taskqid can be determined simply by
166 * checking the min taskqid for each head item on the pending, priority,
167 * and active taskq_thread list. This value is stored in
168 * tq->tq_lowest_id and only updated to the new lowest id when the
169 * previous lowest id completes. All taskqids lower than
170 * tq->tq_lowest_id must have completed. It is also possible larger
171 * taskqid's have completed because they may be processed in parallel by
172 * several worker threads. However, this is not a problem because the
173 * behavior of taskq_wait_id() is to block until all previously
174 * submitted taskqid's have completed.
82387586
BB
175 *
176 * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
177 * 64-bit values so even if a taskq is processing 2^24 (16,777,216)
178 * taskqid_ts per second it will still take 2^40 seconds, 34,865 years,
179 * before the wrap occurs. I can live with that for now.
bcd68186 180 */
181static int
182taskq_wait_check(taskq_t *tq, taskqid_t id)
183{
7257ec41
BB
184 int rc;
185
186 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
187 rc = (id < tq->tq_lowest_id);
188 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
189
b17edc10 190 SRETURN(rc);
bcd68186 191}
192
bcd68186 193void
194__taskq_wait_id(taskq_t *tq, taskqid_t id)
f1ca4da6 195{
b17edc10 196 SENTRY;
bcd68186 197 ASSERT(tq);
198
199 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
200
b17edc10 201 SEXIT;
bcd68186 202}
203EXPORT_SYMBOL(__taskq_wait_id);
204
205void
206__taskq_wait(taskq_t *tq)
207{
208 taskqid_t id;
b17edc10 209 SENTRY;
bcd68186 210 ASSERT(tq);
211
7257ec41 212 /* Wait for the largest outstanding taskqid */
749045bb 213 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
7257ec41 214 id = tq->tq_next_id - 1;
749045bb 215 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 216
217 __taskq_wait_id(tq, id);
218
b17edc10 219 SEXIT;
bcd68186 220
221}
222EXPORT_SYMBOL(__taskq_wait);
223
224int
225__taskq_member(taskq_t *tq, void *t)
226{
2c02b71b
PS
227 struct list_head *l;
228 taskq_thread_t *tqt;
b17edc10 229 SENTRY;
bcd68186 230
231 ASSERT(tq);
232 ASSERT(t);
233
2c02b71b
PS
234 list_for_each(l, &tq->tq_thread_list) {
235 tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
236 if (tqt->tqt_thread == (struct task_struct *)t)
237 SRETURN(1);
238 }
bcd68186 239
b17edc10 240 SRETURN(0);
bcd68186 241}
242EXPORT_SYMBOL(__taskq_member);
243
244taskqid_t
245__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
246{
046a70c9 247 taskq_ent_t *t;
bcd68186 248 taskqid_t rc = 0;
b17edc10 249 SENTRY;
f1ca4da6 250
937879f1 251 ASSERT(tq);
252 ASSERT(func);
d05ec4b4
BB
253
254 /* Solaris assumes TQ_SLEEP if not passed explicitly */
255 if (!(flags & (TQ_SLEEP | TQ_NOSLEEP)))
256 flags |= TQ_SLEEP;
257
55abb092
BB
258 if (unlikely(in_atomic() && (flags & TQ_SLEEP)))
259 PANIC("May schedule while atomic: %s/0x%08x/%d\n",
260 current->comm, preempt_count(), current->pid);
f1ca4da6 261
749045bb 262 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
f1ca4da6 263
bcd68186 264 /* Taskq being destroyed and all tasks drained */
265 if (!(tq->tq_flags & TQ_ACTIVE))
b17edc10 266 SGOTO(out, rc = 0);
f1ca4da6 267
bcd68186 268 /* Do not queue the task unless there is idle thread for it */
269 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
270 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
b17edc10 271 SGOTO(out, rc = 0);
bcd68186 272
273 if ((t = task_alloc(tq, flags)) == NULL)
b17edc10 274 SGOTO(out, rc = 0);
f1ca4da6 275
046a70c9 276 spin_lock(&t->tqent_lock);
f0d8bb26
NB
277
278 /* Queue to the priority list instead of the pending list */
279 if (flags & TQ_FRONT)
046a70c9 280 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
f0d8bb26 281 else
046a70c9 282 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
f0d8bb26 283
046a70c9 284 t->tqent_id = rc = tq->tq_next_id;
bcd68186 285 tq->tq_next_id++;
046a70c9
PS
286 t->tqent_func = func;
287 t->tqent_arg = arg;
44217f7a
PS
288
289 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
290
046a70c9 291 spin_unlock(&t->tqent_lock);
0bb43ca2
NB
292
293 wake_up(&tq->tq_work_waitq);
bcd68186 294out:
749045bb 295 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
b17edc10 296 SRETURN(rc);
f1ca4da6 297}
f1b59d26 298EXPORT_SYMBOL(__taskq_dispatch);
44217f7a
PS
299
300void
301__taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
302 taskq_ent_t *t)
303{
304 SENTRY;
305
306 ASSERT(tq);
307 ASSERT(func);
308 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
309
310 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
311
312 /* Taskq being destroyed and all tasks drained */
313 if (!(tq->tq_flags & TQ_ACTIVE)) {
314 t->tqent_id = 0;
315 goto out;
316 }
317
318 spin_lock(&t->tqent_lock);
319
320 /*
321 * Mark it as a prealloc'd task. This is important
322 * to ensure that we don't free it later.
323 */
324 t->tqent_flags |= TQENT_FLAG_PREALLOC;
325
326 /* Queue to the priority list instead of the pending list */
327 if (flags & TQ_FRONT)
328 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
329 else
330 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
331
332 t->tqent_id = tq->tq_next_id;
333 tq->tq_next_id++;
334 t->tqent_func = func;
335 t->tqent_arg = arg;
336
337 spin_unlock(&t->tqent_lock);
338
339 wake_up(&tq->tq_work_waitq);
340out:
0bb43ca2 341 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
44217f7a
PS
342 SEXIT;
343}
344EXPORT_SYMBOL(__taskq_dispatch_ent);
345
346int
347__taskq_empty_ent(taskq_ent_t *t)
348{
349 return list_empty(&t->tqent_list);
350}
351EXPORT_SYMBOL(__taskq_empty_ent);
352
353void
354__taskq_init_ent(taskq_ent_t *t)
355{
356 spin_lock_init(&t->tqent_lock);
357 INIT_LIST_HEAD(&t->tqent_list);
358 t->tqent_id = 0;
359 t->tqent_func = NULL;
360 t->tqent_arg = NULL;
361 t->tqent_flags = 0;
362}
363EXPORT_SYMBOL(__taskq_init_ent);
364
82387586
BB
365/*
366 * Returns the lowest incomplete taskqid_t. The taskqid_t may
f0d8bb26
NB
367 * be queued on the pending list, on the priority list, or on
368 * the work list currently being handled, but it is not 100%
369 * complete yet.
82387586 370 */
bcd68186 371static taskqid_t
372taskq_lowest_id(taskq_t *tq)
373{
7257ec41 374 taskqid_t lowest_id = tq->tq_next_id;
046a70c9 375 taskq_ent_t *t;
2c02b71b 376 taskq_thread_t *tqt;
b17edc10 377 SENTRY;
bcd68186 378
379 ASSERT(tq);
380 ASSERT(spin_is_locked(&tq->tq_lock));
381
82387586 382 if (!list_empty(&tq->tq_pend_list)) {
046a70c9
PS
383 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
384 lowest_id = MIN(lowest_id, t->tqent_id);
82387586 385 }
bcd68186 386
f0d8bb26 387 if (!list_empty(&tq->tq_prio_list)) {
046a70c9
PS
388 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
389 lowest_id = MIN(lowest_id, t->tqent_id);
f0d8bb26
NB
390 }
391
2c02b71b
PS
392 if (!list_empty(&tq->tq_active_list)) {
393 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
394 tqt_active_list);
e7e5f78e
PS
395 ASSERT(tqt->tqt_id != 0);
396 lowest_id = MIN(lowest_id, tqt->tqt_id);
82387586 397 }
bcd68186 398
b17edc10 399 SRETURN(lowest_id);
bcd68186 400}
401
f0d8bb26
NB
402/*
403 * Insert a task into a list keeping the list sorted by increasing
404 * taskqid.
405 */
406static void
2c02b71b 407taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
f0d8bb26 408{
2c02b71b 409 taskq_thread_t *w;
f0d8bb26
NB
410 struct list_head *l;
411
b17edc10 412 SENTRY;
f0d8bb26 413 ASSERT(tq);
2c02b71b 414 ASSERT(tqt);
f0d8bb26
NB
415 ASSERT(spin_is_locked(&tq->tq_lock));
416
2c02b71b
PS
417 list_for_each_prev(l, &tq->tq_active_list) {
418 w = list_entry(l, taskq_thread_t, tqt_active_list);
e7e5f78e 419 if (w->tqt_id < tqt->tqt_id) {
2c02b71b 420 list_add(&tqt->tqt_active_list, l);
f0d8bb26
NB
421 break;
422 }
423 }
2c02b71b
PS
424 if (l == &tq->tq_active_list)
425 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
f0d8bb26 426
b17edc10 427 SEXIT;
f0d8bb26
NB
428}
429
bcd68186 430static int
431taskq_thread(void *args)
432{
433 DECLARE_WAITQUEUE(wait, current);
434 sigset_t blocked;
2c02b71b
PS
435 taskq_thread_t *tqt = args;
436 taskq_t *tq;
046a70c9 437 taskq_ent_t *t;
f0d8bb26 438 struct list_head *pend_list;
b17edc10 439 SENTRY;
bcd68186 440
2c02b71b
PS
441 ASSERT(tqt);
442 tq = tqt->tqt_tq;
bcd68186 443 current->flags |= PF_NOFREEZE;
444
372c2572
BB
445 /* Disable the direct memory reclaim path */
446 if (tq->tq_flags & TASKQ_NORECLAIM)
447 current->flags |= PF_MEMALLOC;
448
bcd68186 449 sigfillset(&blocked);
450 sigprocmask(SIG_BLOCK, &blocked, NULL);
451 flush_signals(current);
452
749045bb 453 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 454 tq->tq_nthreads++;
455 wake_up(&tq->tq_wait_waitq);
456 set_current_state(TASK_INTERRUPTIBLE);
457
458 while (!kthread_should_stop()) {
459
f0d8bb26
NB
460 if (list_empty(&tq->tq_pend_list) &&
461 list_empty(&tq->tq_prio_list)) {
3c6ed541 462 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
749045bb 463 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 464 schedule();
749045bb 465 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
3c6ed541 466 remove_wait_queue(&tq->tq_work_waitq, &wait);
bcd68186 467 } else {
468 __set_current_state(TASK_RUNNING);
469 }
470
f0d8bb26
NB
471
472 if (!list_empty(&tq->tq_prio_list))
473 pend_list = &tq->tq_prio_list;
474 else if (!list_empty(&tq->tq_pend_list))
475 pend_list = &tq->tq_pend_list;
476 else
477 pend_list = NULL;
478
479 if (pend_list) {
046a70c9
PS
480 t = list_entry(pend_list->next, taskq_ent_t, tqent_list);
481 list_del_init(&t->tqent_list);
8f2503e0 482
44217f7a
PS
483 /* In order to support recursively dispatching a
484 * preallocated taskq_ent_t, tqent_id must be
485 * stored prior to executing tqent_func. */
e7e5f78e 486 tqt->tqt_id = t->tqent_id;
8f2503e0
PS
487
488 /* We must store a copy of the flags prior to
489 * servicing the task (servicing a prealloc'd task
490 * returns the ownership of the tqent back to
491 * the caller of taskq_dispatch). Thus,
492 * tqent_flags _may_ change within the call. */
493 tqt->tqt_flags = t->tqent_flags;
494
2c02b71b 495 taskq_insert_in_order(tq, tqt);
bcd68186 496 tq->tq_nactive++;
749045bb 497 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 498
499 /* Perform the requested task */
046a70c9 500 t->tqent_func(t->tqent_arg);
bcd68186 501
749045bb 502 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 503 tq->tq_nactive--;
2c02b71b 504 list_del_init(&tqt->tqt_active_list);
8f2503e0
PS
505
506 /* For prealloc'd tasks, we don't free anything. */
507 if ((tq->tq_flags & TASKQ_DYNAMIC) ||
508 !(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
509 task_done(tq, t);
bcd68186 510
7257ec41
BB
511 /* When the current lowest outstanding taskqid is
512 * done calculate the new lowest outstanding id */
e7e5f78e 513 if (tq->tq_lowest_id == tqt->tqt_id) {
bcd68186 514 tq->tq_lowest_id = taskq_lowest_id(tq);
e7e5f78e 515 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
bcd68186 516 }
517
e7e5f78e 518 tqt->tqt_id = 0;
8f2503e0 519 tqt->tqt_flags = 0;
bcd68186 520 wake_up_all(&tq->tq_wait_waitq);
521 }
522
523 set_current_state(TASK_INTERRUPTIBLE);
524
525 }
526
527 __set_current_state(TASK_RUNNING);
528 tq->tq_nthreads--;
2c02b71b
PS
529 list_del_init(&tqt->tqt_thread_list);
530 kmem_free(tqt, sizeof(taskq_thread_t));
531
749045bb 532 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 533
b17edc10 534 SRETURN(0);
bcd68186 535}
536
f1ca4da6 537taskq_t *
538__taskq_create(const char *name, int nthreads, pri_t pri,
539 int minalloc, int maxalloc, uint_t flags)
540{
bcd68186 541 taskq_t *tq;
2c02b71b 542 taskq_thread_t *tqt;
bcd68186 543 int rc = 0, i, j = 0;
b17edc10 544 SENTRY;
bcd68186 545
546 ASSERT(name != NULL);
547 ASSERT(pri <= maxclsyspri);
548 ASSERT(minalloc >= 0);
549 ASSERT(maxalloc <= INT_MAX);
550 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
551
915404bd
BB
552 /* Scale the number of threads using nthreads as a percentage */
553 if (flags & TASKQ_THREADS_CPU_PCT) {
554 ASSERT(nthreads <= 100);
555 ASSERT(nthreads >= 0);
556 nthreads = MIN(nthreads, 100);
557 nthreads = MAX(nthreads, 0);
558 nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
559 }
560
bcd68186 561 tq = kmem_alloc(sizeof(*tq), KM_SLEEP);
562 if (tq == NULL)
b17edc10 563 SRETURN(NULL);
bcd68186 564
bcd68186 565 spin_lock_init(&tq->tq_lock);
749045bb 566 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
2c02b71b
PS
567 INIT_LIST_HEAD(&tq->tq_thread_list);
568 INIT_LIST_HEAD(&tq->tq_active_list);
bcd68186 569 tq->tq_name = name;
570 tq->tq_nactive = 0;
571 tq->tq_nthreads = 0;
572 tq->tq_pri = pri;
573 tq->tq_minalloc = minalloc;
574 tq->tq_maxalloc = maxalloc;
575 tq->tq_nalloc = 0;
576 tq->tq_flags = (flags | TQ_ACTIVE);
577 tq->tq_next_id = 1;
578 tq->tq_lowest_id = 1;
579 INIT_LIST_HEAD(&tq->tq_free_list);
bcd68186 580 INIT_LIST_HEAD(&tq->tq_pend_list);
f0d8bb26 581 INIT_LIST_HEAD(&tq->tq_prio_list);
bcd68186 582 init_waitqueue_head(&tq->tq_work_waitq);
583 init_waitqueue_head(&tq->tq_wait_waitq);
584
585 if (flags & TASKQ_PREPOPULATE)
586 for (i = 0; i < minalloc; i++)
587 task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW));
6e605b6e 588
749045bb 589 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
6e605b6e 590
2c02b71b
PS
591 for (i = 0; i < nthreads; i++) {
592 tqt = kmem_alloc(sizeof(*tqt), KM_SLEEP);
593 INIT_LIST_HEAD(&tqt->tqt_thread_list);
594 INIT_LIST_HEAD(&tqt->tqt_active_list);
595 tqt->tqt_tq = tq;
e7e5f78e 596 tqt->tqt_id = 0;
2c02b71b
PS
597
598 tqt->tqt_thread = kthread_create(taskq_thread, tqt,
599 "%s/%d", name, i);
600 if (tqt->tqt_thread) {
601 list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
602 kthread_bind(tqt->tqt_thread, i % num_online_cpus());
603 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
604 wake_up_process(tqt->tqt_thread);
bcd68186 605 j++;
2c02b71b
PS
606 } else {
607 kmem_free(tqt, sizeof(taskq_thread_t));
608 rc = 1;
609 }
610 }
bcd68186 611
612 /* Wait for all threads to be started before potential destroy */
613 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
614
615 if (rc) {
616 __taskq_destroy(tq);
617 tq = NULL;
618 }
619
b17edc10 620 SRETURN(tq);
f1ca4da6 621}
f1b59d26 622EXPORT_SYMBOL(__taskq_create);
b123971f 623
624void
625__taskq_destroy(taskq_t *tq)
626{
2c02b71b
PS
627 struct task_struct *thread;
628 taskq_thread_t *tqt;
046a70c9 629 taskq_ent_t *t;
b17edc10 630 SENTRY;
b123971f 631
bcd68186 632 ASSERT(tq);
749045bb 633 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 634 tq->tq_flags &= ~TQ_ACTIVE;
749045bb 635 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 636
637 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
638 __taskq_wait(tq);
639
749045bb 640 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 641
2c02b71b
PS
642 /*
643 * Signal each thread to exit and block until it does. Each thread
644 * is responsible for removing itself from the list and freeing its
645 * taskq_thread_t. This allows for idle threads to opt to remove
646 * themselves from the taskq. They can be recreated as needed.
647 */
648 while (!list_empty(&tq->tq_thread_list)) {
649 tqt = list_entry(tq->tq_thread_list.next,
650 taskq_thread_t, tqt_thread_list);
651 thread = tqt->tqt_thread;
652 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
653
654 kthread_stop(thread);
655
656 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
657 }
658
bcd68186 659 while (!list_empty(&tq->tq_free_list)) {
046a70c9 660 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
44217f7a
PS
661
662 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
663
046a70c9 664 list_del_init(&t->tqent_list);
bcd68186 665 task_free(tq, t);
666 }
667
668 ASSERT(tq->tq_nthreads == 0);
669 ASSERT(tq->tq_nalloc == 0);
2c02b71b
PS
670 ASSERT(list_empty(&tq->tq_thread_list));
671 ASSERT(list_empty(&tq->tq_active_list));
bcd68186 672 ASSERT(list_empty(&tq->tq_free_list));
bcd68186 673 ASSERT(list_empty(&tq->tq_pend_list));
f0d8bb26 674 ASSERT(list_empty(&tq->tq_prio_list));
bcd68186 675
749045bb 676 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
2c02b71b 677
bcd68186 678 kmem_free(tq, sizeof(taskq_t));
679
b17edc10 680 SEXIT;
b123971f 681}
bcd68186 682EXPORT_SYMBOL(__taskq_destroy);
e9cb2b4f
BB
683
684int
685spl_taskq_init(void)
686{
b17edc10 687 SENTRY;
e9cb2b4f 688
f220894e
BB
689 /* Solaris creates a dynamic taskq of up to 64 threads, however in
690 * a Linux environment 1 thread per-core is usually about right */
691 system_taskq = taskq_create("spl_system_taskq", num_online_cpus(),
692 minclsyspri, 4, 512, TASKQ_PREPOPULATE);
e9cb2b4f 693 if (system_taskq == NULL)
b17edc10 694 SRETURN(1);
e9cb2b4f 695
b17edc10 696 SRETURN(0);
e9cb2b4f
BB
697}
698
699void
700spl_taskq_fini(void)
701{
b17edc10 702 SENTRY;
e9cb2b4f 703 taskq_destroy(system_taskq);
b17edc10 704 SEXIT;
e9cb2b4f 705}