]> git.proxmox.com Git - mirror_spl-debian.git/blame - module/spl/spl-taskq.c
Imported Upstream version 0.6.2
[mirror_spl-debian.git] / module / spl / spl-taskq.c
CommitLineData
716154c5
BB
1/*****************************************************************************\
2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3 * Copyright (C) 2007 The Regents of the University of California.
4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
715f6251 6 * UCRL-CODE-235197
7 *
716154c5 8 * This file is part of the SPL, Solaris Porting Layer.
3d6af2dd 9 * For details, see <http://zfsonlinux.org/>.
715f6251 10 *
716154c5
BB
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
15 *
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
715f6251 17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 * for more details.
20 *
21 * You should have received a copy of the GNU General Public License along
716154c5
BB
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *****************************************************************************
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25\*****************************************************************************/
715f6251 26
f4b37741 27#include <sys/taskq.h>
3d061e9d 28#include <sys/kmem.h>
55abb092 29#include <spl-debug.h>
f1ca4da6 30
b17edc10
BB
31#ifdef SS_DEBUG_SUBSYS
32#undef SS_DEBUG_SUBSYS
937879f1 33#endif
34
b17edc10 35#define SS_DEBUG_SUBSYS SS_TASKQ
937879f1 36
e9cb2b4f
BB
37/* Global system-wide dynamic task queue available for all consumers */
38taskq_t *system_taskq;
39EXPORT_SYMBOL(system_taskq);
40
9b51f218
BB
41static int
42task_km_flags(uint_t flags)
43{
44 if (flags & TQ_NOSLEEP)
45 return KM_NOSLEEP;
46
47 if (flags & TQ_PUSHPAGE)
48 return KM_PUSHPAGE;
49
50 return KM_SLEEP;
51}
52
82387586
BB
53/*
54 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
bcd68186 55 * is not attached to the free, work, or pending taskq lists.
f1ca4da6 56 */
046a70c9 57static taskq_ent_t *
bcd68186 58task_alloc(taskq_t *tq, uint_t flags)
59{
472a34ca
BB
60 taskq_ent_t *t;
61 int count = 0;
62 SENTRY;
bcd68186 63
472a34ca
BB
64 ASSERT(tq);
65 ASSERT(spin_is_locked(&tq->tq_lock));
bcd68186 66retry:
472a34ca
BB
67 /* Acquire taskq_ent_t's from free list if available */
68 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
69 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
70
71 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
d9acd930
BB
72 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
73 ASSERT(!timer_pending(&t->tqent_timer));
472a34ca
BB
74
75 list_del_init(&t->tqent_list);
76 SRETURN(t);
77 }
78
79 /* Free list is empty and memory allocations are prohibited */
80 if (flags & TQ_NOALLOC)
81 SRETURN(NULL);
82
83 /* Hit maximum taskq_ent_t pool size */
84 if (tq->tq_nalloc >= tq->tq_maxalloc) {
85 if (flags & TQ_NOSLEEP)
86 SRETURN(NULL);
87
88 /*
89 * Sleep periodically polling the free list for an available
90 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
91 * but we cannot block forever waiting for an taskq_ent_t to
92 * show up in the free list, otherwise a deadlock can happen.
93 *
94 * Therefore, we need to allocate a new task even if the number
95 * of allocated tasks is above tq->tq_maxalloc, but we still
96 * end up delaying the task allocation by one second, thereby
97 * throttling the task dispatch rate.
98 */
99 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
100 schedule_timeout(HZ / 100);
101 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
102 if (count < 100)
103 SGOTO(retry, count++);
104 }
105
106 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
107 t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags));
108 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
109
110 if (t) {
111 taskq_init_ent(t);
112 tq->tq_nalloc++;
113 }
114
115 SRETURN(t);
bcd68186 116}
117
82387586 118/*
046a70c9 119 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
bcd68186 120 * to already be removed from the free, work, or pending taskq lists.
121 */
122static void
046a70c9 123task_free(taskq_t *tq, taskq_ent_t *t)
bcd68186 124{
472a34ca 125 SENTRY;
bcd68186 126
472a34ca
BB
127 ASSERT(tq);
128 ASSERT(t);
bcd68186 129 ASSERT(spin_is_locked(&tq->tq_lock));
046a70c9 130 ASSERT(list_empty(&t->tqent_list));
d9acd930 131 ASSERT(!timer_pending(&t->tqent_timer));
bcd68186 132
472a34ca
BB
133 kmem_free(t, sizeof(taskq_ent_t));
134 tq->tq_nalloc--;
f1ca4da6 135
b17edc10 136 SEXIT;
bcd68186 137}
138
82387586
BB
139/*
140 * NOTE: Must be called with tq->tq_lock held, either destroys the
046a70c9 141 * taskq_ent_t if too many exist or moves it to the free list for later use.
bcd68186 142 */
f1ca4da6 143static void
046a70c9 144task_done(taskq_t *tq, taskq_ent_t *t)
f1ca4da6 145{
b17edc10 146 SENTRY;
bcd68186 147 ASSERT(tq);
148 ASSERT(t);
149 ASSERT(spin_is_locked(&tq->tq_lock));
150
d9acd930
BB
151 /* Wake tasks blocked in taskq_wait_id() */
152 wake_up_all(&t->tqent_waitq);
153
046a70c9 154 list_del_init(&t->tqent_list);
f1ca4da6 155
472a34ca 156 if (tq->tq_nalloc <= tq->tq_minalloc) {
046a70c9
PS
157 t->tqent_id = 0;
158 t->tqent_func = NULL;
159 t->tqent_arg = NULL;
44217f7a 160 t->tqent_flags = 0;
8f2503e0 161
472a34ca 162 list_add_tail(&t->tqent_list, &tq->tq_free_list);
bcd68186 163 } else {
164 task_free(tq, t);
165 }
f1ca4da6 166
472a34ca 167 SEXIT;
f1ca4da6 168}
169
82387586 170/*
d9acd930
BB
171 * When a delayed task timer expires remove it from the delay list and
172 * add it to the priority list in order for immediate processing.
bcd68186 173 */
d9acd930
BB
174static void
175task_expire(unsigned long data)
bcd68186 176{
d9acd930
BB
177 taskq_ent_t *w, *t = (taskq_ent_t *)data;
178 taskq_t *tq = t->tqent_taskq;
179 struct list_head *l;
7257ec41
BB
180
181 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
d9acd930
BB
182
183 if (t->tqent_flags & TQENT_FLAG_CANCEL) {
184 ASSERT(list_empty(&t->tqent_list));
185 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
186 return;
187 }
188
189 /*
190 * The priority list must be maintained in strict task id order
191 * from lowest to highest for lowest_id to be easily calculable.
192 */
193 list_del(&t->tqent_list);
194 list_for_each_prev(l, &tq->tq_prio_list) {
195 w = list_entry(l, taskq_ent_t, tqent_list);
196 if (w->tqent_id < t->tqent_id) {
197 list_add(&t->tqent_list, l);
198 break;
199 }
200 }
201 if (l == &tq->tq_prio_list)
202 list_add(&t->tqent_list, &tq->tq_prio_list);
203
7257ec41
BB
204 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
205
d9acd930
BB
206 wake_up(&tq->tq_work_waitq);
207}
208
209/*
210 * Returns the lowest incomplete taskqid_t. The taskqid_t may
211 * be queued on the pending list, on the priority list, on the
212 * delay list, or on the work list currently being handled, but
213 * it is not 100% complete yet.
214 */
215static taskqid_t
216taskq_lowest_id(taskq_t *tq)
217{
218 taskqid_t lowest_id = tq->tq_next_id;
219 taskq_ent_t *t;
220 taskq_thread_t *tqt;
221 SENTRY;
222
223 ASSERT(tq);
224 ASSERT(spin_is_locked(&tq->tq_lock));
225
226 if (!list_empty(&tq->tq_pend_list)) {
227 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
228 lowest_id = MIN(lowest_id, t->tqent_id);
229 }
230
231 if (!list_empty(&tq->tq_prio_list)) {
232 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
233 lowest_id = MIN(lowest_id, t->tqent_id);
234 }
235
236 if (!list_empty(&tq->tq_delay_list)) {
237 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
238 lowest_id = MIN(lowest_id, t->tqent_id);
239 }
240
241 if (!list_empty(&tq->tq_active_list)) {
242 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
243 tqt_active_list);
244 ASSERT(tqt->tqt_id != 0);
245 lowest_id = MIN(lowest_id, tqt->tqt_id);
246 }
247
248 SRETURN(lowest_id);
249}
250
251/*
252 * Insert a task into a list keeping the list sorted by increasing taskqid.
253 */
254static void
255taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
256{
257 taskq_thread_t *w;
258 struct list_head *l;
259
260 SENTRY;
261 ASSERT(tq);
262 ASSERT(tqt);
263 ASSERT(spin_is_locked(&tq->tq_lock));
264
265 list_for_each_prev(l, &tq->tq_active_list) {
266 w = list_entry(l, taskq_thread_t, tqt_active_list);
267 if (w->tqt_id < tqt->tqt_id) {
268 list_add(&tqt->tqt_active_list, l);
269 break;
270 }
271 }
272 if (l == &tq->tq_active_list)
273 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
274
275 SEXIT;
276}
277
278/*
279 * Find and return a task from the given list if it exists. The list
280 * must be in lowest to highest task id order.
281 */
282static taskq_ent_t *
283taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
284{
285 struct list_head *l;
286 taskq_ent_t *t;
287 SENTRY;
288
289 ASSERT(spin_is_locked(&tq->tq_lock));
290
291 list_for_each(l, lh) {
292 t = list_entry(l, taskq_ent_t, tqent_list);
293
294 if (t->tqent_id == id)
295 SRETURN(t);
296
297 if (t->tqent_id > id)
298 break;
299 }
300
301 SRETURN(NULL);
bcd68186 302}
303
d9acd930
BB
304/*
305 * Find an already dispatched task given the task id regardless of what
306 * state it is in. If a task is still pending or executing it will be
307 * returned and 'active' set appropriately. If the task has already
308 * been run then NULL is returned.
309 */
310static taskq_ent_t *
311taskq_find(taskq_t *tq, taskqid_t id, int *active)
312{
313 taskq_thread_t *tqt;
314 struct list_head *l;
315 taskq_ent_t *t;
316 SENTRY;
317
318 ASSERT(spin_is_locked(&tq->tq_lock));
319 *active = 0;
320
321 t = taskq_find_list(tq, &tq->tq_delay_list, id);
322 if (t)
323 SRETURN(t);
324
325 t = taskq_find_list(tq, &tq->tq_prio_list, id);
326 if (t)
327 SRETURN(t);
328
329 t = taskq_find_list(tq, &tq->tq_pend_list, id);
330 if (t)
331 SRETURN(t);
332
333 list_for_each(l, &tq->tq_active_list) {
334 tqt = list_entry(l, taskq_thread_t, tqt_active_list);
335 if (tqt->tqt_id == id) {
336 t = tqt->tqt_task;
337 *active = 1;
338 SRETURN(t);
339 }
340 }
341
342 SRETURN(NULL);
343}
344
80093b6f
AX
345static int
346taskq_wait_id_check(taskq_t *tq, taskqid_t id)
f1ca4da6 347{
d9acd930 348 int active = 0;
80093b6f 349 int rc;
bcd68186 350
d9acd930 351 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
80093b6f 352 rc = (taskq_find(tq, id, &active) == NULL);
d9acd930
BB
353 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
354
80093b6f
AX
355 return (rc);
356}
bcd68186 357
80093b6f
AX
358/*
359 * The taskq_wait_id() function blocks until the passed task id completes.
360 * This does not guarantee that all lower task ids have completed.
361 */
362void
363taskq_wait_id(taskq_t *tq, taskqid_t id)
364{
365 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
bcd68186 366}
aed8671c 367EXPORT_SYMBOL(taskq_wait_id);
bcd68186 368
d9acd930
BB
369/*
370 * The taskq_wait() function will block until all previously submitted
371 * tasks have been completed. A previously submitted task is defined as
372 * a task with a lower task id than the current task queue id. Note that
373 * all task id's are assigned monotonically at dispatch time.
374 *
375 * Waiting for all previous tasks to complete is accomplished by tracking
376 * the lowest outstanding task id. As tasks are dispatched they are added
377 * added to the tail of the pending, priority, or delay lists. And as
378 * worker threads become available the tasks are removed from the heads
379 * of these lists and linked to the worker threads. This ensures the
380 * lists are kept in lowest to highest task id order.
381 *
382 * Therefore the lowest outstanding task id can be quickly determined by
383 * checking the head item from all of these lists. This value is stored
384 * with the task queue as the lowest id. It only needs to be recalculated
385 * when either the task with the current lowest id completes or is canceled.
386 *
387 * By blocking until the lowest task id exceeds the current task id when
388 * the function was called we ensure all previous tasks have completed.
389 *
390 * NOTE: When there are multiple worked threads it is possible for larger
391 * task ids to complete before smaller ones. Conversely when the task
392 * queue contains delay tasks with small task ids, you may block for a
393 * considerable length of time waiting for them to expire and execute.
394 */
395static int
396taskq_wait_check(taskq_t *tq, taskqid_t id)
397{
398 int rc;
399
400 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
401 rc = (id < tq->tq_lowest_id);
402 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
403
404 SRETURN(rc);
405}
406
407void
408taskq_wait_all(taskq_t *tq, taskqid_t id)
409{
410 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
411}
412EXPORT_SYMBOL(taskq_wait_all);
413
bcd68186 414void
aed8671c 415taskq_wait(taskq_t *tq)
bcd68186 416{
417 taskqid_t id;
b17edc10 418 SENTRY;
bcd68186 419 ASSERT(tq);
420
7257ec41 421 /* Wait for the largest outstanding taskqid */
749045bb 422 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
7257ec41 423 id = tq->tq_next_id - 1;
749045bb 424 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 425
d9acd930 426 taskq_wait_all(tq, id);
bcd68186 427
b17edc10 428 SEXIT;
bcd68186 429
430}
aed8671c 431EXPORT_SYMBOL(taskq_wait);
bcd68186 432
433int
aed8671c 434taskq_member(taskq_t *tq, void *t)
bcd68186 435{
2c02b71b
PS
436 struct list_head *l;
437 taskq_thread_t *tqt;
472a34ca 438 SENTRY;
bcd68186 439
440 ASSERT(tq);
472a34ca 441 ASSERT(t);
bcd68186 442
2c02b71b
PS
443 list_for_each(l, &tq->tq_thread_list) {
444 tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
445 if (tqt->tqt_thread == (struct task_struct *)t)
446 SRETURN(1);
447 }
bcd68186 448
472a34ca 449 SRETURN(0);
bcd68186 450}
aed8671c 451EXPORT_SYMBOL(taskq_member);
bcd68186 452
d9acd930
BB
453/*
454 * Cancel an already dispatched task given the task id. Still pending tasks
455 * will be immediately canceled, and if the task is active the function will
456 * block until it completes. Preallocated tasks which are canceled must be
457 * freed by the caller.
458 */
459int
460taskq_cancel_id(taskq_t *tq, taskqid_t id)
461{
462 taskq_ent_t *t;
463 int active = 0;
464 int rc = ENOENT;
465 SENTRY;
466
467 ASSERT(tq);
468
469 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
470 t = taskq_find(tq, id, &active);
471 if (t && !active) {
472 list_del_init(&t->tqent_list);
473 t->tqent_flags |= TQENT_FLAG_CANCEL;
474
475 /*
476 * When canceling the lowest outstanding task id we
477 * must recalculate the new lowest outstanding id.
478 */
479 if (tq->tq_lowest_id == t->tqent_id) {
480 tq->tq_lowest_id = taskq_lowest_id(tq);
481 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
482 }
483
484 /*
485 * The task_expire() function takes the tq->tq_lock so drop
486 * drop the lock before synchronously cancelling the timer.
487 */
488 if (timer_pending(&t->tqent_timer)) {
489 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
490 del_timer_sync(&t->tqent_timer);
491 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
492 }
493
494 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
495 task_done(tq, t);
496
497 rc = 0;
498 }
499 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
500
501 if (active) {
502 taskq_wait_id(tq, id);
503 rc = EBUSY;
504 }
505
506 SRETURN(rc);
507}
508EXPORT_SYMBOL(taskq_cancel_id);
509
bcd68186 510taskqid_t
aed8671c 511taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
bcd68186 512{
472a34ca 513 taskq_ent_t *t;
bcd68186 514 taskqid_t rc = 0;
472a34ca 515 SENTRY;
f1ca4da6 516
472a34ca
BB
517 ASSERT(tq);
518 ASSERT(func);
d05ec4b4 519
472a34ca 520 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
f1ca4da6 521
bcd68186 522 /* Taskq being destroyed and all tasks drained */
523 if (!(tq->tq_flags & TQ_ACTIVE))
b17edc10 524 SGOTO(out, rc = 0);
f1ca4da6 525
bcd68186 526 /* Do not queue the task unless there is idle thread for it */
527 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
528 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
b17edc10 529 SGOTO(out, rc = 0);
bcd68186 530
472a34ca 531 if ((t = task_alloc(tq, flags)) == NULL)
b17edc10 532 SGOTO(out, rc = 0);
f1ca4da6 533
046a70c9 534 spin_lock(&t->tqent_lock);
f0d8bb26
NB
535
536 /* Queue to the priority list instead of the pending list */
537 if (flags & TQ_FRONT)
046a70c9 538 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
f0d8bb26 539 else
046a70c9 540 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
f0d8bb26 541
046a70c9 542 t->tqent_id = rc = tq->tq_next_id;
bcd68186 543 tq->tq_next_id++;
472a34ca
BB
544 t->tqent_func = func;
545 t->tqent_arg = arg;
d9acd930
BB
546 t->tqent_taskq = tq;
547 t->tqent_timer.data = 0;
548 t->tqent_timer.function = NULL;
549 t->tqent_timer.expires = 0;
44217f7a
PS
550
551 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
552
046a70c9 553 spin_unlock(&t->tqent_lock);
0bb43ca2
NB
554
555 wake_up(&tq->tq_work_waitq);
bcd68186 556out:
749045bb 557 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
b17edc10 558 SRETURN(rc);
f1ca4da6 559}
aed8671c 560EXPORT_SYMBOL(taskq_dispatch);
44217f7a 561
d9acd930
BB
562taskqid_t
563taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
564 uint_t flags, clock_t expire_time)
565{
566 taskq_ent_t *t;
567 taskqid_t rc = 0;
568 SENTRY;
569
570 ASSERT(tq);
571 ASSERT(func);
572
573 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
574
575 /* Taskq being destroyed and all tasks drained */
576 if (!(tq->tq_flags & TQ_ACTIVE))
577 SGOTO(out, rc = 0);
578
579 if ((t = task_alloc(tq, flags)) == NULL)
580 SGOTO(out, rc = 0);
581
582 spin_lock(&t->tqent_lock);
583
584 /* Queue to the delay list for subsequent execution */
585 list_add_tail(&t->tqent_list, &tq->tq_delay_list);
586
587 t->tqent_id = rc = tq->tq_next_id;
588 tq->tq_next_id++;
589 t->tqent_func = func;
590 t->tqent_arg = arg;
591 t->tqent_taskq = tq;
592 t->tqent_timer.data = (unsigned long)t;
593 t->tqent_timer.function = task_expire;
594 t->tqent_timer.expires = (unsigned long)expire_time;
595 add_timer(&t->tqent_timer);
596
597 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
598
599 spin_unlock(&t->tqent_lock);
600out:
601 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
602 SRETURN(rc);
603}
604EXPORT_SYMBOL(taskq_dispatch_delay);
605
44217f7a 606void
aed8671c 607taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
44217f7a
PS
608 taskq_ent_t *t)
609{
610 SENTRY;
611
612 ASSERT(tq);
613 ASSERT(func);
614 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
615
616 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
617
618 /* Taskq being destroyed and all tasks drained */
619 if (!(tq->tq_flags & TQ_ACTIVE)) {
620 t->tqent_id = 0;
621 goto out;
622 }
623
624 spin_lock(&t->tqent_lock);
625
626 /*
627 * Mark it as a prealloc'd task. This is important
628 * to ensure that we don't free it later.
629 */
630 t->tqent_flags |= TQENT_FLAG_PREALLOC;
631
632 /* Queue to the priority list instead of the pending list */
633 if (flags & TQ_FRONT)
634 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
635 else
636 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
637
638 t->tqent_id = tq->tq_next_id;
639 tq->tq_next_id++;
640 t->tqent_func = func;
641 t->tqent_arg = arg;
d9acd930 642 t->tqent_taskq = tq;
44217f7a
PS
643
644 spin_unlock(&t->tqent_lock);
645
646 wake_up(&tq->tq_work_waitq);
647out:
0bb43ca2 648 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
44217f7a
PS
649 SEXIT;
650}
aed8671c 651EXPORT_SYMBOL(taskq_dispatch_ent);
44217f7a
PS
652
653int
aed8671c 654taskq_empty_ent(taskq_ent_t *t)
44217f7a
PS
655{
656 return list_empty(&t->tqent_list);
657}
aed8671c 658EXPORT_SYMBOL(taskq_empty_ent);
44217f7a
PS
659
660void
aed8671c 661taskq_init_ent(taskq_ent_t *t)
44217f7a
PS
662{
663 spin_lock_init(&t->tqent_lock);
d9acd930
BB
664 init_waitqueue_head(&t->tqent_waitq);
665 init_timer(&t->tqent_timer);
44217f7a
PS
666 INIT_LIST_HEAD(&t->tqent_list);
667 t->tqent_id = 0;
668 t->tqent_func = NULL;
669 t->tqent_arg = NULL;
670 t->tqent_flags = 0;
d9acd930 671 t->tqent_taskq = NULL;
44217f7a 672}
aed8671c 673EXPORT_SYMBOL(taskq_init_ent);
44217f7a 674
bcd68186 675static int
676taskq_thread(void *args)
677{
472a34ca
BB
678 DECLARE_WAITQUEUE(wait, current);
679 sigset_t blocked;
2c02b71b 680 taskq_thread_t *tqt = args;
472a34ca
BB
681 taskq_t *tq;
682 taskq_ent_t *t;
f0d8bb26 683 struct list_head *pend_list;
b17edc10 684 SENTRY;
bcd68186 685
472a34ca 686 ASSERT(tqt);
2c02b71b 687 tq = tqt->tqt_tq;
472a34ca 688 current->flags |= PF_NOFREEZE;
bcd68186 689
472a34ca
BB
690 sigfillset(&blocked);
691 sigprocmask(SIG_BLOCK, &blocked, NULL);
692 flush_signals(current);
bcd68186 693
472a34ca
BB
694 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
695 tq->tq_nthreads++;
696 wake_up(&tq->tq_wait_waitq);
697 set_current_state(TASK_INTERRUPTIBLE);
bcd68186 698
472a34ca 699 while (!kthread_should_stop()) {
bcd68186 700
f0d8bb26
NB
701 if (list_empty(&tq->tq_pend_list) &&
702 list_empty(&tq->tq_prio_list)) {
3c6ed541 703 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
749045bb 704 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 705 schedule();
749045bb 706 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
3c6ed541 707 remove_wait_queue(&tq->tq_work_waitq, &wait);
bcd68186 708 } else {
709 __set_current_state(TASK_RUNNING);
710 }
711
f0d8bb26
NB
712
713 if (!list_empty(&tq->tq_prio_list))
714 pend_list = &tq->tq_prio_list;
715 else if (!list_empty(&tq->tq_pend_list))
716 pend_list = &tq->tq_pend_list;
717 else
718 pend_list = NULL;
719
720 if (pend_list) {
472a34ca
BB
721 t = list_entry(pend_list->next,taskq_ent_t,tqent_list);
722 list_del_init(&t->tqent_list);
8f2503e0 723
44217f7a
PS
724 /* In order to support recursively dispatching a
725 * preallocated taskq_ent_t, tqent_id must be
726 * stored prior to executing tqent_func. */
e7e5f78e 727 tqt->tqt_id = t->tqent_id;
d9acd930 728 tqt->tqt_task = t;
8f2503e0
PS
729
730 /* We must store a copy of the flags prior to
731 * servicing the task (servicing a prealloc'd task
732 * returns the ownership of the tqent back to
733 * the caller of taskq_dispatch). Thus,
734 * tqent_flags _may_ change within the call. */
735 tqt->tqt_flags = t->tqent_flags;
736
2c02b71b 737 taskq_insert_in_order(tq, tqt);
472a34ca 738 tq->tq_nactive++;
749045bb 739 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 740
741 /* Perform the requested task */
472a34ca 742 t->tqent_func(t->tqent_arg);
bcd68186 743
749045bb 744 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
472a34ca 745 tq->tq_nactive--;
2c02b71b 746 list_del_init(&tqt->tqt_active_list);
d9acd930 747 tqt->tqt_task = NULL;
8f2503e0
PS
748
749 /* For prealloc'd tasks, we don't free anything. */
750 if ((tq->tq_flags & TASKQ_DYNAMIC) ||
751 !(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
752 task_done(tq, t);
bcd68186 753
7257ec41
BB
754 /* When the current lowest outstanding taskqid is
755 * done calculate the new lowest outstanding id */
e7e5f78e 756 if (tq->tq_lowest_id == tqt->tqt_id) {
bcd68186 757 tq->tq_lowest_id = taskq_lowest_id(tq);
e7e5f78e 758 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
bcd68186 759 }
760
e7e5f78e 761 tqt->tqt_id = 0;
8f2503e0 762 tqt->tqt_flags = 0;
472a34ca 763 wake_up_all(&tq->tq_wait_waitq);
bcd68186 764 }
765
766 set_current_state(TASK_INTERRUPTIBLE);
767
472a34ca 768 }
bcd68186 769
770 __set_current_state(TASK_RUNNING);
472a34ca 771 tq->tq_nthreads--;
2c02b71b
PS
772 list_del_init(&tqt->tqt_thread_list);
773 kmem_free(tqt, sizeof(taskq_thread_t));
774
472a34ca 775 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 776
b17edc10 777 SRETURN(0);
bcd68186 778}
779
f1ca4da6 780taskq_t *
aed8671c 781taskq_create(const char *name, int nthreads, pri_t pri,
472a34ca 782 int minalloc, int maxalloc, uint_t flags)
f1ca4da6 783{
472a34ca 784 taskq_t *tq;
2c02b71b 785 taskq_thread_t *tqt;
472a34ca
BB
786 int rc = 0, i, j = 0;
787 SENTRY;
bcd68186 788
472a34ca
BB
789 ASSERT(name != NULL);
790 ASSERT(pri <= maxclsyspri);
791 ASSERT(minalloc >= 0);
792 ASSERT(maxalloc <= INT_MAX);
793 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
bcd68186 794
915404bd
BB
795 /* Scale the number of threads using nthreads as a percentage */
796 if (flags & TASKQ_THREADS_CPU_PCT) {
797 ASSERT(nthreads <= 100);
798 ASSERT(nthreads >= 0);
799 nthreads = MIN(nthreads, 100);
800 nthreads = MAX(nthreads, 0);
801 nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
802 }
803
472a34ca
BB
804 tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE);
805 if (tq == NULL)
806 SRETURN(NULL);
bcd68186 807
472a34ca
BB
808 spin_lock_init(&tq->tq_lock);
809 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
810 INIT_LIST_HEAD(&tq->tq_thread_list);
811 INIT_LIST_HEAD(&tq->tq_active_list);
812 tq->tq_name = name;
813 tq->tq_nactive = 0;
bcd68186 814 tq->tq_nthreads = 0;
472a34ca
BB
815 tq->tq_pri = pri;
816 tq->tq_minalloc = minalloc;
817 tq->tq_maxalloc = maxalloc;
bcd68186 818 tq->tq_nalloc = 0;
472a34ca 819 tq->tq_flags = (flags | TQ_ACTIVE);
bcd68186 820 tq->tq_next_id = 1;
821 tq->tq_lowest_id = 1;
472a34ca
BB
822 INIT_LIST_HEAD(&tq->tq_free_list);
823 INIT_LIST_HEAD(&tq->tq_pend_list);
824 INIT_LIST_HEAD(&tq->tq_prio_list);
d9acd930 825 INIT_LIST_HEAD(&tq->tq_delay_list);
472a34ca
BB
826 init_waitqueue_head(&tq->tq_work_waitq);
827 init_waitqueue_head(&tq->tq_wait_waitq);
bcd68186 828
472a34ca
BB
829 if (flags & TASKQ_PREPOPULATE)
830 for (i = 0; i < minalloc; i++)
831 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW));
6e605b6e 832
472a34ca 833 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
6e605b6e 834
2c02b71b 835 for (i = 0; i < nthreads; i++) {
9b51f218 836 tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE);
2c02b71b
PS
837 INIT_LIST_HEAD(&tqt->tqt_thread_list);
838 INIT_LIST_HEAD(&tqt->tqt_active_list);
839 tqt->tqt_tq = tq;
e7e5f78e 840 tqt->tqt_id = 0;
2c02b71b
PS
841
842 tqt->tqt_thread = kthread_create(taskq_thread, tqt,
472a34ca 843 "%s/%d", name, i);
2c02b71b
PS
844 if (tqt->tqt_thread) {
845 list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
846 kthread_bind(tqt->tqt_thread, i % num_online_cpus());
847 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
848 wake_up_process(tqt->tqt_thread);
bcd68186 849 j++;
2c02b71b
PS
850 } else {
851 kmem_free(tqt, sizeof(taskq_thread_t));
852 rc = 1;
853 }
854 }
bcd68186 855
472a34ca 856 /* Wait for all threads to be started before potential destroy */
bcd68186 857 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
858
472a34ca 859 if (rc) {
aed8671c 860 taskq_destroy(tq);
472a34ca
BB
861 tq = NULL;
862 }
bcd68186 863
472a34ca 864 SRETURN(tq);
f1ca4da6 865}
aed8671c 866EXPORT_SYMBOL(taskq_create);
b123971f 867
868void
aed8671c 869taskq_destroy(taskq_t *tq)
b123971f 870{
2c02b71b
PS
871 struct task_struct *thread;
872 taskq_thread_t *tqt;
046a70c9 873 taskq_ent_t *t;
b17edc10 874 SENTRY;
b123971f 875
bcd68186 876 ASSERT(tq);
749045bb 877 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
472a34ca 878 tq->tq_flags &= ~TQ_ACTIVE;
749045bb 879 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 880
881 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
aed8671c 882 taskq_wait(tq);
bcd68186 883
472a34ca 884 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 885
2c02b71b
PS
886 /*
887 * Signal each thread to exit and block until it does. Each thread
888 * is responsible for removing itself from the list and freeing its
889 * taskq_thread_t. This allows for idle threads to opt to remove
890 * themselves from the taskq. They can be recreated as needed.
891 */
892 while (!list_empty(&tq->tq_thread_list)) {
893 tqt = list_entry(tq->tq_thread_list.next,
894 taskq_thread_t, tqt_thread_list);
895 thread = tqt->tqt_thread;
896 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
897
898 kthread_stop(thread);
899
472a34ca 900 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
2c02b71b
PS
901 }
902
472a34ca 903 while (!list_empty(&tq->tq_free_list)) {
046a70c9 904 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
44217f7a
PS
905
906 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
907
472a34ca
BB
908 list_del_init(&t->tqent_list);
909 task_free(tq, t);
910 }
bcd68186 911
472a34ca
BB
912 ASSERT(tq->tq_nthreads == 0);
913 ASSERT(tq->tq_nalloc == 0);
914 ASSERT(list_empty(&tq->tq_thread_list));
915 ASSERT(list_empty(&tq->tq_active_list));
916 ASSERT(list_empty(&tq->tq_free_list));
917 ASSERT(list_empty(&tq->tq_pend_list));
918 ASSERT(list_empty(&tq->tq_prio_list));
d9acd930 919 ASSERT(list_empty(&tq->tq_delay_list));
bcd68186 920
472a34ca 921 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
2c02b71b 922
472a34ca 923 kmem_free(tq, sizeof(taskq_t));
bcd68186 924
b17edc10 925 SEXIT;
b123971f 926}
aed8671c 927EXPORT_SYMBOL(taskq_destroy);
e9cb2b4f
BB
928
929int
930spl_taskq_init(void)
931{
472a34ca 932 SENTRY;
e9cb2b4f 933
f220894e
BB
934 /* Solaris creates a dynamic taskq of up to 64 threads, however in
935 * a Linux environment 1 thread per-core is usually about right */
472a34ca 936 system_taskq = taskq_create("spl_system_taskq", num_online_cpus(),
f220894e 937 minclsyspri, 4, 512, TASKQ_PREPOPULATE);
e9cb2b4f 938 if (system_taskq == NULL)
b17edc10 939 SRETURN(1);
e9cb2b4f 940
472a34ca 941 SRETURN(0);
e9cb2b4f
BB
942}
943
944void
945spl_taskq_fini(void)
946{
472a34ca 947 SENTRY;
e9cb2b4f 948 taskq_destroy(system_taskq);
472a34ca 949 SEXIT;
e9cb2b4f 950}