+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Jens Axboe <axboe@kernel.dk>
+Date: Tue, 31 Aug 2021 13:57:32 -0600
+Subject: [PATCH] io-wq: split bounded and unbounded work into separate lists
+
+We've got a few issues that all boil down to the fact that we have one
+list of pending work items, yet two different types of workers to
+serve them. This causes some oddities around workers switching type and
+even hashed work vs regular work on the same bounded list.
+
+Just separate them out cleanly, similarly to how we already do
+accounting of what is running. That provides a clean separation and
+removes some corner cases that can cause stalls when handling IO
+that is punted to io-wq.
+
+Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
+Signed-off-by: Jens Axboe <axboe@kernel.dk>
+[backport]
+Signed-off-by: Fabian Ebner <f.ebner@proxmox.com>
+---
+ fs/io-wq.c | 156 +++++++++++++++++++++++------------------------------
+ 1 file changed, 68 insertions(+), 88 deletions(-)
+
+diff --git a/fs/io-wq.c b/fs/io-wq.c
+index 33678185f3bc..2496d8781ea1 100644
+--- a/fs/io-wq.c
++++ b/fs/io-wq.c
+@@ -34,7 +34,7 @@ enum {
+ };
+
+ enum {
+- IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
++ IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
+ };
+
+ /*
+@@ -73,25 +73,24 @@ struct io_wqe_acct {
+ unsigned max_workers;
+ int index;
+ atomic_t nr_running;
++ struct io_wq_work_list work_list;
++ unsigned long flags;
+ };
+
+ enum {
+ IO_WQ_ACCT_BOUND,
+ IO_WQ_ACCT_UNBOUND,
++ IO_WQ_ACCT_NR,
+ };
+
+ /*
+ * Per-node worker thread pool
+ */
+ struct io_wqe {
+- struct {
+- raw_spinlock_t lock;
+- struct io_wq_work_list work_list;
+- unsigned flags;
+- } ____cacheline_aligned_in_smp;
++ raw_spinlock_t lock;
++ struct io_wqe_acct acct[2];
+
+ int node;
+- struct io_wqe_acct acct[2];
+
+ struct hlist_nulls_head free_list;
+ struct list_head all_list;
+@@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker)
+ do_exit(0);
+ }
+
+-static inline bool io_wqe_run_queue(struct io_wqe *wqe)
+- __must_hold(wqe->lock)
++static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
+ {
+- if (!wq_list_empty(&wqe->work_list) &&
+- !(wqe->flags & IO_WQE_FLAG_STALLED))
++ if (!wq_list_empty(&acct->work_list) &&
++ !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
+ return true;
+ return false;
+ }
+@@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
+ * Check head of free list for an available worker. If one isn't available,
+ * caller must create one.
+ */
+-static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
++static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
++ struct io_wqe_acct *acct)
+ __must_hold(RCU)
+ {
+ struct hlist_nulls_node *n;
+@@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
+ hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
+ if (!io_worker_get(worker))
+ continue;
++ if (io_wqe_get_acct(worker) != acct) {
++ io_worker_release(worker);
++ continue;
++ }
+ if (wake_up_process(worker->task)) {
+ io_worker_release(worker);
+ return true;
+@@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
+ if (!(worker->flags & IO_WORKER_F_UP))
+ return;
+
+- if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
++ if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
+ atomic_inc(&acct->nr_running);
+ atomic_inc(&wqe->wq->worker_refs);
+ io_queue_worker_create(wqe, worker, acct);
+@@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
+ struct io_wq_work *work)
+ __must_hold(wqe->lock)
+ {
+- bool worker_bound, work_bound;
+-
+- BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
+-
+ if (worker->flags & IO_WORKER_F_FREE) {
+ worker->flags &= ~IO_WORKER_F_FREE;
+ hlist_nulls_del_init_rcu(&worker->nulls_node);
+ }
+-
+- /*
+- * If worker is moving from bound to unbound (or vice versa), then
+- * ensure we update the running accounting.
+- */
+- worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
+- work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
+- if (worker_bound != work_bound) {
+- int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
+- io_wqe_dec_running(worker);
+- worker->flags ^= IO_WORKER_F_BOUND;
+- wqe->acct[index].nr_workers--;
+- wqe->acct[index ^ 1].nr_workers++;
+- io_wqe_inc_running(worker);
+- }
+ }
+
+ /*
+@@ -417,44 +401,23 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
+ spin_unlock(&wq->hash->wait.lock);
+ }
+
+-/*
+- * We can always run the work if the worker is currently the same type as
+- * the work (eg both are bound, or both are unbound). If they are not the
+- * same, only allow it if incrementing the worker count would be allowed.
+- */
+-static bool io_worker_can_run_work(struct io_worker *worker,
+- struct io_wq_work *work)
+-{
+- struct io_wqe_acct *acct;
+-
+- if (!(worker->flags & IO_WORKER_F_BOUND) !=
+- !(work->flags & IO_WQ_WORK_UNBOUND))
+- return true;
+-
+- /* not the same type, check if we'd go over the limit */
+- acct = io_work_get_acct(worker->wqe, work);
+- return acct->nr_workers < acct->max_workers;
+-}
+-
+-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
++static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
+ struct io_worker *worker)
+ __must_hold(wqe->lock)
+ {
+ struct io_wq_work_node *node, *prev;
+ struct io_wq_work *work, *tail;
+ unsigned int stall_hash = -1U;
++ struct io_wqe *wqe = worker->wqe;
+
+- wq_list_for_each(node, prev, &wqe->work_list) {
++ wq_list_for_each(node, prev, &acct->work_list) {
+ unsigned int hash;
+
+ work = container_of(node, struct io_wq_work, list);
+
+- if (!io_worker_can_run_work(worker, work))
+- break;
+-
+ /* not hashed, can run anytime */
+ if (!io_wq_is_hashed(work)) {
+- wq_list_del(&wqe->work_list, node, prev);
++ wq_list_del(&acct->work_list, node, prev);
+ return work;
+ }
+
+@@ -465,7 +428,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
+ /* hashed, can run if not already running */
+ if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
+ wqe->hash_tail[hash] = NULL;
+- wq_list_cut(&wqe->work_list, &tail->list, prev);
++ wq_list_cut(&acct->work_list, &tail->list, prev);
+ return work;
+ }
+ if (stall_hash == -1U)
+@@ -479,7 +442,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
+ * Set this before dropping the lock to avoid racing with new
+ * work being added and clearing the stalled bit.
+ */
+- wqe->flags |= IO_WQE_FLAG_STALLED;
++ set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ raw_spin_unlock(&wqe->lock);
+ io_wait_on_hash(wqe, stall_hash);
+ raw_spin_lock(&wqe->lock);
+@@ -516,6 +479,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
+ static void io_worker_handle_work(struct io_worker *worker)
+ __releases(wqe->lock)
+ {
++ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wqe *wqe = worker->wqe;
+ struct io_wq *wq = wqe->wq;
+ bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
+@@ -530,7 +494,7 @@ static void io_worker_handle_work(struct io_worker *worker)
+ * can't make progress, any work completion or insertion will
+ * clear the stalled flag.
+ */
+- work = io_get_next_work(wqe, worker);
++ work = io_get_next_work(acct, worker);
+ if (work)
+ __io_worker_busy(wqe, worker, work);
+
+@@ -564,10 +528,10 @@ static void io_worker_handle_work(struct io_worker *worker)
+
+ if (hash != -1U && !next_hashed) {
+ clear_bit(hash, &wq->hash->map);
++ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ if (wq_has_sleeper(&wq->hash->wait))
+ wake_up(&wq->hash->wait);
+ raw_spin_lock_irq(&wqe->lock);
+- wqe->flags &= ~IO_WQE_FLAG_STALLED;
+ /* skip unnecessary unlock-lock wqe->lock */
+ if (!work)
+ goto get_next;
+@@ -582,6 +546,7 @@ static void io_worker_handle_work(struct io_worker *worker)
+ static int io_wqe_worker(void *data)
+ {
+ struct io_worker *worker = data;
++ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wqe *wqe = worker->wqe;
+ struct io_wq *wq = wqe->wq;
+ char buf[TASK_COMM_LEN];
+@@ -597,7 +562,7 @@ static int io_wqe_worker(void *data)
+ set_current_state(TASK_INTERRUPTIBLE);
+ loop:
+ raw_spin_lock_irq(&wqe->lock);
+- if (io_wqe_run_queue(wqe)) {
++ if (io_acct_run_queue(acct)) {
+ io_worker_handle_work(worker);
+ goto loop;
+ }
+@@ -623,7 +588,7 @@ static int io_wqe_worker(void *data)
+
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
+ raw_spin_lock_irq(&wqe->lock);
+- if (!wq_list_empty(&wqe->work_list))
++ if (!wq_list_empty(&acct->work_list))
+ io_worker_handle_work(worker);
+ else
+ raw_spin_unlock_irq(&wqe->lock);
+@@ -769,12 +734,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
+
+ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
+ {
++ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ unsigned int hash;
+ struct io_wq_work *tail;
+
+ if (!io_wq_is_hashed(work)) {
+ append:
+- wq_list_add_tail(&work->list, &wqe->work_list);
++ wq_list_add_tail(&work->list, &acct->work_list);
+ return;
+ }
+
+@@ -784,7 +750,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
+ if (!tail)
+ goto append;
+
+- wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
++ wq_list_add_after(&work->list, &tail->list, &acct->work_list);
+ }
+
+ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
+@@ -806,10 +772,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
+
+ raw_spin_lock_irqsave(&wqe->lock, flags);
+ io_wqe_insert_work(wqe, work);
+- wqe->flags &= ~IO_WQE_FLAG_STALLED;
++ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+
+ rcu_read_lock();
+- do_create = !io_wqe_activate_free_worker(wqe);
++ do_create = !io_wqe_activate_free_worker(wqe, acct);
+ rcu_read_unlock();
+
+ raw_spin_unlock_irqrestore(&wqe->lock, flags);
+@@ -862,6 +828,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
+ struct io_wq_work *work,
+ struct io_wq_work_node *prev)
+ {
++ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ unsigned int hash = io_get_work_hash(work);
+ struct io_wq_work *prev_work = NULL;
+
+@@ -873,7 +840,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
+ else
+ wqe->hash_tail[hash] = NULL;
+ }
+- wq_list_del(&wqe->work_list, &work->list, prev);
++ wq_list_del(&acct->work_list, &work->list, prev);
+ }
+
+ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
+@@ -882,22 +849,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
+ struct io_wq_work_node *node, *prev;
+ struct io_wq_work *work;
+ unsigned long flags;
++ int i;
+
+ retry:
+ raw_spin_lock_irqsave(&wqe->lock, flags);
+- wq_list_for_each(node, prev, &wqe->work_list) {
+- work = container_of(node, struct io_wq_work, list);
+- if (!match->fn(work, match->data))
+- continue;
+- io_wqe_remove_pending(wqe, work, prev);
+- raw_spin_unlock_irqrestore(&wqe->lock, flags);
+- io_run_cancel(work, wqe);
+- match->nr_pending++;
+- if (!match->cancel_all)
+- return;
++ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
++ struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
+
+- /* not safe to continue after unlock */
+- goto retry;
++ wq_list_for_each(node, prev, &acct->work_list) {
++ work = container_of(node, struct io_wq_work, list);
++ if (!match->fn(work, match->data))
++ continue;
++ io_wqe_remove_pending(wqe, work, prev);
++ raw_spin_unlock_irqrestore(&wqe->lock, flags);
++ io_run_cancel(work, wqe);
++ match->nr_pending++;
++ if (!match->cancel_all)
++ return;
++
++ /* not safe to continue after unlock */
++ goto retry;
++ }
+ }
+ raw_spin_unlock_irqrestore(&wqe->lock, flags);
+ }
+@@ -958,18 +930,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
+ int sync, void *key)
+ {
+ struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
++ int i;
+
+ list_del_init(&wait->entry);
+
+ rcu_read_lock();
+- io_wqe_activate_free_worker(wqe);
++ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
++ struct io_wqe_acct *acct = &wqe->acct[i];
++
++ if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
++ io_wqe_activate_free_worker(wqe, acct);
++ }
+ rcu_read_unlock();
+ return 1;
+ }
+
+ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
+ {
+- int ret = -ENOMEM, node;
++ int ret, node, i;
+ struct io_wq *wq;
+
+ if (WARN_ON_ONCE(!data->free_work || !data->do_work))
+@@ -1006,18 +984,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
+ goto err;
+ wq->wqes[node] = wqe;
+ wqe->node = alloc_node;
+- wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
+- wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
+ wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
+- atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
+ wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
+ task_rlimit(current, RLIMIT_NPROC);
+- atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
+- wqe->wait.func = io_wqe_hash_wake;
+ INIT_LIST_HEAD(&wqe->wait.entry);
++ wqe->wait.func = io_wqe_hash_wake;
++ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
++ struct io_wqe_acct *acct = &wqe->acct[i];
++
++ acct->index = i;
++ atomic_set(&acct->nr_running, 0);
++ INIT_WQ_LIST(&acct->work_list);
++ }
+ wqe->wq = wq;
+ raw_spin_lock_init(&wqe->lock);
+- INIT_WQ_LIST(&wqe->work_list);
+ INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
+ INIT_LIST_HEAD(&wqe->all_list);
+ }