]> git.proxmox.com Git - mirror_spl-debian.git/blob - modules/spl/spl-taskq.c
Go through and add a header with the proper UCRL number.
[mirror_spl-debian.git] / modules / spl / spl-taskq.c
1 /*
2 * This file is part of the SPL: Solaris Porting Layer.
3 *
4 * Copyright (c) 2008 Lawrence Livermore National Security, LLC.
5 * Produced at Lawrence Livermore National Laboratory
6 * Written by:
7 * Brian Behlendorf <behlendorf1@llnl.gov>,
8 * Herb Wartens <wartens2@llnl.gov>,
9 * Jim Garlick <garlick@llnl.gov>
10 * UCRL-CODE-235197
11 *
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 *
22 * You should have received a copy of the GNU General Public License along
23 * with this program; if not, write to the Free Software Foundation, Inc.,
24 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 */
26
27 #include <sys/taskq.h>
28
29 #ifdef DEBUG_SUBSYSTEM
30 #undef DEBUG_SUBSYSTEM
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_TASKQ
34
35 /* NOTE: Must be called with tq->tq_lock held, returns a list_t which
36 * is not attached to the free, work, or pending taskq lists.
37 */
38 static task_t *
39 task_alloc(taskq_t *tq, uint_t flags)
40 {
41 task_t *t;
42 int count = 0;
43 ENTRY;
44
45 ASSERT(tq);
46 ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */
47 ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
48 ASSERT(spin_is_locked(&tq->tq_lock));
49 retry:
50 /* Aquire task_t's from free list if available */
51 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
52 t = list_entry(tq->tq_free_list.next, task_t, t_list);
53 list_del_init(&t->t_list);
54 RETURN(t);
55 }
56
57 /* Free list is empty and memory allocs are prohibited */
58 if (flags & TQ_NOALLOC)
59 RETURN(NULL);
60
61 /* Hit maximum task_t pool size */
62 if (tq->tq_nalloc >= tq->tq_maxalloc) {
63 if (flags & TQ_NOSLEEP)
64 RETURN(NULL);
65
66 /* Sleep periodically polling the free list for an available
67 * task_t. If a full second passes and we have not found
68 * one gives up and return a NULL to the caller. */
69 if (flags & TQ_SLEEP) {
70 spin_unlock_irq(&tq->tq_lock);
71 schedule_timeout(HZ / 100);
72 spin_lock_irq(&tq->tq_lock);
73 if (count < 100)
74 GOTO(retry, count++);
75
76 RETURN(NULL);
77 }
78
79 /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */
80 SBUG();
81 }
82
83 spin_unlock_irq(&tq->tq_lock);
84 t = kmem_alloc(sizeof(task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
85 spin_lock_irq(&tq->tq_lock);
86
87 if (t) {
88 spin_lock_init(&t->t_lock);
89 INIT_LIST_HEAD(&t->t_list);
90 t->t_id = 0;
91 t->t_func = NULL;
92 t->t_arg = NULL;
93 tq->tq_nalloc++;
94 }
95
96 RETURN(t);
97 }
98
99 /* NOTE: Must be called with tq->tq_lock held, expectes the task_t
100 * to already be removed from the free, work, or pending taskq lists.
101 */
102 static void
103 task_free(taskq_t *tq, task_t *t)
104 {
105 ENTRY;
106
107 ASSERT(tq);
108 ASSERT(t);
109 ASSERT(spin_is_locked(&tq->tq_lock));
110 ASSERT(list_empty(&t->t_list));
111
112 kmem_free(t, sizeof(task_t));
113 tq->tq_nalloc--;
114
115 EXIT;
116 }
117
118 /* NOTE: Must be called with tq->tq_lock held, either destroyes the
119 * task_t if too many exist or moves it to the free list for later use.
120 */
121 static void
122 task_done(taskq_t *tq, task_t *t)
123 {
124 ENTRY;
125 ASSERT(tq);
126 ASSERT(t);
127 ASSERT(spin_is_locked(&tq->tq_lock));
128
129 list_del_init(&t->t_list);
130
131 if (tq->tq_nalloc <= tq->tq_minalloc) {
132 t->t_id = 0;
133 t->t_func = NULL;
134 t->t_arg = NULL;
135 list_add_tail(&t->t_list, &tq->tq_free_list);
136 } else {
137 task_free(tq, t);
138 }
139
140 EXIT;
141 }
142
143 /* Taskqid's are handed out in a monotonically increasing fashion per
144 * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi
145 * a 64-bit value so this is probably never going to happen. The lowest
146 * pending taskqid is stored in the taskq_t to make it easy for any
147 * taskq_wait()'ers to know if the tasks they're waiting for have
148 * completed. Unfortunately, tq_task_lowest is kept up to date is
149 * a pretty brain dead way, something more clever should be done.
150 */
151 static int
152 taskq_wait_check(taskq_t *tq, taskqid_t id)
153 {
154 RETURN(tq->tq_lowest_id >= id);
155 }
156
157 /* Expected to wait for all previously scheduled tasks to complete. We do
158 * not need to wait for tasked scheduled after this call to complete. In
159 * otherwords we do not need to drain the entire taskq. */
160 void
161 __taskq_wait_id(taskq_t *tq, taskqid_t id)
162 {
163 ENTRY;
164 ASSERT(tq);
165
166 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
167
168 EXIT;
169 }
170 EXPORT_SYMBOL(__taskq_wait_id);
171
172 void
173 __taskq_wait(taskq_t *tq)
174 {
175 taskqid_t id;
176 ENTRY;
177 ASSERT(tq);
178
179 spin_lock_irq(&tq->tq_lock);
180 id = tq->tq_next_id;
181 spin_unlock_irq(&tq->tq_lock);
182
183 __taskq_wait_id(tq, id);
184
185 EXIT;
186
187 }
188 EXPORT_SYMBOL(__taskq_wait);
189
190 int
191 __taskq_member(taskq_t *tq, void *t)
192 {
193 int i;
194 ENTRY;
195
196 ASSERT(tq);
197 ASSERT(t);
198
199 for (i = 0; i < tq->tq_nthreads; i++)
200 if (tq->tq_threads[i] == (struct task_struct *)t)
201 RETURN(1);
202
203 RETURN(0);
204 }
205 EXPORT_SYMBOL(__taskq_member);
206
207 taskqid_t
208 __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
209 {
210 task_t *t;
211 taskqid_t rc = 0;
212 ENTRY;
213
214 ASSERT(tq);
215 ASSERT(func);
216 if (unlikely(in_atomic() && (flags & TQ_SLEEP))) {
217 CERROR("May schedule while atomic: %s/0x%08x/%d\n",
218 current->comm, preempt_count(), current->pid);
219 SBUG();
220 }
221
222 spin_lock_irq(&tq->tq_lock);
223
224 /* Taskq being destroyed and all tasks drained */
225 if (!(tq->tq_flags & TQ_ACTIVE))
226 GOTO(out, rc = 0);
227
228 /* Do not queue the task unless there is idle thread for it */
229 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
230 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
231 GOTO(out, rc = 0);
232
233 if ((t = task_alloc(tq, flags)) == NULL)
234 GOTO(out, rc = 0);
235
236
237 spin_lock(&t->t_lock);
238 list_add_tail(&t->t_list, &tq->tq_pend_list);
239 t->t_id = rc = tq->tq_next_id;
240 tq->tq_next_id++;
241 t->t_func = func;
242 t->t_arg = arg;
243 spin_unlock(&t->t_lock);
244
245 wake_up(&tq->tq_work_waitq);
246 out:
247 spin_unlock_irq(&tq->tq_lock);
248 RETURN(rc);
249 }
250 EXPORT_SYMBOL(__taskq_dispatch);
251
252 /* NOTE: Must be called with tq->tq_lock held */
253 static taskqid_t
254 taskq_lowest_id(taskq_t *tq)
255 {
256 taskqid_t lowest_id = ~0;
257 task_t *t;
258 ENTRY;
259
260 ASSERT(tq);
261 ASSERT(spin_is_locked(&tq->tq_lock));
262
263 list_for_each_entry(t, &tq->tq_pend_list, t_list)
264 if (t->t_id < lowest_id)
265 lowest_id = t->t_id;
266
267 list_for_each_entry(t, &tq->tq_work_list, t_list)
268 if (t->t_id < lowest_id)
269 lowest_id = t->t_id;
270
271 RETURN(lowest_id);
272 }
273
274 static int
275 taskq_thread(void *args)
276 {
277 DECLARE_WAITQUEUE(wait, current);
278 sigset_t blocked;
279 taskqid_t id;
280 taskq_t *tq = args;
281 task_t *t;
282 ENTRY;
283
284 ASSERT(tq);
285 current->flags |= PF_NOFREEZE;
286
287 sigfillset(&blocked);
288 sigprocmask(SIG_BLOCK, &blocked, NULL);
289 flush_signals(current);
290
291 spin_lock_irq(&tq->tq_lock);
292 tq->tq_nthreads++;
293 wake_up(&tq->tq_wait_waitq);
294 set_current_state(TASK_INTERRUPTIBLE);
295
296 while (!kthread_should_stop()) {
297
298 add_wait_queue(&tq->tq_work_waitq, &wait);
299 if (list_empty(&tq->tq_pend_list)) {
300 spin_unlock_irq(&tq->tq_lock);
301 schedule();
302 spin_lock_irq(&tq->tq_lock);
303 } else {
304 __set_current_state(TASK_RUNNING);
305 }
306
307 remove_wait_queue(&tq->tq_work_waitq, &wait);
308 if (!list_empty(&tq->tq_pend_list)) {
309 t = list_entry(tq->tq_pend_list.next, task_t, t_list);
310 list_del_init(&t->t_list);
311 list_add_tail(&t->t_list, &tq->tq_work_list);
312 tq->tq_nactive++;
313 spin_unlock_irq(&tq->tq_lock);
314
315 /* Perform the requested task */
316 t->t_func(t->t_arg);
317
318 spin_lock_irq(&tq->tq_lock);
319 tq->tq_nactive--;
320 id = t->t_id;
321 task_done(tq, t);
322
323 /* Update the lowest remaining taskqid yet to run */
324 if (tq->tq_lowest_id == id) {
325 tq->tq_lowest_id = taskq_lowest_id(tq);
326 ASSERT(tq->tq_lowest_id > id);
327 }
328
329 wake_up_all(&tq->tq_wait_waitq);
330 }
331
332 set_current_state(TASK_INTERRUPTIBLE);
333
334 }
335
336 __set_current_state(TASK_RUNNING);
337 tq->tq_nthreads--;
338 spin_unlock_irq(&tq->tq_lock);
339
340 RETURN(0);
341 }
342
343 taskq_t *
344 __taskq_create(const char *name, int nthreads, pri_t pri,
345 int minalloc, int maxalloc, uint_t flags)
346 {
347 taskq_t *tq;
348 struct task_struct *t;
349 int rc = 0, i, j = 0;
350 ENTRY;
351
352 ASSERT(name != NULL);
353 ASSERT(pri <= maxclsyspri);
354 ASSERT(minalloc >= 0);
355 ASSERT(maxalloc <= INT_MAX);
356 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
357
358 tq = kmem_alloc(sizeof(*tq), KM_SLEEP);
359 if (tq == NULL)
360 RETURN(NULL);
361
362 tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
363 if (tq->tq_threads == NULL) {
364 kmem_free(tq, sizeof(*tq));
365 RETURN(NULL);
366 }
367
368 spin_lock_init(&tq->tq_lock);
369 spin_lock_irq(&tq->tq_lock);
370 tq->tq_name = name;
371 tq->tq_nactive = 0;
372 tq->tq_nthreads = 0;
373 tq->tq_pri = pri;
374 tq->tq_minalloc = minalloc;
375 tq->tq_maxalloc = maxalloc;
376 tq->tq_nalloc = 0;
377 tq->tq_flags = (flags | TQ_ACTIVE);
378 tq->tq_next_id = 1;
379 tq->tq_lowest_id = 1;
380 INIT_LIST_HEAD(&tq->tq_free_list);
381 INIT_LIST_HEAD(&tq->tq_work_list);
382 INIT_LIST_HEAD(&tq->tq_pend_list);
383 init_waitqueue_head(&tq->tq_work_waitq);
384 init_waitqueue_head(&tq->tq_wait_waitq);
385
386 if (flags & TASKQ_PREPOPULATE)
387 for (i = 0; i < minalloc; i++)
388 task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW));
389
390 spin_unlock_irq(&tq->tq_lock);
391
392 for (i = 0; i < nthreads; i++) {
393 t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
394 if (t) {
395 tq->tq_threads[i] = t;
396 kthread_bind(t, i % num_online_cpus());
397 set_user_nice(t, PRIO_TO_NICE(pri));
398 wake_up_process(t);
399 j++;
400 } else {
401 tq->tq_threads[i] = NULL;
402 rc = 1;
403 }
404 }
405
406 /* Wait for all threads to be started before potential destroy */
407 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
408
409 if (rc) {
410 __taskq_destroy(tq);
411 tq = NULL;
412 }
413
414 RETURN(tq);
415 }
416 EXPORT_SYMBOL(__taskq_create);
417
418 void
419 __taskq_destroy(taskq_t *tq)
420 {
421 task_t *t;
422 int i, nthreads;
423 ENTRY;
424
425 ASSERT(tq);
426 spin_lock_irq(&tq->tq_lock);
427 tq->tq_flags &= ~TQ_ACTIVE;
428 spin_unlock_irq(&tq->tq_lock);
429
430 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
431 __taskq_wait(tq);
432
433 nthreads = tq->tq_nthreads;
434 for (i = 0; i < nthreads; i++)
435 if (tq->tq_threads[i])
436 kthread_stop(tq->tq_threads[i]);
437
438 spin_lock_irq(&tq->tq_lock);
439
440 while (!list_empty(&tq->tq_free_list)) {
441 t = list_entry(tq->tq_free_list.next, task_t, t_list);
442 list_del_init(&t->t_list);
443 task_free(tq, t);
444 }
445
446 ASSERT(tq->tq_nthreads == 0);
447 ASSERT(tq->tq_nalloc == 0);
448 ASSERT(list_empty(&tq->tq_free_list));
449 ASSERT(list_empty(&tq->tq_work_list));
450 ASSERT(list_empty(&tq->tq_pend_list));
451
452 spin_unlock_irq(&tq->tq_lock);
453 kmem_free(tq->tq_threads, nthreads * sizeof(task_t *));
454 kmem_free(tq, sizeof(taskq_t));
455
456 EXIT;
457 }
458 EXPORT_SYMBOL(__taskq_destroy);