1 // SPDX-License-Identifier: GPL-2.0
3 * Basic worker thread pool for io_uring
5 * Copyright (C) 2019 Jens Axboe
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/percpu.h>
13 #include <linux/slab.h>
14 #include <linux/rculist_nulls.h>
15 #include <linux/cpu.h>
16 #include <linux/tracehook.h>
20 #define WORKER_IDLE_TIMEOUT (5 * HZ)
23 IO_WORKER_F_UP
= 1, /* up and active */
24 IO_WORKER_F_RUNNING
= 2, /* account as running */
25 IO_WORKER_F_FREE
= 4, /* worker on free list */
26 IO_WORKER_F_FIXED
= 8, /* static idle worker */
27 IO_WORKER_F_BOUND
= 16, /* is doing bounded work */
31 IO_WQ_BIT_EXIT
= 0, /* wq exiting */
35 IO_WQE_FLAG_STALLED
= 1, /* stalled on hash */
39 * One for each thread in a wqe pool
44 struct hlist_nulls_node nulls_node
;
45 struct list_head all_list
;
46 struct task_struct
*task
;
49 struct io_wq_work
*cur_work
;
52 struct completion ref_done
;
57 #if BITS_PER_LONG == 64
58 #define IO_WQ_HASH_ORDER 6
60 #define IO_WQ_HASH_ORDER 5
63 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
78 * Per-node worker thread pool
83 struct io_wq_work_list work_list
;
85 } ____cacheline_aligned_in_smp
;
88 struct io_wqe_acct acct
[2];
90 struct hlist_nulls_head free_list
;
91 struct list_head all_list
;
93 struct wait_queue_entry wait
;
96 struct io_wq_work
*hash_tail
[IO_WQ_NR_HASH_BUCKETS
];
98 cpumask_var_t cpu_mask
;
107 free_work_fn
*free_work
;
108 io_wq_work_fn
*do_work
;
110 struct io_wq_hash
*hash
;
112 atomic_t worker_refs
;
113 struct completion worker_done
;
115 struct hlist_node cpuhp_node
;
117 struct task_struct
*task
;
119 struct io_wqe
*wqes
[];
122 static enum cpuhp_state io_wq_online
;
124 struct io_cb_cancel_data
{
132 static void create_io_worker(struct io_wq
*wq
, struct io_wqe
*wqe
, int index
);
134 static bool io_worker_get(struct io_worker
*worker
)
136 return refcount_inc_not_zero(&worker
->ref
);
139 static void io_worker_release(struct io_worker
*worker
)
141 if (refcount_dec_and_test(&worker
->ref
))
142 complete(&worker
->ref_done
);
145 static inline struct io_wqe_acct
*io_get_acct(struct io_wqe
*wqe
, bool bound
)
147 return &wqe
->acct
[bound
? IO_WQ_ACCT_BOUND
: IO_WQ_ACCT_UNBOUND
];
150 static inline struct io_wqe_acct
*io_work_get_acct(struct io_wqe
*wqe
,
151 struct io_wq_work
*work
)
153 return io_get_acct(wqe
, !(work
->flags
& IO_WQ_WORK_UNBOUND
));
156 static inline struct io_wqe_acct
*io_wqe_get_acct(struct io_worker
*worker
)
158 return io_get_acct(worker
->wqe
, worker
->flags
& IO_WORKER_F_BOUND
);
161 static void io_worker_ref_put(struct io_wq
*wq
)
163 if (atomic_dec_and_test(&wq
->worker_refs
))
164 complete(&wq
->worker_done
);
167 static void io_worker_exit(struct io_worker
*worker
)
169 struct io_wqe
*wqe
= worker
->wqe
;
170 struct io_wqe_acct
*acct
= io_wqe_get_acct(worker
);
173 if (refcount_dec_and_test(&worker
->ref
))
174 complete(&worker
->ref_done
);
175 wait_for_completion(&worker
->ref_done
);
178 current
->flags
&= ~PF_IO_WORKER
;
179 flags
= worker
->flags
;
181 if (flags
& IO_WORKER_F_RUNNING
)
182 atomic_dec(&acct
->nr_running
);
186 raw_spin_lock_irq(&wqe
->lock
);
187 if (flags
& IO_WORKER_F_FREE
)
188 hlist_nulls_del_rcu(&worker
->nulls_node
);
189 list_del_rcu(&worker
->all_list
);
191 raw_spin_unlock_irq(&wqe
->lock
);
193 kfree_rcu(worker
, rcu
);
194 io_worker_ref_put(wqe
->wq
);
198 static inline bool io_wqe_run_queue(struct io_wqe
*wqe
)
199 __must_hold(wqe
->lock
)
201 if (!wq_list_empty(&wqe
->work_list
) &&
202 !(wqe
->flags
& IO_WQE_FLAG_STALLED
))
208 * Check head of free list for an available worker. If one isn't available,
209 * caller must create one.
211 static bool io_wqe_activate_free_worker(struct io_wqe
*wqe
)
214 struct hlist_nulls_node
*n
;
215 struct io_worker
*worker
;
217 n
= rcu_dereference(hlist_nulls_first_rcu(&wqe
->free_list
));
221 worker
= hlist_nulls_entry(n
, struct io_worker
, nulls_node
);
222 if (io_worker_get(worker
)) {
223 wake_up_process(worker
->task
);
224 io_worker_release(worker
);
232 * We need a worker. If we find a free one, we're good. If not, and we're
233 * below the max number of workers, create one.
235 static void io_wqe_wake_worker(struct io_wqe
*wqe
, struct io_wqe_acct
*acct
)
240 * Most likely an attempt to queue unbounded work on an io_wq that
241 * wasn't setup with any unbounded workers.
243 if (unlikely(!acct
->max_workers
))
244 pr_warn_once("io-wq is not configured for unbound workers");
247 ret
= io_wqe_activate_free_worker(wqe
);
250 if (!ret
&& acct
->nr_workers
< acct
->max_workers
) {
251 atomic_inc(&acct
->nr_running
);
252 atomic_inc(&wqe
->wq
->worker_refs
);
253 create_io_worker(wqe
->wq
, wqe
, acct
->index
);
257 static void io_wqe_inc_running(struct io_worker
*worker
)
259 struct io_wqe_acct
*acct
= io_wqe_get_acct(worker
);
261 atomic_inc(&acct
->nr_running
);
264 struct create_worker_data
{
265 struct callback_head work
;
270 static void create_worker_cb(struct callback_head
*cb
)
272 struct create_worker_data
*cwd
;
275 cwd
= container_of(cb
, struct create_worker_data
, work
);
277 create_io_worker(wq
, cwd
->wqe
, cwd
->index
);
281 static void io_queue_worker_create(struct io_wqe
*wqe
, struct io_wqe_acct
*acct
)
283 struct create_worker_data
*cwd
;
284 struct io_wq
*wq
= wqe
->wq
;
286 /* raced with exit, just ignore create call */
287 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
))
290 cwd
= kmalloc(sizeof(*cwd
), GFP_ATOMIC
);
292 init_task_work(&cwd
->work
, create_worker_cb
);
294 cwd
->index
= acct
->index
;
295 if (!task_work_add(wq
->task
, &cwd
->work
, TWA_SIGNAL
))
301 atomic_dec(&acct
->nr_running
);
302 io_worker_ref_put(wq
);
305 static void io_wqe_dec_running(struct io_worker
*worker
)
306 __must_hold(wqe
->lock
)
308 struct io_wqe_acct
*acct
= io_wqe_get_acct(worker
);
309 struct io_wqe
*wqe
= worker
->wqe
;
311 if (!(worker
->flags
& IO_WORKER_F_UP
))
314 if (atomic_dec_and_test(&acct
->nr_running
) && io_wqe_run_queue(wqe
)) {
315 atomic_inc(&acct
->nr_running
);
316 atomic_inc(&wqe
->wq
->worker_refs
);
317 io_queue_worker_create(wqe
, acct
);
322 * Worker will start processing some work. Move it to the busy list, if
323 * it's currently on the freelist
325 static void __io_worker_busy(struct io_wqe
*wqe
, struct io_worker
*worker
,
326 struct io_wq_work
*work
)
327 __must_hold(wqe
->lock
)
329 bool worker_bound
, work_bound
;
331 BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND
^ IO_WQ_ACCT_BOUND
) != 1);
333 if (worker
->flags
& IO_WORKER_F_FREE
) {
334 worker
->flags
&= ~IO_WORKER_F_FREE
;
335 hlist_nulls_del_init_rcu(&worker
->nulls_node
);
339 * If worker is moving from bound to unbound (or vice versa), then
340 * ensure we update the running accounting.
342 worker_bound
= (worker
->flags
& IO_WORKER_F_BOUND
) != 0;
343 work_bound
= (work
->flags
& IO_WQ_WORK_UNBOUND
) == 0;
344 if (worker_bound
!= work_bound
) {
345 int index
= work_bound
? IO_WQ_ACCT_UNBOUND
: IO_WQ_ACCT_BOUND
;
346 io_wqe_dec_running(worker
);
347 worker
->flags
^= IO_WORKER_F_BOUND
;
348 wqe
->acct
[index
].nr_workers
--;
349 wqe
->acct
[index
^ 1].nr_workers
++;
350 io_wqe_inc_running(worker
);
355 * No work, worker going to sleep. Move to freelist, and unuse mm if we
356 * have one attached. Dropping the mm may potentially sleep, so we drop
357 * the lock in that case and return success. Since the caller has to
358 * retry the loop in that case (we changed task state), we don't regrab
359 * the lock if we return success.
361 static void __io_worker_idle(struct io_wqe
*wqe
, struct io_worker
*worker
)
362 __must_hold(wqe
->lock
)
364 if (!(worker
->flags
& IO_WORKER_F_FREE
)) {
365 worker
->flags
|= IO_WORKER_F_FREE
;
366 hlist_nulls_add_head_rcu(&worker
->nulls_node
, &wqe
->free_list
);
370 static inline unsigned int io_get_work_hash(struct io_wq_work
*work
)
372 return work
->flags
>> IO_WQ_HASH_SHIFT
;
375 static void io_wait_on_hash(struct io_wqe
*wqe
, unsigned int hash
)
377 struct io_wq
*wq
= wqe
->wq
;
379 spin_lock(&wq
->hash
->wait
.lock
);
380 if (list_empty(&wqe
->wait
.entry
)) {
381 __add_wait_queue(&wq
->hash
->wait
, &wqe
->wait
);
382 if (!test_bit(hash
, &wq
->hash
->map
)) {
383 __set_current_state(TASK_RUNNING
);
384 list_del_init(&wqe
->wait
.entry
);
387 spin_unlock(&wq
->hash
->wait
.lock
);
390 static struct io_wq_work
*io_get_next_work(struct io_wqe
*wqe
)
391 __must_hold(wqe
->lock
)
393 struct io_wq_work_node
*node
, *prev
;
394 struct io_wq_work
*work
, *tail
;
395 unsigned int stall_hash
= -1U;
397 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
400 work
= container_of(node
, struct io_wq_work
, list
);
402 /* not hashed, can run anytime */
403 if (!io_wq_is_hashed(work
)) {
404 wq_list_del(&wqe
->work_list
, node
, prev
);
408 hash
= io_get_work_hash(work
);
409 /* all items with this hash lie in [work, tail] */
410 tail
= wqe
->hash_tail
[hash
];
412 /* hashed, can run if not already running */
413 if (!test_and_set_bit(hash
, &wqe
->wq
->hash
->map
)) {
414 wqe
->hash_tail
[hash
] = NULL
;
415 wq_list_cut(&wqe
->work_list
, &tail
->list
, prev
);
418 if (stall_hash
== -1U)
420 /* fast forward to a next hash, for-each will fix up @prev */
424 if (stall_hash
!= -1U) {
425 raw_spin_unlock(&wqe
->lock
);
426 io_wait_on_hash(wqe
, stall_hash
);
427 raw_spin_lock(&wqe
->lock
);
433 static bool io_flush_signals(void)
435 if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL
))) {
436 __set_current_state(TASK_RUNNING
);
437 tracehook_notify_signal();
443 static void io_assign_current_work(struct io_worker
*worker
,
444 struct io_wq_work
*work
)
451 spin_lock_irq(&worker
->lock
);
452 worker
->cur_work
= work
;
453 spin_unlock_irq(&worker
->lock
);
456 static void io_wqe_enqueue(struct io_wqe
*wqe
, struct io_wq_work
*work
);
458 static void io_worker_handle_work(struct io_worker
*worker
)
459 __releases(wqe
->lock
)
461 struct io_wqe
*wqe
= worker
->wqe
;
462 struct io_wq
*wq
= wqe
->wq
;
463 bool do_kill
= test_bit(IO_WQ_BIT_EXIT
, &wq
->state
);
466 struct io_wq_work
*work
;
469 * If we got some work, mark us as busy. If we didn't, but
470 * the list isn't empty, it means we stalled on hashed work.
471 * Mark us stalled so we don't keep looking for work when we
472 * can't make progress, any work completion or insertion will
473 * clear the stalled flag.
475 work
= io_get_next_work(wqe
);
477 __io_worker_busy(wqe
, worker
, work
);
478 else if (!wq_list_empty(&wqe
->work_list
))
479 wqe
->flags
|= IO_WQE_FLAG_STALLED
;
481 raw_spin_unlock_irq(&wqe
->lock
);
484 io_assign_current_work(worker
, work
);
485 __set_current_state(TASK_RUNNING
);
487 /* handle a whole dependent link */
489 struct io_wq_work
*next_hashed
, *linked
;
490 unsigned int hash
= io_get_work_hash(work
);
492 next_hashed
= wq_next_work(work
);
494 if (unlikely(do_kill
) && (work
->flags
& IO_WQ_WORK_UNBOUND
))
495 work
->flags
|= IO_WQ_WORK_CANCEL
;
497 io_assign_current_work(worker
, NULL
);
499 linked
= wq
->free_work(work
);
501 if (!work
&& linked
&& !io_wq_is_hashed(linked
)) {
505 io_assign_current_work(worker
, work
);
507 io_wqe_enqueue(wqe
, linked
);
509 if (hash
!= -1U && !next_hashed
) {
510 clear_bit(hash
, &wq
->hash
->map
);
511 if (wq_has_sleeper(&wq
->hash
->wait
))
512 wake_up(&wq
->hash
->wait
);
513 raw_spin_lock_irq(&wqe
->lock
);
514 wqe
->flags
&= ~IO_WQE_FLAG_STALLED
;
515 /* skip unnecessary unlock-lock wqe->lock */
518 raw_spin_unlock_irq(&wqe
->lock
);
522 raw_spin_lock_irq(&wqe
->lock
);
526 static int io_wqe_worker(void *data
)
528 struct io_worker
*worker
= data
;
529 struct io_wqe
*wqe
= worker
->wqe
;
530 struct io_wq
*wq
= wqe
->wq
;
531 char buf
[TASK_COMM_LEN
];
533 worker
->flags
|= (IO_WORKER_F_UP
| IO_WORKER_F_RUNNING
);
535 snprintf(buf
, sizeof(buf
), "iou-wrk-%d", wq
->task
->pid
);
536 set_task_comm(current
, buf
);
538 while (!test_bit(IO_WQ_BIT_EXIT
, &wq
->state
)) {
541 set_current_state(TASK_INTERRUPTIBLE
);
543 raw_spin_lock_irq(&wqe
->lock
);
544 if (io_wqe_run_queue(wqe
)) {
545 io_worker_handle_work(worker
);
548 __io_worker_idle(wqe
, worker
);
549 raw_spin_unlock_irq(&wqe
->lock
);
550 if (io_flush_signals())
552 ret
= schedule_timeout(WORKER_IDLE_TIMEOUT
);
553 if (signal_pending(current
)) {
556 if (!get_signal(&ksig
))
562 /* timed out, exit unless we're the fixed worker */
563 if (!(worker
->flags
& IO_WORKER_F_FIXED
))
567 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
)) {
568 raw_spin_lock_irq(&wqe
->lock
);
569 io_worker_handle_work(worker
);
572 io_worker_exit(worker
);
577 * Called when a worker is scheduled in. Mark us as currently running.
579 void io_wq_worker_running(struct task_struct
*tsk
)
581 struct io_worker
*worker
= tsk
->pf_io_worker
;
585 if (!(worker
->flags
& IO_WORKER_F_UP
))
587 if (worker
->flags
& IO_WORKER_F_RUNNING
)
589 worker
->flags
|= IO_WORKER_F_RUNNING
;
590 io_wqe_inc_running(worker
);
594 * Called when worker is going to sleep. If there are no workers currently
595 * running and we have work pending, wake up a free one or create a new one.
597 void io_wq_worker_sleeping(struct task_struct
*tsk
)
599 struct io_worker
*worker
= tsk
->pf_io_worker
;
603 if (!(worker
->flags
& IO_WORKER_F_UP
))
605 if (!(worker
->flags
& IO_WORKER_F_RUNNING
))
608 worker
->flags
&= ~IO_WORKER_F_RUNNING
;
610 raw_spin_lock_irq(&worker
->wqe
->lock
);
611 io_wqe_dec_running(worker
);
612 raw_spin_unlock_irq(&worker
->wqe
->lock
);
615 static void create_io_worker(struct io_wq
*wq
, struct io_wqe
*wqe
, int index
)
617 struct io_wqe_acct
*acct
= &wqe
->acct
[index
];
618 struct io_worker
*worker
;
619 struct task_struct
*tsk
;
621 __set_current_state(TASK_RUNNING
);
623 worker
= kzalloc_node(sizeof(*worker
), GFP_KERNEL
, wqe
->node
);
627 refcount_set(&worker
->ref
, 1);
628 worker
->nulls_node
.pprev
= NULL
;
630 spin_lock_init(&worker
->lock
);
631 init_completion(&worker
->ref_done
);
633 tsk
= create_io_thread(io_wqe_worker
, worker
, wqe
->node
);
637 atomic_dec(&acct
->nr_running
);
638 io_worker_ref_put(wq
);
642 tsk
->pf_io_worker
= worker
;
644 set_cpus_allowed_ptr(tsk
, wqe
->cpu_mask
);
645 tsk
->flags
|= PF_NO_SETAFFINITY
;
647 raw_spin_lock_irq(&wqe
->lock
);
648 hlist_nulls_add_head_rcu(&worker
->nulls_node
, &wqe
->free_list
);
649 list_add_tail_rcu(&worker
->all_list
, &wqe
->all_list
);
650 worker
->flags
|= IO_WORKER_F_FREE
;
651 if (index
== IO_WQ_ACCT_BOUND
)
652 worker
->flags
|= IO_WORKER_F_BOUND
;
653 if (!acct
->nr_workers
&& (worker
->flags
& IO_WORKER_F_BOUND
))
654 worker
->flags
|= IO_WORKER_F_FIXED
;
656 raw_spin_unlock_irq(&wqe
->lock
);
657 wake_up_new_task(tsk
);
661 * Iterate the passed in list and call the specific function for each
662 * worker that isn't exiting
664 static bool io_wq_for_each_worker(struct io_wqe
*wqe
,
665 bool (*func
)(struct io_worker
*, void *),
668 struct io_worker
*worker
;
671 list_for_each_entry_rcu(worker
, &wqe
->all_list
, all_list
) {
672 if (io_worker_get(worker
)) {
673 /* no task if node is/was offline */
675 ret
= func(worker
, data
);
676 io_worker_release(worker
);
685 static bool io_wq_worker_wake(struct io_worker
*worker
, void *data
)
687 set_notify_signal(worker
->task
);
688 wake_up_process(worker
->task
);
692 static bool io_wq_work_match_all(struct io_wq_work
*work
, void *data
)
697 static void io_run_cancel(struct io_wq_work
*work
, struct io_wqe
*wqe
)
699 struct io_wq
*wq
= wqe
->wq
;
702 work
->flags
|= IO_WQ_WORK_CANCEL
;
704 work
= wq
->free_work(work
);
708 static void io_wqe_insert_work(struct io_wqe
*wqe
, struct io_wq_work
*work
)
711 struct io_wq_work
*tail
;
713 if (!io_wq_is_hashed(work
)) {
715 wq_list_add_tail(&work
->list
, &wqe
->work_list
);
719 hash
= io_get_work_hash(work
);
720 tail
= wqe
->hash_tail
[hash
];
721 wqe
->hash_tail
[hash
] = work
;
725 wq_list_add_after(&work
->list
, &tail
->list
, &wqe
->work_list
);
728 static void io_wqe_enqueue(struct io_wqe
*wqe
, struct io_wq_work
*work
)
730 struct io_wqe_acct
*acct
= io_work_get_acct(wqe
, work
);
734 if (test_bit(IO_WQ_BIT_EXIT
, &wqe
->wq
->state
)) {
735 io_run_cancel(work
, wqe
);
739 work_flags
= work
->flags
;
740 raw_spin_lock_irqsave(&wqe
->lock
, flags
);
741 io_wqe_insert_work(wqe
, work
);
742 wqe
->flags
&= ~IO_WQE_FLAG_STALLED
;
743 raw_spin_unlock_irqrestore(&wqe
->lock
, flags
);
745 if ((work_flags
& IO_WQ_WORK_CONCURRENT
) ||
746 !atomic_read(&acct
->nr_running
))
747 io_wqe_wake_worker(wqe
, acct
);
750 void io_wq_enqueue(struct io_wq
*wq
, struct io_wq_work
*work
)
752 struct io_wqe
*wqe
= wq
->wqes
[numa_node_id()];
754 io_wqe_enqueue(wqe
, work
);
758 * Work items that hash to the same value will not be done in parallel.
759 * Used to limit concurrent writes, generally hashed by inode.
761 void io_wq_hash_work(struct io_wq_work
*work
, void *val
)
765 bit
= hash_ptr(val
, IO_WQ_HASH_ORDER
);
766 work
->flags
|= (IO_WQ_WORK_HASHED
| (bit
<< IO_WQ_HASH_SHIFT
));
769 static bool io_wq_worker_cancel(struct io_worker
*worker
, void *data
)
771 struct io_cb_cancel_data
*match
= data
;
775 * Hold the lock to avoid ->cur_work going out of scope, caller
776 * may dereference the passed in work.
778 spin_lock_irqsave(&worker
->lock
, flags
);
779 if (worker
->cur_work
&&
780 match
->fn(worker
->cur_work
, match
->data
)) {
781 set_notify_signal(worker
->task
);
784 spin_unlock_irqrestore(&worker
->lock
, flags
);
786 return match
->nr_running
&& !match
->cancel_all
;
789 static inline void io_wqe_remove_pending(struct io_wqe
*wqe
,
790 struct io_wq_work
*work
,
791 struct io_wq_work_node
*prev
)
793 unsigned int hash
= io_get_work_hash(work
);
794 struct io_wq_work
*prev_work
= NULL
;
796 if (io_wq_is_hashed(work
) && work
== wqe
->hash_tail
[hash
]) {
798 prev_work
= container_of(prev
, struct io_wq_work
, list
);
799 if (prev_work
&& io_get_work_hash(prev_work
) == hash
)
800 wqe
->hash_tail
[hash
] = prev_work
;
802 wqe
->hash_tail
[hash
] = NULL
;
804 wq_list_del(&wqe
->work_list
, &work
->list
, prev
);
807 static void io_wqe_cancel_pending_work(struct io_wqe
*wqe
,
808 struct io_cb_cancel_data
*match
)
810 struct io_wq_work_node
*node
, *prev
;
811 struct io_wq_work
*work
;
815 raw_spin_lock_irqsave(&wqe
->lock
, flags
);
816 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
817 work
= container_of(node
, struct io_wq_work
, list
);
818 if (!match
->fn(work
, match
->data
))
820 io_wqe_remove_pending(wqe
, work
, prev
);
821 raw_spin_unlock_irqrestore(&wqe
->lock
, flags
);
822 io_run_cancel(work
, wqe
);
824 if (!match
->cancel_all
)
827 /* not safe to continue after unlock */
830 raw_spin_unlock_irqrestore(&wqe
->lock
, flags
);
833 static void io_wqe_cancel_running_work(struct io_wqe
*wqe
,
834 struct io_cb_cancel_data
*match
)
837 io_wq_for_each_worker(wqe
, io_wq_worker_cancel
, match
);
841 enum io_wq_cancel
io_wq_cancel_cb(struct io_wq
*wq
, work_cancel_fn
*cancel
,
842 void *data
, bool cancel_all
)
844 struct io_cb_cancel_data match
= {
847 .cancel_all
= cancel_all
,
852 * First check pending list, if we're lucky we can just remove it
853 * from there. CANCEL_OK means that the work is returned as-new,
854 * no completion will be posted for it.
856 for_each_node(node
) {
857 struct io_wqe
*wqe
= wq
->wqes
[node
];
859 io_wqe_cancel_pending_work(wqe
, &match
);
860 if (match
.nr_pending
&& !match
.cancel_all
)
861 return IO_WQ_CANCEL_OK
;
865 * Now check if a free (going busy) or busy worker has the work
866 * currently running. If we find it there, we'll return CANCEL_RUNNING
867 * as an indication that we attempt to signal cancellation. The
868 * completion will run normally in this case.
870 for_each_node(node
) {
871 struct io_wqe
*wqe
= wq
->wqes
[node
];
873 io_wqe_cancel_running_work(wqe
, &match
);
874 if (match
.nr_running
&& !match
.cancel_all
)
875 return IO_WQ_CANCEL_RUNNING
;
878 if (match
.nr_running
)
879 return IO_WQ_CANCEL_RUNNING
;
880 if (match
.nr_pending
)
881 return IO_WQ_CANCEL_OK
;
882 return IO_WQ_CANCEL_NOTFOUND
;
885 static int io_wqe_hash_wake(struct wait_queue_entry
*wait
, unsigned mode
,
888 struct io_wqe
*wqe
= container_of(wait
, struct io_wqe
, wait
);
890 list_del_init(&wait
->entry
);
893 io_wqe_activate_free_worker(wqe
);
898 struct io_wq
*io_wq_create(unsigned bounded
, struct io_wq_data
*data
)
903 if (WARN_ON_ONCE(!data
->free_work
|| !data
->do_work
))
904 return ERR_PTR(-EINVAL
);
905 if (WARN_ON_ONCE(!bounded
))
906 return ERR_PTR(-EINVAL
);
908 wq
= kzalloc(struct_size(wq
, wqes
, nr_node_ids
), GFP_KERNEL
);
910 return ERR_PTR(-ENOMEM
);
911 ret
= cpuhp_state_add_instance_nocalls(io_wq_online
, &wq
->cpuhp_node
);
915 refcount_inc(&data
->hash
->refs
);
916 wq
->hash
= data
->hash
;
917 wq
->free_work
= data
->free_work
;
918 wq
->do_work
= data
->do_work
;
921 for_each_node(node
) {
923 int alloc_node
= node
;
925 if (!node_online(alloc_node
))
926 alloc_node
= NUMA_NO_NODE
;
927 wqe
= kzalloc_node(sizeof(struct io_wqe
), GFP_KERNEL
, alloc_node
);
930 if (!alloc_cpumask_var(&wqe
->cpu_mask
, GFP_KERNEL
))
932 cpumask_copy(wqe
->cpu_mask
, cpumask_of_node(node
));
933 wq
->wqes
[node
] = wqe
;
934 wqe
->node
= alloc_node
;
935 wqe
->acct
[IO_WQ_ACCT_BOUND
].index
= IO_WQ_ACCT_BOUND
;
936 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].index
= IO_WQ_ACCT_UNBOUND
;
937 wqe
->acct
[IO_WQ_ACCT_BOUND
].max_workers
= bounded
;
938 atomic_set(&wqe
->acct
[IO_WQ_ACCT_BOUND
].nr_running
, 0);
939 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].max_workers
=
940 task_rlimit(current
, RLIMIT_NPROC
);
941 atomic_set(&wqe
->acct
[IO_WQ_ACCT_UNBOUND
].nr_running
, 0);
942 wqe
->wait
.func
= io_wqe_hash_wake
;
943 INIT_LIST_HEAD(&wqe
->wait
.entry
);
945 raw_spin_lock_init(&wqe
->lock
);
946 INIT_WQ_LIST(&wqe
->work_list
);
947 INIT_HLIST_NULLS_HEAD(&wqe
->free_list
, 0);
948 INIT_LIST_HEAD(&wqe
->all_list
);
951 wq
->task
= get_task_struct(data
->task
);
952 atomic_set(&wq
->worker_refs
, 1);
953 init_completion(&wq
->worker_done
);
956 io_wq_put_hash(data
->hash
);
957 cpuhp_state_remove_instance_nocalls(io_wq_online
, &wq
->cpuhp_node
);
958 for_each_node(node
) {
961 free_cpumask_var(wq
->wqes
[node
]->cpu_mask
);
962 kfree(wq
->wqes
[node
]);
969 static bool io_task_work_match(struct callback_head
*cb
, void *data
)
971 struct create_worker_data
*cwd
;
973 if (cb
->func
!= create_worker_cb
)
975 cwd
= container_of(cb
, struct create_worker_data
, work
);
976 return cwd
->wqe
->wq
== data
;
979 void io_wq_exit_start(struct io_wq
*wq
)
981 set_bit(IO_WQ_BIT_EXIT
, &wq
->state
);
984 static void io_wq_exit_workers(struct io_wq
*wq
)
986 struct callback_head
*cb
;
992 while ((cb
= task_work_cancel_match(wq
->task
, io_task_work_match
, wq
)) != NULL
) {
993 struct create_worker_data
*cwd
;
995 cwd
= container_of(cb
, struct create_worker_data
, work
);
996 atomic_dec(&cwd
->wqe
->acct
[cwd
->index
].nr_running
);
997 io_worker_ref_put(wq
);
1002 for_each_node(node
) {
1003 struct io_wqe
*wqe
= wq
->wqes
[node
];
1005 io_wq_for_each_worker(wqe
, io_wq_worker_wake
, NULL
);
1008 io_worker_ref_put(wq
);
1009 wait_for_completion(&wq
->worker_done
);
1011 for_each_node(node
) {
1012 spin_lock_irq(&wq
->hash
->wait
.lock
);
1013 list_del_init(&wq
->wqes
[node
]->wait
.entry
);
1014 spin_unlock_irq(&wq
->hash
->wait
.lock
);
1016 put_task_struct(wq
->task
);
1020 static void io_wq_destroy(struct io_wq
*wq
)
1024 cpuhp_state_remove_instance_nocalls(io_wq_online
, &wq
->cpuhp_node
);
1026 for_each_node(node
) {
1027 struct io_wqe
*wqe
= wq
->wqes
[node
];
1028 struct io_cb_cancel_data match
= {
1029 .fn
= io_wq_work_match_all
,
1032 io_wqe_cancel_pending_work(wqe
, &match
);
1033 free_cpumask_var(wqe
->cpu_mask
);
1036 io_wq_put_hash(wq
->hash
);
1040 void io_wq_put_and_exit(struct io_wq
*wq
)
1042 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT
, &wq
->state
));
1044 io_wq_exit_workers(wq
);
1048 struct online_data
{
1053 static bool io_wq_worker_affinity(struct io_worker
*worker
, void *data
)
1055 struct online_data
*od
= data
;
1058 cpumask_set_cpu(od
->cpu
, worker
->wqe
->cpu_mask
);
1060 cpumask_clear_cpu(od
->cpu
, worker
->wqe
->cpu_mask
);
1064 static int __io_wq_cpu_online(struct io_wq
*wq
, unsigned int cpu
, bool online
)
1066 struct online_data od
= {
1074 io_wq_for_each_worker(wq
->wqes
[i
], io_wq_worker_affinity
, &od
);
1079 static int io_wq_cpu_online(unsigned int cpu
, struct hlist_node
*node
)
1081 struct io_wq
*wq
= hlist_entry_safe(node
, struct io_wq
, cpuhp_node
);
1083 return __io_wq_cpu_online(wq
, cpu
, true);
1086 static int io_wq_cpu_offline(unsigned int cpu
, struct hlist_node
*node
)
1088 struct io_wq
*wq
= hlist_entry_safe(node
, struct io_wq
, cpuhp_node
);
1090 return __io_wq_cpu_online(wq
, cpu
, false);
1093 int io_wq_cpu_affinity(struct io_wq
*wq
, cpumask_var_t mask
)
1099 struct io_wqe
*wqe
= wq
->wqes
[i
];
1102 cpumask_copy(wqe
->cpu_mask
, mask
);
1104 cpumask_copy(wqe
->cpu_mask
, cpumask_of_node(i
));
1110 static __init
int io_wq_init(void)
1114 ret
= cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN
, "io-wq/online",
1115 io_wq_cpu_online
, io_wq_cpu_offline
);
1121 subsys_initcall(io_wq_init
);