]> git.proxmox.com Git - mirror_spl.git/blame - module/spl/spl-taskq.c
Make taskq_wait() block until the queue is empty
[mirror_spl.git] / module / spl / spl-taskq.c
CommitLineData
716154c5
BB
1/*****************************************************************************\
2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3 * Copyright (C) 2007 The Regents of the University of California.
4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
715f6251 6 * UCRL-CODE-235197
7 *
716154c5 8 * This file is part of the SPL, Solaris Porting Layer.
3d6af2dd 9 * For details, see <http://zfsonlinux.org/>.
715f6251 10 *
716154c5
BB
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
15 *
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
715f6251 17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 * for more details.
20 *
21 * You should have received a copy of the GNU General Public License along
716154c5
BB
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *****************************************************************************
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25\*****************************************************************************/
715f6251 26
f4b37741 27#include <sys/taskq.h>
3d061e9d 28#include <sys/kmem.h>
937879f1 29
703371d8
AV
30int spl_taskq_thread_bind = 0;
31module_param(spl_taskq_thread_bind, int, 0644);
32MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
33
e9cb2b4f
BB
34/* Global system-wide dynamic task queue available for all consumers */
35taskq_t *system_taskq;
36EXPORT_SYMBOL(system_taskq);
37
9b51f218
BB
38static int
39task_km_flags(uint_t flags)
40{
41 if (flags & TQ_NOSLEEP)
42 return KM_NOSLEEP;
43
44 if (flags & TQ_PUSHPAGE)
45 return KM_PUSHPAGE;
46
47 return KM_SLEEP;
48}
49
82387586
BB
50/*
51 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
bcd68186 52 * is not attached to the free, work, or pending taskq lists.
f1ca4da6 53 */
046a70c9 54static taskq_ent_t *
bcd68186 55task_alloc(taskq_t *tq, uint_t flags)
56{
472a34ca
BB
57 taskq_ent_t *t;
58 int count = 0;
bcd68186 59
472a34ca
BB
60 ASSERT(tq);
61 ASSERT(spin_is_locked(&tq->tq_lock));
bcd68186 62retry:
472a34ca
BB
63 /* Acquire taskq_ent_t's from free list if available */
64 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
65 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
66
67 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
d9acd930
BB
68 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
69 ASSERT(!timer_pending(&t->tqent_timer));
472a34ca
BB
70
71 list_del_init(&t->tqent_list);
8d9a23e8 72 return (t);
472a34ca
BB
73 }
74
75 /* Free list is empty and memory allocations are prohibited */
76 if (flags & TQ_NOALLOC)
8d9a23e8 77 return (NULL);
472a34ca
BB
78
79 /* Hit maximum taskq_ent_t pool size */
80 if (tq->tq_nalloc >= tq->tq_maxalloc) {
81 if (flags & TQ_NOSLEEP)
8d9a23e8 82 return (NULL);
472a34ca
BB
83
84 /*
85 * Sleep periodically polling the free list for an available
86 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
87 * but we cannot block forever waiting for an taskq_ent_t to
88 * show up in the free list, otherwise a deadlock can happen.
89 *
90 * Therefore, we need to allocate a new task even if the number
91 * of allocated tasks is above tq->tq_maxalloc, but we still
92 * end up delaying the task allocation by one second, thereby
93 * throttling the task dispatch rate.
94 */
95 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
96 schedule_timeout(HZ / 100);
97 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
8d9a23e8
BB
98 if (count < 100) {
99 count++;
100 goto retry;
101 }
472a34ca
BB
102 }
103
104 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
105 t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags));
106 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
107
108 if (t) {
109 taskq_init_ent(t);
110 tq->tq_nalloc++;
111 }
112
8d9a23e8 113 return (t);
bcd68186 114}
115
82387586 116/*
046a70c9 117 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
bcd68186 118 * to already be removed from the free, work, or pending taskq lists.
119 */
120static void
046a70c9 121task_free(taskq_t *tq, taskq_ent_t *t)
bcd68186 122{
472a34ca
BB
123 ASSERT(tq);
124 ASSERT(t);
bcd68186 125 ASSERT(spin_is_locked(&tq->tq_lock));
046a70c9 126 ASSERT(list_empty(&t->tqent_list));
d9acd930 127 ASSERT(!timer_pending(&t->tqent_timer));
bcd68186 128
472a34ca
BB
129 kmem_free(t, sizeof(taskq_ent_t));
130 tq->tq_nalloc--;
bcd68186 131}
132
82387586
BB
133/*
134 * NOTE: Must be called with tq->tq_lock held, either destroys the
046a70c9 135 * taskq_ent_t if too many exist or moves it to the free list for later use.
bcd68186 136 */
f1ca4da6 137static void
046a70c9 138task_done(taskq_t *tq, taskq_ent_t *t)
f1ca4da6 139{
bcd68186 140 ASSERT(tq);
141 ASSERT(t);
142 ASSERT(spin_is_locked(&tq->tq_lock));
143
d9acd930
BB
144 /* Wake tasks blocked in taskq_wait_id() */
145 wake_up_all(&t->tqent_waitq);
146
046a70c9 147 list_del_init(&t->tqent_list);
f1ca4da6 148
472a34ca 149 if (tq->tq_nalloc <= tq->tq_minalloc) {
046a70c9
PS
150 t->tqent_id = 0;
151 t->tqent_func = NULL;
152 t->tqent_arg = NULL;
44217f7a 153 t->tqent_flags = 0;
8f2503e0 154
472a34ca 155 list_add_tail(&t->tqent_list, &tq->tq_free_list);
bcd68186 156 } else {
157 task_free(tq, t);
158 }
f1ca4da6 159}
160
82387586 161/*
d9acd930
BB
162 * When a delayed task timer expires remove it from the delay list and
163 * add it to the priority list in order for immediate processing.
bcd68186 164 */
d9acd930
BB
165static void
166task_expire(unsigned long data)
bcd68186 167{
d9acd930
BB
168 taskq_ent_t *w, *t = (taskq_ent_t *)data;
169 taskq_t *tq = t->tqent_taskq;
170 struct list_head *l;
7257ec41
BB
171
172 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
d9acd930
BB
173
174 if (t->tqent_flags & TQENT_FLAG_CANCEL) {
175 ASSERT(list_empty(&t->tqent_list));
176 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
177 return;
178 }
179
180 /*
181 * The priority list must be maintained in strict task id order
182 * from lowest to highest for lowest_id to be easily calculable.
183 */
184 list_del(&t->tqent_list);
185 list_for_each_prev(l, &tq->tq_prio_list) {
186 w = list_entry(l, taskq_ent_t, tqent_list);
187 if (w->tqent_id < t->tqent_id) {
188 list_add(&t->tqent_list, l);
189 break;
190 }
191 }
192 if (l == &tq->tq_prio_list)
193 list_add(&t->tqent_list, &tq->tq_prio_list);
194
7257ec41
BB
195 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
196
d9acd930
BB
197 wake_up(&tq->tq_work_waitq);
198}
199
200/*
201 * Returns the lowest incomplete taskqid_t. The taskqid_t may
202 * be queued on the pending list, on the priority list, on the
203 * delay list, or on the work list currently being handled, but
204 * it is not 100% complete yet.
205 */
206static taskqid_t
207taskq_lowest_id(taskq_t *tq)
208{
209 taskqid_t lowest_id = tq->tq_next_id;
210 taskq_ent_t *t;
211 taskq_thread_t *tqt;
d9acd930
BB
212
213 ASSERT(tq);
214 ASSERT(spin_is_locked(&tq->tq_lock));
215
216 if (!list_empty(&tq->tq_pend_list)) {
217 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
218 lowest_id = MIN(lowest_id, t->tqent_id);
219 }
220
221 if (!list_empty(&tq->tq_prio_list)) {
222 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
223 lowest_id = MIN(lowest_id, t->tqent_id);
224 }
225
226 if (!list_empty(&tq->tq_delay_list)) {
227 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
228 lowest_id = MIN(lowest_id, t->tqent_id);
229 }
230
231 if (!list_empty(&tq->tq_active_list)) {
232 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
233 tqt_active_list);
234 ASSERT(tqt->tqt_id != 0);
235 lowest_id = MIN(lowest_id, tqt->tqt_id);
236 }
237
8d9a23e8 238 return (lowest_id);
d9acd930
BB
239}
240
241/*
242 * Insert a task into a list keeping the list sorted by increasing taskqid.
243 */
244static void
245taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
246{
247 taskq_thread_t *w;
248 struct list_head *l;
249
d9acd930
BB
250 ASSERT(tq);
251 ASSERT(tqt);
252 ASSERT(spin_is_locked(&tq->tq_lock));
253
254 list_for_each_prev(l, &tq->tq_active_list) {
255 w = list_entry(l, taskq_thread_t, tqt_active_list);
256 if (w->tqt_id < tqt->tqt_id) {
257 list_add(&tqt->tqt_active_list, l);
258 break;
259 }
260 }
261 if (l == &tq->tq_active_list)
262 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
d9acd930
BB
263}
264
265/*
266 * Find and return a task from the given list if it exists. The list
267 * must be in lowest to highest task id order.
268 */
269static taskq_ent_t *
270taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
271{
272 struct list_head *l;
273 taskq_ent_t *t;
d9acd930
BB
274
275 ASSERT(spin_is_locked(&tq->tq_lock));
276
277 list_for_each(l, lh) {
278 t = list_entry(l, taskq_ent_t, tqent_list);
279
280 if (t->tqent_id == id)
8d9a23e8 281 return (t);
d9acd930
BB
282
283 if (t->tqent_id > id)
284 break;
285 }
286
8d9a23e8 287 return (NULL);
bcd68186 288}
289
d9acd930
BB
290/*
291 * Find an already dispatched task given the task id regardless of what
292 * state it is in. If a task is still pending or executing it will be
293 * returned and 'active' set appropriately. If the task has already
294 * been run then NULL is returned.
295 */
296static taskq_ent_t *
297taskq_find(taskq_t *tq, taskqid_t id, int *active)
298{
299 taskq_thread_t *tqt;
300 struct list_head *l;
301 taskq_ent_t *t;
d9acd930
BB
302
303 ASSERT(spin_is_locked(&tq->tq_lock));
304 *active = 0;
305
306 t = taskq_find_list(tq, &tq->tq_delay_list, id);
307 if (t)
8d9a23e8 308 return (t);
d9acd930
BB
309
310 t = taskq_find_list(tq, &tq->tq_prio_list, id);
311 if (t)
8d9a23e8 312 return (t);
d9acd930
BB
313
314 t = taskq_find_list(tq, &tq->tq_pend_list, id);
315 if (t)
8d9a23e8 316 return (t);
d9acd930
BB
317
318 list_for_each(l, &tq->tq_active_list) {
319 tqt = list_entry(l, taskq_thread_t, tqt_active_list);
320 if (tqt->tqt_id == id) {
321 t = tqt->tqt_task;
322 *active = 1;
8d9a23e8 323 return (t);
d9acd930
BB
324 }
325 }
326
8d9a23e8 327 return (NULL);
d9acd930
BB
328}
329
a876b030
CD
330/*
331 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
332 * taskq_wait() functions below.
333 *
334 * Taskq waiting is accomplished by tracking the lowest outstanding task
335 * id and the next available task id. As tasks are dispatched they are
336 * added to the tail of the pending, priority, or delay lists. As worker
337 * threads become available the tasks are removed from the heads of these
338 * lists and linked to the worker threads. This ensures the lists are
339 * kept sorted by lowest to highest task id.
340 *
341 * Therefore the lowest outstanding task id can be quickly determined by
342 * checking the head item from all of these lists. This value is stored
343 * with the taskq as the lowest id. It only needs to be recalculated when
344 * either the task with the current lowest id completes or is canceled.
345 *
346 * By blocking until the lowest task id exceeds the passed task id the
347 * taskq_wait_outstanding() function can be easily implemented. Similarly,
348 * by blocking until the lowest task id matches the next task id taskq_wait()
349 * can be implemented.
350 *
351 * Callers should be aware that when there are multiple worked threads it
352 * is possible for larger task ids to complete before smaller ones. Also
353 * when the taskq contains delay tasks with small task ids callers may
354 * block for a considerable length of time waiting for them to expire and
355 * execute.
356 */
99c452bb
BB
357static int
358taskq_wait_id_check(taskq_t *tq, taskqid_t id)
f1ca4da6 359{
d9acd930 360 int active = 0;
99c452bb 361 int rc;
bcd68186 362
d9acd930 363 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
99c452bb 364 rc = (taskq_find(tq, id, &active) == NULL);
d9acd930
BB
365 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
366
99c452bb
BB
367 return (rc);
368}
bcd68186 369
99c452bb
BB
370/*
371 * The taskq_wait_id() function blocks until the passed task id completes.
372 * This does not guarantee that all lower task ids have completed.
373 */
374void
375taskq_wait_id(taskq_t *tq, taskqid_t id)
376{
377 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
bcd68186 378}
aed8671c 379EXPORT_SYMBOL(taskq_wait_id);
bcd68186 380
d9acd930 381static int
a876b030 382taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
d9acd930
BB
383{
384 int rc;
385
386 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
387 rc = (id < tq->tq_lowest_id);
388 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
389
8d9a23e8 390 return (rc);
d9acd930
BB
391}
392
a876b030
CD
393/*
394 * The taskq_wait_outstanding() function will block until all tasks with a
395 * lower taskqid than the passed 'id' have been completed. Note that all
396 * task id's are assigned monotonically at dispatch time. Zero may be
397 * passed for the id to indicate all tasks dispatch up to this point,
398 * but not after, should be waited for.
399 */
d9acd930 400void
a876b030 401taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
d9acd930 402{
a876b030
CD
403 wait_event(tq->tq_wait_waitq,
404 taskq_wait_outstanding_check(tq, id ? id : tq->tq_next_id - 1));
d9acd930 405}
a876b030 406EXPORT_SYMBOL(taskq_wait_outstanding);
d9acd930 407
a876b030
CD
408static int
409taskq_wait_check(taskq_t *tq)
bcd68186 410{
a876b030 411 int rc;
bcd68186 412
749045bb 413 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
a876b030 414 rc = (tq->tq_lowest_id == tq->tq_next_id);
749045bb 415 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 416
a876b030
CD
417 return (rc);
418}
419
420/*
421 * The taskq_wait() function will block until the taskq is empty.
422 * This means that if a taskq re-dispatches work to itself taskq_wait()
423 * callers will block indefinitely.
424 */
425void
426taskq_wait(taskq_t *tq)
427{
428 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
bcd68186 429}
aed8671c 430EXPORT_SYMBOL(taskq_wait);
bcd68186 431
432int
aed8671c 433taskq_member(taskq_t *tq, void *t)
bcd68186 434{
2c02b71b
PS
435 struct list_head *l;
436 taskq_thread_t *tqt;
bcd68186 437
438 ASSERT(tq);
472a34ca 439 ASSERT(t);
bcd68186 440
2c02b71b
PS
441 list_for_each(l, &tq->tq_thread_list) {
442 tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
443 if (tqt->tqt_thread == (struct task_struct *)t)
8d9a23e8 444 return (1);
2c02b71b 445 }
bcd68186 446
8d9a23e8 447 return (0);
bcd68186 448}
aed8671c 449EXPORT_SYMBOL(taskq_member);
bcd68186 450
d9acd930
BB
451/*
452 * Cancel an already dispatched task given the task id. Still pending tasks
453 * will be immediately canceled, and if the task is active the function will
454 * block until it completes. Preallocated tasks which are canceled must be
455 * freed by the caller.
456 */
457int
458taskq_cancel_id(taskq_t *tq, taskqid_t id)
459{
460 taskq_ent_t *t;
461 int active = 0;
462 int rc = ENOENT;
d9acd930
BB
463
464 ASSERT(tq);
465
466 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
467 t = taskq_find(tq, id, &active);
468 if (t && !active) {
469 list_del_init(&t->tqent_list);
470 t->tqent_flags |= TQENT_FLAG_CANCEL;
471
472 /*
473 * When canceling the lowest outstanding task id we
474 * must recalculate the new lowest outstanding id.
475 */
476 if (tq->tq_lowest_id == t->tqent_id) {
477 tq->tq_lowest_id = taskq_lowest_id(tq);
478 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
479 }
480
481 /*
482 * The task_expire() function takes the tq->tq_lock so drop
483 * drop the lock before synchronously cancelling the timer.
484 */
485 if (timer_pending(&t->tqent_timer)) {
486 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
487 del_timer_sync(&t->tqent_timer);
488 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
489 }
490
491 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
492 task_done(tq, t);
493
494 rc = 0;
495 }
496 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
497
498 if (active) {
499 taskq_wait_id(tq, id);
500 rc = EBUSY;
501 }
502
8d9a23e8 503 return (rc);
d9acd930
BB
504}
505EXPORT_SYMBOL(taskq_cancel_id);
506
bcd68186 507taskqid_t
aed8671c 508taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
bcd68186 509{
472a34ca 510 taskq_ent_t *t;
bcd68186 511 taskqid_t rc = 0;
f1ca4da6 512
472a34ca
BB
513 ASSERT(tq);
514 ASSERT(func);
d05ec4b4 515
472a34ca 516 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
f1ca4da6 517
bcd68186 518 /* Taskq being destroyed and all tasks drained */
519 if (!(tq->tq_flags & TQ_ACTIVE))
8d9a23e8 520 goto out;
f1ca4da6 521
bcd68186 522 /* Do not queue the task unless there is idle thread for it */
523 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
524 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
8d9a23e8 525 goto out;
bcd68186 526
472a34ca 527 if ((t = task_alloc(tq, flags)) == NULL)
8d9a23e8 528 goto out;
f1ca4da6 529
046a70c9 530 spin_lock(&t->tqent_lock);
f0d8bb26
NB
531
532 /* Queue to the priority list instead of the pending list */
533 if (flags & TQ_FRONT)
046a70c9 534 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
f0d8bb26 535 else
046a70c9 536 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
f0d8bb26 537
046a70c9 538 t->tqent_id = rc = tq->tq_next_id;
bcd68186 539 tq->tq_next_id++;
472a34ca
BB
540 t->tqent_func = func;
541 t->tqent_arg = arg;
d9acd930
BB
542 t->tqent_taskq = tq;
543 t->tqent_timer.data = 0;
544 t->tqent_timer.function = NULL;
545 t->tqent_timer.expires = 0;
44217f7a
PS
546
547 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
548
046a70c9 549 spin_unlock(&t->tqent_lock);
0bb43ca2
NB
550
551 wake_up(&tq->tq_work_waitq);
bcd68186 552out:
749045bb 553 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
8d9a23e8 554 return (rc);
f1ca4da6 555}
aed8671c 556EXPORT_SYMBOL(taskq_dispatch);
44217f7a 557
d9acd930
BB
558taskqid_t
559taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
560 uint_t flags, clock_t expire_time)
561{
d9acd930 562 taskqid_t rc = 0;
8d9a23e8 563 taskq_ent_t *t;
d9acd930
BB
564
565 ASSERT(tq);
566 ASSERT(func);
567
568 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
569
570 /* Taskq being destroyed and all tasks drained */
571 if (!(tq->tq_flags & TQ_ACTIVE))
8d9a23e8 572 goto out;
d9acd930
BB
573
574 if ((t = task_alloc(tq, flags)) == NULL)
8d9a23e8 575 goto out;
d9acd930
BB
576
577 spin_lock(&t->tqent_lock);
578
579 /* Queue to the delay list for subsequent execution */
580 list_add_tail(&t->tqent_list, &tq->tq_delay_list);
581
582 t->tqent_id = rc = tq->tq_next_id;
583 tq->tq_next_id++;
584 t->tqent_func = func;
585 t->tqent_arg = arg;
586 t->tqent_taskq = tq;
587 t->tqent_timer.data = (unsigned long)t;
588 t->tqent_timer.function = task_expire;
589 t->tqent_timer.expires = (unsigned long)expire_time;
590 add_timer(&t->tqent_timer);
591
592 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
593
594 spin_unlock(&t->tqent_lock);
595out:
596 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
8d9a23e8 597 return (rc);
d9acd930
BB
598}
599EXPORT_SYMBOL(taskq_dispatch_delay);
600
44217f7a 601void
aed8671c 602taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
44217f7a
PS
603 taskq_ent_t *t)
604{
44217f7a
PS
605 ASSERT(tq);
606 ASSERT(func);
607 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
608
609 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
610
611 /* Taskq being destroyed and all tasks drained */
612 if (!(tq->tq_flags & TQ_ACTIVE)) {
613 t->tqent_id = 0;
614 goto out;
615 }
616
617 spin_lock(&t->tqent_lock);
618
619 /*
620 * Mark it as a prealloc'd task. This is important
621 * to ensure that we don't free it later.
622 */
623 t->tqent_flags |= TQENT_FLAG_PREALLOC;
624
625 /* Queue to the priority list instead of the pending list */
626 if (flags & TQ_FRONT)
627 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
628 else
629 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
630
631 t->tqent_id = tq->tq_next_id;
632 tq->tq_next_id++;
633 t->tqent_func = func;
634 t->tqent_arg = arg;
d9acd930 635 t->tqent_taskq = tq;
44217f7a
PS
636
637 spin_unlock(&t->tqent_lock);
638
639 wake_up(&tq->tq_work_waitq);
640out:
0bb43ca2 641 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
44217f7a 642}
aed8671c 643EXPORT_SYMBOL(taskq_dispatch_ent);
44217f7a
PS
644
645int
aed8671c 646taskq_empty_ent(taskq_ent_t *t)
44217f7a
PS
647{
648 return list_empty(&t->tqent_list);
649}
aed8671c 650EXPORT_SYMBOL(taskq_empty_ent);
44217f7a
PS
651
652void
aed8671c 653taskq_init_ent(taskq_ent_t *t)
44217f7a
PS
654{
655 spin_lock_init(&t->tqent_lock);
d9acd930
BB
656 init_waitqueue_head(&t->tqent_waitq);
657 init_timer(&t->tqent_timer);
44217f7a
PS
658 INIT_LIST_HEAD(&t->tqent_list);
659 t->tqent_id = 0;
660 t->tqent_func = NULL;
661 t->tqent_arg = NULL;
662 t->tqent_flags = 0;
d9acd930 663 t->tqent_taskq = NULL;
44217f7a 664}
aed8671c 665EXPORT_SYMBOL(taskq_init_ent);
44217f7a 666
bcd68186 667static int
668taskq_thread(void *args)
669{
472a34ca
BB
670 DECLARE_WAITQUEUE(wait, current);
671 sigset_t blocked;
2c02b71b 672 taskq_thread_t *tqt = args;
472a34ca
BB
673 taskq_t *tq;
674 taskq_ent_t *t;
f0d8bb26 675 struct list_head *pend_list;
bcd68186 676
472a34ca 677 ASSERT(tqt);
2c02b71b 678 tq = tqt->tqt_tq;
472a34ca 679 current->flags |= PF_NOFREEZE;
bcd68186 680
472a34ca
BB
681 sigfillset(&blocked);
682 sigprocmask(SIG_BLOCK, &blocked, NULL);
683 flush_signals(current);
bcd68186 684
472a34ca
BB
685 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
686 tq->tq_nthreads++;
687 wake_up(&tq->tq_wait_waitq);
688 set_current_state(TASK_INTERRUPTIBLE);
bcd68186 689
472a34ca 690 while (!kthread_should_stop()) {
bcd68186 691
f0d8bb26
NB
692 if (list_empty(&tq->tq_pend_list) &&
693 list_empty(&tq->tq_prio_list)) {
3c6ed541 694 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
749045bb 695 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 696 schedule();
749045bb 697 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
3c6ed541 698 remove_wait_queue(&tq->tq_work_waitq, &wait);
bcd68186 699 } else {
700 __set_current_state(TASK_RUNNING);
701 }
702
f0d8bb26
NB
703
704 if (!list_empty(&tq->tq_prio_list))
705 pend_list = &tq->tq_prio_list;
706 else if (!list_empty(&tq->tq_pend_list))
707 pend_list = &tq->tq_pend_list;
708 else
709 pend_list = NULL;
710
711 if (pend_list) {
472a34ca
BB
712 t = list_entry(pend_list->next,taskq_ent_t,tqent_list);
713 list_del_init(&t->tqent_list);
8f2503e0 714
44217f7a
PS
715 /* In order to support recursively dispatching a
716 * preallocated taskq_ent_t, tqent_id must be
717 * stored prior to executing tqent_func. */
e7e5f78e 718 tqt->tqt_id = t->tqent_id;
d9acd930 719 tqt->tqt_task = t;
8f2503e0
PS
720
721 /* We must store a copy of the flags prior to
722 * servicing the task (servicing a prealloc'd task
723 * returns the ownership of the tqent back to
724 * the caller of taskq_dispatch). Thus,
725 * tqent_flags _may_ change within the call. */
726 tqt->tqt_flags = t->tqent_flags;
727
2c02b71b 728 taskq_insert_in_order(tq, tqt);
472a34ca 729 tq->tq_nactive++;
749045bb 730 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 731
732 /* Perform the requested task */
472a34ca 733 t->tqent_func(t->tqent_arg);
bcd68186 734
749045bb 735 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
472a34ca 736 tq->tq_nactive--;
2c02b71b 737 list_del_init(&tqt->tqt_active_list);
d9acd930 738 tqt->tqt_task = NULL;
8f2503e0
PS
739
740 /* For prealloc'd tasks, we don't free anything. */
741 if ((tq->tq_flags & TASKQ_DYNAMIC) ||
742 !(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
743 task_done(tq, t);
bcd68186 744
7257ec41
BB
745 /* When the current lowest outstanding taskqid is
746 * done calculate the new lowest outstanding id */
e7e5f78e 747 if (tq->tq_lowest_id == tqt->tqt_id) {
bcd68186 748 tq->tq_lowest_id = taskq_lowest_id(tq);
e7e5f78e 749 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
bcd68186 750 }
751
e7e5f78e 752 tqt->tqt_id = 0;
8f2503e0 753 tqt->tqt_flags = 0;
472a34ca 754 wake_up_all(&tq->tq_wait_waitq);
bcd68186 755 }
756
757 set_current_state(TASK_INTERRUPTIBLE);
758
472a34ca 759 }
bcd68186 760
761 __set_current_state(TASK_RUNNING);
472a34ca 762 tq->tq_nthreads--;
2c02b71b
PS
763 list_del_init(&tqt->tqt_thread_list);
764 kmem_free(tqt, sizeof(taskq_thread_t));
765
472a34ca 766 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 767
8d9a23e8 768 return (0);
bcd68186 769}
770
f1ca4da6 771taskq_t *
aed8671c 772taskq_create(const char *name, int nthreads, pri_t pri,
472a34ca 773 int minalloc, int maxalloc, uint_t flags)
f1ca4da6 774{
703371d8 775 static int last_used_cpu = 0;
472a34ca 776 taskq_t *tq;
2c02b71b 777 taskq_thread_t *tqt;
472a34ca 778 int rc = 0, i, j = 0;
bcd68186 779
472a34ca
BB
780 ASSERT(name != NULL);
781 ASSERT(pri <= maxclsyspri);
782 ASSERT(minalloc >= 0);
783 ASSERT(maxalloc <= INT_MAX);
784 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
bcd68186 785
915404bd
BB
786 /* Scale the number of threads using nthreads as a percentage */
787 if (flags & TASKQ_THREADS_CPU_PCT) {
788 ASSERT(nthreads <= 100);
789 ASSERT(nthreads >= 0);
790 nthreads = MIN(nthreads, 100);
791 nthreads = MAX(nthreads, 0);
792 nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
793 }
794
472a34ca
BB
795 tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE);
796 if (tq == NULL)
8d9a23e8 797 return (NULL);
bcd68186 798
472a34ca
BB
799 spin_lock_init(&tq->tq_lock);
800 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
801 INIT_LIST_HEAD(&tq->tq_thread_list);
802 INIT_LIST_HEAD(&tq->tq_active_list);
803 tq->tq_name = name;
804 tq->tq_nactive = 0;
bcd68186 805 tq->tq_nthreads = 0;
472a34ca
BB
806 tq->tq_pri = pri;
807 tq->tq_minalloc = minalloc;
808 tq->tq_maxalloc = maxalloc;
bcd68186 809 tq->tq_nalloc = 0;
472a34ca 810 tq->tq_flags = (flags | TQ_ACTIVE);
bcd68186 811 tq->tq_next_id = 1;
812 tq->tq_lowest_id = 1;
472a34ca
BB
813 INIT_LIST_HEAD(&tq->tq_free_list);
814 INIT_LIST_HEAD(&tq->tq_pend_list);
815 INIT_LIST_HEAD(&tq->tq_prio_list);
d9acd930 816 INIT_LIST_HEAD(&tq->tq_delay_list);
472a34ca
BB
817 init_waitqueue_head(&tq->tq_work_waitq);
818 init_waitqueue_head(&tq->tq_wait_waitq);
bcd68186 819
472a34ca
BB
820 if (flags & TASKQ_PREPOPULATE)
821 for (i = 0; i < minalloc; i++)
822 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW));
6e605b6e 823
472a34ca 824 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
6e605b6e 825
2c02b71b 826 for (i = 0; i < nthreads; i++) {
9b51f218 827 tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE);
2c02b71b
PS
828 INIT_LIST_HEAD(&tqt->tqt_thread_list);
829 INIT_LIST_HEAD(&tqt->tqt_active_list);
830 tqt->tqt_tq = tq;
e7e5f78e 831 tqt->tqt_id = 0;
2c02b71b 832
17a527cb 833 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
472a34ca 834 "%s/%d", name, i);
2c02b71b
PS
835 if (tqt->tqt_thread) {
836 list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
703371d8
AV
837 if (spl_taskq_thread_bind) {
838 last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
839 kthread_bind(tqt->tqt_thread, last_used_cpu);
840 }
2c02b71b
PS
841 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
842 wake_up_process(tqt->tqt_thread);
bcd68186 843 j++;
2c02b71b
PS
844 } else {
845 kmem_free(tqt, sizeof(taskq_thread_t));
846 rc = 1;
847 }
848 }
bcd68186 849
472a34ca 850 /* Wait for all threads to be started before potential destroy */
bcd68186 851 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
852
472a34ca 853 if (rc) {
aed8671c 854 taskq_destroy(tq);
472a34ca
BB
855 tq = NULL;
856 }
bcd68186 857
8d9a23e8 858 return (tq);
f1ca4da6 859}
aed8671c 860EXPORT_SYMBOL(taskq_create);
b123971f 861
862void
aed8671c 863taskq_destroy(taskq_t *tq)
b123971f 864{
2c02b71b
PS
865 struct task_struct *thread;
866 taskq_thread_t *tqt;
046a70c9 867 taskq_ent_t *t;
b123971f 868
bcd68186 869 ASSERT(tq);
749045bb 870 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
472a34ca 871 tq->tq_flags &= ~TQ_ACTIVE;
749045bb 872 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 873
874 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
aed8671c 875 taskq_wait(tq);
bcd68186 876
472a34ca 877 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 878
2c02b71b
PS
879 /*
880 * Signal each thread to exit and block until it does. Each thread
881 * is responsible for removing itself from the list and freeing its
882 * taskq_thread_t. This allows for idle threads to opt to remove
883 * themselves from the taskq. They can be recreated as needed.
884 */
885 while (!list_empty(&tq->tq_thread_list)) {
886 tqt = list_entry(tq->tq_thread_list.next,
887 taskq_thread_t, tqt_thread_list);
888 thread = tqt->tqt_thread;
889 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
890
891 kthread_stop(thread);
892
472a34ca 893 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
2c02b71b
PS
894 }
895
472a34ca 896 while (!list_empty(&tq->tq_free_list)) {
046a70c9 897 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
44217f7a
PS
898
899 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
900
472a34ca
BB
901 list_del_init(&t->tqent_list);
902 task_free(tq, t);
903 }
bcd68186 904
472a34ca
BB
905 ASSERT(tq->tq_nthreads == 0);
906 ASSERT(tq->tq_nalloc == 0);
907 ASSERT(list_empty(&tq->tq_thread_list));
908 ASSERT(list_empty(&tq->tq_active_list));
909 ASSERT(list_empty(&tq->tq_free_list));
910 ASSERT(list_empty(&tq->tq_pend_list));
911 ASSERT(list_empty(&tq->tq_prio_list));
d9acd930 912 ASSERT(list_empty(&tq->tq_delay_list));
bcd68186 913
472a34ca 914 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
2c02b71b 915
472a34ca 916 kmem_free(tq, sizeof(taskq_t));
b123971f 917}
aed8671c 918EXPORT_SYMBOL(taskq_destroy);
e9cb2b4f
BB
919
920int
921spl_taskq_init(void)
922{
f220894e
BB
923 /* Solaris creates a dynamic taskq of up to 64 threads, however in
924 * a Linux environment 1 thread per-core is usually about right */
472a34ca 925 system_taskq = taskq_create("spl_system_taskq", num_online_cpus(),
f220894e 926 minclsyspri, 4, 512, TASKQ_PREPOPULATE);
e9cb2b4f 927 if (system_taskq == NULL)
8d9a23e8 928 return (1);
e9cb2b4f 929
8d9a23e8 930 return (0);
e9cb2b4f
BB
931}
932
933void
934spl_taskq_fini(void)
935{
e9cb2b4f 936 taskq_destroy(system_taskq);
e9cb2b4f 937}