refcount_t refs;
struct completion done;
+ atomic_t worker_refs;
+ struct completion worker_done;
+
struct hlist_node cpuhp_node;
pid_t task_pid;
raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu);
- io_wq_put(wqe->wq);
+ if (atomic_dec_and_test(&wqe->wq->worker_refs))
+ complete(&wqe->wq->worker_done);
}
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
init_completion(&worker->ref_done);
init_completion(&worker->started);
- refcount_inc(&wq->refs);
+ atomic_inc(&wq->worker_refs);
if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
- io_wq_put(wq);
+ if (atomic_dec_and_test(&wq->worker_refs))
+ complete(&wq->worker_done);
kfree(worker);
return false;
}
{
struct io_wq *wq = data;
char buf[TASK_COMM_LEN];
+ int node;
sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf);
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq);
+
+ rcu_read_lock();
+ for_each_node(node)
+ io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
+ rcu_read_unlock();
+
+ /* we might not ever have created any workers */
+ if (atomic_read(&wq->worker_refs))
+ wait_for_completion(&wq->worker_done);
wq->manager = NULL;
io_wq_put(wq);
do_exit(0);
if (wq->manager)
return 0;
+ reinit_completion(&wq->worker_done);
clear_bit(IO_WQ_BIT_EXIT, &wq->state);
refcount_inc(&wq->refs);
current->flags |= PF_IO_WORKER;
init_completion(&wq->done);
refcount_set(&wq->refs, 1);
+ init_completion(&wq->worker_done);
+ atomic_set(&wq->worker_refs, 0);
+
ret = io_wq_fork_manager(wq);
if (!ret)
return wq;
if (wq->manager)
wake_up_process(wq->manager);
- rcu_read_lock();
- for_each_node(node)
- io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
- rcu_read_unlock();
-
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];