]> git.proxmox.com Git - mirror_qemu.git/commitdiff
thread-pool: avoid passing the pool parameter every time
authorEmanuele Giuseppe Esposito <eesposit@redhat.com>
Fri, 3 Feb 2023 13:17:31 +0000 (08:17 -0500)
committerKevin Wolf <kwolf@redhat.com>
Tue, 25 Apr 2023 11:17:28 +0000 (13:17 +0200)
thread_pool_submit_aio() is always called on a pool taken from
qemu_get_current_aio_context(), and that is the only intended
use: each pool runs only in the same thread that is submitting
work to it, it can't run anywhere else.

Therefore simplify the thread_pool_submit* API and remove the
ThreadPool function parameter.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
Message-Id: <20230203131731.851116-5-eesposit@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
12 files changed:
backends/tpm/tpm_backend.c
block/file-posix.c
block/file-win32.c
block/qcow2-threads.c
hw/9pfs/coth.c
hw/ppc/spapr_nvdimm.c
hw/virtio/virtio-pmem.c
include/block/thread-pool.h
scsi/pr-manager.c
scsi/qemu-pr-helper.c
tests/unit/test-thread-pool.c
util/thread-pool.c

index 375587e743d022cac2dbea6affcf96dce4385a60..485a20b9e09f66ffc7879ac6f9d6fca6dcb26d19 100644 (file)
@@ -100,8 +100,6 @@ bool tpm_backend_had_startup_error(TPMBackend *s)
 
 void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
 {
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
-
     if (s->cmd != NULL) {
         error_report("There is a TPM request pending");
         return;
@@ -109,7 +107,7 @@ void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
 
     s->cmd = cmd;
     object_ref(OBJECT(s));
-    thread_pool_submit_aio(pool, tpm_backend_worker_thread, s,
+    thread_pool_submit_aio(tpm_backend_worker_thread, s,
                            tpm_backend_request_completed, s);
 }
 
index 173b3b165307dd8b2de3fb752946f94ebed201b8..c7b723368e53ecfc6a404a37facf648c59139bae 100644 (file)
@@ -2042,9 +2042,7 @@ out:
 
 static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
 {
-    /* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
-    return thread_pool_submit_co(pool, func, arg);
+    return thread_pool_submit_co(func, arg);
 }
 
 /*
index 0aedb0875cec2cc34cae277790ff73940bbafbc1..48b790d91739429e6f93ce6e9cbb5d5957850ba3 100644 (file)
@@ -153,7 +153,6 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
         BlockCompletionFunc *cb, void *opaque, int type)
 {
     RawWin32AIOData *acb = g_new(RawWin32AIOData, 1);
-    ThreadPool *pool;
 
     acb->bs = bs;
     acb->hfile = hfile;
@@ -168,8 +167,7 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
     acb->aio_offset = offset;
 
     trace_file_paio_submit(acb, opaque, offset, count, type);
-    pool = aio_get_thread_pool(qemu_get_current_aio_context());
-    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
 }
 
 int qemu_ftruncate64(int fd, int64_t length)
index 6d2e6b7bf412f3b902e02766fbb77203389b463c..d6071a1eae4a087c90b7c8eb287e8bcae8ec4905 100644 (file)
@@ -43,7 +43,6 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
 {
     int ret;
     BDRVQcow2State *s = bs->opaque;
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
 
     qemu_co_mutex_lock(&s->lock);
     while (s->nb_threads >= QCOW2_MAX_THREADS) {
@@ -52,7 +51,7 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
     s->nb_threads++;
     qemu_co_mutex_unlock(&s->lock);
 
-    ret = thread_pool_submit_co(pool, func, arg);
+    ret = thread_pool_submit_co(func, arg);
 
     qemu_co_mutex_lock(&s->lock);
     s->nb_threads--;
index 2802d41cce20114090f593afe22c19ba07bd8ce6..598f46add9936c83ea3594e601329a634313576f 100644 (file)
@@ -41,6 +41,5 @@ static int coroutine_enter_func(void *arg)
 void co_run_in_worker_bh(void *opaque)
 {
     Coroutine *co = opaque;
-    thread_pool_submit_aio(aio_get_thread_pool(qemu_get_aio_context()),
-                           coroutine_enter_func, co, coroutine_enter_cb, co);
+    thread_pool_submit_aio(coroutine_enter_func, co, coroutine_enter_cb, co);
 }
index 04a64cada339f1e82432516cf6538947a7723541..a8688243a6fd6336be3631080b5fd4964c983b7c 100644 (file)
@@ -496,7 +496,6 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
 {
     SpaprNVDIMMDevice *s_nvdimm = (SpaprNVDIMMDevice *)opaque;
     SpaprNVDIMMDeviceFlushState *state;
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
     HostMemoryBackend *backend = MEMORY_BACKEND(PC_DIMM(s_nvdimm)->hostmem);
     bool is_pmem = object_property_get_bool(OBJECT(backend), "pmem", NULL);
     bool pmem_override = object_property_get_bool(OBJECT(s_nvdimm),
@@ -517,7 +516,7 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
     }
 
     QLIST_FOREACH(state, &s_nvdimm->pending_nvdimm_flush_states, node) {
-        thread_pool_submit_aio(pool, flush_worker_cb, state,
+        thread_pool_submit_aio(flush_worker_cb, state,
                                spapr_nvdimm_flush_completion_cb, state);
     }
 
@@ -664,7 +663,6 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
     PCDIMMDevice *dimm;
     HostMemoryBackend *backend = NULL;
     SpaprNVDIMMDeviceFlushState *state;
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
     int fd;
 
     if (!drc || !drc->dev ||
@@ -699,7 +697,7 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
 
         state->drcidx = drc_index;
 
-        thread_pool_submit_aio(pool, flush_worker_cb, state,
+        thread_pool_submit_aio(flush_worker_cb, state,
                                spapr_nvdimm_flush_completion_cb, state);
 
         continue_token = state->continue_token;
index dff402f08f8642be1d1f2d0e7fc5d2ba7611ac03..c3512c2dae3f5d17cbd4ba193d0190b2b8abdfdb 100644 (file)
@@ -70,7 +70,6 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
     VirtIODeviceRequest *req_data;
     VirtIOPMEM *pmem = VIRTIO_PMEM(vdev);
     HostMemoryBackend *backend = MEMORY_BACKEND(pmem->memdev);
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
 
     trace_virtio_pmem_flush_request();
     req_data = virtqueue_pop(vq, sizeof(VirtIODeviceRequest));
@@ -88,7 +87,7 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
     req_data->fd   = memory_region_get_fd(&backend->mr);
     req_data->pmem = pmem;
     req_data->vdev = vdev;
-    thread_pool_submit_aio(pool, worker_cb, req_data, done_cb, req_data);
+    thread_pool_submit_aio(worker_cb, req_data, done_cb, req_data);
 }
 
 static void virtio_pmem_get_config(VirtIODevice *vdev, uint8_t *config)
index c408bde74c9a1e8ccdaf16ff03ce665bbc9c2042..948ff5f30c31de452361578dec9fa252817b4a69 100644 (file)
@@ -33,12 +33,10 @@ void thread_pool_free(ThreadPool *pool);
  * thread_pool_submit* API: submit I/O requests in the thread's
  * current AioContext.
  */
-BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
-        ThreadPoolFunc *func, void *arg,
-        BlockCompletionFunc *cb, void *opaque);
-int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
-        ThreadPoolFunc *func, void *arg);
-void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+                                   BlockCompletionFunc *cb, void *opaque);
+int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPoolFunc *func, void *arg);
 
 void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
 
index 2098d7e759cc81543fcaa412dbf51e636ae004ee..fb5fc297309c98ff9e8d6cb21559a468d7d5c0f6 100644 (file)
@@ -51,7 +51,6 @@ static int pr_manager_worker(void *opaque)
 int coroutine_fn pr_manager_execute(PRManager *pr_mgr, AioContext *ctx, int fd,
                                     struct sg_io_hdr *hdr)
 {
-    ThreadPool *pool = aio_get_thread_pool(ctx);
     PRManagerData data = {
         .pr_mgr = pr_mgr,
         .fd     = fd,
@@ -62,7 +61,7 @@ int coroutine_fn pr_manager_execute(PRManager *pr_mgr, AioContext *ctx, int fd,
 
     /* The matching object_unref is in pr_manager_worker.  */
     object_ref(OBJECT(pr_mgr));
-    return thread_pool_submit_co(pool, pr_manager_worker, &data);
+    return thread_pool_submit_co(pr_manager_worker, &data);
 }
 
 bool pr_manager_is_connected(PRManager *pr_mgr)
index 199227a556e6de3a77a998b32c1f29b73ca1119f..e9b3ad259a81582d5becb7928e95a9a34c972361 100644 (file)
@@ -180,7 +180,6 @@ static int do_sgio_worker(void *opaque)
 static int do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
                     uint8_t *buf, int *sz, int dir)
 {
-    ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
     int r;
 
     PRHelperSGIOData data = {
@@ -192,7 +191,7 @@ static int do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
         .dir = dir,
     };
 
-    r = thread_pool_submit_co(pool, do_sgio_worker, &data);
+    r = thread_pool_submit_co(do_sgio_worker, &data);
     *sz = data.sz;
     return r;
 }
index 6020e65d6986f6af76e33793a4c97ec5b8f820aa..448fbf7e5f5663738e9ae5b0a73872140918b42e 100644 (file)
@@ -8,7 +8,6 @@
 #include "qemu/main-loop.h"
 
 static AioContext *ctx;
-static ThreadPool *pool;
 static int active;
 
 typedef struct {
@@ -47,7 +46,7 @@ static void done_cb(void *opaque, int ret)
 static void test_submit(void)
 {
     WorkerTestData data = { .n = 0 };
-    thread_pool_submit(pool, worker_cb, &data);
+    thread_pool_submit(worker_cb, &data);
     while (data.n == 0) {
         aio_poll(ctx, true);
     }
@@ -57,7 +56,7 @@ static void test_submit(void)
 static void test_submit_aio(void)
 {
     WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
-    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
+    data.aiocb = thread_pool_submit_aio(worker_cb, &data,
                                         done_cb, &data);
 
     /* The callbacks are not called until after the first wait.  */
@@ -78,7 +77,7 @@ static void co_test_cb(void *opaque)
     active = 1;
     data->n = 0;
     data->ret = -EINPROGRESS;
-    thread_pool_submit_co(pool, worker_cb, data);
+    thread_pool_submit_co(worker_cb, data);
 
     /* The test continues in test_submit_co, after qemu_coroutine_enter... */
 
@@ -122,7 +121,7 @@ static void test_submit_many(void)
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
+        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
     }
 
     active = 100;
@@ -150,7 +149,7 @@ static void do_test_cancel(bool sync)
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
+        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
                                                done_cb, &data[i]);
     }
 
@@ -235,7 +234,6 @@ int main(int argc, char **argv)
 {
     qemu_init_main_loop(&error_abort);
     ctx = qemu_get_current_aio_context();
-    pool = aio_get_thread_pool(ctx);
 
     g_test_init(&argc, &argv, NULL);
     g_test_add_func("/thread-pool/submit", test_submit);
index a70abb8a59f131f35969ff4482c89727493052f1..0d97888df01a118761e35da9f07722eb3a53ad3c 100644 (file)
@@ -241,11 +241,12 @@ static const AIOCBInfo thread_pool_aiocb_info = {
     .get_aio_context    = thread_pool_get_aio_context,
 };
 
-BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
-        ThreadPoolFunc *func, void *arg,
-        BlockCompletionFunc *cb, void *opaque)
+BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+                                   BlockCompletionFunc *cb, void *opaque)
 {
     ThreadPoolElement *req;
+    AioContext *ctx = qemu_get_current_aio_context();
+    ThreadPool *pool = aio_get_thread_pool(ctx);
 
     /* Assert that the thread submitting work is the same running the pool */
     assert(pool->ctx == qemu_get_current_aio_context());
@@ -283,19 +284,18 @@ static void thread_pool_co_cb(void *opaque, int ret)
     aio_co_wake(co->co);
 }
 
-int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
-                                       void *arg)
+int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
 {
     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
     assert(qemu_in_coroutine());
-    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
+    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
     qemu_coroutine_yield();
     return tpc.ret;
 }
 
-void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
+void thread_pool_submit(ThreadPoolFunc *func, void *arg)
 {
-    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
+    thread_pool_submit_aio(func, arg, NULL, NULL);
 }
 
 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)