]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/blame - fs/io-wq.c
Merge branch 'akpm' (patches from Andrew)
[mirror_ubuntu-jammy-kernel.git] / fs / io-wq.c
CommitLineData
771b53d0
JA
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
771b53d0
JA
13#include <linux/sched/mm.h>
14#include <linux/percpu.h>
15#include <linux/slab.h>
16#include <linux/kthread.h>
17#include <linux/rculist_nulls.h>
9392a27d 18#include <linux/fs_struct.h>
aa96bf8a 19#include <linux/task_work.h>
91d8f519 20#include <linux/blk-cgroup.h>
4ea33a97 21#include <linux/audit.h>
43c01fbe 22#include <linux/cpu.h>
771b53d0 23
43c01fbe 24#include "../kernel/sched/sched.h"
771b53d0
JA
25#include "io-wq.h"
26
27#define WORKER_IDLE_TIMEOUT (5 * HZ)
28
29enum {
30 IO_WORKER_F_UP = 1, /* up and active */
31 IO_WORKER_F_RUNNING = 2, /* account as running */
32 IO_WORKER_F_FREE = 4, /* worker on free list */
145cc8c6
JA
33 IO_WORKER_F_FIXED = 8, /* static idle worker */
34 IO_WORKER_F_BOUND = 16, /* is doing bounded work */
771b53d0
JA
35};
36
37enum {
38 IO_WQ_BIT_EXIT = 0, /* wq exiting */
446bc1c2 39 IO_WQ_BIT_ERROR = 1, /* error on setup */
771b53d0
JA
40};
41
42enum {
43 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
44};
45
46/*
47 * One for each thread in a wqe pool
48 */
49struct io_worker {
50 refcount_t ref;
51 unsigned flags;
52 struct hlist_nulls_node nulls_node;
e61df66c 53 struct list_head all_list;
771b53d0 54 struct task_struct *task;
771b53d0 55 struct io_wqe *wqe;
36c2f922 56
771b53d0 57 struct io_wq_work *cur_work;
36c2f922 58 spinlock_t lock;
771b53d0
JA
59
60 struct rcu_head rcu;
61 struct mm_struct *mm;
91d8f519
DZ
62#ifdef CONFIG_BLK_CGROUP
63 struct cgroup_subsys_state *blkcg_css;
64#endif
cccf0ee8
JA
65 const struct cred *cur_creds;
66 const struct cred *saved_creds;
fcb323cc 67 struct files_struct *restore_files;
9b828492 68 struct nsproxy *restore_nsproxy;
9392a27d 69 struct fs_struct *restore_fs;
771b53d0
JA
70};
71
771b53d0
JA
72#if BITS_PER_LONG == 64
73#define IO_WQ_HASH_ORDER 6
74#else
75#define IO_WQ_HASH_ORDER 5
76#endif
77
86f3cd1b
PB
78#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
79
c5def4ab
JA
80struct io_wqe_acct {
81 unsigned nr_workers;
82 unsigned max_workers;
83 atomic_t nr_running;
84};
85
86enum {
87 IO_WQ_ACCT_BOUND,
88 IO_WQ_ACCT_UNBOUND,
89};
90
771b53d0
JA
91/*
92 * Per-node worker thread pool
93 */
94struct io_wqe {
95 struct {
95da8465 96 raw_spinlock_t lock;
6206f0e1 97 struct io_wq_work_list work_list;
771b53d0
JA
98 unsigned long hash_map;
99 unsigned flags;
100 } ____cacheline_aligned_in_smp;
101
102 int node;
c5def4ab 103 struct io_wqe_acct acct[2];
771b53d0 104
021d1cdd 105 struct hlist_nulls_head free_list;
e61df66c 106 struct list_head all_list;
771b53d0
JA
107
108 struct io_wq *wq;
86f3cd1b 109 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
771b53d0
JA
110};
111
112/*
113 * Per io_wq state
114 */
115struct io_wq {
116 struct io_wqe **wqes;
117 unsigned long state;
771b53d0 118
e9fd9396 119 free_work_fn *free_work;
f5fa38c5 120 io_wq_work_fn *do_work;
7d723065 121
771b53d0 122 struct task_struct *manager;
c5def4ab 123 struct user_struct *user;
771b53d0
JA
124 refcount_t refs;
125 struct completion done;
848f7e18 126
43c01fbe
JA
127 struct hlist_node cpuhp_node;
128
848f7e18 129 refcount_t use_refs;
771b53d0
JA
130};
131
43c01fbe
JA
132static enum cpuhp_state io_wq_online;
133
771b53d0
JA
134static bool io_worker_get(struct io_worker *worker)
135{
136 return refcount_inc_not_zero(&worker->ref);
137}
138
139static void io_worker_release(struct io_worker *worker)
140{
141 if (refcount_dec_and_test(&worker->ref))
142 wake_up_process(worker->task);
143}
144
145/*
146 * Note: drops the wqe->lock if returning true! The caller must re-acquire
147 * the lock in that case. Some callers need to restart handling if this
148 * happens, so we can't just re-acquire the lock on behalf of the caller.
149 */
150static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
151{
fcb323cc
JA
152 bool dropped_lock = false;
153
cccf0ee8
JA
154 if (worker->saved_creds) {
155 revert_creds(worker->saved_creds);
156 worker->cur_creds = worker->saved_creds = NULL;
181e448d
JA
157 }
158
fcb323cc
JA
159 if (current->files != worker->restore_files) {
160 __acquire(&wqe->lock);
95da8465 161 raw_spin_unlock_irq(&wqe->lock);
fcb323cc
JA
162 dropped_lock = true;
163
164 task_lock(current);
165 current->files = worker->restore_files;
9b828492 166 current->nsproxy = worker->restore_nsproxy;
fcb323cc
JA
167 task_unlock(current);
168 }
169
9392a27d
JA
170 if (current->fs != worker->restore_fs)
171 current->fs = worker->restore_fs;
172
771b53d0
JA
173 /*
174 * If we have an active mm, we need to drop the wq lock before unusing
175 * it. If we do, return true and let the caller retry the idle loop.
176 */
177 if (worker->mm) {
fcb323cc
JA
178 if (!dropped_lock) {
179 __acquire(&wqe->lock);
95da8465 180 raw_spin_unlock_irq(&wqe->lock);
fcb323cc
JA
181 dropped_lock = true;
182 }
771b53d0 183 __set_current_state(TASK_RUNNING);
f5678e7f 184 kthread_unuse_mm(worker->mm);
771b53d0
JA
185 mmput(worker->mm);
186 worker->mm = NULL;
771b53d0
JA
187 }
188
91d8f519
DZ
189#ifdef CONFIG_BLK_CGROUP
190 if (worker->blkcg_css) {
191 kthread_associate_blkcg(NULL);
192 worker->blkcg_css = NULL;
193 }
194#endif
69228338
JA
195 if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
196 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
fcb323cc 197 return dropped_lock;
771b53d0
JA
198}
199
c5def4ab
JA
200static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
201 struct io_wq_work *work)
202{
203 if (work->flags & IO_WQ_WORK_UNBOUND)
204 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
205
206 return &wqe->acct[IO_WQ_ACCT_BOUND];
207}
208
209static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
210 struct io_worker *worker)
211{
212 if (worker->flags & IO_WORKER_F_BOUND)
213 return &wqe->acct[IO_WQ_ACCT_BOUND];
214
215 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
216}
217
771b53d0
JA
218static void io_worker_exit(struct io_worker *worker)
219{
220 struct io_wqe *wqe = worker->wqe;
c5def4ab 221 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
771b53d0
JA
222
223 /*
224 * If we're not at zero, someone else is holding a brief reference
225 * to the worker. Wait for that to go away.
226 */
227 set_current_state(TASK_INTERRUPTIBLE);
228 if (!refcount_dec_and_test(&worker->ref))
229 schedule();
230 __set_current_state(TASK_RUNNING);
231
232 preempt_disable();
233 current->flags &= ~PF_IO_WORKER;
234 if (worker->flags & IO_WORKER_F_RUNNING)
c5def4ab
JA
235 atomic_dec(&acct->nr_running);
236 if (!(worker->flags & IO_WORKER_F_BOUND))
237 atomic_dec(&wqe->wq->user->processes);
771b53d0
JA
238 worker->flags = 0;
239 preempt_enable();
240
95da8465 241 raw_spin_lock_irq(&wqe->lock);
771b53d0 242 hlist_nulls_del_rcu(&worker->nulls_node);
e61df66c 243 list_del_rcu(&worker->all_list);
771b53d0
JA
244 if (__io_worker_unuse(wqe, worker)) {
245 __release(&wqe->lock);
95da8465 246 raw_spin_lock_irq(&wqe->lock);
771b53d0 247 }
c5def4ab 248 acct->nr_workers--;
95da8465 249 raw_spin_unlock_irq(&wqe->lock);
771b53d0 250
364b05fd 251 kfree_rcu(worker, rcu);
c4068bf8
HD
252 if (refcount_dec_and_test(&wqe->wq->refs))
253 complete(&wqe->wq->done);
771b53d0
JA
254}
255
c5def4ab
JA
256static inline bool io_wqe_run_queue(struct io_wqe *wqe)
257 __must_hold(wqe->lock)
258{
6206f0e1
JA
259 if (!wq_list_empty(&wqe->work_list) &&
260 !(wqe->flags & IO_WQE_FLAG_STALLED))
c5def4ab
JA
261 return true;
262 return false;
263}
264
265/*
266 * Check head of free list for an available worker. If one isn't available,
267 * caller must wake up the wq manager to create one.
268 */
269static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
270 __must_hold(RCU)
271{
272 struct hlist_nulls_node *n;
273 struct io_worker *worker;
274
021d1cdd 275 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
c5def4ab
JA
276 if (is_a_nulls(n))
277 return false;
278
279 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
280 if (io_worker_get(worker)) {
506d95ff 281 wake_up_process(worker->task);
c5def4ab
JA
282 io_worker_release(worker);
283 return true;
284 }
285
286 return false;
287}
288
289/*
290 * We need a worker. If we find a free one, we're good. If not, and we're
291 * below the max number of workers, wake up the manager to create one.
292 */
293static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
294{
295 bool ret;
296
297 /*
298 * Most likely an attempt to queue unbounded work on an io_wq that
299 * wasn't setup with any unbounded workers.
300 */
301 WARN_ON_ONCE(!acct->max_workers);
302
303 rcu_read_lock();
304 ret = io_wqe_activate_free_worker(wqe);
305 rcu_read_unlock();
306
307 if (!ret && acct->nr_workers < acct->max_workers)
308 wake_up_process(wqe->wq->manager);
309}
310
311static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
312{
313 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
314
315 atomic_inc(&acct->nr_running);
316}
317
318static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
319 __must_hold(wqe->lock)
320{
321 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
322
323 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
324 io_wqe_wake_worker(wqe, acct);
325}
326
771b53d0
JA
327static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
328{
329 allow_kernel_signal(SIGINT);
330
331 current->flags |= PF_IO_WORKER;
332
333 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
fcb323cc 334 worker->restore_files = current->files;
9b828492 335 worker->restore_nsproxy = current->nsproxy;
9392a27d 336 worker->restore_fs = current->fs;
c5def4ab 337 io_wqe_inc_running(wqe, worker);
771b53d0
JA
338}
339
340/*
341 * Worker will start processing some work. Move it to the busy list, if
342 * it's currently on the freelist
343 */
344static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
345 struct io_wq_work *work)
346 __must_hold(wqe->lock)
347{
c5def4ab
JA
348 bool worker_bound, work_bound;
349
771b53d0
JA
350 if (worker->flags & IO_WORKER_F_FREE) {
351 worker->flags &= ~IO_WORKER_F_FREE;
352 hlist_nulls_del_init_rcu(&worker->nulls_node);
771b53d0 353 }
c5def4ab
JA
354
355 /*
356 * If worker is moving from bound to unbound (or vice versa), then
357 * ensure we update the running accounting.
358 */
b2e9c7d6
DC
359 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
360 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
361 if (worker_bound != work_bound) {
c5def4ab
JA
362 io_wqe_dec_running(wqe, worker);
363 if (work_bound) {
364 worker->flags |= IO_WORKER_F_BOUND;
365 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
366 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
367 atomic_dec(&wqe->wq->user->processes);
368 } else {
369 worker->flags &= ~IO_WORKER_F_BOUND;
370 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
371 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
372 atomic_inc(&wqe->wq->user->processes);
373 }
374 io_wqe_inc_running(wqe, worker);
375 }
771b53d0
JA
376}
377
378/*
379 * No work, worker going to sleep. Move to freelist, and unuse mm if we
380 * have one attached. Dropping the mm may potentially sleep, so we drop
381 * the lock in that case and return success. Since the caller has to
382 * retry the loop in that case (we changed task state), we don't regrab
383 * the lock if we return success.
384 */
385static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
386 __must_hold(wqe->lock)
387{
388 if (!(worker->flags & IO_WORKER_F_FREE)) {
389 worker->flags |= IO_WORKER_F_FREE;
021d1cdd 390 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
771b53d0
JA
391 }
392
393 return __io_worker_unuse(wqe, worker);
394}
395
60cf46ae
PB
396static inline unsigned int io_get_work_hash(struct io_wq_work *work)
397{
398 return work->flags >> IO_WQ_HASH_SHIFT;
399}
400
401static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
771b53d0
JA
402 __must_hold(wqe->lock)
403{
6206f0e1 404 struct io_wq_work_node *node, *prev;
86f3cd1b 405 struct io_wq_work *work, *tail;
60cf46ae 406 unsigned int hash;
771b53d0 407
6206f0e1
JA
408 wq_list_for_each(node, prev, &wqe->work_list) {
409 work = container_of(node, struct io_wq_work, list);
410
771b53d0 411 /* not hashed, can run anytime */
8766dd51 412 if (!io_wq_is_hashed(work)) {
86f3cd1b 413 wq_list_del(&wqe->work_list, node, prev);
771b53d0
JA
414 return work;
415 }
416
417 /* hashed, can run if not already running */
60cf46ae
PB
418 hash = io_get_work_hash(work);
419 if (!(wqe->hash_map & BIT(hash))) {
420 wqe->hash_map |= BIT(hash);
86f3cd1b
PB
421 /* all items with this hash lie in [work, tail] */
422 tail = wqe->hash_tail[hash];
423 wqe->hash_tail[hash] = NULL;
424 wq_list_cut(&wqe->work_list, &tail->list, prev);
771b53d0
JA
425 return work;
426 }
427 }
428
429 return NULL;
430}
431
cccf0ee8
JA
432static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
433{
434 if (worker->mm) {
f5678e7f 435 kthread_unuse_mm(worker->mm);
cccf0ee8
JA
436 mmput(worker->mm);
437 worker->mm = NULL;
438 }
37c54f9b 439
98447d65
JA
440 if (mmget_not_zero(work->identity->mm)) {
441 kthread_use_mm(work->identity->mm);
442 worker->mm = work->identity->mm;
cccf0ee8
JA
443 return;
444 }
445
446 /* failed grabbing mm, ensure work gets cancelled */
447 work->flags |= IO_WQ_WORK_CANCEL;
448}
449
91d8f519
DZ
450static inline void io_wq_switch_blkcg(struct io_worker *worker,
451 struct io_wq_work *work)
452{
453#ifdef CONFIG_BLK_CGROUP
0f203765
JA
454 if (!(work->flags & IO_WQ_WORK_BLKCG))
455 return;
98447d65
JA
456 if (work->identity->blkcg_css != worker->blkcg_css) {
457 kthread_associate_blkcg(work->identity->blkcg_css);
458 worker->blkcg_css = work->identity->blkcg_css;
91d8f519
DZ
459 }
460#endif
461}
462
cccf0ee8
JA
463static void io_wq_switch_creds(struct io_worker *worker,
464 struct io_wq_work *work)
465{
98447d65 466 const struct cred *old_creds = override_creds(work->identity->creds);
cccf0ee8 467
98447d65 468 worker->cur_creds = work->identity->creds;
cccf0ee8
JA
469 if (worker->saved_creds)
470 put_cred(old_creds); /* creds set by previous switch */
471 else
472 worker->saved_creds = old_creds;
473}
474
dc026a73
PB
475static void io_impersonate_work(struct io_worker *worker,
476 struct io_wq_work *work)
477{
98447d65
JA
478 if ((work->flags & IO_WQ_WORK_FILES) &&
479 current->files != work->identity->files) {
dc026a73 480 task_lock(current);
98447d65
JA
481 current->files = work->identity->files;
482 current->nsproxy = work->identity->nsproxy;
dc026a73 483 task_unlock(current);
3dd1680d
JA
484 if (!work->identity->files) {
485 /* failed grabbing files, ensure work gets cancelled */
486 work->flags |= IO_WQ_WORK_CANCEL;
487 }
dc026a73 488 }
98447d65
JA
489 if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs)
490 current->fs = work->identity->fs;
491 if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm)
dc026a73 492 io_wq_switch_mm(worker, work);
98447d65
JA
493 if ((work->flags & IO_WQ_WORK_CREDS) &&
494 worker->cur_creds != work->identity->creds)
dc026a73 495 io_wq_switch_creds(worker, work);
69228338
JA
496 if (work->flags & IO_WQ_WORK_FSIZE)
497 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize;
498 else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
499 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
91d8f519 500 io_wq_switch_blkcg(worker, work);
4ea33a97
JA
501#ifdef CONFIG_AUDIT
502 current->loginuid = work->identity->loginuid;
503 current->sessionid = work->identity->sessionid;
504#endif
dc026a73
PB
505}
506
507static void io_assign_current_work(struct io_worker *worker,
508 struct io_wq_work *work)
509{
d78298e7
PB
510 if (work) {
511 /* flush pending signals before assigning new work */
512 if (signal_pending(current))
513 flush_signals(current);
514 cond_resched();
515 }
dc026a73 516
4ea33a97
JA
517#ifdef CONFIG_AUDIT
518 current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET);
519 current->sessionid = AUDIT_SID_UNSET;
520#endif
521
dc026a73
PB
522 spin_lock_irq(&worker->lock);
523 worker->cur_work = work;
524 spin_unlock_irq(&worker->lock);
525}
526
60cf46ae
PB
527static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
528
771b53d0
JA
529static void io_worker_handle_work(struct io_worker *worker)
530 __releases(wqe->lock)
531{
771b53d0
JA
532 struct io_wqe *wqe = worker->wqe;
533 struct io_wq *wq = wqe->wq;
534
535 do {
86f3cd1b 536 struct io_wq_work *work;
f462fd36 537get_next:
771b53d0
JA
538 /*
539 * If we got some work, mark us as busy. If we didn't, but
540 * the list isn't empty, it means we stalled on hashed work.
541 * Mark us stalled so we don't keep looking for work when we
542 * can't make progress, any work completion or insertion will
543 * clear the stalled flag.
544 */
60cf46ae 545 work = io_get_next_work(wqe);
771b53d0
JA
546 if (work)
547 __io_worker_busy(wqe, worker, work);
6206f0e1 548 else if (!wq_list_empty(&wqe->work_list))
771b53d0
JA
549 wqe->flags |= IO_WQE_FLAG_STALLED;
550
95da8465 551 raw_spin_unlock_irq(&wqe->lock);
771b53d0
JA
552 if (!work)
553 break;
58e39319 554 io_assign_current_work(worker, work);
36c2f922 555
dc026a73
PB
556 /* handle a whole dependent link */
557 do {
86f3cd1b 558 struct io_wq_work *old_work, *next_hashed, *linked;
b089ed39 559 unsigned int hash = io_get_work_hash(work);
dc026a73 560
86f3cd1b 561 next_hashed = wq_next_work(work);
58e39319 562 io_impersonate_work(worker, work);
dc026a73 563
f4db7182
PB
564 old_work = work;
565 linked = wq->do_work(work);
86f3cd1b
PB
566
567 work = next_hashed;
568 if (!work && linked && !io_wq_is_hashed(linked)) {
569 work = linked;
570 linked = NULL;
571 }
572 io_assign_current_work(worker, work);
e9fd9396 573 wq->free_work(old_work);
dc026a73 574
86f3cd1b
PB
575 if (linked)
576 io_wqe_enqueue(wqe, linked);
577
578 if (hash != -1U && !next_hashed) {
95da8465 579 raw_spin_lock_irq(&wqe->lock);
dc026a73
PB
580 wqe->hash_map &= ~BIT_ULL(hash);
581 wqe->flags &= ~IO_WQE_FLAG_STALLED;
f462fd36
PB
582 /* skip unnecessary unlock-lock wqe->lock */
583 if (!work)
584 goto get_next;
95da8465 585 raw_spin_unlock_irq(&wqe->lock);
7d723065 586 }
58e39319 587 } while (work);
7d723065 588
95da8465 589 raw_spin_lock_irq(&wqe->lock);
771b53d0
JA
590 } while (1);
591}
592
771b53d0
JA
593static int io_wqe_worker(void *data)
594{
595 struct io_worker *worker = data;
596 struct io_wqe *wqe = worker->wqe;
597 struct io_wq *wq = wqe->wq;
771b53d0
JA
598
599 io_worker_start(wqe, worker);
600
601 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
506d95ff 602 set_current_state(TASK_INTERRUPTIBLE);
e995d512 603loop:
95da8465 604 raw_spin_lock_irq(&wqe->lock);
771b53d0
JA
605 if (io_wqe_run_queue(wqe)) {
606 __set_current_state(TASK_RUNNING);
607 io_worker_handle_work(worker);
e995d512 608 goto loop;
771b53d0
JA
609 }
610 /* drops the lock on success, retry */
611 if (__io_worker_idle(wqe, worker)) {
612 __release(&wqe->lock);
e995d512 613 goto loop;
771b53d0 614 }
95da8465 615 raw_spin_unlock_irq(&wqe->lock);
771b53d0
JA
616 if (signal_pending(current))
617 flush_signals(current);
618 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
619 continue;
620 /* timed out, exit unless we're the fixed worker */
621 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
622 !(worker->flags & IO_WORKER_F_FIXED))
623 break;
624 }
625
771b53d0 626 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
95da8465 627 raw_spin_lock_irq(&wqe->lock);
6206f0e1 628 if (!wq_list_empty(&wqe->work_list))
771b53d0
JA
629 io_worker_handle_work(worker);
630 else
95da8465 631 raw_spin_unlock_irq(&wqe->lock);
771b53d0
JA
632 }
633
634 io_worker_exit(worker);
635 return 0;
636}
637
771b53d0
JA
638/*
639 * Called when a worker is scheduled in. Mark us as currently running.
640 */
641void io_wq_worker_running(struct task_struct *tsk)
642{
643 struct io_worker *worker = kthread_data(tsk);
644 struct io_wqe *wqe = worker->wqe;
645
646 if (!(worker->flags & IO_WORKER_F_UP))
647 return;
648 if (worker->flags & IO_WORKER_F_RUNNING)
649 return;
650 worker->flags |= IO_WORKER_F_RUNNING;
c5def4ab 651 io_wqe_inc_running(wqe, worker);
771b53d0
JA
652}
653
654/*
655 * Called when worker is going to sleep. If there are no workers currently
656 * running and we have work pending, wake up a free one or have the manager
657 * set one up.
658 */
659void io_wq_worker_sleeping(struct task_struct *tsk)
660{
661 struct io_worker *worker = kthread_data(tsk);
662 struct io_wqe *wqe = worker->wqe;
663
664 if (!(worker->flags & IO_WORKER_F_UP))
665 return;
666 if (!(worker->flags & IO_WORKER_F_RUNNING))
667 return;
668
669 worker->flags &= ~IO_WORKER_F_RUNNING;
670
95da8465 671 raw_spin_lock_irq(&wqe->lock);
c5def4ab 672 io_wqe_dec_running(wqe, worker);
95da8465 673 raw_spin_unlock_irq(&wqe->lock);
771b53d0
JA
674}
675
b60fda60 676static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
771b53d0 677{
c4068bf8 678 struct io_wqe_acct *acct = &wqe->acct[index];
771b53d0
JA
679 struct io_worker *worker;
680
ad6e005c 681 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
771b53d0 682 if (!worker)
b60fda60 683 return false;
771b53d0
JA
684
685 refcount_set(&worker->ref, 1);
686 worker->nulls_node.pprev = NULL;
771b53d0 687 worker->wqe = wqe;
36c2f922 688 spin_lock_init(&worker->lock);
771b53d0
JA
689
690 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
c5def4ab 691 "io_wqe_worker-%d/%d", index, wqe->node);
771b53d0
JA
692 if (IS_ERR(worker->task)) {
693 kfree(worker);
b60fda60 694 return false;
771b53d0 695 }
a8b595b2 696 kthread_bind_mask(worker->task, cpumask_of_node(wqe->node));
771b53d0 697
95da8465 698 raw_spin_lock_irq(&wqe->lock);
021d1cdd 699 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
e61df66c 700 list_add_tail_rcu(&worker->all_list, &wqe->all_list);
771b53d0 701 worker->flags |= IO_WORKER_F_FREE;
c5def4ab
JA
702 if (index == IO_WQ_ACCT_BOUND)
703 worker->flags |= IO_WORKER_F_BOUND;
704 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
771b53d0 705 worker->flags |= IO_WORKER_F_FIXED;
c5def4ab 706 acct->nr_workers++;
95da8465 707 raw_spin_unlock_irq(&wqe->lock);
771b53d0 708
c5def4ab
JA
709 if (index == IO_WQ_ACCT_UNBOUND)
710 atomic_inc(&wq->user->processes);
711
c4068bf8 712 refcount_inc(&wq->refs);
771b53d0 713 wake_up_process(worker->task);
b60fda60 714 return true;
771b53d0
JA
715}
716
c5def4ab 717static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
771b53d0
JA
718 __must_hold(wqe->lock)
719{
c5def4ab 720 struct io_wqe_acct *acct = &wqe->acct[index];
771b53d0 721
c5def4ab 722 /* if we have available workers or no work, no need */
021d1cdd 723 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
c5def4ab
JA
724 return false;
725 return acct->nr_workers < acct->max_workers;
771b53d0
JA
726}
727
c4068bf8
HD
728/*
729 * Iterate the passed in list and call the specific function for each
730 * worker that isn't exiting
731 */
732static bool io_wq_for_each_worker(struct io_wqe *wqe,
733 bool (*func)(struct io_worker *, void *),
734 void *data)
735{
736 struct io_worker *worker;
737 bool ret = false;
738
739 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
740 if (io_worker_get(worker)) {
741 /* no task if node is/was offline */
742 if (worker->task)
743 ret = func(worker, data);
744 io_worker_release(worker);
745 if (ret)
746 break;
747 }
748 }
749
750 return ret;
751}
752
753static bool io_wq_worker_wake(struct io_worker *worker, void *data)
754{
755 wake_up_process(worker->task);
756 return false;
757}
758
771b53d0
JA
759/*
760 * Manager thread. Tasked with creating new workers, if we need them.
761 */
762static int io_wq_manager(void *data)
763{
764 struct io_wq *wq = data;
3fc50ab5 765 int node;
771b53d0 766
b60fda60 767 /* create fixed workers */
c4068bf8 768 refcount_set(&wq->refs, 1);
3fc50ab5 769 for_each_node(node) {
7563439a
JA
770 if (!node_online(node))
771 continue;
c4068bf8
HD
772 if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
773 continue;
774 set_bit(IO_WQ_BIT_ERROR, &wq->state);
775 set_bit(IO_WQ_BIT_EXIT, &wq->state);
776 goto out;
b60fda60 777 }
771b53d0 778
b60fda60
JA
779 complete(&wq->done);
780
781 while (!kthread_should_stop()) {
aa96bf8a
JA
782 if (current->task_works)
783 task_work_run();
784
3fc50ab5
JH
785 for_each_node(node) {
786 struct io_wqe *wqe = wq->wqes[node];
c5def4ab 787 bool fork_worker[2] = { false, false };
771b53d0 788
7563439a
JA
789 if (!node_online(node))
790 continue;
791
95da8465 792 raw_spin_lock_irq(&wqe->lock);
c5def4ab
JA
793 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
794 fork_worker[IO_WQ_ACCT_BOUND] = true;
795 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
796 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
95da8465 797 raw_spin_unlock_irq(&wqe->lock);
c5def4ab
JA
798 if (fork_worker[IO_WQ_ACCT_BOUND])
799 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
800 if (fork_worker[IO_WQ_ACCT_UNBOUND])
801 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
771b53d0
JA
802 }
803 set_current_state(TASK_INTERRUPTIBLE);
804 schedule_timeout(HZ);
805 }
806
aa96bf8a
JA
807 if (current->task_works)
808 task_work_run();
809
c4068bf8
HD
810out:
811 if (refcount_dec_and_test(&wq->refs)) {
b60fda60 812 complete(&wq->done);
c4068bf8
HD
813 return 0;
814 }
815 /* if ERROR is set and we get here, we have workers to wake */
816 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
817 rcu_read_lock();
818 for_each_node(node)
819 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
820 rcu_read_unlock();
821 }
b60fda60 822 return 0;
771b53d0
JA
823}
824
c5def4ab
JA
825static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
826 struct io_wq_work *work)
827{
828 bool free_worker;
829
830 if (!(work->flags & IO_WQ_WORK_UNBOUND))
831 return true;
832 if (atomic_read(&acct->nr_running))
833 return true;
834
835 rcu_read_lock();
021d1cdd 836 free_worker = !hlist_nulls_empty(&wqe->free_list);
c5def4ab
JA
837 rcu_read_unlock();
838 if (free_worker)
839 return true;
840
841 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
842 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
843 return false;
844
845 return true;
846}
847
e9fd9396 848static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
fc04c39b 849{
e9fd9396
PB
850 struct io_wq *wq = wqe->wq;
851
fc04c39b
PB
852 do {
853 struct io_wq_work *old_work = work;
854
855 work->flags |= IO_WQ_WORK_CANCEL;
f4db7182 856 work = wq->do_work(work);
e9fd9396 857 wq->free_work(old_work);
fc04c39b
PB
858 } while (work);
859}
860
86f3cd1b
PB
861static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
862{
863 unsigned int hash;
864 struct io_wq_work *tail;
865
866 if (!io_wq_is_hashed(work)) {
867append:
868 wq_list_add_tail(&work->list, &wqe->work_list);
869 return;
870 }
871
872 hash = io_get_work_hash(work);
873 tail = wqe->hash_tail[hash];
874 wqe->hash_tail[hash] = work;
875 if (!tail)
876 goto append;
877
878 wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
879}
880
771b53d0
JA
881static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
882{
c5def4ab 883 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
895e2ca0 884 int work_flags;
771b53d0
JA
885 unsigned long flags;
886
c5def4ab
JA
887 /*
888 * Do early check to see if we need a new unbound worker, and if we do,
889 * if we're allowed to do so. This isn't 100% accurate as there's a
890 * gap between this check and incrementing the value, but that's OK.
891 * It's close enough to not be an issue, fork() has the same delay.
892 */
893 if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
e9fd9396 894 io_run_cancel(work, wqe);
c5def4ab
JA
895 return;
896 }
897
895e2ca0 898 work_flags = work->flags;
95da8465 899 raw_spin_lock_irqsave(&wqe->lock, flags);
86f3cd1b 900 io_wqe_insert_work(wqe, work);
771b53d0 901 wqe->flags &= ~IO_WQE_FLAG_STALLED;
95da8465 902 raw_spin_unlock_irqrestore(&wqe->lock, flags);
771b53d0 903
895e2ca0
JA
904 if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
905 !atomic_read(&acct->nr_running))
c5def4ab 906 io_wqe_wake_worker(wqe, acct);
771b53d0
JA
907}
908
909void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
910{
911 struct io_wqe *wqe = wq->wqes[numa_node_id()];
912
913 io_wqe_enqueue(wqe, work);
914}
915
916/*
8766dd51
PB
917 * Work items that hash to the same value will not be done in parallel.
918 * Used to limit concurrent writes, generally hashed by inode.
771b53d0 919 */
8766dd51 920void io_wq_hash_work(struct io_wq_work *work, void *val)
771b53d0 921{
8766dd51 922 unsigned int bit;
771b53d0
JA
923
924 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
925 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
771b53d0
JA
926}
927
62755e35 928struct io_cb_cancel_data {
2293b419
PB
929 work_cancel_fn *fn;
930 void *data;
4f26bda1
PB
931 int nr_running;
932 int nr_pending;
933 bool cancel_all;
62755e35
JA
934};
935
2293b419 936static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
62755e35 937{
2293b419 938 struct io_cb_cancel_data *match = data;
6f72653e 939 unsigned long flags;
62755e35
JA
940
941 /*
942 * Hold the lock to avoid ->cur_work going out of scope, caller
36c2f922 943 * may dereference the passed in work.
62755e35 944 */
36c2f922 945 spin_lock_irqsave(&worker->lock, flags);
62755e35 946 if (worker->cur_work &&
0c9d5ccd 947 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
2293b419 948 match->fn(worker->cur_work, match->data)) {
771b53d0 949 send_sig(SIGINT, worker->task, 1);
4f26bda1 950 match->nr_running++;
771b53d0 951 }
36c2f922 952 spin_unlock_irqrestore(&worker->lock, flags);
771b53d0 953
4f26bda1 954 return match->nr_running && !match->cancel_all;
771b53d0
JA
955}
956
204361a7
PB
957static inline void io_wqe_remove_pending(struct io_wqe *wqe,
958 struct io_wq_work *work,
959 struct io_wq_work_node *prev)
960{
961 unsigned int hash = io_get_work_hash(work);
962 struct io_wq_work *prev_work = NULL;
963
964 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
965 if (prev)
966 prev_work = container_of(prev, struct io_wq_work, list);
967 if (prev_work && io_get_work_hash(prev_work) == hash)
968 wqe->hash_tail[hash] = prev_work;
969 else
970 wqe->hash_tail[hash] = NULL;
971 }
972 wq_list_del(&wqe->work_list, &work->list, prev);
973}
974
4f26bda1 975static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
f4c2665e 976 struct io_cb_cancel_data *match)
771b53d0 977{
6206f0e1 978 struct io_wq_work_node *node, *prev;
771b53d0 979 struct io_wq_work *work;
6f72653e 980 unsigned long flags;
771b53d0 981
4f26bda1 982retry:
95da8465 983 raw_spin_lock_irqsave(&wqe->lock, flags);
6206f0e1
JA
984 wq_list_for_each(node, prev, &wqe->work_list) {
985 work = container_of(node, struct io_wq_work, list);
4f26bda1
PB
986 if (!match->fn(work, match->data))
987 continue;
204361a7 988 io_wqe_remove_pending(wqe, work, prev);
95da8465 989 raw_spin_unlock_irqrestore(&wqe->lock, flags);
4f26bda1
PB
990 io_run_cancel(work, wqe);
991 match->nr_pending++;
992 if (!match->cancel_all)
993 return;
994
995 /* not safe to continue after unlock */
996 goto retry;
771b53d0 997 }
95da8465 998 raw_spin_unlock_irqrestore(&wqe->lock, flags);
f4c2665e
PB
999}
1000
4f26bda1 1001static void io_wqe_cancel_running_work(struct io_wqe *wqe,
f4c2665e
PB
1002 struct io_cb_cancel_data *match)
1003{
771b53d0 1004 rcu_read_lock();
4f26bda1 1005 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
771b53d0 1006 rcu_read_unlock();
771b53d0
JA
1007}
1008
2293b419 1009enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
4f26bda1 1010 void *data, bool cancel_all)
771b53d0 1011{
2293b419 1012 struct io_cb_cancel_data match = {
4f26bda1
PB
1013 .fn = cancel,
1014 .data = data,
1015 .cancel_all = cancel_all,
00bcda13 1016 };
3fc50ab5 1017 int node;
771b53d0 1018
f4c2665e
PB
1019 /*
1020 * First check pending list, if we're lucky we can just remove it
1021 * from there. CANCEL_OK means that the work is returned as-new,
1022 * no completion will be posted for it.
1023 */
3fc50ab5
JH
1024 for_each_node(node) {
1025 struct io_wqe *wqe = wq->wqes[node];
771b53d0 1026
4f26bda1
PB
1027 io_wqe_cancel_pending_work(wqe, &match);
1028 if (match.nr_pending && !match.cancel_all)
f4c2665e 1029 return IO_WQ_CANCEL_OK;
771b53d0
JA
1030 }
1031
f4c2665e
PB
1032 /*
1033 * Now check if a free (going busy) or busy worker has the work
1034 * currently running. If we find it there, we'll return CANCEL_RUNNING
1035 * as an indication that we attempt to signal cancellation. The
1036 * completion will run normally in this case.
1037 */
1038 for_each_node(node) {
1039 struct io_wqe *wqe = wq->wqes[node];
1040
4f26bda1
PB
1041 io_wqe_cancel_running_work(wqe, &match);
1042 if (match.nr_running && !match.cancel_all)
f4c2665e
PB
1043 return IO_WQ_CANCEL_RUNNING;
1044 }
1045
4f26bda1
PB
1046 if (match.nr_running)
1047 return IO_WQ_CANCEL_RUNNING;
1048 if (match.nr_pending)
1049 return IO_WQ_CANCEL_OK;
f4c2665e 1050 return IO_WQ_CANCEL_NOTFOUND;
771b53d0
JA
1051}
1052
576a347b 1053struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
771b53d0 1054{
3fc50ab5 1055 int ret = -ENOMEM, node;
771b53d0
JA
1056 struct io_wq *wq;
1057
f5fa38c5 1058 if (WARN_ON_ONCE(!data->free_work || !data->do_work))
e9fd9396
PB
1059 return ERR_PTR(-EINVAL);
1060
ad6e005c 1061 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
771b53d0
JA
1062 if (!wq)
1063 return ERR_PTR(-ENOMEM);
1064
3fc50ab5 1065 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
43c01fbe
JA
1066 if (!wq->wqes)
1067 goto err_wq;
1068
1069 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1070 if (ret)
1071 goto err_wqes;
771b53d0 1072
e9fd9396 1073 wq->free_work = data->free_work;
f5fa38c5 1074 wq->do_work = data->do_work;
7d723065 1075
c5def4ab 1076 /* caller must already hold a reference to this */
576a347b 1077 wq->user = data->user;
c5def4ab 1078
43c01fbe 1079 ret = -ENOMEM;
3fc50ab5 1080 for_each_node(node) {
771b53d0 1081 struct io_wqe *wqe;
7563439a 1082 int alloc_node = node;
771b53d0 1083
7563439a
JA
1084 if (!node_online(alloc_node))
1085 alloc_node = NUMA_NO_NODE;
1086 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
771b53d0 1087 if (!wqe)
3fc50ab5
JH
1088 goto err;
1089 wq->wqes[node] = wqe;
7563439a 1090 wqe->node = alloc_node;
c5def4ab
JA
1091 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1092 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
576a347b 1093 if (wq->user) {
c5def4ab
JA
1094 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1095 task_rlimit(current, RLIMIT_NPROC);
1096 }
1097 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
771b53d0 1098 wqe->wq = wq;
95da8465 1099 raw_spin_lock_init(&wqe->lock);
6206f0e1 1100 INIT_WQ_LIST(&wqe->work_list);
021d1cdd 1101 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
e61df66c 1102 INIT_LIST_HEAD(&wqe->all_list);
771b53d0
JA
1103 }
1104
1105 init_completion(&wq->done);
1106
771b53d0
JA
1107 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1108 if (!IS_ERR(wq->manager)) {
1109 wake_up_process(wq->manager);
b60fda60
JA
1110 wait_for_completion(&wq->done);
1111 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1112 ret = -ENOMEM;
1113 goto err;
1114 }
848f7e18 1115 refcount_set(&wq->use_refs, 1);
b60fda60 1116 reinit_completion(&wq->done);
771b53d0
JA
1117 return wq;
1118 }
1119
1120 ret = PTR_ERR(wq->manager);
771b53d0 1121 complete(&wq->done);
b60fda60 1122err:
43c01fbe 1123 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
3fc50ab5
JH
1124 for_each_node(node)
1125 kfree(wq->wqes[node]);
43c01fbe 1126err_wqes:
b60fda60 1127 kfree(wq->wqes);
43c01fbe 1128err_wq:
b60fda60 1129 kfree(wq);
771b53d0
JA
1130 return ERR_PTR(ret);
1131}
1132
eba6f5a3
PB
1133bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
1134{
f5fa38c5 1135 if (data->free_work != wq->free_work || data->do_work != wq->do_work)
eba6f5a3
PB
1136 return false;
1137
1138 return refcount_inc_not_zero(&wq->use_refs);
1139}
1140
848f7e18 1141static void __io_wq_destroy(struct io_wq *wq)
771b53d0 1142{
3fc50ab5 1143 int node;
771b53d0 1144
43c01fbe
JA
1145 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1146
b60fda60
JA
1147 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1148 if (wq->manager)
771b53d0 1149 kthread_stop(wq->manager);
771b53d0
JA
1150
1151 rcu_read_lock();
3fc50ab5
JH
1152 for_each_node(node)
1153 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
771b53d0
JA
1154 rcu_read_unlock();
1155
1156 wait_for_completion(&wq->done);
1157
3fc50ab5
JH
1158 for_each_node(node)
1159 kfree(wq->wqes[node]);
771b53d0
JA
1160 kfree(wq->wqes);
1161 kfree(wq);
1162}
848f7e18
JA
1163
1164void io_wq_destroy(struct io_wq *wq)
1165{
1166 if (refcount_dec_and_test(&wq->use_refs))
1167 __io_wq_destroy(wq);
1168}
aa96bf8a
JA
1169
1170struct task_struct *io_wq_get_task(struct io_wq *wq)
1171{
1172 return wq->manager;
1173}
43c01fbe
JA
1174
1175static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1176{
1177 struct task_struct *task = worker->task;
1178 struct rq_flags rf;
1179 struct rq *rq;
1180
1181 rq = task_rq_lock(task, &rf);
1182 do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node));
1183 task->flags |= PF_NO_SETAFFINITY;
1184 task_rq_unlock(rq, task, &rf);
1185 return false;
1186}
1187
1188static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1189{
1190 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1191 int i;
1192
1193 rcu_read_lock();
1194 for_each_node(i)
1195 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL);
1196 rcu_read_unlock();
1197 return 0;
1198}
1199
1200static __init int io_wq_init(void)
1201{
1202 int ret;
1203
1204 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1205 io_wq_cpu_online, NULL);
1206 if (ret < 0)
1207 return ret;
1208 io_wq_online = ret;
1209 return 0;
1210}
1211subsys_initcall(io_wq_init);