]>
git.proxmox.com Git - mirror_spl-debian.git/blob - modules/spl/spl-taskq.c
2 * This file is part of the SPL: Solaris Porting Layer.
4 * Copyright (c) 2008 Lawrence Livermore National Security, LLC.
5 * Produced at Lawrence Livermore National Laboratory
7 * Brian Behlendorf <behlendorf1@llnl.gov>,
8 * Herb Wartens <wartens2@llnl.gov>,
9 * Jim Garlick <garlick@llnl.gov>
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
17 * This is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 * You should have received a copy of the GNU General Public License along
23 * with this program; if not, write to the Free Software Foundation, Inc.,
24 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/taskq.h>
29 #ifdef DEBUG_SUBSYSTEM
30 #undef DEBUG_SUBSYSTEM
33 #define DEBUG_SUBSYSTEM S_TASKQ
35 /* NOTE: Must be called with tq->tq_lock held, returns a list_t which
36 * is not attached to the free, work, or pending taskq lists.
39 task_alloc(taskq_t
*tq
, uint_t flags
)
46 ASSERT(flags
& (TQ_SLEEP
| TQ_NOSLEEP
)); /* One set */
47 ASSERT(!((flags
& TQ_SLEEP
) && (flags
& TQ_NOSLEEP
))); /* Not both */
48 ASSERT(spin_is_locked(&tq
->tq_lock
));
50 /* Aquire task_t's from free list if available */
51 if (!list_empty(&tq
->tq_free_list
) && !(flags
& TQ_NEW
)) {
52 t
= list_entry(tq
->tq_free_list
.next
, task_t
, t_list
);
53 list_del_init(&t
->t_list
);
57 /* Free list is empty and memory allocs are prohibited */
58 if (flags
& TQ_NOALLOC
)
61 /* Hit maximum task_t pool size */
62 if (tq
->tq_nalloc
>= tq
->tq_maxalloc
) {
63 if (flags
& TQ_NOSLEEP
)
66 /* Sleep periodically polling the free list for an available
67 * task_t. If a full second passes and we have not found
68 * one gives up and return a NULL to the caller. */
69 if (flags
& TQ_SLEEP
) {
70 spin_unlock_irq(&tq
->tq_lock
);
71 schedule_timeout(HZ
/ 100);
72 spin_lock_irq(&tq
->tq_lock
);
79 /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */
83 spin_unlock_irq(&tq
->tq_lock
);
84 t
= kmem_alloc(sizeof(task_t
), flags
& (TQ_SLEEP
| TQ_NOSLEEP
));
85 spin_lock_irq(&tq
->tq_lock
);
88 spin_lock_init(&t
->t_lock
);
89 INIT_LIST_HEAD(&t
->t_list
);
99 /* NOTE: Must be called with tq->tq_lock held, expectes the task_t
100 * to already be removed from the free, work, or pending taskq lists.
103 task_free(taskq_t
*tq
, task_t
*t
)
109 ASSERT(spin_is_locked(&tq
->tq_lock
));
110 ASSERT(list_empty(&t
->t_list
));
112 kmem_free(t
, sizeof(task_t
));
118 /* NOTE: Must be called with tq->tq_lock held, either destroyes the
119 * task_t if too many exist or moves it to the free list for later use.
122 task_done(taskq_t
*tq
, task_t
*t
)
127 ASSERT(spin_is_locked(&tq
->tq_lock
));
129 list_del_init(&t
->t_list
);
131 if (tq
->tq_nalloc
<= tq
->tq_minalloc
) {
135 list_add_tail(&t
->t_list
, &tq
->tq_free_list
);
143 /* Taskqid's are handed out in a monotonically increasing fashion per
144 * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi
145 * a 64-bit value so this is probably never going to happen. The lowest
146 * pending taskqid is stored in the taskq_t to make it easy for any
147 * taskq_wait()'ers to know if the tasks they're waiting for have
148 * completed. Unfortunately, tq_task_lowest is kept up to date is
149 * a pretty brain dead way, something more clever should be done.
152 taskq_wait_check(taskq_t
*tq
, taskqid_t id
)
154 RETURN(tq
->tq_lowest_id
>= id
);
157 /* Expected to wait for all previously scheduled tasks to complete. We do
158 * not need to wait for tasked scheduled after this call to complete. In
159 * otherwords we do not need to drain the entire taskq. */
161 __taskq_wait_id(taskq_t
*tq
, taskqid_t id
)
166 wait_event(tq
->tq_wait_waitq
, taskq_wait_check(tq
, id
));
170 EXPORT_SYMBOL(__taskq_wait_id
);
173 __taskq_wait(taskq_t
*tq
)
179 spin_lock_irq(&tq
->tq_lock
);
181 spin_unlock_irq(&tq
->tq_lock
);
183 __taskq_wait_id(tq
, id
);
188 EXPORT_SYMBOL(__taskq_wait
);
191 __taskq_member(taskq_t
*tq
, void *t
)
199 for (i
= 0; i
< tq
->tq_nthreads
; i
++)
200 if (tq
->tq_threads
[i
] == (struct task_struct
*)t
)
205 EXPORT_SYMBOL(__taskq_member
);
208 __taskq_dispatch(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t flags
)
216 if (unlikely(in_atomic() && (flags
& TQ_SLEEP
))) {
217 CERROR("May schedule while atomic: %s/0x%08x/%d\n",
218 current
->comm
, preempt_count(), current
->pid
);
222 spin_lock_irq(&tq
->tq_lock
);
224 /* Taskq being destroyed and all tasks drained */
225 if (!(tq
->tq_flags
& TQ_ACTIVE
))
228 /* Do not queue the task unless there is idle thread for it */
229 ASSERT(tq
->tq_nactive
<= tq
->tq_nthreads
);
230 if ((flags
& TQ_NOQUEUE
) && (tq
->tq_nactive
== tq
->tq_nthreads
))
233 if ((t
= task_alloc(tq
, flags
)) == NULL
)
237 spin_lock(&t
->t_lock
);
238 list_add_tail(&t
->t_list
, &tq
->tq_pend_list
);
239 t
->t_id
= rc
= tq
->tq_next_id
;
243 spin_unlock(&t
->t_lock
);
245 wake_up(&tq
->tq_work_waitq
);
247 spin_unlock_irq(&tq
->tq_lock
);
250 EXPORT_SYMBOL(__taskq_dispatch
);
252 /* NOTE: Must be called with tq->tq_lock held */
254 taskq_lowest_id(taskq_t
*tq
)
256 taskqid_t lowest_id
= ~0;
261 ASSERT(spin_is_locked(&tq
->tq_lock
));
263 list_for_each_entry(t
, &tq
->tq_pend_list
, t_list
)
264 if (t
->t_id
< lowest_id
)
267 list_for_each_entry(t
, &tq
->tq_work_list
, t_list
)
268 if (t
->t_id
< lowest_id
)
275 taskq_thread(void *args
)
277 DECLARE_WAITQUEUE(wait
, current
);
285 current
->flags
|= PF_NOFREEZE
;
287 sigfillset(&blocked
);
288 sigprocmask(SIG_BLOCK
, &blocked
, NULL
);
289 flush_signals(current
);
291 spin_lock_irq(&tq
->tq_lock
);
293 wake_up(&tq
->tq_wait_waitq
);
294 set_current_state(TASK_INTERRUPTIBLE
);
296 while (!kthread_should_stop()) {
298 add_wait_queue(&tq
->tq_work_waitq
, &wait
);
299 if (list_empty(&tq
->tq_pend_list
)) {
300 spin_unlock_irq(&tq
->tq_lock
);
302 spin_lock_irq(&tq
->tq_lock
);
304 __set_current_state(TASK_RUNNING
);
307 remove_wait_queue(&tq
->tq_work_waitq
, &wait
);
308 if (!list_empty(&tq
->tq_pend_list
)) {
309 t
= list_entry(tq
->tq_pend_list
.next
, task_t
, t_list
);
310 list_del_init(&t
->t_list
);
311 list_add_tail(&t
->t_list
, &tq
->tq_work_list
);
313 spin_unlock_irq(&tq
->tq_lock
);
315 /* Perform the requested task */
318 spin_lock_irq(&tq
->tq_lock
);
323 /* Update the lowest remaining taskqid yet to run */
324 if (tq
->tq_lowest_id
== id
) {
325 tq
->tq_lowest_id
= taskq_lowest_id(tq
);
326 ASSERT(tq
->tq_lowest_id
> id
);
329 wake_up_all(&tq
->tq_wait_waitq
);
332 set_current_state(TASK_INTERRUPTIBLE
);
336 __set_current_state(TASK_RUNNING
);
338 spin_unlock_irq(&tq
->tq_lock
);
344 __taskq_create(const char *name
, int nthreads
, pri_t pri
,
345 int minalloc
, int maxalloc
, uint_t flags
)
348 struct task_struct
*t
;
349 int rc
= 0, i
, j
= 0;
352 ASSERT(name
!= NULL
);
353 ASSERT(pri
<= maxclsyspri
);
354 ASSERT(minalloc
>= 0);
355 ASSERT(maxalloc
<= INT_MAX
);
356 ASSERT(!(flags
& (TASKQ_CPR_SAFE
| TASKQ_DYNAMIC
))); /* Unsupported */
358 tq
= kmem_alloc(sizeof(*tq
), KM_SLEEP
);
362 tq
->tq_threads
= kmem_alloc(nthreads
* sizeof(t
), KM_SLEEP
);
363 if (tq
->tq_threads
== NULL
) {
364 kmem_free(tq
, sizeof(*tq
));
368 spin_lock_init(&tq
->tq_lock
);
369 spin_lock_irq(&tq
->tq_lock
);
374 tq
->tq_minalloc
= minalloc
;
375 tq
->tq_maxalloc
= maxalloc
;
377 tq
->tq_flags
= (flags
| TQ_ACTIVE
);
379 tq
->tq_lowest_id
= 1;
380 INIT_LIST_HEAD(&tq
->tq_free_list
);
381 INIT_LIST_HEAD(&tq
->tq_work_list
);
382 INIT_LIST_HEAD(&tq
->tq_pend_list
);
383 init_waitqueue_head(&tq
->tq_work_waitq
);
384 init_waitqueue_head(&tq
->tq_wait_waitq
);
386 if (flags
& TASKQ_PREPOPULATE
)
387 for (i
= 0; i
< minalloc
; i
++)
388 task_done(tq
, task_alloc(tq
, TQ_SLEEP
| TQ_NEW
));
390 spin_unlock_irq(&tq
->tq_lock
);
392 for (i
= 0; i
< nthreads
; i
++) {
393 t
= kthread_create(taskq_thread
, tq
, "%s/%d", name
, i
);
395 tq
->tq_threads
[i
] = t
;
396 kthread_bind(t
, i
% num_online_cpus());
397 set_user_nice(t
, PRIO_TO_NICE(pri
));
401 tq
->tq_threads
[i
] = NULL
;
406 /* Wait for all threads to be started before potential destroy */
407 wait_event(tq
->tq_wait_waitq
, tq
->tq_nthreads
== j
);
416 EXPORT_SYMBOL(__taskq_create
);
419 __taskq_destroy(taskq_t
*tq
)
426 spin_lock_irq(&tq
->tq_lock
);
427 tq
->tq_flags
&= ~TQ_ACTIVE
;
428 spin_unlock_irq(&tq
->tq_lock
);
430 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
433 nthreads
= tq
->tq_nthreads
;
434 for (i
= 0; i
< nthreads
; i
++)
435 if (tq
->tq_threads
[i
])
436 kthread_stop(tq
->tq_threads
[i
]);
438 spin_lock_irq(&tq
->tq_lock
);
440 while (!list_empty(&tq
->tq_free_list
)) {
441 t
= list_entry(tq
->tq_free_list
.next
, task_t
, t_list
);
442 list_del_init(&t
->t_list
);
446 ASSERT(tq
->tq_nthreads
== 0);
447 ASSERT(tq
->tq_nalloc
== 0);
448 ASSERT(list_empty(&tq
->tq_free_list
));
449 ASSERT(list_empty(&tq
->tq_work_list
));
450 ASSERT(list_empty(&tq
->tq_pend_list
));
452 spin_unlock_irq(&tq
->tq_lock
);
453 kmem_free(tq
->tq_threads
, nthreads
* sizeof(task_t
*));
454 kmem_free(tq
, sizeof(taskq_t
));
458 EXPORT_SYMBOL(__taskq_destroy
);