]> git.proxmox.com Git - mirror_spl-debian.git/blame_incremental - modules/spl/spl-taskq.c
Remove u8_textprep, we will not be implementing this nightmare yet
[mirror_spl-debian.git] / modules / spl / spl-taskq.c
... / ...
CommitLineData
1/*
2 * This file is part of the SPL: Solaris Porting Layer.
3 *
4 * Copyright (c) 2008 Lawrence Livermore National Security, LLC.
5 * Produced at Lawrence Livermore National Laboratory
6 * Written by:
7 * Brian Behlendorf <behlendorf1@llnl.gov>,
8 * Herb Wartens <wartens2@llnl.gov>,
9 * Jim Garlick <garlick@llnl.gov>
10 * UCRL-CODE-235197
11 *
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 *
22 * You should have received a copy of the GNU General Public License along
23 * with this program; if not, write to the Free Software Foundation, Inc.,
24 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 */
26
27#include <sys/taskq.h>
28#include <sys/kmem.h>
29
30#ifdef DEBUG_SUBSYSTEM
31#undef DEBUG_SUBSYSTEM
32#endif
33
34#define DEBUG_SUBSYSTEM S_TASKQ
35
36typedef struct spl_task {
37 spinlock_t t_lock;
38 struct list_head t_list;
39 taskqid_t t_id;
40 task_func_t *t_func;
41 void *t_arg;
42} spl_task_t;
43
44/* NOTE: Must be called with tq->tq_lock held, returns a list_t which
45 * is not attached to the free, work, or pending taskq lists.
46 */
47static spl_task_t *
48task_alloc(taskq_t *tq, uint_t flags)
49{
50 spl_task_t *t;
51 int count = 0;
52 ENTRY;
53
54 ASSERT(tq);
55 ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */
56 ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
57 ASSERT(spin_is_locked(&tq->tq_lock));
58retry:
59 /* Aquire spl_task_t's from free list if available */
60 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
61 t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
62 list_del_init(&t->t_list);
63 RETURN(t);
64 }
65
66 /* Free list is empty and memory allocs are prohibited */
67 if (flags & TQ_NOALLOC)
68 RETURN(NULL);
69
70 /* Hit maximum spl_task_t pool size */
71 if (tq->tq_nalloc >= tq->tq_maxalloc) {
72 if (flags & TQ_NOSLEEP)
73 RETURN(NULL);
74
75 /* Sleep periodically polling the free list for an available
76 * spl_task_t. If a full second passes and we have not found
77 * one gives up and return a NULL to the caller. */
78 if (flags & TQ_SLEEP) {
79 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
80 schedule_timeout(HZ / 100);
81 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
82 if (count < 100)
83 GOTO(retry, count++);
84
85 RETURN(NULL);
86 }
87
88 /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */
89 SBUG();
90 }
91
92 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
93 t = kmem_alloc(sizeof(spl_task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
94 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
95
96 if (t) {
97 spin_lock_init(&t->t_lock);
98 INIT_LIST_HEAD(&t->t_list);
99 t->t_id = 0;
100 t->t_func = NULL;
101 t->t_arg = NULL;
102 tq->tq_nalloc++;
103 }
104
105 RETURN(t);
106}
107
108/* NOTE: Must be called with tq->tq_lock held, expectes the spl_task_t
109 * to already be removed from the free, work, or pending taskq lists.
110 */
111static void
112task_free(taskq_t *tq, spl_task_t *t)
113{
114 ENTRY;
115
116 ASSERT(tq);
117 ASSERT(t);
118 ASSERT(spin_is_locked(&tq->tq_lock));
119 ASSERT(list_empty(&t->t_list));
120
121 kmem_free(t, sizeof(spl_task_t));
122 tq->tq_nalloc--;
123
124 EXIT;
125}
126
127/* NOTE: Must be called with tq->tq_lock held, either destroyes the
128 * spl_task_t if too many exist or moves it to the free list for later use.
129 */
130static void
131task_done(taskq_t *tq, spl_task_t *t)
132{
133 ENTRY;
134 ASSERT(tq);
135 ASSERT(t);
136 ASSERT(spin_is_locked(&tq->tq_lock));
137
138 list_del_init(&t->t_list);
139
140 if (tq->tq_nalloc <= tq->tq_minalloc) {
141 t->t_id = 0;
142 t->t_func = NULL;
143 t->t_arg = NULL;
144 list_add_tail(&t->t_list, &tq->tq_free_list);
145 } else {
146 task_free(tq, t);
147 }
148
149 EXIT;
150}
151
152/* Taskqid's are handed out in a monotonically increasing fashion per
153 * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi
154 * a 64-bit value so this is probably never going to happen. The lowest
155 * pending taskqid is stored in the taskq_t to make it easy for any
156 * taskq_wait()'ers to know if the tasks they're waiting for have
157 * completed. Unfortunately, tq_task_lowest is kept up to date is
158 * a pretty brain dead way, something more clever should be done.
159 */
160static int
161taskq_wait_check(taskq_t *tq, taskqid_t id)
162{
163 RETURN(tq->tq_lowest_id >= id);
164}
165
166/* Expected to wait for all previously scheduled tasks to complete. We do
167 * not need to wait for tasked scheduled after this call to complete. In
168 * otherwords we do not need to drain the entire taskq. */
169void
170__taskq_wait_id(taskq_t *tq, taskqid_t id)
171{
172 ENTRY;
173 ASSERT(tq);
174
175 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
176
177 EXIT;
178}
179EXPORT_SYMBOL(__taskq_wait_id);
180
181void
182__taskq_wait(taskq_t *tq)
183{
184 taskqid_t id;
185 ENTRY;
186 ASSERT(tq);
187
188 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
189 id = tq->tq_next_id;
190 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
191
192 __taskq_wait_id(tq, id);
193
194 EXIT;
195
196}
197EXPORT_SYMBOL(__taskq_wait);
198
199int
200__taskq_member(taskq_t *tq, void *t)
201{
202 int i;
203 ENTRY;
204
205 ASSERT(tq);
206 ASSERT(t);
207
208 for (i = 0; i < tq->tq_nthreads; i++)
209 if (tq->tq_threads[i] == (struct task_struct *)t)
210 RETURN(1);
211
212 RETURN(0);
213}
214EXPORT_SYMBOL(__taskq_member);
215
216taskqid_t
217__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
218{
219 spl_task_t *t;
220 taskqid_t rc = 0;
221 ENTRY;
222
223 ASSERT(tq);
224 ASSERT(func);
225 if (unlikely(in_atomic() && (flags & TQ_SLEEP))) {
226 CERROR("May schedule while atomic: %s/0x%08x/%d\n",
227 current->comm, preempt_count(), current->pid);
228 SBUG();
229 }
230
231 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
232
233 /* Taskq being destroyed and all tasks drained */
234 if (!(tq->tq_flags & TQ_ACTIVE))
235 GOTO(out, rc = 0);
236
237 /* Do not queue the task unless there is idle thread for it */
238 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
239 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
240 GOTO(out, rc = 0);
241
242 if ((t = task_alloc(tq, flags)) == NULL)
243 GOTO(out, rc = 0);
244
245 spin_lock(&t->t_lock);
246 list_add_tail(&t->t_list, &tq->tq_pend_list);
247 t->t_id = rc = tq->tq_next_id;
248 tq->tq_next_id++;
249 t->t_func = func;
250 t->t_arg = arg;
251 spin_unlock(&t->t_lock);
252
253 wake_up(&tq->tq_work_waitq);
254out:
255 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
256 RETURN(rc);
257}
258EXPORT_SYMBOL(__taskq_dispatch);
259
260/* NOTE: Must be called with tq->tq_lock held */
261static taskqid_t
262taskq_lowest_id(taskq_t *tq)
263{
264 taskqid_t lowest_id = ~0;
265 spl_task_t *t;
266 ENTRY;
267
268 ASSERT(tq);
269 ASSERT(spin_is_locked(&tq->tq_lock));
270
271 list_for_each_entry(t, &tq->tq_pend_list, t_list)
272 if (t->t_id < lowest_id)
273 lowest_id = t->t_id;
274
275 list_for_each_entry(t, &tq->tq_work_list, t_list)
276 if (t->t_id < lowest_id)
277 lowest_id = t->t_id;
278
279 RETURN(lowest_id);
280}
281
282static int
283taskq_thread(void *args)
284{
285 DECLARE_WAITQUEUE(wait, current);
286 sigset_t blocked;
287 taskqid_t id;
288 taskq_t *tq = args;
289 spl_task_t *t;
290 ENTRY;
291
292 ASSERT(tq);
293 current->flags |= PF_NOFREEZE;
294
295 sigfillset(&blocked);
296 sigprocmask(SIG_BLOCK, &blocked, NULL);
297 flush_signals(current);
298
299 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
300 tq->tq_nthreads++;
301 wake_up(&tq->tq_wait_waitq);
302 set_current_state(TASK_INTERRUPTIBLE);
303
304 while (!kthread_should_stop()) {
305
306 add_wait_queue(&tq->tq_work_waitq, &wait);
307 if (list_empty(&tq->tq_pend_list)) {
308 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
309 schedule();
310 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
311 } else {
312 __set_current_state(TASK_RUNNING);
313 }
314
315 remove_wait_queue(&tq->tq_work_waitq, &wait);
316 if (!list_empty(&tq->tq_pend_list)) {
317 t = list_entry(tq->tq_pend_list.next, spl_task_t, t_list);
318 list_del_init(&t->t_list);
319 list_add_tail(&t->t_list, &tq->tq_work_list);
320 tq->tq_nactive++;
321 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
322
323 /* Perform the requested task */
324 t->t_func(t->t_arg);
325
326 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
327 tq->tq_nactive--;
328 id = t->t_id;
329 task_done(tq, t);
330
331 /* Update the lowest remaining taskqid yet to run */
332 if (tq->tq_lowest_id == id) {
333 tq->tq_lowest_id = taskq_lowest_id(tq);
334 ASSERT(tq->tq_lowest_id > id);
335 }
336
337 wake_up_all(&tq->tq_wait_waitq);
338 }
339
340 set_current_state(TASK_INTERRUPTIBLE);
341
342 }
343
344 __set_current_state(TASK_RUNNING);
345 tq->tq_nthreads--;
346 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
347
348 RETURN(0);
349}
350
351taskq_t *
352__taskq_create(const char *name, int nthreads, pri_t pri,
353 int minalloc, int maxalloc, uint_t flags)
354{
355 taskq_t *tq;
356 struct task_struct *t;
357 int rc = 0, i, j = 0;
358 ENTRY;
359
360 ASSERT(name != NULL);
361 ASSERT(pri <= maxclsyspri);
362 ASSERT(minalloc >= 0);
363 ASSERT(maxalloc <= INT_MAX);
364 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
365
366 tq = kmem_alloc(sizeof(*tq), KM_SLEEP);
367 if (tq == NULL)
368 RETURN(NULL);
369
370 tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
371 if (tq->tq_threads == NULL) {
372 kmem_free(tq, sizeof(*tq));
373 RETURN(NULL);
374 }
375
376 spin_lock_init(&tq->tq_lock);
377 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
378 tq->tq_name = name;
379 tq->tq_nactive = 0;
380 tq->tq_nthreads = 0;
381 tq->tq_pri = pri;
382 tq->tq_minalloc = minalloc;
383 tq->tq_maxalloc = maxalloc;
384 tq->tq_nalloc = 0;
385 tq->tq_flags = (flags | TQ_ACTIVE);
386 tq->tq_next_id = 1;
387 tq->tq_lowest_id = 1;
388 INIT_LIST_HEAD(&tq->tq_free_list);
389 INIT_LIST_HEAD(&tq->tq_work_list);
390 INIT_LIST_HEAD(&tq->tq_pend_list);
391 init_waitqueue_head(&tq->tq_work_waitq);
392 init_waitqueue_head(&tq->tq_wait_waitq);
393
394 if (flags & TASKQ_PREPOPULATE)
395 for (i = 0; i < minalloc; i++)
396 task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW));
397
398 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
399
400 for (i = 0; i < nthreads; i++) {
401 t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
402 if (t) {
403 tq->tq_threads[i] = t;
404 kthread_bind(t, i % num_online_cpus());
405 set_user_nice(t, PRIO_TO_NICE(pri));
406 wake_up_process(t);
407 j++;
408 } else {
409 tq->tq_threads[i] = NULL;
410 rc = 1;
411 }
412 }
413
414 /* Wait for all threads to be started before potential destroy */
415 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
416
417 if (rc) {
418 __taskq_destroy(tq);
419 tq = NULL;
420 }
421
422 RETURN(tq);
423}
424EXPORT_SYMBOL(__taskq_create);
425
426void
427__taskq_destroy(taskq_t *tq)
428{
429 spl_task_t *t;
430 int i, nthreads;
431 ENTRY;
432
433 ASSERT(tq);
434 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
435 tq->tq_flags &= ~TQ_ACTIVE;
436 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
437
438 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
439 __taskq_wait(tq);
440
441 nthreads = tq->tq_nthreads;
442 for (i = 0; i < nthreads; i++)
443 if (tq->tq_threads[i])
444 kthread_stop(tq->tq_threads[i]);
445
446 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
447
448 while (!list_empty(&tq->tq_free_list)) {
449 t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
450 list_del_init(&t->t_list);
451 task_free(tq, t);
452 }
453
454 ASSERT(tq->tq_nthreads == 0);
455 ASSERT(tq->tq_nalloc == 0);
456 ASSERT(list_empty(&tq->tq_free_list));
457 ASSERT(list_empty(&tq->tq_work_list));
458 ASSERT(list_empty(&tq->tq_pend_list));
459
460 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
461 kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));
462 kmem_free(tq, sizeof(taskq_t));
463
464 EXIT;
465}
466EXPORT_SYMBOL(__taskq_destroy);