]>
git.proxmox.com Git - mirror_spl.git/blob - module/spl/spl-taskq.c
951298d9fa7fc74f7bbff0a7742b7174ed77ed2f
1 /*****************************************************************************\
2 * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3 * Copyright (C) 2007 The Regents of the University of California.
4 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5 * Written by Brian Behlendorf <behlendorf1@llnl.gov>.
8 * This file is part of the SPL, Solaris Porting Layer.
9 * For details, see <http://zfsonlinux.org/>.
11 * The SPL is free software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the
13 * Free Software Foundation; either version 2 of the License, or (at your
14 * option) any later version.
16 * The SPL is distributed in the hope that it will be useful, but WITHOUT
17 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 * You should have received a copy of the GNU General Public License along
22 * with the SPL. If not, see <http://www.gnu.org/licenses/>.
23 *****************************************************************************
24 * Solaris Porting Layer (SPL) Task Queue Implementation.
25 \*****************************************************************************/
27 #include <sys/taskq.h>
30 int spl_taskq_thread_bind
= 0;
31 module_param(spl_taskq_thread_bind
, int, 0644);
32 MODULE_PARM_DESC(spl_taskq_thread_bind
, "Bind taskq thread to CPU by default");
34 /* Global system-wide dynamic task queue available for all consumers */
35 taskq_t
*system_taskq
;
36 EXPORT_SYMBOL(system_taskq
);
39 task_km_flags(uint_t flags
)
41 if (flags
& TQ_NOSLEEP
)
44 if (flags
& TQ_PUSHPAGE
)
51 * NOTE: Must be called with tq->tq_lock held, returns a list_t which
52 * is not attached to the free, work, or pending taskq lists.
55 task_alloc(taskq_t
*tq
, uint_t flags
)
61 ASSERT(spin_is_locked(&tq
->tq_lock
));
63 /* Acquire taskq_ent_t's from free list if available */
64 if (!list_empty(&tq
->tq_free_list
) && !(flags
& TQ_NEW
)) {
65 t
= list_entry(tq
->tq_free_list
.next
, taskq_ent_t
, tqent_list
);
67 ASSERT(!(t
->tqent_flags
& TQENT_FLAG_PREALLOC
));
68 ASSERT(!(t
->tqent_flags
& TQENT_FLAG_CANCEL
));
69 ASSERT(!timer_pending(&t
->tqent_timer
));
71 list_del_init(&t
->tqent_list
);
75 /* Free list is empty and memory allocations are prohibited */
76 if (flags
& TQ_NOALLOC
)
79 /* Hit maximum taskq_ent_t pool size */
80 if (tq
->tq_nalloc
>= tq
->tq_maxalloc
) {
81 if (flags
& TQ_NOSLEEP
)
85 * Sleep periodically polling the free list for an available
86 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
87 * but we cannot block forever waiting for an taskq_ent_t to
88 * show up in the free list, otherwise a deadlock can happen.
90 * Therefore, we need to allocate a new task even if the number
91 * of allocated tasks is above tq->tq_maxalloc, but we still
92 * end up delaying the task allocation by one second, thereby
93 * throttling the task dispatch rate.
95 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
96 schedule_timeout(HZ
/ 100);
97 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
104 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
105 t
= kmem_alloc(sizeof(taskq_ent_t
), task_km_flags(flags
));
106 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
117 * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
118 * to already be removed from the free, work, or pending taskq lists.
121 task_free(taskq_t
*tq
, taskq_ent_t
*t
)
125 ASSERT(spin_is_locked(&tq
->tq_lock
));
126 ASSERT(list_empty(&t
->tqent_list
));
127 ASSERT(!timer_pending(&t
->tqent_timer
));
129 kmem_free(t
, sizeof(taskq_ent_t
));
134 * NOTE: Must be called with tq->tq_lock held, either destroys the
135 * taskq_ent_t if too many exist or moves it to the free list for later use.
138 task_done(taskq_t
*tq
, taskq_ent_t
*t
)
142 ASSERT(spin_is_locked(&tq
->tq_lock
));
144 /* Wake tasks blocked in taskq_wait_id() */
145 wake_up_all(&t
->tqent_waitq
);
147 list_del_init(&t
->tqent_list
);
149 if (tq
->tq_nalloc
<= tq
->tq_minalloc
) {
151 t
->tqent_func
= NULL
;
155 list_add_tail(&t
->tqent_list
, &tq
->tq_free_list
);
162 * When a delayed task timer expires remove it from the delay list and
163 * add it to the priority list in order for immediate processing.
166 task_expire(unsigned long data
)
168 taskq_ent_t
*w
, *t
= (taskq_ent_t
*)data
;
169 taskq_t
*tq
= t
->tqent_taskq
;
172 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
174 if (t
->tqent_flags
& TQENT_FLAG_CANCEL
) {
175 ASSERT(list_empty(&t
->tqent_list
));
176 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
181 * The priority list must be maintained in strict task id order
182 * from lowest to highest for lowest_id to be easily calculable.
184 list_del(&t
->tqent_list
);
185 list_for_each_prev(l
, &tq
->tq_prio_list
) {
186 w
= list_entry(l
, taskq_ent_t
, tqent_list
);
187 if (w
->tqent_id
< t
->tqent_id
) {
188 list_add(&t
->tqent_list
, l
);
192 if (l
== &tq
->tq_prio_list
)
193 list_add(&t
->tqent_list
, &tq
->tq_prio_list
);
195 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
197 wake_up(&tq
->tq_work_waitq
);
201 * Returns the lowest incomplete taskqid_t. The taskqid_t may
202 * be queued on the pending list, on the priority list, on the
203 * delay list, or on the work list currently being handled, but
204 * it is not 100% complete yet.
207 taskq_lowest_id(taskq_t
*tq
)
209 taskqid_t lowest_id
= tq
->tq_next_id
;
214 ASSERT(spin_is_locked(&tq
->tq_lock
));
216 if (!list_empty(&tq
->tq_pend_list
)) {
217 t
= list_entry(tq
->tq_pend_list
.next
, taskq_ent_t
, tqent_list
);
218 lowest_id
= MIN(lowest_id
, t
->tqent_id
);
221 if (!list_empty(&tq
->tq_prio_list
)) {
222 t
= list_entry(tq
->tq_prio_list
.next
, taskq_ent_t
, tqent_list
);
223 lowest_id
= MIN(lowest_id
, t
->tqent_id
);
226 if (!list_empty(&tq
->tq_delay_list
)) {
227 t
= list_entry(tq
->tq_delay_list
.next
, taskq_ent_t
, tqent_list
);
228 lowest_id
= MIN(lowest_id
, t
->tqent_id
);
231 if (!list_empty(&tq
->tq_active_list
)) {
232 tqt
= list_entry(tq
->tq_active_list
.next
, taskq_thread_t
,
234 ASSERT(tqt
->tqt_id
!= 0);
235 lowest_id
= MIN(lowest_id
, tqt
->tqt_id
);
242 * Insert a task into a list keeping the list sorted by increasing taskqid.
245 taskq_insert_in_order(taskq_t
*tq
, taskq_thread_t
*tqt
)
252 ASSERT(spin_is_locked(&tq
->tq_lock
));
254 list_for_each_prev(l
, &tq
->tq_active_list
) {
255 w
= list_entry(l
, taskq_thread_t
, tqt_active_list
);
256 if (w
->tqt_id
< tqt
->tqt_id
) {
257 list_add(&tqt
->tqt_active_list
, l
);
261 if (l
== &tq
->tq_active_list
)
262 list_add(&tqt
->tqt_active_list
, &tq
->tq_active_list
);
266 * Find and return a task from the given list if it exists. The list
267 * must be in lowest to highest task id order.
270 taskq_find_list(taskq_t
*tq
, struct list_head
*lh
, taskqid_t id
)
275 ASSERT(spin_is_locked(&tq
->tq_lock
));
277 list_for_each(l
, lh
) {
278 t
= list_entry(l
, taskq_ent_t
, tqent_list
);
280 if (t
->tqent_id
== id
)
283 if (t
->tqent_id
> id
)
291 * Find an already dispatched task given the task id regardless of what
292 * state it is in. If a task is still pending or executing it will be
293 * returned and 'active' set appropriately. If the task has already
294 * been run then NULL is returned.
297 taskq_find(taskq_t
*tq
, taskqid_t id
, int *active
)
303 ASSERT(spin_is_locked(&tq
->tq_lock
));
306 t
= taskq_find_list(tq
, &tq
->tq_delay_list
, id
);
310 t
= taskq_find_list(tq
, &tq
->tq_prio_list
, id
);
314 t
= taskq_find_list(tq
, &tq
->tq_pend_list
, id
);
318 list_for_each(l
, &tq
->tq_active_list
) {
319 tqt
= list_entry(l
, taskq_thread_t
, tqt_active_list
);
320 if (tqt
->tqt_id
== id
) {
331 taskq_wait_id_check(taskq_t
*tq
, taskqid_t id
)
336 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
337 rc
= (taskq_find(tq
, id
, &active
) == NULL
);
338 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
344 * The taskq_wait_id() function blocks until the passed task id completes.
345 * This does not guarantee that all lower task ids have completed.
348 taskq_wait_id(taskq_t
*tq
, taskqid_t id
)
350 wait_event(tq
->tq_wait_waitq
, taskq_wait_id_check(tq
, id
));
352 EXPORT_SYMBOL(taskq_wait_id
);
355 * The taskq_wait() function will block until all previously submitted
356 * tasks have been completed. A previously submitted task is defined as
357 * a task with a lower task id than the current task queue id. Note that
358 * all task id's are assigned monotonically at dispatch time.
360 * Waiting for all previous tasks to complete is accomplished by tracking
361 * the lowest outstanding task id. As tasks are dispatched they are added
362 * added to the tail of the pending, priority, or delay lists. And as
363 * worker threads become available the tasks are removed from the heads
364 * of these lists and linked to the worker threads. This ensures the
365 * lists are kept in lowest to highest task id order.
367 * Therefore the lowest outstanding task id can be quickly determined by
368 * checking the head item from all of these lists. This value is stored
369 * with the task queue as the lowest id. It only needs to be recalculated
370 * when either the task with the current lowest id completes or is canceled.
372 * By blocking until the lowest task id exceeds the current task id when
373 * the function was called we ensure all previous tasks have completed.
375 * NOTE: When there are multiple worked threads it is possible for larger
376 * task ids to complete before smaller ones. Conversely when the task
377 * queue contains delay tasks with small task ids, you may block for a
378 * considerable length of time waiting for them to expire and execute.
381 taskq_wait_check(taskq_t
*tq
, taskqid_t id
)
385 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
386 rc
= (id
< tq
->tq_lowest_id
);
387 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
393 taskq_wait_all(taskq_t
*tq
, taskqid_t id
)
395 wait_event(tq
->tq_wait_waitq
, taskq_wait_check(tq
, id
));
397 EXPORT_SYMBOL(taskq_wait_all
);
400 taskq_wait(taskq_t
*tq
)
406 /* Wait for the largest outstanding taskqid */
407 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
408 id
= tq
->tq_next_id
- 1;
409 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
411 taskq_wait_all(tq
, id
);
413 EXPORT_SYMBOL(taskq_wait
);
416 taskq_member(taskq_t
*tq
, void *t
)
424 list_for_each(l
, &tq
->tq_thread_list
) {
425 tqt
= list_entry(l
, taskq_thread_t
, tqt_thread_list
);
426 if (tqt
->tqt_thread
== (struct task_struct
*)t
)
432 EXPORT_SYMBOL(taskq_member
);
435 * Cancel an already dispatched task given the task id. Still pending tasks
436 * will be immediately canceled, and if the task is active the function will
437 * block until it completes. Preallocated tasks which are canceled must be
438 * freed by the caller.
441 taskq_cancel_id(taskq_t
*tq
, taskqid_t id
)
449 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
450 t
= taskq_find(tq
, id
, &active
);
452 list_del_init(&t
->tqent_list
);
453 t
->tqent_flags
|= TQENT_FLAG_CANCEL
;
456 * When canceling the lowest outstanding task id we
457 * must recalculate the new lowest outstanding id.
459 if (tq
->tq_lowest_id
== t
->tqent_id
) {
460 tq
->tq_lowest_id
= taskq_lowest_id(tq
);
461 ASSERT3S(tq
->tq_lowest_id
, >, t
->tqent_id
);
465 * The task_expire() function takes the tq->tq_lock so drop
466 * drop the lock before synchronously cancelling the timer.
468 if (timer_pending(&t
->tqent_timer
)) {
469 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
470 del_timer_sync(&t
->tqent_timer
);
471 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
474 if (!(t
->tqent_flags
& TQENT_FLAG_PREALLOC
))
479 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
482 taskq_wait_id(tq
, id
);
488 EXPORT_SYMBOL(taskq_cancel_id
);
491 taskq_dispatch(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t flags
)
499 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
501 /* Taskq being destroyed and all tasks drained */
502 if (!(tq
->tq_flags
& TQ_ACTIVE
))
505 /* Do not queue the task unless there is idle thread for it */
506 ASSERT(tq
->tq_nactive
<= tq
->tq_nthreads
);
507 if ((flags
& TQ_NOQUEUE
) && (tq
->tq_nactive
== tq
->tq_nthreads
))
510 if ((t
= task_alloc(tq
, flags
)) == NULL
)
513 spin_lock(&t
->tqent_lock
);
515 /* Queue to the priority list instead of the pending list */
516 if (flags
& TQ_FRONT
)
517 list_add_tail(&t
->tqent_list
, &tq
->tq_prio_list
);
519 list_add_tail(&t
->tqent_list
, &tq
->tq_pend_list
);
521 t
->tqent_id
= rc
= tq
->tq_next_id
;
523 t
->tqent_func
= func
;
526 t
->tqent_timer
.data
= 0;
527 t
->tqent_timer
.function
= NULL
;
528 t
->tqent_timer
.expires
= 0;
530 ASSERT(!(t
->tqent_flags
& TQENT_FLAG_PREALLOC
));
532 spin_unlock(&t
->tqent_lock
);
534 wake_up(&tq
->tq_work_waitq
);
536 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
539 EXPORT_SYMBOL(taskq_dispatch
);
542 taskq_dispatch_delay(taskq_t
*tq
, task_func_t func
, void *arg
,
543 uint_t flags
, clock_t expire_time
)
551 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
553 /* Taskq being destroyed and all tasks drained */
554 if (!(tq
->tq_flags
& TQ_ACTIVE
))
557 if ((t
= task_alloc(tq
, flags
)) == NULL
)
560 spin_lock(&t
->tqent_lock
);
562 /* Queue to the delay list for subsequent execution */
563 list_add_tail(&t
->tqent_list
, &tq
->tq_delay_list
);
565 t
->tqent_id
= rc
= tq
->tq_next_id
;
567 t
->tqent_func
= func
;
570 t
->tqent_timer
.data
= (unsigned long)t
;
571 t
->tqent_timer
.function
= task_expire
;
572 t
->tqent_timer
.expires
= (unsigned long)expire_time
;
573 add_timer(&t
->tqent_timer
);
575 ASSERT(!(t
->tqent_flags
& TQENT_FLAG_PREALLOC
));
577 spin_unlock(&t
->tqent_lock
);
579 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
582 EXPORT_SYMBOL(taskq_dispatch_delay
);
585 taskq_dispatch_ent(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t flags
,
590 ASSERT(!(tq
->tq_flags
& TASKQ_DYNAMIC
));
592 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
594 /* Taskq being destroyed and all tasks drained */
595 if (!(tq
->tq_flags
& TQ_ACTIVE
)) {
600 spin_lock(&t
->tqent_lock
);
603 * Mark it as a prealloc'd task. This is important
604 * to ensure that we don't free it later.
606 t
->tqent_flags
|= TQENT_FLAG_PREALLOC
;
608 /* Queue to the priority list instead of the pending list */
609 if (flags
& TQ_FRONT
)
610 list_add_tail(&t
->tqent_list
, &tq
->tq_prio_list
);
612 list_add_tail(&t
->tqent_list
, &tq
->tq_pend_list
);
614 t
->tqent_id
= tq
->tq_next_id
;
616 t
->tqent_func
= func
;
620 spin_unlock(&t
->tqent_lock
);
622 wake_up(&tq
->tq_work_waitq
);
624 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
626 EXPORT_SYMBOL(taskq_dispatch_ent
);
629 taskq_empty_ent(taskq_ent_t
*t
)
631 return list_empty(&t
->tqent_list
);
633 EXPORT_SYMBOL(taskq_empty_ent
);
636 taskq_init_ent(taskq_ent_t
*t
)
638 spin_lock_init(&t
->tqent_lock
);
639 init_waitqueue_head(&t
->tqent_waitq
);
640 init_timer(&t
->tqent_timer
);
641 INIT_LIST_HEAD(&t
->tqent_list
);
643 t
->tqent_func
= NULL
;
646 t
->tqent_taskq
= NULL
;
648 EXPORT_SYMBOL(taskq_init_ent
);
651 taskq_thread(void *args
)
653 DECLARE_WAITQUEUE(wait
, current
);
655 taskq_thread_t
*tqt
= args
;
658 struct list_head
*pend_list
;
662 current
->flags
|= PF_NOFREEZE
;
664 sigfillset(&blocked
);
665 sigprocmask(SIG_BLOCK
, &blocked
, NULL
);
666 flush_signals(current
);
668 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
670 wake_up(&tq
->tq_wait_waitq
);
671 set_current_state(TASK_INTERRUPTIBLE
);
673 while (!kthread_should_stop()) {
675 if (list_empty(&tq
->tq_pend_list
) &&
676 list_empty(&tq
->tq_prio_list
)) {
677 add_wait_queue_exclusive(&tq
->tq_work_waitq
, &wait
);
678 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
680 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
681 remove_wait_queue(&tq
->tq_work_waitq
, &wait
);
683 __set_current_state(TASK_RUNNING
);
687 if (!list_empty(&tq
->tq_prio_list
))
688 pend_list
= &tq
->tq_prio_list
;
689 else if (!list_empty(&tq
->tq_pend_list
))
690 pend_list
= &tq
->tq_pend_list
;
695 t
= list_entry(pend_list
->next
,taskq_ent_t
,tqent_list
);
696 list_del_init(&t
->tqent_list
);
698 /* In order to support recursively dispatching a
699 * preallocated taskq_ent_t, tqent_id must be
700 * stored prior to executing tqent_func. */
701 tqt
->tqt_id
= t
->tqent_id
;
704 /* We must store a copy of the flags prior to
705 * servicing the task (servicing a prealloc'd task
706 * returns the ownership of the tqent back to
707 * the caller of taskq_dispatch). Thus,
708 * tqent_flags _may_ change within the call. */
709 tqt
->tqt_flags
= t
->tqent_flags
;
711 taskq_insert_in_order(tq
, tqt
);
713 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
715 /* Perform the requested task */
716 t
->tqent_func(t
->tqent_arg
);
718 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
720 list_del_init(&tqt
->tqt_active_list
);
721 tqt
->tqt_task
= NULL
;
723 /* For prealloc'd tasks, we don't free anything. */
724 if ((tq
->tq_flags
& TASKQ_DYNAMIC
) ||
725 !(tqt
->tqt_flags
& TQENT_FLAG_PREALLOC
))
728 /* When the current lowest outstanding taskqid is
729 * done calculate the new lowest outstanding id */
730 if (tq
->tq_lowest_id
== tqt
->tqt_id
) {
731 tq
->tq_lowest_id
= taskq_lowest_id(tq
);
732 ASSERT3S(tq
->tq_lowest_id
, >, tqt
->tqt_id
);
737 wake_up_all(&tq
->tq_wait_waitq
);
740 set_current_state(TASK_INTERRUPTIBLE
);
744 __set_current_state(TASK_RUNNING
);
746 list_del_init(&tqt
->tqt_thread_list
);
747 kmem_free(tqt
, sizeof(taskq_thread_t
));
749 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
755 taskq_create(const char *name
, int nthreads
, pri_t pri
,
756 int minalloc
, int maxalloc
, uint_t flags
)
758 static int last_used_cpu
= 0;
761 int rc
= 0, i
, j
= 0;
763 ASSERT(name
!= NULL
);
764 ASSERT(pri
<= maxclsyspri
);
765 ASSERT(minalloc
>= 0);
766 ASSERT(maxalloc
<= INT_MAX
);
767 ASSERT(!(flags
& (TASKQ_CPR_SAFE
| TASKQ_DYNAMIC
))); /* Unsupported */
769 /* Scale the number of threads using nthreads as a percentage */
770 if (flags
& TASKQ_THREADS_CPU_PCT
) {
771 ASSERT(nthreads
<= 100);
772 ASSERT(nthreads
>= 0);
773 nthreads
= MIN(nthreads
, 100);
774 nthreads
= MAX(nthreads
, 0);
775 nthreads
= MAX((num_online_cpus() * nthreads
) / 100, 1);
778 tq
= kmem_alloc(sizeof(*tq
), KM_PUSHPAGE
);
782 spin_lock_init(&tq
->tq_lock
);
783 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
784 INIT_LIST_HEAD(&tq
->tq_thread_list
);
785 INIT_LIST_HEAD(&tq
->tq_active_list
);
790 tq
->tq_minalloc
= minalloc
;
791 tq
->tq_maxalloc
= maxalloc
;
793 tq
->tq_flags
= (flags
| TQ_ACTIVE
);
795 tq
->tq_lowest_id
= 1;
796 INIT_LIST_HEAD(&tq
->tq_free_list
);
797 INIT_LIST_HEAD(&tq
->tq_pend_list
);
798 INIT_LIST_HEAD(&tq
->tq_prio_list
);
799 INIT_LIST_HEAD(&tq
->tq_delay_list
);
800 init_waitqueue_head(&tq
->tq_work_waitq
);
801 init_waitqueue_head(&tq
->tq_wait_waitq
);
803 if (flags
& TASKQ_PREPOPULATE
)
804 for (i
= 0; i
< minalloc
; i
++)
805 task_done(tq
, task_alloc(tq
, TQ_PUSHPAGE
| TQ_NEW
));
807 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
809 for (i
= 0; i
< nthreads
; i
++) {
810 tqt
= kmem_alloc(sizeof(*tqt
), KM_PUSHPAGE
);
811 INIT_LIST_HEAD(&tqt
->tqt_thread_list
);
812 INIT_LIST_HEAD(&tqt
->tqt_active_list
);
816 tqt
->tqt_thread
= spl_kthread_create(taskq_thread
, tqt
,
818 if (tqt
->tqt_thread
) {
819 list_add(&tqt
->tqt_thread_list
, &tq
->tq_thread_list
);
820 if (spl_taskq_thread_bind
) {
821 last_used_cpu
= (last_used_cpu
+ 1) % num_online_cpus();
822 kthread_bind(tqt
->tqt_thread
, last_used_cpu
);
824 set_user_nice(tqt
->tqt_thread
, PRIO_TO_NICE(pri
));
825 wake_up_process(tqt
->tqt_thread
);
828 kmem_free(tqt
, sizeof(taskq_thread_t
));
833 /* Wait for all threads to be started before potential destroy */
834 wait_event(tq
->tq_wait_waitq
, tq
->tq_nthreads
== j
);
843 EXPORT_SYMBOL(taskq_create
);
846 taskq_destroy(taskq_t
*tq
)
848 struct task_struct
*thread
;
853 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
854 tq
->tq_flags
&= ~TQ_ACTIVE
;
855 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
857 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
860 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
863 * Signal each thread to exit and block until it does. Each thread
864 * is responsible for removing itself from the list and freeing its
865 * taskq_thread_t. This allows for idle threads to opt to remove
866 * themselves from the taskq. They can be recreated as needed.
868 while (!list_empty(&tq
->tq_thread_list
)) {
869 tqt
= list_entry(tq
->tq_thread_list
.next
,
870 taskq_thread_t
, tqt_thread_list
);
871 thread
= tqt
->tqt_thread
;
872 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
874 kthread_stop(thread
);
876 spin_lock_irqsave(&tq
->tq_lock
, tq
->tq_lock_flags
);
879 while (!list_empty(&tq
->tq_free_list
)) {
880 t
= list_entry(tq
->tq_free_list
.next
, taskq_ent_t
, tqent_list
);
882 ASSERT(!(t
->tqent_flags
& TQENT_FLAG_PREALLOC
));
884 list_del_init(&t
->tqent_list
);
888 ASSERT(tq
->tq_nthreads
== 0);
889 ASSERT(tq
->tq_nalloc
== 0);
890 ASSERT(list_empty(&tq
->tq_thread_list
));
891 ASSERT(list_empty(&tq
->tq_active_list
));
892 ASSERT(list_empty(&tq
->tq_free_list
));
893 ASSERT(list_empty(&tq
->tq_pend_list
));
894 ASSERT(list_empty(&tq
->tq_prio_list
));
895 ASSERT(list_empty(&tq
->tq_delay_list
));
897 spin_unlock_irqrestore(&tq
->tq_lock
, tq
->tq_lock_flags
);
899 kmem_free(tq
, sizeof(taskq_t
));
901 EXPORT_SYMBOL(taskq_destroy
);
906 /* Solaris creates a dynamic taskq of up to 64 threads, however in
907 * a Linux environment 1 thread per-core is usually about right */
908 system_taskq
= taskq_create("spl_system_taskq", num_online_cpus(),
909 minclsyspri
, 4, 512, TASKQ_PREPOPULATE
);
910 if (system_taskq
== NULL
)
919 taskq_destroy(system_taskq
);