]> git.proxmox.com Git - mirror_spl.git/blob - module/spl/spl-taskq.c
2b3f3f4bc939f588cffb16f078b578527e29d5da
[mirror_spl.git] / module / spl / spl-taskq.c
1 /*
2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3 * Copyright (C) 2007 The Regents of the University of California.
4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
6 * UCRL-CODE-235197
7 *
8 * This file is part of the SPL, Solaris Porting Layer.
9 * For details, see <http://zfsonlinux.org/>.
10 *
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
15 *
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 * for more details.
20 *
21 * You should have received a copy of the GNU General Public License along
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25 */
26
27 #include <sys/taskq.h>
28 #include <sys/kmem.h>
29 #include <sys/tsd.h>
30
31 int spl_taskq_thread_bind = 0;
32 module_param(spl_taskq_thread_bind, int, 0644);
33 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
34
35
36 int spl_taskq_thread_dynamic = 1;
37 module_param(spl_taskq_thread_dynamic, int, 0644);
38 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
39
40 int spl_taskq_thread_priority = 1;
41 module_param(spl_taskq_thread_priority, int, 0644);
42 MODULE_PARM_DESC(spl_taskq_thread_priority,
43 "Allow non-default priority for taskq threads");
44
45 int spl_taskq_thread_sequential = 4;
46 module_param(spl_taskq_thread_sequential, int, 0644);
47 MODULE_PARM_DESC(spl_taskq_thread_sequential,
48 "Create new taskq threads after N sequential tasks");
49
50 /* Global system-wide dynamic task queue available for all consumers */
51 taskq_t *system_taskq;
52 EXPORT_SYMBOL(system_taskq);
53
54 /* Private dedicated taskq for creating new taskq threads on demand. */
55 static taskq_t *dynamic_taskq;
56 static taskq_thread_t *taskq_thread_create(taskq_t *);
57
58 /* List of all taskqs */
59 LIST_HEAD(tq_list);
60 DECLARE_RWSEM(tq_list_sem);
61 static uint_t taskq_tsd;
62
63 static int
64 task_km_flags(uint_t flags)
65 {
66 if (flags & TQ_NOSLEEP)
67 return (KM_NOSLEEP);
68
69 if (flags & TQ_PUSHPAGE)
70 return (KM_PUSHPAGE);
71
72 return (KM_SLEEP);
73 }
74
75 /*
76 * taskq_find_by_name - Find the largest instance number of a named taskq.
77 */
78 static int
79 taskq_find_by_name(const char *name)
80 {
81 struct list_head *tql;
82 taskq_t *tq;
83
84 list_for_each_prev(tql, &tq_list) {
85 tq = list_entry(tql, taskq_t, tq_taskqs);
86 if (strcmp(name, tq->tq_name) == 0)
87 return tq->tq_instance;
88 }
89 return (-1);
90 }
91
92 /*
93 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
94 * is not attached to the free, work, or pending taskq lists.
95 */
96 static taskq_ent_t *
97 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
98 {
99 taskq_ent_t *t;
100 int count = 0;
101
102 ASSERT(tq);
103 ASSERT(spin_is_locked(&tq->tq_lock));
104 retry:
105 /* Acquire taskq_ent_t's from free list if available */
106 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
107 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
108
109 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
110 ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
111 ASSERT(!timer_pending(&t->tqent_timer));
112
113 list_del_init(&t->tqent_list);
114 return (t);
115 }
116
117 /* Free list is empty and memory allocations are prohibited */
118 if (flags & TQ_NOALLOC)
119 return (NULL);
120
121 /* Hit maximum taskq_ent_t pool size */
122 if (tq->tq_nalloc >= tq->tq_maxalloc) {
123 if (flags & TQ_NOSLEEP)
124 return (NULL);
125
126 /*
127 * Sleep periodically polling the free list for an available
128 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
129 * but we cannot block forever waiting for an taskq_ent_t to
130 * show up in the free list, otherwise a deadlock can happen.
131 *
132 * Therefore, we need to allocate a new task even if the number
133 * of allocated tasks is above tq->tq_maxalloc, but we still
134 * end up delaying the task allocation by one second, thereby
135 * throttling the task dispatch rate.
136 */
137 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
138 schedule_timeout(HZ / 100);
139 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
140 tq->tq_lock_class);
141 if (count < 100) {
142 count++;
143 goto retry;
144 }
145 }
146
147 spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
148 t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
149 spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
150
151 if (t) {
152 taskq_init_ent(t);
153 tq->tq_nalloc++;
154 }
155
156 return (t);
157 }
158
159 /*
160 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
161 * to already be removed from the free, work, or pending taskq lists.
162 */
163 static void
164 task_free(taskq_t *tq, taskq_ent_t *t)
165 {
166 ASSERT(tq);
167 ASSERT(t);
168 ASSERT(spin_is_locked(&tq->tq_lock));
169 ASSERT(list_empty(&t->tqent_list));
170 ASSERT(!timer_pending(&t->tqent_timer));
171
172 kmem_free(t, sizeof (taskq_ent_t));
173 tq->tq_nalloc--;
174 }
175
176 /*
177 * NOTE: Must be called with tq->tq_lock held, either destroys the
178 * taskq_ent_t if too many exist or moves it to the free list for later use.
179 */
180 static void
181 task_done(taskq_t *tq, taskq_ent_t *t)
182 {
183 ASSERT(tq);
184 ASSERT(t);
185 ASSERT(spin_is_locked(&tq->tq_lock));
186
187 /* Wake tasks blocked in taskq_wait_id() */
188 wake_up_all(&t->tqent_waitq);
189
190 list_del_init(&t->tqent_list);
191
192 if (tq->tq_nalloc <= tq->tq_minalloc) {
193 t->tqent_id = 0;
194 t->tqent_func = NULL;
195 t->tqent_arg = NULL;
196 t->tqent_flags = 0;
197
198 list_add_tail(&t->tqent_list, &tq->tq_free_list);
199 } else {
200 task_free(tq, t);
201 }
202 }
203
204 /*
205 * When a delayed task timer expires remove it from the delay list and
206 * add it to the priority list in order for immediate processing.
207 */
208 static void
209 task_expire(unsigned long data)
210 {
211 taskq_ent_t *w, *t = (taskq_ent_t *)data;
212 taskq_t *tq = t->tqent_taskq;
213 struct list_head *l;
214 unsigned long flags;
215
216 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
217
218 if (t->tqent_flags & TQENT_FLAG_CANCEL) {
219 ASSERT(list_empty(&t->tqent_list));
220 spin_unlock_irqrestore(&tq->tq_lock, flags);
221 return;
222 }
223
224 /*
225 * The priority list must be maintained in strict task id order
226 * from lowest to highest for lowest_id to be easily calculable.
227 */
228 list_del(&t->tqent_list);
229 list_for_each_prev(l, &tq->tq_prio_list) {
230 w = list_entry(l, taskq_ent_t, tqent_list);
231 if (w->tqent_id < t->tqent_id) {
232 list_add(&t->tqent_list, l);
233 break;
234 }
235 }
236 if (l == &tq->tq_prio_list)
237 list_add(&t->tqent_list, &tq->tq_prio_list);
238
239 spin_unlock_irqrestore(&tq->tq_lock, flags);
240
241 wake_up(&tq->tq_work_waitq);
242 }
243
244 /*
245 * Returns the lowest incomplete taskqid_t. The taskqid_t may
246 * be queued on the pending list, on the priority list, on the
247 * delay list, or on the work list currently being handled, but
248 * it is not 100% complete yet.
249 */
250 static taskqid_t
251 taskq_lowest_id(taskq_t *tq)
252 {
253 taskqid_t lowest_id = tq->tq_next_id;
254 taskq_ent_t *t;
255 taskq_thread_t *tqt;
256
257 ASSERT(tq);
258 ASSERT(spin_is_locked(&tq->tq_lock));
259
260 if (!list_empty(&tq->tq_pend_list)) {
261 t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
262 lowest_id = MIN(lowest_id, t->tqent_id);
263 }
264
265 if (!list_empty(&tq->tq_prio_list)) {
266 t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
267 lowest_id = MIN(lowest_id, t->tqent_id);
268 }
269
270 if (!list_empty(&tq->tq_delay_list)) {
271 t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
272 lowest_id = MIN(lowest_id, t->tqent_id);
273 }
274
275 if (!list_empty(&tq->tq_active_list)) {
276 tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
277 tqt_active_list);
278 ASSERT(tqt->tqt_id != 0);
279 lowest_id = MIN(lowest_id, tqt->tqt_id);
280 }
281
282 return (lowest_id);
283 }
284
285 /*
286 * Insert a task into a list keeping the list sorted by increasing taskqid.
287 */
288 static void
289 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
290 {
291 taskq_thread_t *w;
292 struct list_head *l;
293
294 ASSERT(tq);
295 ASSERT(tqt);
296 ASSERT(spin_is_locked(&tq->tq_lock));
297
298 list_for_each_prev(l, &tq->tq_active_list) {
299 w = list_entry(l, taskq_thread_t, tqt_active_list);
300 if (w->tqt_id < tqt->tqt_id) {
301 list_add(&tqt->tqt_active_list, l);
302 break;
303 }
304 }
305 if (l == &tq->tq_active_list)
306 list_add(&tqt->tqt_active_list, &tq->tq_active_list);
307 }
308
309 /*
310 * Find and return a task from the given list if it exists. The list
311 * must be in lowest to highest task id order.
312 */
313 static taskq_ent_t *
314 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
315 {
316 struct list_head *l;
317 taskq_ent_t *t;
318
319 ASSERT(spin_is_locked(&tq->tq_lock));
320
321 list_for_each(l, lh) {
322 t = list_entry(l, taskq_ent_t, tqent_list);
323
324 if (t->tqent_id == id)
325 return (t);
326
327 if (t->tqent_id > id)
328 break;
329 }
330
331 return (NULL);
332 }
333
334 /*
335 * Find an already dispatched task given the task id regardless of what
336 * state it is in. If a task is still pending or executing it will be
337 * returned and 'active' set appropriately. If the task has already
338 * been run then NULL is returned.
339 */
340 static taskq_ent_t *
341 taskq_find(taskq_t *tq, taskqid_t id, int *active)
342 {
343 taskq_thread_t *tqt;
344 struct list_head *l;
345 taskq_ent_t *t;
346
347 ASSERT(spin_is_locked(&tq->tq_lock));
348 *active = 0;
349
350 t = taskq_find_list(tq, &tq->tq_delay_list, id);
351 if (t)
352 return (t);
353
354 t = taskq_find_list(tq, &tq->tq_prio_list, id);
355 if (t)
356 return (t);
357
358 t = taskq_find_list(tq, &tq->tq_pend_list, id);
359 if (t)
360 return (t);
361
362 list_for_each(l, &tq->tq_active_list) {
363 tqt = list_entry(l, taskq_thread_t, tqt_active_list);
364 if (tqt->tqt_id == id) {
365 t = tqt->tqt_task;
366 *active = 1;
367 return (t);
368 }
369 }
370
371 return (NULL);
372 }
373
374 /*
375 * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
376 * taskq_wait() functions below.
377 *
378 * Taskq waiting is accomplished by tracking the lowest outstanding task
379 * id and the next available task id. As tasks are dispatched they are
380 * added to the tail of the pending, priority, or delay lists. As worker
381 * threads become available the tasks are removed from the heads of these
382 * lists and linked to the worker threads. This ensures the lists are
383 * kept sorted by lowest to highest task id.
384 *
385 * Therefore the lowest outstanding task id can be quickly determined by
386 * checking the head item from all of these lists. This value is stored
387 * with the taskq as the lowest id. It only needs to be recalculated when
388 * either the task with the current lowest id completes or is canceled.
389 *
390 * By blocking until the lowest task id exceeds the passed task id the
391 * taskq_wait_outstanding() function can be easily implemented. Similarly,
392 * by blocking until the lowest task id matches the next task id taskq_wait()
393 * can be implemented.
394 *
395 * Callers should be aware that when there are multiple worked threads it
396 * is possible for larger task ids to complete before smaller ones. Also
397 * when the taskq contains delay tasks with small task ids callers may
398 * block for a considerable length of time waiting for them to expire and
399 * execute.
400 */
401 static int
402 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
403 {
404 int active = 0;
405 int rc;
406 unsigned long flags;
407
408 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
409 rc = (taskq_find(tq, id, &active) == NULL);
410 spin_unlock_irqrestore(&tq->tq_lock, flags);
411
412 return (rc);
413 }
414
415 /*
416 * The taskq_wait_id() function blocks until the passed task id completes.
417 * This does not guarantee that all lower task ids have completed.
418 */
419 void
420 taskq_wait_id(taskq_t *tq, taskqid_t id)
421 {
422 wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
423 }
424 EXPORT_SYMBOL(taskq_wait_id);
425
426 static int
427 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
428 {
429 int rc;
430 unsigned long flags;
431
432 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
433 rc = (id < tq->tq_lowest_id);
434 spin_unlock_irqrestore(&tq->tq_lock, flags);
435
436 return (rc);
437 }
438
439 /*
440 * The taskq_wait_outstanding() function will block until all tasks with a
441 * lower taskqid than the passed 'id' have been completed. Note that all
442 * task id's are assigned monotonically at dispatch time. Zero may be
443 * passed for the id to indicate all tasks dispatch up to this point,
444 * but not after, should be waited for.
445 */
446 void
447 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
448 {
449 wait_event(tq->tq_wait_waitq,
450 taskq_wait_outstanding_check(tq, id ? id : tq->tq_next_id - 1));
451 }
452 EXPORT_SYMBOL(taskq_wait_outstanding);
453
454 static int
455 taskq_wait_check(taskq_t *tq)
456 {
457 int rc;
458 unsigned long flags;
459
460 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
461 rc = (tq->tq_lowest_id == tq->tq_next_id);
462 spin_unlock_irqrestore(&tq->tq_lock, flags);
463
464 return (rc);
465 }
466
467 /*
468 * The taskq_wait() function will block until the taskq is empty.
469 * This means that if a taskq re-dispatches work to itself taskq_wait()
470 * callers will block indefinitely.
471 */
472 void
473 taskq_wait(taskq_t *tq)
474 {
475 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
476 }
477 EXPORT_SYMBOL(taskq_wait);
478
479 int
480 taskq_member(taskq_t *tq, kthread_t *t)
481 {
482 return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
483 }
484 EXPORT_SYMBOL(taskq_member);
485
486 /*
487 * Cancel an already dispatched task given the task id. Still pending tasks
488 * will be immediately canceled, and if the task is active the function will
489 * block until it completes. Preallocated tasks which are canceled must be
490 * freed by the caller.
491 */
492 int
493 taskq_cancel_id(taskq_t *tq, taskqid_t id)
494 {
495 taskq_ent_t *t;
496 int active = 0;
497 int rc = ENOENT;
498 unsigned long flags;
499
500 ASSERT(tq);
501
502 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
503 t = taskq_find(tq, id, &active);
504 if (t && !active) {
505 list_del_init(&t->tqent_list);
506 t->tqent_flags |= TQENT_FLAG_CANCEL;
507
508 /*
509 * When canceling the lowest outstanding task id we
510 * must recalculate the new lowest outstanding id.
511 */
512 if (tq->tq_lowest_id == t->tqent_id) {
513 tq->tq_lowest_id = taskq_lowest_id(tq);
514 ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
515 }
516
517 /*
518 * The task_expire() function takes the tq->tq_lock so drop
519 * drop the lock before synchronously cancelling the timer.
520 */
521 if (timer_pending(&t->tqent_timer)) {
522 spin_unlock_irqrestore(&tq->tq_lock, flags);
523 del_timer_sync(&t->tqent_timer);
524 spin_lock_irqsave_nested(&tq->tq_lock, flags,
525 tq->tq_lock_class);
526 }
527
528 if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
529 task_done(tq, t);
530
531 rc = 0;
532 }
533 spin_unlock_irqrestore(&tq->tq_lock, flags);
534
535 if (active) {
536 taskq_wait_id(tq, id);
537 rc = EBUSY;
538 }
539
540 return (rc);
541 }
542 EXPORT_SYMBOL(taskq_cancel_id);
543
544 static int taskq_thread_spawn(taskq_t *tq);
545
546 taskqid_t
547 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
548 {
549 taskq_ent_t *t;
550 taskqid_t rc = 0;
551 unsigned long irqflags;
552
553 ASSERT(tq);
554 ASSERT(func);
555
556 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
557
558 /* Taskq being destroyed and all tasks drained */
559 if (!(tq->tq_flags & TASKQ_ACTIVE))
560 goto out;
561
562 /* Do not queue the task unless there is idle thread for it */
563 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
564 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
565 goto out;
566
567 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
568 goto out;
569
570 spin_lock(&t->tqent_lock);
571
572 /* Queue to the priority list instead of the pending list */
573 if (flags & TQ_FRONT)
574 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
575 else
576 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
577
578 t->tqent_id = rc = tq->tq_next_id;
579 tq->tq_next_id++;
580 t->tqent_func = func;
581 t->tqent_arg = arg;
582 t->tqent_taskq = tq;
583 t->tqent_timer.data = 0;
584 t->tqent_timer.function = NULL;
585 t->tqent_timer.expires = 0;
586
587 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
588
589 spin_unlock(&t->tqent_lock);
590
591 wake_up(&tq->tq_work_waitq);
592 out:
593 /* Spawn additional taskq threads if required. */
594 if (tq->tq_nactive == tq->tq_nthreads)
595 (void) taskq_thread_spawn(tq);
596
597 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
598 return (rc);
599 }
600 EXPORT_SYMBOL(taskq_dispatch);
601
602 taskqid_t
603 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
604 uint_t flags, clock_t expire_time)
605 {
606 taskqid_t rc = 0;
607 taskq_ent_t *t;
608 unsigned long irqflags;
609
610 ASSERT(tq);
611 ASSERT(func);
612
613 spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
614
615 /* Taskq being destroyed and all tasks drained */
616 if (!(tq->tq_flags & TASKQ_ACTIVE))
617 goto out;
618
619 if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
620 goto out;
621
622 spin_lock(&t->tqent_lock);
623
624 /* Queue to the delay list for subsequent execution */
625 list_add_tail(&t->tqent_list, &tq->tq_delay_list);
626
627 t->tqent_id = rc = tq->tq_next_id;
628 tq->tq_next_id++;
629 t->tqent_func = func;
630 t->tqent_arg = arg;
631 t->tqent_taskq = tq;
632 t->tqent_timer.data = (unsigned long)t;
633 t->tqent_timer.function = task_expire;
634 t->tqent_timer.expires = (unsigned long)expire_time;
635 add_timer(&t->tqent_timer);
636
637 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
638
639 spin_unlock(&t->tqent_lock);
640 out:
641 /* Spawn additional taskq threads if required. */
642 if (tq->tq_nactive == tq->tq_nthreads)
643 (void) taskq_thread_spawn(tq);
644 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
645 return (rc);
646 }
647 EXPORT_SYMBOL(taskq_dispatch_delay);
648
649 void
650 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
651 taskq_ent_t *t)
652 {
653 unsigned long irqflags;
654 ASSERT(tq);
655 ASSERT(func);
656
657 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
658 tq->tq_lock_class);
659
660 /* Taskq being destroyed and all tasks drained */
661 if (!(tq->tq_flags & TASKQ_ACTIVE)) {
662 t->tqent_id = 0;
663 goto out;
664 }
665
666 spin_lock(&t->tqent_lock);
667
668 /*
669 * Mark it as a prealloc'd task. This is important
670 * to ensure that we don't free it later.
671 */
672 t->tqent_flags |= TQENT_FLAG_PREALLOC;
673
674 /* Queue to the priority list instead of the pending list */
675 if (flags & TQ_FRONT)
676 list_add_tail(&t->tqent_list, &tq->tq_prio_list);
677 else
678 list_add_tail(&t->tqent_list, &tq->tq_pend_list);
679
680 t->tqent_id = tq->tq_next_id;
681 tq->tq_next_id++;
682 t->tqent_func = func;
683 t->tqent_arg = arg;
684 t->tqent_taskq = tq;
685
686 spin_unlock(&t->tqent_lock);
687
688 wake_up(&tq->tq_work_waitq);
689 out:
690 /* Spawn additional taskq threads if required. */
691 if (tq->tq_nactive == tq->tq_nthreads)
692 (void) taskq_thread_spawn(tq);
693 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
694 }
695 EXPORT_SYMBOL(taskq_dispatch_ent);
696
697 int
698 taskq_empty_ent(taskq_ent_t *t)
699 {
700 return (list_empty(&t->tqent_list));
701 }
702 EXPORT_SYMBOL(taskq_empty_ent);
703
704 void
705 taskq_init_ent(taskq_ent_t *t)
706 {
707 spin_lock_init(&t->tqent_lock);
708 init_waitqueue_head(&t->tqent_waitq);
709 init_timer(&t->tqent_timer);
710 INIT_LIST_HEAD(&t->tqent_list);
711 t->tqent_id = 0;
712 t->tqent_func = NULL;
713 t->tqent_arg = NULL;
714 t->tqent_flags = 0;
715 t->tqent_taskq = NULL;
716 }
717 EXPORT_SYMBOL(taskq_init_ent);
718
719 /*
720 * Return the next pending task, preference is given to tasks on the
721 * priority list which were dispatched with TQ_FRONT.
722 */
723 static taskq_ent_t *
724 taskq_next_ent(taskq_t *tq)
725 {
726 struct list_head *list;
727
728 ASSERT(spin_is_locked(&tq->tq_lock));
729
730 if (!list_empty(&tq->tq_prio_list))
731 list = &tq->tq_prio_list;
732 else if (!list_empty(&tq->tq_pend_list))
733 list = &tq->tq_pend_list;
734 else
735 return (NULL);
736
737 return (list_entry(list->next, taskq_ent_t, tqent_list));
738 }
739
740 /*
741 * Spawns a new thread for the specified taskq.
742 */
743 static void
744 taskq_thread_spawn_task(void *arg)
745 {
746 taskq_t *tq = (taskq_t *)arg;
747 unsigned long flags;
748
749 (void) taskq_thread_create(tq);
750
751 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
752 tq->tq_nspawn--;
753 spin_unlock_irqrestore(&tq->tq_lock, flags);
754 }
755
756 /*
757 * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
758 * number of threads is insufficient to handle the pending tasks. These
759 * new threads must be created by the dedicated dynamic_taskq to avoid
760 * deadlocks between thread creation and memory reclaim. The system_taskq
761 * which is also a dynamic taskq cannot be safely used for this.
762 */
763 static int
764 taskq_thread_spawn(taskq_t *tq)
765 {
766 int spawning = 0;
767
768 if (!(tq->tq_flags & TASKQ_DYNAMIC))
769 return (0);
770
771 if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
772 (tq->tq_flags & TASKQ_ACTIVE)) {
773 spawning = (++tq->tq_nspawn);
774 taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
775 tq, TQ_NOSLEEP);
776 }
777
778 return (spawning);
779 }
780
781 /*
782 * Threads in a dynamic taskq should only exit once it has been completely
783 * drained and no other threads are actively servicing tasks. This prevents
784 * threads from being created and destroyed more than is required.
785 *
786 * The first thread is the thread list is treated as the primary thread.
787 * There is nothing special about the primary thread but in order to avoid
788 * all the taskq pids from changing we opt to make it long running.
789 */
790 static int
791 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
792 {
793 ASSERT(spin_is_locked(&tq->tq_lock));
794
795 if (!(tq->tq_flags & TASKQ_DYNAMIC))
796 return (0);
797
798 if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
799 tqt_thread_list) == tqt)
800 return (0);
801
802 return
803 ((tq->tq_nspawn == 0) && /* No threads are being spawned */
804 (tq->tq_nactive == 0) && /* No threads are handling tasks */
805 (tq->tq_nthreads > 1) && /* More than 1 thread is running */
806 (!taskq_next_ent(tq)) && /* There are no pending tasks */
807 (spl_taskq_thread_dynamic)); /* Dynamic taskqs are allowed */
808 }
809
810 static int
811 taskq_thread(void *args)
812 {
813 DECLARE_WAITQUEUE(wait, current);
814 sigset_t blocked;
815 taskq_thread_t *tqt = args;
816 taskq_t *tq;
817 taskq_ent_t *t;
818 int seq_tasks = 0;
819 unsigned long flags;
820
821 ASSERT(tqt);
822 ASSERT(tqt->tqt_tq);
823 tq = tqt->tqt_tq;
824 current->flags |= PF_NOFREEZE;
825
826 (void) spl_fstrans_mark();
827
828 sigfillset(&blocked);
829 sigprocmask(SIG_BLOCK, &blocked, NULL);
830 flush_signals(current);
831
832 tsd_set(taskq_tsd, tq);
833 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
834
835 /* Immediately exit if more threads than allowed were created. */
836 if (tq->tq_nthreads >= tq->tq_maxthreads)
837 goto error;
838
839 tq->tq_nthreads++;
840 list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
841 wake_up(&tq->tq_wait_waitq);
842 set_current_state(TASK_INTERRUPTIBLE);
843
844 while (!kthread_should_stop()) {
845
846 if (list_empty(&tq->tq_pend_list) &&
847 list_empty(&tq->tq_prio_list)) {
848
849 if (taskq_thread_should_stop(tq, tqt)) {
850 wake_up_all(&tq->tq_wait_waitq);
851 break;
852 }
853
854 add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
855 spin_unlock_irqrestore(&tq->tq_lock, flags);
856
857 schedule();
858 seq_tasks = 0;
859
860 spin_lock_irqsave_nested(&tq->tq_lock, flags,
861 tq->tq_lock_class);
862 remove_wait_queue(&tq->tq_work_waitq, &wait);
863 } else {
864 __set_current_state(TASK_RUNNING);
865 }
866
867 if ((t = taskq_next_ent(tq)) != NULL) {
868 list_del_init(&t->tqent_list);
869
870 /*
871 * In order to support recursively dispatching a
872 * preallocated taskq_ent_t, tqent_id must be
873 * stored prior to executing tqent_func.
874 */
875 tqt->tqt_id = t->tqent_id;
876 tqt->tqt_task = t;
877
878 /*
879 * We must store a copy of the flags prior to
880 * servicing the task (servicing a prealloc'd task
881 * returns the ownership of the tqent back to
882 * the caller of taskq_dispatch). Thus,
883 * tqent_flags _may_ change within the call.
884 */
885 tqt->tqt_flags = t->tqent_flags;
886
887 taskq_insert_in_order(tq, tqt);
888 tq->tq_nactive++;
889 spin_unlock_irqrestore(&tq->tq_lock, flags);
890
891 /* Perform the requested task */
892 t->tqent_func(t->tqent_arg);
893
894 spin_lock_irqsave_nested(&tq->tq_lock, flags,
895 tq->tq_lock_class);
896 tq->tq_nactive--;
897 list_del_init(&tqt->tqt_active_list);
898 tqt->tqt_task = NULL;
899
900 /* For prealloc'd tasks, we don't free anything. */
901 if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
902 task_done(tq, t);
903
904 /*
905 * When the current lowest outstanding taskqid is
906 * done calculate the new lowest outstanding id
907 */
908 if (tq->tq_lowest_id == tqt->tqt_id) {
909 tq->tq_lowest_id = taskq_lowest_id(tq);
910 ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
911 }
912
913 /* Spawn additional taskq threads if required. */
914 if ((++seq_tasks) > spl_taskq_thread_sequential &&
915 taskq_thread_spawn(tq))
916 seq_tasks = 0;
917
918 tqt->tqt_id = 0;
919 tqt->tqt_flags = 0;
920 wake_up_all(&tq->tq_wait_waitq);
921 } else {
922 if (taskq_thread_should_stop(tq, tqt))
923 break;
924 }
925
926 set_current_state(TASK_INTERRUPTIBLE);
927
928 }
929
930 __set_current_state(TASK_RUNNING);
931 tq->tq_nthreads--;
932 list_del_init(&tqt->tqt_thread_list);
933 error:
934 kmem_free(tqt, sizeof (taskq_thread_t));
935 spin_unlock_irqrestore(&tq->tq_lock, flags);
936
937 tsd_set(taskq_tsd, NULL);
938
939 return (0);
940 }
941
942 static taskq_thread_t *
943 taskq_thread_create(taskq_t *tq)
944 {
945 static int last_used_cpu = 0;
946 taskq_thread_t *tqt;
947
948 tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
949 INIT_LIST_HEAD(&tqt->tqt_thread_list);
950 INIT_LIST_HEAD(&tqt->tqt_active_list);
951 tqt->tqt_tq = tq;
952 tqt->tqt_id = 0;
953
954 tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
955 "%s", tq->tq_name);
956 if (tqt->tqt_thread == NULL) {
957 kmem_free(tqt, sizeof (taskq_thread_t));
958 return (NULL);
959 }
960
961 if (spl_taskq_thread_bind) {
962 last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
963 kthread_bind(tqt->tqt_thread, last_used_cpu);
964 }
965
966 if (spl_taskq_thread_priority)
967 set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
968
969 wake_up_process(tqt->tqt_thread);
970
971 return (tqt);
972 }
973
974 taskq_t *
975 taskq_create(const char *name, int nthreads, pri_t pri,
976 int minalloc, int maxalloc, uint_t flags)
977 {
978 taskq_t *tq;
979 taskq_thread_t *tqt;
980 int count = 0, rc = 0, i;
981 unsigned long irqflags;
982
983 ASSERT(name != NULL);
984 ASSERT(minalloc >= 0);
985 ASSERT(maxalloc <= INT_MAX);
986 ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
987
988 /* Scale the number of threads using nthreads as a percentage */
989 if (flags & TASKQ_THREADS_CPU_PCT) {
990 ASSERT(nthreads <= 100);
991 ASSERT(nthreads >= 0);
992 nthreads = MIN(nthreads, 100);
993 nthreads = MAX(nthreads, 0);
994 nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
995 }
996
997 tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
998 if (tq == NULL)
999 return (NULL);
1000
1001 spin_lock_init(&tq->tq_lock);
1002 INIT_LIST_HEAD(&tq->tq_thread_list);
1003 INIT_LIST_HEAD(&tq->tq_active_list);
1004 tq->tq_name = strdup(name);
1005 tq->tq_nactive = 0;
1006 tq->tq_nthreads = 0;
1007 tq->tq_nspawn = 0;
1008 tq->tq_maxthreads = nthreads;
1009 tq->tq_pri = pri;
1010 tq->tq_minalloc = minalloc;
1011 tq->tq_maxalloc = maxalloc;
1012 tq->tq_nalloc = 0;
1013 tq->tq_flags = (flags | TASKQ_ACTIVE);
1014 tq->tq_next_id = 1;
1015 tq->tq_lowest_id = 1;
1016 INIT_LIST_HEAD(&tq->tq_free_list);
1017 INIT_LIST_HEAD(&tq->tq_pend_list);
1018 INIT_LIST_HEAD(&tq->tq_prio_list);
1019 INIT_LIST_HEAD(&tq->tq_delay_list);
1020 init_waitqueue_head(&tq->tq_work_waitq);
1021 init_waitqueue_head(&tq->tq_wait_waitq);
1022 tq->tq_lock_class = TQ_LOCK_GENERAL;
1023 INIT_LIST_HEAD(&tq->tq_taskqs);
1024
1025 if (flags & TASKQ_PREPOPULATE) {
1026 spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1027 tq->tq_lock_class);
1028
1029 for (i = 0; i < minalloc; i++)
1030 task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1031 &irqflags));
1032
1033 spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1034 }
1035
1036 if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1037 nthreads = 1;
1038
1039 for (i = 0; i < nthreads; i++) {
1040 tqt = taskq_thread_create(tq);
1041 if (tqt == NULL)
1042 rc = 1;
1043 else
1044 count++;
1045 }
1046
1047 /* Wait for all threads to be started before potential destroy */
1048 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1049
1050 if (rc) {
1051 taskq_destroy(tq);
1052 tq = NULL;
1053 } else {
1054 down_write(&tq_list_sem);
1055 tq->tq_instance = taskq_find_by_name(name) + 1;
1056 list_add_tail(&tq->tq_taskqs, &tq_list);
1057 up_write(&tq_list_sem);
1058 }
1059
1060 return (tq);
1061 }
1062 EXPORT_SYMBOL(taskq_create);
1063
1064 void
1065 taskq_destroy(taskq_t *tq)
1066 {
1067 struct task_struct *thread;
1068 taskq_thread_t *tqt;
1069 taskq_ent_t *t;
1070 unsigned long flags;
1071
1072 ASSERT(tq);
1073 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1074 tq->tq_flags &= ~TASKQ_ACTIVE;
1075 spin_unlock_irqrestore(&tq->tq_lock, flags);
1076
1077 /*
1078 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1079 * new worker threads be spawned for dynamic taskq.
1080 */
1081 if (dynamic_taskq != NULL)
1082 taskq_wait_outstanding(dynamic_taskq, 0);
1083
1084 taskq_wait(tq);
1085
1086 /* remove taskq from global list used by the kstats */
1087 down_write(&tq_list_sem);
1088 list_del(&tq->tq_taskqs);
1089 up_write(&tq_list_sem);
1090
1091 spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1092
1093 /*
1094 * Signal each thread to exit and block until it does. Each thread
1095 * is responsible for removing itself from the list and freeing its
1096 * taskq_thread_t. This allows for idle threads to opt to remove
1097 * themselves from the taskq. They can be recreated as needed.
1098 */
1099 while (!list_empty(&tq->tq_thread_list)) {
1100 tqt = list_entry(tq->tq_thread_list.next,
1101 taskq_thread_t, tqt_thread_list);
1102 thread = tqt->tqt_thread;
1103 spin_unlock_irqrestore(&tq->tq_lock, flags);
1104
1105 kthread_stop(thread);
1106
1107 spin_lock_irqsave_nested(&tq->tq_lock, flags,
1108 tq->tq_lock_class);
1109 }
1110
1111 while (!list_empty(&tq->tq_free_list)) {
1112 t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1113
1114 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1115
1116 list_del_init(&t->tqent_list);
1117 task_free(tq, t);
1118 }
1119
1120 ASSERT0(tq->tq_nthreads);
1121 ASSERT0(tq->tq_nalloc);
1122 ASSERT0(tq->tq_nspawn);
1123 ASSERT(list_empty(&tq->tq_thread_list));
1124 ASSERT(list_empty(&tq->tq_active_list));
1125 ASSERT(list_empty(&tq->tq_free_list));
1126 ASSERT(list_empty(&tq->tq_pend_list));
1127 ASSERT(list_empty(&tq->tq_prio_list));
1128 ASSERT(list_empty(&tq->tq_delay_list));
1129
1130 spin_unlock_irqrestore(&tq->tq_lock, flags);
1131
1132 strfree(tq->tq_name);
1133 kmem_free(tq, sizeof (taskq_t));
1134 }
1135 EXPORT_SYMBOL(taskq_destroy);
1136
1137 int
1138 spl_taskq_init(void)
1139 {
1140 tsd_create(&taskq_tsd, NULL);
1141
1142 system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1143 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1144 if (system_taskq == NULL)
1145 return (1);
1146
1147 dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1148 maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1149 if (dynamic_taskq == NULL) {
1150 taskq_destroy(system_taskq);
1151 return (1);
1152 }
1153
1154 /*
1155 * This is used to annotate tq_lock, so
1156 * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1157 * does not trigger a lockdep warning re: possible recursive locking
1158 */
1159 dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1160
1161 return (0);
1162 }
1163
1164 void
1165 spl_taskq_fini(void)
1166 {
1167 taskq_destroy(dynamic_taskq);
1168 dynamic_taskq = NULL;
1169
1170 taskq_destroy(system_taskq);
1171 system_taskq = NULL;
1172
1173 tsd_destroy(&taskq_tsd);
1174 }