1 // SPDX-License-Identifier: GPL-2.0
3 * Basic worker thread pool for io_uring
5 * Copyright (C) 2019 Jens Axboe
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
13 #include <linux/sched/mm.h>
14 #include <linux/percpu.h>
15 #include <linux/slab.h>
16 #include <linux/rculist_nulls.h>
17 #include <linux/cpu.h>
18 #include <linux/tracehook.h>
22 #define WORKER_IDLE_TIMEOUT (5 * HZ)
25 IO_WORKER_F_UP
= 1, /* up and active */
26 IO_WORKER_F_RUNNING
= 2, /* account as running */
27 IO_WORKER_F_FREE
= 4, /* worker on free list */
28 IO_WORKER_F_FIXED
= 8, /* static idle worker */
29 IO_WORKER_F_BOUND
= 16, /* is doing bounded work */
33 IO_WQ_BIT_EXIT
= 0, /* wq exiting */
37 IO_WQE_FLAG_STALLED
= 1, /* stalled on hash */
41 * One for each thread in a wqe pool
46 struct hlist_nulls_node nulls_node
;
47 struct list_head all_list
;
48 struct task_struct
*task
;
51 struct io_wq_work
*cur_work
;
54 struct completion ref_done
;
59 #if BITS_PER_LONG == 64
60 #define IO_WQ_HASH_ORDER 6
62 #define IO_WQ_HASH_ORDER 5
65 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
80 * Per-node worker thread pool
85 struct io_wq_work_list work_list
;
87 } ____cacheline_aligned_in_smp
;
90 struct io_wqe_acct acct
[2];
92 struct hlist_nulls_head free_list
;
93 struct list_head all_list
;
95 struct wait_queue_entry wait
;
98 struct io_wq_work
*hash_tail
[IO_WQ_NR_HASH_BUCKETS
];
105 struct io_wqe
**wqes
;
108 free_work_fn
*free_work
;
109 io_wq_work_fn
*do_work
;
111 struct io_wq_hash
*hash
;
115 atomic_t worker_refs
;
116 struct completion worker_done
;
118 struct hlist_node cpuhp_node
;
120 struct task_struct
*task
;
123 static enum cpuhp_state io_wq_online
;
125 struct io_cb_cancel_data
{
133 static void create_io_worker(struct io_wq
*wq
, struct io_wqe
*wqe
, int index
);
135 static bool io_worker_get(struct io_worker
*worker
)
137 return refcount_inc_not_zero(&worker
->ref
);
140 static void io_worker_release(struct io_worker
*worker
)
142 if (refcount_dec_and_test(&worker
->ref
))
143 complete(&worker
->ref_done
);
146 static inline struct io_wqe_acct
*io_get_acct(struct io_wqe
*wqe
, bool bound
)
148 return &wqe
->acct
[bound
? IO_WQ_ACCT_BOUND
: IO_WQ_ACCT_UNBOUND
];
151 static inline struct io_wqe_acct
*io_work_get_acct(struct io_wqe
*wqe
,
152 struct io_wq_work
*work
)
154 return io_get_acct(wqe
, !(work
->flags
& IO_WQ_WORK_UNBOUND
));
157 static inline struct io_wqe_acct
*io_wqe_get_acct(struct io_worker
*worker
)
159 return io_get_acct(worker
->wqe
, worker
->flags
& IO_WORKER_F_BOUND
);
162 static void io_worker_ref_put(struct io_wq
*wq
)
164 if (atomic_dec_and_test(&wq
->worker_refs
))
165 complete(&wq
->worker_done
);
168 static void io_worker_exit(struct io_worker
*worker
)
170 struct io_wqe
*wqe
= worker
->wqe
;
171 struct io_wqe_acct
*acct
= io_wqe_get_acct(worker
);
174 if (refcount_dec_and_test(&worker
->ref
))
175 complete(&worker
->ref_done
);
176 wait_for_completion(&worker
->ref_done
);
179 current
->flags
&= ~PF_IO_WORKER
;
180 flags
= worker
->flags
;
182 if (flags
& IO_WORKER_F_RUNNING
)
183 atomic_dec(&acct
->nr_running
);
187 raw_spin_lock_irq(&wqe
->lock
);
188 if (flags
& IO_WORKER_F_FREE
)
189 hlist_nulls_del_rcu(&worker
->nulls_node
);
190 list_del_rcu(&worker
->all_list
);
192 raw_spin_unlock_irq(&wqe
->lock
);
194 kfree_rcu(worker
, rcu
);
195 io_worker_ref_put(wqe
->wq
);
199 static inline bool io_wqe_run_queue(struct io_wqe
*wqe
)
200 __must_hold(wqe
->lock
)
202 if (!wq_list_empty(&wqe
->work_list
) &&
203 !(wqe
->flags
& IO_WQE_FLAG_STALLED
))
209 * Check head of free list for an available worker. If one isn't available,
210 * caller must create one.
212 static bool io_wqe_activate_free_worker(struct io_wqe
*wqe
)
215 struct hlist_nulls_node
*n
;
216 struct io_worker
*worker
;
218 n
= rcu_dereference(hlist_nulls_first_rcu(&wqe
->free_list
));
222 worker
= hlist_nulls_entry(n
, struct io_worker
, nulls_node
);
223 if (io_worker_get(worker
)) {
224 wake_up_process(worker
->task
);
225 io_worker_release(worker
);
233 * We need a worker. If we find a free one, we're good. If not, and we're
234 * below the max number of workers, create one.
236 static void io_wqe_wake_worker(struct io_wqe
*wqe
, struct io_wqe_acct
*acct
)
241 * Most likely an attempt to queue unbounded work on an io_wq that
242 * wasn't setup with any unbounded workers.
244 WARN_ON_ONCE(!acct
->max_workers
);
247 ret
= io_wqe_activate_free_worker(wqe
);
250 if (!ret
&& acct
->nr_workers
< acct
->max_workers
) {
251 atomic_inc(&acct
->nr_running
);
252 atomic_inc(&wqe
->wq
->worker_refs
);
253 create_io_worker(wqe
->wq
, wqe
, acct
->index
);
257 static void io_wqe_inc_running(struct io_worker
*worker
)
259 struct io_wqe_acct
*acct
= io_wqe_get_acct(worker
);
261 atomic_inc(&acct
->nr_running
);
264 struct create_worker_data
{
265 struct callback_head work
;
270 static void create_worker_cb(struct callback_head
*cb
)
272 struct create_worker_data
*cwd
;
275 cwd
= container_of(cb
, struct create_worker_data
, work
);
277 create_io_worker(wq
, cwd
->wqe
, cwd
->index
);
281 static void io_queue_worker_create(struct io_wqe
*wqe
, struct io_wqe_acct
*acct
)
283 struct create_worker_data
*cwd
;
284 struct io_wq
*wq
= wqe
->wq
;
286 /* raced with exit, just ignore create call */
287 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
))
290 cwd
= kmalloc(sizeof(*cwd
), GFP_ATOMIC
);
292 init_task_work(&cwd
->work
, create_worker_cb
);
294 cwd
->index
= acct
->index
;
295 if (!task_work_add(wq
->task
, &cwd
->work
, TWA_SIGNAL
))
301 atomic_dec(&acct
->nr_running
);
302 io_worker_ref_put(wq
);
305 static void io_wqe_dec_running(struct io_worker
*worker
)
306 __must_hold(wqe
->lock
)
308 struct io_wqe_acct
*acct
= io_wqe_get_acct(worker
);
309 struct io_wqe
*wqe
= worker
->wqe
;
311 if (!(worker
->flags
& IO_WORKER_F_UP
))
314 if (atomic_dec_and_test(&acct
->nr_running
) && io_wqe_run_queue(wqe
)) {
315 atomic_inc(&acct
->nr_running
);
316 atomic_inc(&wqe
->wq
->worker_refs
);
317 io_queue_worker_create(wqe
, acct
);
322 * Worker will start processing some work. Move it to the busy list, if
323 * it's currently on the freelist
325 static void __io_worker_busy(struct io_wqe
*wqe
, struct io_worker
*worker
,
326 struct io_wq_work
*work
)
327 __must_hold(wqe
->lock
)
329 bool worker_bound
, work_bound
;
331 BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND
^ IO_WQ_ACCT_BOUND
) != 1);
333 if (worker
->flags
& IO_WORKER_F_FREE
) {
334 worker
->flags
&= ~IO_WORKER_F_FREE
;
335 hlist_nulls_del_init_rcu(&worker
->nulls_node
);
339 * If worker is moving from bound to unbound (or vice versa), then
340 * ensure we update the running accounting.
342 worker_bound
= (worker
->flags
& IO_WORKER_F_BOUND
) != 0;
343 work_bound
= (work
->flags
& IO_WQ_WORK_UNBOUND
) == 0;
344 if (worker_bound
!= work_bound
) {
345 int index
= work_bound
? IO_WQ_ACCT_UNBOUND
: IO_WQ_ACCT_BOUND
;
346 io_wqe_dec_running(worker
);
347 worker
->flags
^= IO_WORKER_F_BOUND
;
348 wqe
->acct
[index
].nr_workers
--;
349 wqe
->acct
[index
^ 1].nr_workers
++;
350 io_wqe_inc_running(worker
);
355 * No work, worker going to sleep. Move to freelist, and unuse mm if we
356 * have one attached. Dropping the mm may potentially sleep, so we drop
357 * the lock in that case and return success. Since the caller has to
358 * retry the loop in that case (we changed task state), we don't regrab
359 * the lock if we return success.
361 static void __io_worker_idle(struct io_wqe
*wqe
, struct io_worker
*worker
)
362 __must_hold(wqe
->lock
)
364 if (!(worker
->flags
& IO_WORKER_F_FREE
)) {
365 worker
->flags
|= IO_WORKER_F_FREE
;
366 hlist_nulls_add_head_rcu(&worker
->nulls_node
, &wqe
->free_list
);
370 static inline unsigned int io_get_work_hash(struct io_wq_work
*work
)
372 return work
->flags
>> IO_WQ_HASH_SHIFT
;
375 static void io_wait_on_hash(struct io_wqe
*wqe
, unsigned int hash
)
377 struct io_wq
*wq
= wqe
->wq
;
379 spin_lock(&wq
->hash
->wait
.lock
);
380 if (list_empty(&wqe
->wait
.entry
)) {
381 __add_wait_queue(&wq
->hash
->wait
, &wqe
->wait
);
382 if (!test_bit(hash
, &wq
->hash
->map
)) {
383 __set_current_state(TASK_RUNNING
);
384 list_del_init(&wqe
->wait
.entry
);
387 spin_unlock(&wq
->hash
->wait
.lock
);
390 static struct io_wq_work
*io_get_next_work(struct io_wqe
*wqe
)
391 __must_hold(wqe
->lock
)
393 struct io_wq_work_node
*node
, *prev
;
394 struct io_wq_work
*work
, *tail
;
395 unsigned int stall_hash
= -1U;
397 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
400 work
= container_of(node
, struct io_wq_work
, list
);
402 /* not hashed, can run anytime */
403 if (!io_wq_is_hashed(work
)) {
404 wq_list_del(&wqe
->work_list
, node
, prev
);
408 hash
= io_get_work_hash(work
);
409 /* all items with this hash lie in [work, tail] */
410 tail
= wqe
->hash_tail
[hash
];
412 /* hashed, can run if not already running */
413 if (!test_and_set_bit(hash
, &wqe
->wq
->hash
->map
)) {
414 wqe
->hash_tail
[hash
] = NULL
;
415 wq_list_cut(&wqe
->work_list
, &tail
->list
, prev
);
418 if (stall_hash
== -1U)
420 /* fast forward to a next hash, for-each will fix up @prev */
424 if (stall_hash
!= -1U) {
425 raw_spin_unlock(&wqe
->lock
);
426 io_wait_on_hash(wqe
, stall_hash
);
427 raw_spin_lock(&wqe
->lock
);
433 static bool io_flush_signals(void)
435 if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL
))) {
436 __set_current_state(TASK_RUNNING
);
437 tracehook_notify_signal();
443 static void io_assign_current_work(struct io_worker
*worker
,
444 struct io_wq_work
*work
)
451 spin_lock_irq(&worker
->lock
);
452 worker
->cur_work
= work
;
453 spin_unlock_irq(&worker
->lock
);
456 static void io_wqe_enqueue(struct io_wqe
*wqe
, struct io_wq_work
*work
);
458 static void io_worker_handle_work(struct io_worker
*worker
)
459 __releases(wqe
->lock
)
461 struct io_wqe
*wqe
= worker
->wqe
;
462 struct io_wq
*wq
= wqe
->wq
;
463 bool do_kill
= test_bit(IO_WQ_BIT_EXIT
, &wq
->state
);
466 struct io_wq_work
*work
;
469 * If we got some work, mark us as busy. If we didn't, but
470 * the list isn't empty, it means we stalled on hashed work.
471 * Mark us stalled so we don't keep looking for work when we
472 * can't make progress, any work completion or insertion will
473 * clear the stalled flag.
475 work
= io_get_next_work(wqe
);
477 __io_worker_busy(wqe
, worker
, work
);
478 else if (!wq_list_empty(&wqe
->work_list
))
479 wqe
->flags
|= IO_WQE_FLAG_STALLED
;
481 raw_spin_unlock_irq(&wqe
->lock
);
484 io_assign_current_work(worker
, work
);
485 __set_current_state(TASK_RUNNING
);
487 /* handle a whole dependent link */
489 struct io_wq_work
*next_hashed
, *linked
;
490 unsigned int hash
= io_get_work_hash(work
);
492 next_hashed
= wq_next_work(work
);
494 if (unlikely(do_kill
) && (work
->flags
& IO_WQ_WORK_UNBOUND
))
495 work
->flags
|= IO_WQ_WORK_CANCEL
;
497 io_assign_current_work(worker
, NULL
);
499 linked
= wq
->free_work(work
);
501 if (!work
&& linked
&& !io_wq_is_hashed(linked
)) {
505 io_assign_current_work(worker
, work
);
507 io_wqe_enqueue(wqe
, linked
);
509 if (hash
!= -1U && !next_hashed
) {
510 clear_bit(hash
, &wq
->hash
->map
);
511 if (wq_has_sleeper(&wq
->hash
->wait
))
512 wake_up(&wq
->hash
->wait
);
513 raw_spin_lock_irq(&wqe
->lock
);
514 wqe
->flags
&= ~IO_WQE_FLAG_STALLED
;
515 /* skip unnecessary unlock-lock wqe->lock */
518 raw_spin_unlock_irq(&wqe
->lock
);
522 raw_spin_lock_irq(&wqe
->lock
);
526 static int io_wqe_worker(void *data
)
528 struct io_worker
*worker
= data
;
529 struct io_wqe
*wqe
= worker
->wqe
;
530 struct io_wq
*wq
= wqe
->wq
;
531 char buf
[TASK_COMM_LEN
];
533 worker
->flags
|= (IO_WORKER_F_UP
| IO_WORKER_F_RUNNING
);
535 snprintf(buf
, sizeof(buf
), "iou-wrk-%d", wq
->task
->pid
);
536 set_task_comm(current
, buf
);
538 while (!test_bit(IO_WQ_BIT_EXIT
, &wq
->state
)) {
541 set_current_state(TASK_INTERRUPTIBLE
);
543 raw_spin_lock_irq(&wqe
->lock
);
544 if (io_wqe_run_queue(wqe
)) {
545 io_worker_handle_work(worker
);
548 __io_worker_idle(wqe
, worker
);
549 raw_spin_unlock_irq(&wqe
->lock
);
550 if (io_flush_signals())
552 ret
= schedule_timeout(WORKER_IDLE_TIMEOUT
);
553 if (signal_pending(current
)) {
556 if (!get_signal(&ksig
))
562 /* timed out, exit unless we're the fixed worker */
563 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
) ||
564 !(worker
->flags
& IO_WORKER_F_FIXED
))
568 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
)) {
569 raw_spin_lock_irq(&wqe
->lock
);
570 if (!wq_list_empty(&wqe
->work_list
))
571 io_worker_handle_work(worker
);
573 raw_spin_unlock_irq(&wqe
->lock
);
576 io_worker_exit(worker
);
581 * Called when a worker is scheduled in. Mark us as currently running.
583 void io_wq_worker_running(struct task_struct
*tsk
)
585 struct io_worker
*worker
= tsk
->pf_io_worker
;
589 if (!(worker
->flags
& IO_WORKER_F_UP
))
591 if (worker
->flags
& IO_WORKER_F_RUNNING
)
593 worker
->flags
|= IO_WORKER_F_RUNNING
;
594 io_wqe_inc_running(worker
);
598 * Called when worker is going to sleep. If there are no workers currently
599 * running and we have work pending, wake up a free one or create a new one.
601 void io_wq_worker_sleeping(struct task_struct
*tsk
)
603 struct io_worker
*worker
= tsk
->pf_io_worker
;
607 if (!(worker
->flags
& IO_WORKER_F_UP
))
609 if (!(worker
->flags
& IO_WORKER_F_RUNNING
))
612 worker
->flags
&= ~IO_WORKER_F_RUNNING
;
614 raw_spin_lock_irq(&worker
->wqe
->lock
);
615 io_wqe_dec_running(worker
);
616 raw_spin_unlock_irq(&worker
->wqe
->lock
);
619 static void create_io_worker(struct io_wq
*wq
, struct io_wqe
*wqe
, int index
)
621 struct io_wqe_acct
*acct
= &wqe
->acct
[index
];
622 struct io_worker
*worker
;
623 struct task_struct
*tsk
;
625 __set_current_state(TASK_RUNNING
);
627 worker
= kzalloc_node(sizeof(*worker
), GFP_KERNEL
, wqe
->node
);
631 refcount_set(&worker
->ref
, 1);
632 worker
->nulls_node
.pprev
= NULL
;
634 spin_lock_init(&worker
->lock
);
635 init_completion(&worker
->ref_done
);
637 tsk
= create_io_thread(io_wqe_worker
, worker
, wqe
->node
);
641 atomic_dec(&acct
->nr_running
);
642 io_worker_ref_put(wq
);
646 tsk
->pf_io_worker
= worker
;
648 set_cpus_allowed_ptr(tsk
, cpumask_of_node(wqe
->node
));
649 tsk
->flags
|= PF_NO_SETAFFINITY
;
651 raw_spin_lock_irq(&wqe
->lock
);
652 hlist_nulls_add_head_rcu(&worker
->nulls_node
, &wqe
->free_list
);
653 list_add_tail_rcu(&worker
->all_list
, &wqe
->all_list
);
654 worker
->flags
|= IO_WORKER_F_FREE
;
655 if (index
== IO_WQ_ACCT_BOUND
)
656 worker
->flags
|= IO_WORKER_F_BOUND
;
657 if (!acct
->nr_workers
&& (worker
->flags
& IO_WORKER_F_BOUND
))
658 worker
->flags
|= IO_WORKER_F_FIXED
;
660 raw_spin_unlock_irq(&wqe
->lock
);
661 wake_up_new_task(tsk
);
665 * Iterate the passed in list and call the specific function for each
666 * worker that isn't exiting
668 static bool io_wq_for_each_worker(struct io_wqe
*wqe
,
669 bool (*func
)(struct io_worker
*, void *),
672 struct io_worker
*worker
;
675 list_for_each_entry_rcu(worker
, &wqe
->all_list
, all_list
) {
676 if (io_worker_get(worker
)) {
677 /* no task if node is/was offline */
679 ret
= func(worker
, data
);
680 io_worker_release(worker
);
689 static bool io_wq_worker_wake(struct io_worker
*worker
, void *data
)
691 set_notify_signal(worker
->task
);
692 wake_up_process(worker
->task
);
696 static bool io_wq_work_match_all(struct io_wq_work
*work
, void *data
)
701 static void io_run_cancel(struct io_wq_work
*work
, struct io_wqe
*wqe
)
703 struct io_wq
*wq
= wqe
->wq
;
706 work
->flags
|= IO_WQ_WORK_CANCEL
;
708 work
= wq
->free_work(work
);
712 static void io_wqe_insert_work(struct io_wqe
*wqe
, struct io_wq_work
*work
)
715 struct io_wq_work
*tail
;
717 if (!io_wq_is_hashed(work
)) {
719 wq_list_add_tail(&work
->list
, &wqe
->work_list
);
723 hash
= io_get_work_hash(work
);
724 tail
= wqe
->hash_tail
[hash
];
725 wqe
->hash_tail
[hash
] = work
;
729 wq_list_add_after(&work
->list
, &tail
->list
, &wqe
->work_list
);
732 static void io_wqe_enqueue(struct io_wqe
*wqe
, struct io_wq_work
*work
)
734 struct io_wqe_acct
*acct
= io_work_get_acct(wqe
, work
);
738 if (test_bit(IO_WQ_BIT_EXIT
, &wqe
->wq
->state
)) {
739 io_run_cancel(work
, wqe
);
743 work_flags
= work
->flags
;
744 raw_spin_lock_irqsave(&wqe
->lock
, flags
);
745 io_wqe_insert_work(wqe
, work
);
746 wqe
->flags
&= ~IO_WQE_FLAG_STALLED
;
747 raw_spin_unlock_irqrestore(&wqe
->lock
, flags
);
749 if ((work_flags
& IO_WQ_WORK_CONCURRENT
) ||
750 !atomic_read(&acct
->nr_running
))
751 io_wqe_wake_worker(wqe
, acct
);
754 void io_wq_enqueue(struct io_wq
*wq
, struct io_wq_work
*work
)
756 struct io_wqe
*wqe
= wq
->wqes
[numa_node_id()];
758 io_wqe_enqueue(wqe
, work
);
762 * Work items that hash to the same value will not be done in parallel.
763 * Used to limit concurrent writes, generally hashed by inode.
765 void io_wq_hash_work(struct io_wq_work
*work
, void *val
)
769 bit
= hash_ptr(val
, IO_WQ_HASH_ORDER
);
770 work
->flags
|= (IO_WQ_WORK_HASHED
| (bit
<< IO_WQ_HASH_SHIFT
));
773 static bool io_wq_worker_cancel(struct io_worker
*worker
, void *data
)
775 struct io_cb_cancel_data
*match
= data
;
779 * Hold the lock to avoid ->cur_work going out of scope, caller
780 * may dereference the passed in work.
782 spin_lock_irqsave(&worker
->lock
, flags
);
783 if (worker
->cur_work
&&
784 match
->fn(worker
->cur_work
, match
->data
)) {
785 set_notify_signal(worker
->task
);
788 spin_unlock_irqrestore(&worker
->lock
, flags
);
790 return match
->nr_running
&& !match
->cancel_all
;
793 static inline void io_wqe_remove_pending(struct io_wqe
*wqe
,
794 struct io_wq_work
*work
,
795 struct io_wq_work_node
*prev
)
797 unsigned int hash
= io_get_work_hash(work
);
798 struct io_wq_work
*prev_work
= NULL
;
800 if (io_wq_is_hashed(work
) && work
== wqe
->hash_tail
[hash
]) {
802 prev_work
= container_of(prev
, struct io_wq_work
, list
);
803 if (prev_work
&& io_get_work_hash(prev_work
) == hash
)
804 wqe
->hash_tail
[hash
] = prev_work
;
806 wqe
->hash_tail
[hash
] = NULL
;
808 wq_list_del(&wqe
->work_list
, &work
->list
, prev
);
811 static void io_wqe_cancel_pending_work(struct io_wqe
*wqe
,
812 struct io_cb_cancel_data
*match
)
814 struct io_wq_work_node
*node
, *prev
;
815 struct io_wq_work
*work
;
819 raw_spin_lock_irqsave(&wqe
->lock
, flags
);
820 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
821 work
= container_of(node
, struct io_wq_work
, list
);
822 if (!match
->fn(work
, match
->data
))
824 io_wqe_remove_pending(wqe
, work
, prev
);
825 raw_spin_unlock_irqrestore(&wqe
->lock
, flags
);
826 io_run_cancel(work
, wqe
);
828 if (!match
->cancel_all
)
831 /* not safe to continue after unlock */
834 raw_spin_unlock_irqrestore(&wqe
->lock
, flags
);
837 static void io_wqe_cancel_running_work(struct io_wqe
*wqe
,
838 struct io_cb_cancel_data
*match
)
841 io_wq_for_each_worker(wqe
, io_wq_worker_cancel
, match
);
845 enum io_wq_cancel
io_wq_cancel_cb(struct io_wq
*wq
, work_cancel_fn
*cancel
,
846 void *data
, bool cancel_all
)
848 struct io_cb_cancel_data match
= {
851 .cancel_all
= cancel_all
,
856 * First check pending list, if we're lucky we can just remove it
857 * from there. CANCEL_OK means that the work is returned as-new,
858 * no completion will be posted for it.
860 for_each_node(node
) {
861 struct io_wqe
*wqe
= wq
->wqes
[node
];
863 io_wqe_cancel_pending_work(wqe
, &match
);
864 if (match
.nr_pending
&& !match
.cancel_all
)
865 return IO_WQ_CANCEL_OK
;
869 * Now check if a free (going busy) or busy worker has the work
870 * currently running. If we find it there, we'll return CANCEL_RUNNING
871 * as an indication that we attempt to signal cancellation. The
872 * completion will run normally in this case.
874 for_each_node(node
) {
875 struct io_wqe
*wqe
= wq
->wqes
[node
];
877 io_wqe_cancel_running_work(wqe
, &match
);
878 if (match
.nr_running
&& !match
.cancel_all
)
879 return IO_WQ_CANCEL_RUNNING
;
882 if (match
.nr_running
)
883 return IO_WQ_CANCEL_RUNNING
;
884 if (match
.nr_pending
)
885 return IO_WQ_CANCEL_OK
;
886 return IO_WQ_CANCEL_NOTFOUND
;
889 static int io_wqe_hash_wake(struct wait_queue_entry
*wait
, unsigned mode
,
892 struct io_wqe
*wqe
= container_of(wait
, struct io_wqe
, wait
);
894 list_del_init(&wait
->entry
);
897 io_wqe_activate_free_worker(wqe
);
902 struct io_wq
*io_wq_create(unsigned bounded
, struct io_wq_data
*data
)
904 int ret
= -ENOMEM
, node
;
907 if (WARN_ON_ONCE(!data
->free_work
|| !data
->do_work
))
908 return ERR_PTR(-EINVAL
);
910 wq
= kzalloc(sizeof(*wq
), GFP_KERNEL
);
912 return ERR_PTR(-ENOMEM
);
914 wq
->wqes
= kcalloc(nr_node_ids
, sizeof(struct io_wqe
*), GFP_KERNEL
);
918 ret
= cpuhp_state_add_instance_nocalls(io_wq_online
, &wq
->cpuhp_node
);
922 refcount_inc(&data
->hash
->refs
);
923 wq
->hash
= data
->hash
;
924 wq
->free_work
= data
->free_work
;
925 wq
->do_work
= data
->do_work
;
928 for_each_node(node
) {
930 int alloc_node
= node
;
932 if (!node_online(alloc_node
))
933 alloc_node
= NUMA_NO_NODE
;
934 wqe
= kzalloc_node(sizeof(struct io_wqe
), GFP_KERNEL
, alloc_node
);
937 wq
->wqes
[node
] = wqe
;
938 wqe
->node
= alloc_node
;
939 wqe
->acct
[IO_WQ_ACCT_BOUND
].index
= IO_WQ_ACCT_BOUND
;
940 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].index
= IO_WQ_ACCT_UNBOUND
;
941 wqe
->acct
[IO_WQ_ACCT_BOUND
].max_workers
= bounded
;
942 atomic_set(&wqe
->acct
[IO_WQ_ACCT_BOUND
].nr_running
, 0);
943 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].max_workers
=
944 task_rlimit(current
, RLIMIT_NPROC
);
945 atomic_set(&wqe
->acct
[IO_WQ_ACCT_UNBOUND
].nr_running
, 0);
946 wqe
->wait
.func
= io_wqe_hash_wake
;
947 INIT_LIST_HEAD(&wqe
->wait
.entry
);
949 raw_spin_lock_init(&wqe
->lock
);
950 INIT_WQ_LIST(&wqe
->work_list
);
951 INIT_HLIST_NULLS_HEAD(&wqe
->free_list
, 0);
952 INIT_LIST_HEAD(&wqe
->all_list
);
955 wq
->task
= get_task_struct(data
->task
);
956 refcount_set(&wq
->refs
, 1);
957 atomic_set(&wq
->worker_refs
, 1);
958 init_completion(&wq
->worker_done
);
961 io_wq_put_hash(data
->hash
);
962 cpuhp_state_remove_instance_nocalls(io_wq_online
, &wq
->cpuhp_node
);
964 kfree(wq
->wqes
[node
]);
972 static bool io_task_work_match(struct callback_head
*cb
, void *data
)
974 struct create_worker_data
*cwd
;
976 if (cb
->func
!= create_worker_cb
)
978 cwd
= container_of(cb
, struct create_worker_data
, work
);
979 return cwd
->wqe
->wq
== data
;
982 static void io_wq_exit_workers(struct io_wq
*wq
)
984 struct callback_head
*cb
;
987 set_bit(IO_WQ_BIT_EXIT
, &wq
->state
);
992 while ((cb
= task_work_cancel_match(wq
->task
, io_task_work_match
, wq
)) != NULL
) {
993 struct create_worker_data
*cwd
;
995 cwd
= container_of(cb
, struct create_worker_data
, work
);
996 atomic_dec(&cwd
->wqe
->acct
[cwd
->index
].nr_running
);
997 io_worker_ref_put(wq
);
1002 for_each_node(node
) {
1003 struct io_wqe
*wqe
= wq
->wqes
[node
];
1005 io_wq_for_each_worker(wqe
, io_wq_worker_wake
, NULL
);
1006 spin_lock_irq(&wq
->hash
->wait
.lock
);
1007 list_del_init(&wq
->wqes
[node
]->wait
.entry
);
1008 spin_unlock_irq(&wq
->hash
->wait
.lock
);
1011 io_worker_ref_put(wq
);
1012 wait_for_completion(&wq
->worker_done
);
1013 put_task_struct(wq
->task
);
1017 static void io_wq_destroy(struct io_wq
*wq
)
1021 cpuhp_state_remove_instance_nocalls(io_wq_online
, &wq
->cpuhp_node
);
1023 io_wq_exit_workers(wq
);
1025 for_each_node(node
) {
1026 struct io_wqe
*wqe
= wq
->wqes
[node
];
1027 struct io_cb_cancel_data match
= {
1028 .fn
= io_wq_work_match_all
,
1031 io_wqe_cancel_pending_work(wqe
, &match
);
1034 io_wq_put_hash(wq
->hash
);
1039 void io_wq_put(struct io_wq
*wq
)
1041 if (refcount_dec_and_test(&wq
->refs
))
1045 void io_wq_put_and_exit(struct io_wq
*wq
)
1047 io_wq_exit_workers(wq
);
1051 static bool io_wq_worker_affinity(struct io_worker
*worker
, void *data
)
1053 set_cpus_allowed_ptr(worker
->task
, cpumask_of_node(worker
->wqe
->node
));
1058 static int io_wq_cpu_online(unsigned int cpu
, struct hlist_node
*node
)
1060 struct io_wq
*wq
= hlist_entry_safe(node
, struct io_wq
, cpuhp_node
);
1065 io_wq_for_each_worker(wq
->wqes
[i
], io_wq_worker_affinity
, NULL
);
1070 static __init
int io_wq_init(void)
1074 ret
= cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN
, "io-wq/online",
1075 io_wq_cpu_online
, NULL
);
1081 subsys_initcall(io_wq_init
);