]> git.proxmox.com Git - mirror_spl-debian.git/blame - modules/spl/spl-taskq.c
Go through and add a header with the proper UCRL number.
[mirror_spl-debian.git] / modules / spl / spl-taskq.c
CommitLineData
715f6251 1/*
2 * This file is part of the SPL: Solaris Porting Layer.
3 *
4 * Copyright (c) 2008 Lawrence Livermore National Security, LLC.
5 * Produced at Lawrence Livermore National Laboratory
6 * Written by:
7 * Brian Behlendorf <behlendorf1@llnl.gov>,
8 * Herb Wartens <wartens2@llnl.gov>,
9 * Jim Garlick <garlick@llnl.gov>
10 * UCRL-CODE-235197
11 *
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 *
22 * You should have received a copy of the GNU General Public License along
23 * with this program; if not, write to the Free Software Foundation, Inc.,
24 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 */
26
f4b37741 27#include <sys/taskq.h>
f1ca4da6 28
937879f1 29#ifdef DEBUG_SUBSYSTEM
30#undef DEBUG_SUBSYSTEM
31#endif
32
33#define DEBUG_SUBSYSTEM S_TASKQ
34
bcd68186 35/* NOTE: Must be called with tq->tq_lock held, returns a list_t which
36 * is not attached to the free, work, or pending taskq lists.
f1ca4da6 37 */
bcd68186 38static task_t *
39task_alloc(taskq_t *tq, uint_t flags)
40{
41 task_t *t;
42 int count = 0;
43 ENTRY;
44
45 ASSERT(tq);
46 ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */
47 ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
48 ASSERT(spin_is_locked(&tq->tq_lock));
49retry:
50 /* Aquire task_t's from free list if available */
51 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
52 t = list_entry(tq->tq_free_list.next, task_t, t_list);
53 list_del_init(&t->t_list);
54 RETURN(t);
55 }
56
57 /* Free list is empty and memory allocs are prohibited */
58 if (flags & TQ_NOALLOC)
59 RETURN(NULL);
60
61 /* Hit maximum task_t pool size */
62 if (tq->tq_nalloc >= tq->tq_maxalloc) {
63 if (flags & TQ_NOSLEEP)
64 RETURN(NULL);
65
66 /* Sleep periodically polling the free list for an available
67 * task_t. If a full second passes and we have not found
68 * one gives up and return a NULL to the caller. */
69 if (flags & TQ_SLEEP) {
70 spin_unlock_irq(&tq->tq_lock);
71 schedule_timeout(HZ / 100);
72 spin_lock_irq(&tq->tq_lock);
73 if (count < 100)
74 GOTO(retry, count++);
75
76 RETURN(NULL);
77 }
78
79 /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */
80 SBUG();
81 }
82
83 spin_unlock_irq(&tq->tq_lock);
84 t = kmem_alloc(sizeof(task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
85 spin_lock_irq(&tq->tq_lock);
86
87 if (t) {
88 spin_lock_init(&t->t_lock);
89 INIT_LIST_HEAD(&t->t_list);
90 t->t_id = 0;
91 t->t_func = NULL;
92 t->t_arg = NULL;
93 tq->tq_nalloc++;
94 }
95
96 RETURN(t);
97}
98
99/* NOTE: Must be called with tq->tq_lock held, expectes the task_t
100 * to already be removed from the free, work, or pending taskq lists.
101 */
102static void
103task_free(taskq_t *tq, task_t *t)
104{
105 ENTRY;
106
107 ASSERT(tq);
108 ASSERT(t);
109 ASSERT(spin_is_locked(&tq->tq_lock));
110 ASSERT(list_empty(&t->t_list));
111
112 kmem_free(t, sizeof(task_t));
113 tq->tq_nalloc--;
f1ca4da6 114
bcd68186 115 EXIT;
116}
117
118/* NOTE: Must be called with tq->tq_lock held, either destroyes the
119 * task_t if too many exist or moves it to the free list for later use.
120 */
f1ca4da6 121static void
bcd68186 122task_done(taskq_t *tq, task_t *t)
f1ca4da6 123{
bcd68186 124 ENTRY;
125 ASSERT(tq);
126 ASSERT(t);
127 ASSERT(spin_is_locked(&tq->tq_lock));
128
129 list_del_init(&t->t_list);
f1ca4da6 130
bcd68186 131 if (tq->tq_nalloc <= tq->tq_minalloc) {
132 t->t_id = 0;
133 t->t_func = NULL;
134 t->t_arg = NULL;
9ab1ac14 135 list_add_tail(&t->t_list, &tq->tq_free_list);
bcd68186 136 } else {
137 task_free(tq, t);
138 }
f1ca4da6 139
bcd68186 140 EXIT;
f1ca4da6 141}
142
bcd68186 143/* Taskqid's are handed out in a monotonically increasing fashion per
144 * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi
145 * a 64-bit value so this is probably never going to happen. The lowest
146 * pending taskqid is stored in the taskq_t to make it easy for any
147 * taskq_wait()'ers to know if the tasks they're waiting for have
148 * completed. Unfortunately, tq_task_lowest is kept up to date is
149 * a pretty brain dead way, something more clever should be done.
150 */
151static int
152taskq_wait_check(taskq_t *tq, taskqid_t id)
153{
154 RETURN(tq->tq_lowest_id >= id);
155}
156
157/* Expected to wait for all previously scheduled tasks to complete. We do
158 * not need to wait for tasked scheduled after this call to complete. In
159 * otherwords we do not need to drain the entire taskq. */
160void
161__taskq_wait_id(taskq_t *tq, taskqid_t id)
f1ca4da6 162{
937879f1 163 ENTRY;
bcd68186 164 ASSERT(tq);
165
166 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
167
168 EXIT;
169}
170EXPORT_SYMBOL(__taskq_wait_id);
171
172void
173__taskq_wait(taskq_t *tq)
174{
175 taskqid_t id;
176 ENTRY;
177 ASSERT(tq);
178
179 spin_lock_irq(&tq->tq_lock);
180 id = tq->tq_next_id;
181 spin_unlock_irq(&tq->tq_lock);
182
183 __taskq_wait_id(tq, id);
184
185 EXIT;
186
187}
188EXPORT_SYMBOL(__taskq_wait);
189
190int
191__taskq_member(taskq_t *tq, void *t)
192{
193 int i;
194 ENTRY;
195
196 ASSERT(tq);
197 ASSERT(t);
198
199 for (i = 0; i < tq->tq_nthreads; i++)
200 if (tq->tq_threads[i] == (struct task_struct *)t)
201 RETURN(1);
202
203 RETURN(0);
204}
205EXPORT_SYMBOL(__taskq_member);
206
207taskqid_t
208__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
209{
210 task_t *t;
211 taskqid_t rc = 0;
212 ENTRY;
f1ca4da6 213
937879f1 214 ASSERT(tq);
215 ASSERT(func);
bcd68186 216 if (unlikely(in_atomic() && (flags & TQ_SLEEP))) {
217 CERROR("May schedule while atomic: %s/0x%08x/%d\n",
218 current->comm, preempt_count(), current->pid);
219 SBUG();
220 }
f1ca4da6 221
bcd68186 222 spin_lock_irq(&tq->tq_lock);
f1ca4da6 223
bcd68186 224 /* Taskq being destroyed and all tasks drained */
225 if (!(tq->tq_flags & TQ_ACTIVE))
226 GOTO(out, rc = 0);
f1ca4da6 227
bcd68186 228 /* Do not queue the task unless there is idle thread for it */
229 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
230 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
231 GOTO(out, rc = 0);
232
233 if ((t = task_alloc(tq, flags)) == NULL)
234 GOTO(out, rc = 0);
f1ca4da6 235
bcd68186 236
237 spin_lock(&t->t_lock);
9ab1ac14 238 list_add_tail(&t->t_list, &tq->tq_pend_list);
bcd68186 239 t->t_id = rc = tq->tq_next_id;
240 tq->tq_next_id++;
241 t->t_func = func;
242 t->t_arg = arg;
243 spin_unlock(&t->t_lock);
244
245 wake_up(&tq->tq_work_waitq);
246out:
247 spin_unlock_irq(&tq->tq_lock);
248 RETURN(rc);
f1ca4da6 249}
f1b59d26 250EXPORT_SYMBOL(__taskq_dispatch);
f1ca4da6 251
bcd68186 252/* NOTE: Must be called with tq->tq_lock held */
253static taskqid_t
254taskq_lowest_id(taskq_t *tq)
255{
256 taskqid_t lowest_id = ~0;
257 task_t *t;
258 ENTRY;
259
260 ASSERT(tq);
261 ASSERT(spin_is_locked(&tq->tq_lock));
262
263 list_for_each_entry(t, &tq->tq_pend_list, t_list)
264 if (t->t_id < lowest_id)
265 lowest_id = t->t_id;
266
267 list_for_each_entry(t, &tq->tq_work_list, t_list)
268 if (t->t_id < lowest_id)
269 lowest_id = t->t_id;
270
271 RETURN(lowest_id);
272}
273
274static int
275taskq_thread(void *args)
276{
277 DECLARE_WAITQUEUE(wait, current);
278 sigset_t blocked;
279 taskqid_t id;
280 taskq_t *tq = args;
281 task_t *t;
282 ENTRY;
283
284 ASSERT(tq);
285 current->flags |= PF_NOFREEZE;
286
287 sigfillset(&blocked);
288 sigprocmask(SIG_BLOCK, &blocked, NULL);
289 flush_signals(current);
290
291 spin_lock_irq(&tq->tq_lock);
292 tq->tq_nthreads++;
293 wake_up(&tq->tq_wait_waitq);
294 set_current_state(TASK_INTERRUPTIBLE);
295
296 while (!kthread_should_stop()) {
297
298 add_wait_queue(&tq->tq_work_waitq, &wait);
299 if (list_empty(&tq->tq_pend_list)) {
300 spin_unlock_irq(&tq->tq_lock);
301 schedule();
302 spin_lock_irq(&tq->tq_lock);
303 } else {
304 __set_current_state(TASK_RUNNING);
305 }
306
307 remove_wait_queue(&tq->tq_work_waitq, &wait);
308 if (!list_empty(&tq->tq_pend_list)) {
309 t = list_entry(tq->tq_pend_list.next, task_t, t_list);
310 list_del_init(&t->t_list);
9ab1ac14 311 list_add_tail(&t->t_list, &tq->tq_work_list);
bcd68186 312 tq->tq_nactive++;
313 spin_unlock_irq(&tq->tq_lock);
314
315 /* Perform the requested task */
316 t->t_func(t->t_arg);
317
318 spin_lock_irq(&tq->tq_lock);
319 tq->tq_nactive--;
320 id = t->t_id;
321 task_done(tq, t);
322
323 /* Update the lowest remaining taskqid yet to run */
324 if (tq->tq_lowest_id == id) {
325 tq->tq_lowest_id = taskq_lowest_id(tq);
326 ASSERT(tq->tq_lowest_id > id);
327 }
328
329 wake_up_all(&tq->tq_wait_waitq);
330 }
331
332 set_current_state(TASK_INTERRUPTIBLE);
333
334 }
335
336 __set_current_state(TASK_RUNNING);
337 tq->tq_nthreads--;
338 spin_unlock_irq(&tq->tq_lock);
339
340 RETURN(0);
341}
342
f1ca4da6 343taskq_t *
344__taskq_create(const char *name, int nthreads, pri_t pri,
345 int minalloc, int maxalloc, uint_t flags)
346{
bcd68186 347 taskq_t *tq;
348 struct task_struct *t;
349 int rc = 0, i, j = 0;
350 ENTRY;
351
352 ASSERT(name != NULL);
353 ASSERT(pri <= maxclsyspri);
354 ASSERT(minalloc >= 0);
355 ASSERT(maxalloc <= INT_MAX);
356 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
357
358 tq = kmem_alloc(sizeof(*tq), KM_SLEEP);
359 if (tq == NULL)
360 RETURN(NULL);
361
362 tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
363 if (tq->tq_threads == NULL) {
364 kmem_free(tq, sizeof(*tq));
365 RETURN(NULL);
366 }
367
368 spin_lock_init(&tq->tq_lock);
369 spin_lock_irq(&tq->tq_lock);
370 tq->tq_name = name;
371 tq->tq_nactive = 0;
372 tq->tq_nthreads = 0;
373 tq->tq_pri = pri;
374 tq->tq_minalloc = minalloc;
375 tq->tq_maxalloc = maxalloc;
376 tq->tq_nalloc = 0;
377 tq->tq_flags = (flags | TQ_ACTIVE);
378 tq->tq_next_id = 1;
379 tq->tq_lowest_id = 1;
380 INIT_LIST_HEAD(&tq->tq_free_list);
381 INIT_LIST_HEAD(&tq->tq_work_list);
382 INIT_LIST_HEAD(&tq->tq_pend_list);
383 init_waitqueue_head(&tq->tq_work_waitq);
384 init_waitqueue_head(&tq->tq_wait_waitq);
385
386 if (flags & TASKQ_PREPOPULATE)
387 for (i = 0; i < minalloc; i++)
388 task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW));
6e605b6e 389
bcd68186 390 spin_unlock_irq(&tq->tq_lock);
6e605b6e 391
bcd68186 392 for (i = 0; i < nthreads; i++) {
393 t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
394 if (t) {
395 tq->tq_threads[i] = t;
396 kthread_bind(t, i % num_online_cpus());
397 set_user_nice(t, PRIO_TO_NICE(pri));
398 wake_up_process(t);
399 j++;
400 } else {
401 tq->tq_threads[i] = NULL;
402 rc = 1;
403 }
404 }
405
406 /* Wait for all threads to be started before potential destroy */
407 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
408
409 if (rc) {
410 __taskq_destroy(tq);
411 tq = NULL;
412 }
413
414 RETURN(tq);
f1ca4da6 415}
f1b59d26 416EXPORT_SYMBOL(__taskq_create);
b123971f 417
418void
419__taskq_destroy(taskq_t *tq)
420{
bcd68186 421 task_t *t;
422 int i, nthreads;
937879f1 423 ENTRY;
b123971f 424
bcd68186 425 ASSERT(tq);
426 spin_lock_irq(&tq->tq_lock);
427 tq->tq_flags &= ~TQ_ACTIVE;
428 spin_unlock_irq(&tq->tq_lock);
429
430 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
431 __taskq_wait(tq);
432
433 nthreads = tq->tq_nthreads;
434 for (i = 0; i < nthreads; i++)
435 if (tq->tq_threads[i])
436 kthread_stop(tq->tq_threads[i]);
437
438 spin_lock_irq(&tq->tq_lock);
439
440 while (!list_empty(&tq->tq_free_list)) {
441 t = list_entry(tq->tq_free_list.next, task_t, t_list);
442 list_del_init(&t->t_list);
443 task_free(tq, t);
444 }
445
446 ASSERT(tq->tq_nthreads == 0);
447 ASSERT(tq->tq_nalloc == 0);
448 ASSERT(list_empty(&tq->tq_free_list));
449 ASSERT(list_empty(&tq->tq_work_list));
450 ASSERT(list_empty(&tq->tq_pend_list));
451
452 spin_unlock_irq(&tq->tq_lock);
453 kmem_free(tq->tq_threads, nthreads * sizeof(task_t *));
454 kmem_free(tq, sizeof(taskq_t));
455
937879f1 456 EXIT;
b123971f 457}
bcd68186 458EXPORT_SYMBOL(__taskq_destroy);