]> git.proxmox.com Git - mirror_spl-debian.git/blame - modules/spl/spl-taskq.c
Remove u8_textprep, we will not be implementing this nightmare yet
[mirror_spl-debian.git] / modules / spl / spl-taskq.c
CommitLineData
715f6251 1/*
2 * This file is part of the SPL: Solaris Porting Layer.
3 *
4 * Copyright (c) 2008 Lawrence Livermore National Security, LLC.
5 * Produced at Lawrence Livermore National Laboratory
6 * Written by:
7 * Brian Behlendorf <behlendorf1@llnl.gov>,
8 * Herb Wartens <wartens2@llnl.gov>,
9 * Jim Garlick <garlick@llnl.gov>
10 * UCRL-CODE-235197
11 *
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 *
22 * You should have received a copy of the GNU General Public License along
23 * with this program; if not, write to the Free Software Foundation, Inc.,
24 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 */
26
f4b37741 27#include <sys/taskq.h>
3d061e9d 28#include <sys/kmem.h>
f1ca4da6 29
937879f1 30#ifdef DEBUG_SUBSYSTEM
31#undef DEBUG_SUBSYSTEM
32#endif
33
34#define DEBUG_SUBSYSTEM S_TASKQ
35
3d061e9d 36typedef struct spl_task {
37 spinlock_t t_lock;
38 struct list_head t_list;
39 taskqid_t t_id;
40 task_func_t *t_func;
41 void *t_arg;
42} spl_task_t;
43
bcd68186 44/* NOTE: Must be called with tq->tq_lock held, returns a list_t which
45 * is not attached to the free, work, or pending taskq lists.
f1ca4da6 46 */
3d061e9d 47static spl_task_t *
bcd68186 48task_alloc(taskq_t *tq, uint_t flags)
49{
3d061e9d 50 spl_task_t *t;
bcd68186 51 int count = 0;
52 ENTRY;
53
54 ASSERT(tq);
55 ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */
56 ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
3d061e9d 57 ASSERT(spin_is_locked(&tq->tq_lock));
bcd68186 58retry:
3d061e9d 59 /* Aquire spl_task_t's from free list if available */
bcd68186 60 if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
3d061e9d 61 t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
62 list_del_init(&t->t_list);
63 RETURN(t);
bcd68186 64 }
65
66 /* Free list is empty and memory allocs are prohibited */
67 if (flags & TQ_NOALLOC)
68 RETURN(NULL);
69
3d061e9d 70 /* Hit maximum spl_task_t pool size */
bcd68186 71 if (tq->tq_nalloc >= tq->tq_maxalloc) {
72 if (flags & TQ_NOSLEEP)
73 RETURN(NULL);
74
75 /* Sleep periodically polling the free list for an available
3d061e9d 76 * spl_task_t. If a full second passes and we have not found
bcd68186 77 * one gives up and return a NULL to the caller. */
78 if (flags & TQ_SLEEP) {
749045bb 79 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 80 schedule_timeout(HZ / 100);
749045bb 81 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 82 if (count < 100)
83 GOTO(retry, count++);
84
85 RETURN(NULL);
86 }
87
88 /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */
89 SBUG();
90 }
91
749045bb 92 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
3d061e9d 93 t = kmem_alloc(sizeof(spl_task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
749045bb 94 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 95
96 if (t) {
97 spin_lock_init(&t->t_lock);
98 INIT_LIST_HEAD(&t->t_list);
99 t->t_id = 0;
100 t->t_func = NULL;
101 t->t_arg = NULL;
102 tq->tq_nalloc++;
103 }
104
105 RETURN(t);
106}
107
3d061e9d 108/* NOTE: Must be called with tq->tq_lock held, expectes the spl_task_t
bcd68186 109 * to already be removed from the free, work, or pending taskq lists.
110 */
111static void
3d061e9d 112task_free(taskq_t *tq, spl_task_t *t)
bcd68186 113{
114 ENTRY;
115
116 ASSERT(tq);
117 ASSERT(t);
118 ASSERT(spin_is_locked(&tq->tq_lock));
119 ASSERT(list_empty(&t->t_list));
120
3d061e9d 121 kmem_free(t, sizeof(spl_task_t));
bcd68186 122 tq->tq_nalloc--;
f1ca4da6 123
bcd68186 124 EXIT;
125}
126
127/* NOTE: Must be called with tq->tq_lock held, either destroyes the
3d061e9d 128 * spl_task_t if too many exist or moves it to the free list for later use.
bcd68186 129 */
f1ca4da6 130static void
3d061e9d 131task_done(taskq_t *tq, spl_task_t *t)
f1ca4da6 132{
bcd68186 133 ENTRY;
134 ASSERT(tq);
135 ASSERT(t);
136 ASSERT(spin_is_locked(&tq->tq_lock));
137
138 list_del_init(&t->t_list);
f1ca4da6 139
bcd68186 140 if (tq->tq_nalloc <= tq->tq_minalloc) {
141 t->t_id = 0;
142 t->t_func = NULL;
143 t->t_arg = NULL;
9ab1ac14 144 list_add_tail(&t->t_list, &tq->tq_free_list);
bcd68186 145 } else {
146 task_free(tq, t);
147 }
f1ca4da6 148
bcd68186 149 EXIT;
f1ca4da6 150}
151
bcd68186 152/* Taskqid's are handed out in a monotonically increasing fashion per
153 * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi
154 * a 64-bit value so this is probably never going to happen. The lowest
155 * pending taskqid is stored in the taskq_t to make it easy for any
156 * taskq_wait()'ers to know if the tasks they're waiting for have
157 * completed. Unfortunately, tq_task_lowest is kept up to date is
158 * a pretty brain dead way, something more clever should be done.
159 */
160static int
161taskq_wait_check(taskq_t *tq, taskqid_t id)
162{
163 RETURN(tq->tq_lowest_id >= id);
164}
165
166/* Expected to wait for all previously scheduled tasks to complete. We do
167 * not need to wait for tasked scheduled after this call to complete. In
168 * otherwords we do not need to drain the entire taskq. */
169void
170__taskq_wait_id(taskq_t *tq, taskqid_t id)
f1ca4da6 171{
937879f1 172 ENTRY;
bcd68186 173 ASSERT(tq);
174
175 wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
176
177 EXIT;
178}
179EXPORT_SYMBOL(__taskq_wait_id);
180
181void
182__taskq_wait(taskq_t *tq)
183{
184 taskqid_t id;
185 ENTRY;
186 ASSERT(tq);
187
749045bb 188 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 189 id = tq->tq_next_id;
749045bb 190 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 191
192 __taskq_wait_id(tq, id);
193
194 EXIT;
195
196}
197EXPORT_SYMBOL(__taskq_wait);
198
199int
200__taskq_member(taskq_t *tq, void *t)
201{
202 int i;
203 ENTRY;
204
205 ASSERT(tq);
206 ASSERT(t);
207
208 for (i = 0; i < tq->tq_nthreads; i++)
209 if (tq->tq_threads[i] == (struct task_struct *)t)
210 RETURN(1);
211
212 RETURN(0);
213}
214EXPORT_SYMBOL(__taskq_member);
215
216taskqid_t
217__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
218{
3d061e9d 219 spl_task_t *t;
bcd68186 220 taskqid_t rc = 0;
221 ENTRY;
f1ca4da6 222
937879f1 223 ASSERT(tq);
224 ASSERT(func);
bcd68186 225 if (unlikely(in_atomic() && (flags & TQ_SLEEP))) {
226 CERROR("May schedule while atomic: %s/0x%08x/%d\n",
227 current->comm, preempt_count(), current->pid);
228 SBUG();
229 }
f1ca4da6 230
749045bb 231 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
f1ca4da6 232
bcd68186 233 /* Taskq being destroyed and all tasks drained */
234 if (!(tq->tq_flags & TQ_ACTIVE))
235 GOTO(out, rc = 0);
f1ca4da6 236
bcd68186 237 /* Do not queue the task unless there is idle thread for it */
238 ASSERT(tq->tq_nactive <= tq->tq_nthreads);
239 if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
240 GOTO(out, rc = 0);
241
242 if ((t = task_alloc(tq, flags)) == NULL)
243 GOTO(out, rc = 0);
f1ca4da6 244
bcd68186 245 spin_lock(&t->t_lock);
9ab1ac14 246 list_add_tail(&t->t_list, &tq->tq_pend_list);
bcd68186 247 t->t_id = rc = tq->tq_next_id;
248 tq->tq_next_id++;
249 t->t_func = func;
250 t->t_arg = arg;
251 spin_unlock(&t->t_lock);
252
253 wake_up(&tq->tq_work_waitq);
254out:
749045bb 255 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 256 RETURN(rc);
f1ca4da6 257}
f1b59d26 258EXPORT_SYMBOL(__taskq_dispatch);
f1ca4da6 259
bcd68186 260/* NOTE: Must be called with tq->tq_lock held */
261static taskqid_t
262taskq_lowest_id(taskq_t *tq)
263{
264 taskqid_t lowest_id = ~0;
3d061e9d 265 spl_task_t *t;
bcd68186 266 ENTRY;
267
268 ASSERT(tq);
269 ASSERT(spin_is_locked(&tq->tq_lock));
270
271 list_for_each_entry(t, &tq->tq_pend_list, t_list)
272 if (t->t_id < lowest_id)
273 lowest_id = t->t_id;
274
275 list_for_each_entry(t, &tq->tq_work_list, t_list)
276 if (t->t_id < lowest_id)
277 lowest_id = t->t_id;
278
279 RETURN(lowest_id);
280}
281
282static int
283taskq_thread(void *args)
284{
285 DECLARE_WAITQUEUE(wait, current);
286 sigset_t blocked;
287 taskqid_t id;
288 taskq_t *tq = args;
3d061e9d 289 spl_task_t *t;
bcd68186 290 ENTRY;
291
292 ASSERT(tq);
293 current->flags |= PF_NOFREEZE;
294
295 sigfillset(&blocked);
296 sigprocmask(SIG_BLOCK, &blocked, NULL);
297 flush_signals(current);
298
749045bb 299 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 300 tq->tq_nthreads++;
301 wake_up(&tq->tq_wait_waitq);
302 set_current_state(TASK_INTERRUPTIBLE);
303
304 while (!kthread_should_stop()) {
305
306 add_wait_queue(&tq->tq_work_waitq, &wait);
307 if (list_empty(&tq->tq_pend_list)) {
749045bb 308 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 309 schedule();
749045bb 310 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 311 } else {
312 __set_current_state(TASK_RUNNING);
313 }
314
315 remove_wait_queue(&tq->tq_work_waitq, &wait);
316 if (!list_empty(&tq->tq_pend_list)) {
3d061e9d 317 t = list_entry(tq->tq_pend_list.next, spl_task_t, t_list);
bcd68186 318 list_del_init(&t->t_list);
9ab1ac14 319 list_add_tail(&t->t_list, &tq->tq_work_list);
bcd68186 320 tq->tq_nactive++;
749045bb 321 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 322
323 /* Perform the requested task */
324 t->t_func(t->t_arg);
325
749045bb 326 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 327 tq->tq_nactive--;
328 id = t->t_id;
329 task_done(tq, t);
330
331 /* Update the lowest remaining taskqid yet to run */
332 if (tq->tq_lowest_id == id) {
333 tq->tq_lowest_id = taskq_lowest_id(tq);
334 ASSERT(tq->tq_lowest_id > id);
335 }
336
337 wake_up_all(&tq->tq_wait_waitq);
338 }
339
340 set_current_state(TASK_INTERRUPTIBLE);
341
342 }
343
344 __set_current_state(TASK_RUNNING);
345 tq->tq_nthreads--;
749045bb 346 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 347
348 RETURN(0);
349}
350
f1ca4da6 351taskq_t *
352__taskq_create(const char *name, int nthreads, pri_t pri,
353 int minalloc, int maxalloc, uint_t flags)
354{
bcd68186 355 taskq_t *tq;
356 struct task_struct *t;
357 int rc = 0, i, j = 0;
358 ENTRY;
359
360 ASSERT(name != NULL);
361 ASSERT(pri <= maxclsyspri);
362 ASSERT(minalloc >= 0);
363 ASSERT(maxalloc <= INT_MAX);
364 ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
365
366 tq = kmem_alloc(sizeof(*tq), KM_SLEEP);
367 if (tq == NULL)
368 RETURN(NULL);
369
370 tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
371 if (tq->tq_threads == NULL) {
372 kmem_free(tq, sizeof(*tq));
373 RETURN(NULL);
374 }
375
376 spin_lock_init(&tq->tq_lock);
749045bb 377 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 378 tq->tq_name = name;
379 tq->tq_nactive = 0;
380 tq->tq_nthreads = 0;
381 tq->tq_pri = pri;
382 tq->tq_minalloc = minalloc;
383 tq->tq_maxalloc = maxalloc;
384 tq->tq_nalloc = 0;
385 tq->tq_flags = (flags | TQ_ACTIVE);
386 tq->tq_next_id = 1;
387 tq->tq_lowest_id = 1;
388 INIT_LIST_HEAD(&tq->tq_free_list);
389 INIT_LIST_HEAD(&tq->tq_work_list);
390 INIT_LIST_HEAD(&tq->tq_pend_list);
391 init_waitqueue_head(&tq->tq_work_waitq);
392 init_waitqueue_head(&tq->tq_wait_waitq);
393
394 if (flags & TASKQ_PREPOPULATE)
395 for (i = 0; i < minalloc; i++)
396 task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW));
6e605b6e 397
749045bb 398 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
6e605b6e 399
bcd68186 400 for (i = 0; i < nthreads; i++) {
401 t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
402 if (t) {
403 tq->tq_threads[i] = t;
404 kthread_bind(t, i % num_online_cpus());
405 set_user_nice(t, PRIO_TO_NICE(pri));
406 wake_up_process(t);
407 j++;
408 } else {
409 tq->tq_threads[i] = NULL;
410 rc = 1;
411 }
412 }
413
414 /* Wait for all threads to be started before potential destroy */
415 wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
416
417 if (rc) {
418 __taskq_destroy(tq);
419 tq = NULL;
420 }
421
422 RETURN(tq);
f1ca4da6 423}
f1b59d26 424EXPORT_SYMBOL(__taskq_create);
b123971f 425
426void
427__taskq_destroy(taskq_t *tq)
428{
3d061e9d 429 spl_task_t *t;
bcd68186 430 int i, nthreads;
937879f1 431 ENTRY;
b123971f 432
bcd68186 433 ASSERT(tq);
749045bb 434 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 435 tq->tq_flags &= ~TQ_ACTIVE;
749045bb 436 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 437
438 /* TQ_ACTIVE cleared prevents new tasks being added to pending */
439 __taskq_wait(tq);
440
441 nthreads = tq->tq_nthreads;
442 for (i = 0; i < nthreads; i++)
443 if (tq->tq_threads[i])
444 kthread_stop(tq->tq_threads[i]);
445
749045bb 446 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
bcd68186 447
448 while (!list_empty(&tq->tq_free_list)) {
3d061e9d 449 t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
bcd68186 450 list_del_init(&t->t_list);
451 task_free(tq, t);
452 }
453
454 ASSERT(tq->tq_nthreads == 0);
455 ASSERT(tq->tq_nalloc == 0);
456 ASSERT(list_empty(&tq->tq_free_list));
457 ASSERT(list_empty(&tq->tq_work_list));
458 ASSERT(list_empty(&tq->tq_pend_list));
459
749045bb 460 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
3d061e9d 461 kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));
bcd68186 462 kmem_free(tq, sizeof(taskq_t));
463
937879f1 464 EXIT;
b123971f 465}
bcd68186 466EXPORT_SYMBOL(__taskq_destroy);