]> git.proxmox.com Git - qemu.git/blobdiff - thread-pool.c
rng-egd: remove redundant free
[qemu.git] / thread-pool.c
index a0aecd08fe529098e65ca88622f5aeb951bf4288..3735fd34bc9ffc4a0a993897eb0632bf7b508f2e 100644 (file)
@@ -23,8 +23,7 @@
 #include "block/block_int.h"
 #include "qemu/event_notifier.h"
 #include "block/thread-pool.h"
-
-typedef struct ThreadPool ThreadPool;
+#include "qemu/main-loop.h"
 
 static void do_spawn_thread(ThreadPool *pool);
 
@@ -59,8 +58,10 @@ struct ThreadPoolElement {
 
 struct ThreadPool {
     EventNotifier notifier;
+    AioContext *ctx;
     QemuMutex lock;
     QemuCond check_cancel;
+    QemuCond worker_stopped;
     QemuSemaphore sem;
     int max_threads;
     QEMUBH *new_thread_bh;
@@ -75,11 +76,9 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     int pending_cancellations; /* whether we need a cond_broadcast */
+    bool stopping;
 };
 
-/* Currently there is only one thread pool instance. */
-static ThreadPool global_pool;
-
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -88,7 +87,7 @@ static void *worker_thread(void *opaque)
     pool->pending_threads--;
     do_spawn_thread(pool);
 
-    while (1) {
+    while (!pool->stopping) {
         ThreadPoolElement *req;
         int ret;
 
@@ -99,7 +98,7 @@ static void *worker_thread(void *opaque)
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
         } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1) {
+        if (ret == -1 || pool->stopping) {
             break;
         }
 
@@ -124,6 +123,7 @@ static void *worker_thread(void *opaque)
     }
 
     pool->cur_threads--;
+    qemu_cond_signal(&pool->worker_stopped);
     qemu_mutex_unlock(&pool->lock);
     return NULL;
 }
@@ -198,12 +198,6 @@ restart:
     }
 }
 
-static int thread_pool_active(EventNotifier *notifier)
-{
-    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
-    return !QLIST_EMPTY(&pool->head);
-}
-
 static void thread_pool_cancel(BlockDriverAIOCB *acb)
 {
     ThreadPoolElement *elem = (ThreadPoolElement *)acb;
@@ -237,10 +231,10 @@ static const AIOCBInfo thread_pool_aiocb_info = {
     .cancel             = thread_pool_cancel,
 };
 
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg,
         BlockDriverCompletionFunc *cb, void *opaque)
 {
-    ThreadPool *pool = &global_pool;
     ThreadPoolElement *req;
 
     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
@@ -276,18 +270,19 @@ static void thread_pool_co_cb(void *opaque, int ret)
     qemu_coroutine_enter(co->co, NULL);
 }
 
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
+                                       void *arg)
 {
     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
     assert(qemu_in_coroutine());
-    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
     qemu_coroutine_yield();
     return tpc.ret;
 }
 
-void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
 {
-    thread_pool_submit_aio(func, arg, NULL, NULL);
+    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
@@ -298,8 +293,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 
     memset(pool, 0, sizeof(*pool));
     event_notifier_init(&pool->notifier, false);
+    pool->ctx = ctx;
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->check_cancel);
+    qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
     pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
@@ -307,13 +304,45 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
 
-    aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
-                           thread_pool_active);
+    aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready);
 }
 
-static void thread_pool_init(void)
+ThreadPool *thread_pool_new(AioContext *ctx)
 {
-    thread_pool_init_one(&global_pool, NULL);
+    ThreadPool *pool = g_new(ThreadPool, 1);
+    thread_pool_init_one(pool, ctx);
+    return pool;
 }
 
-block_init(thread_pool_init)
+void thread_pool_free(ThreadPool *pool)
+{
+    if (!pool) {
+        return;
+    }
+
+    assert(QLIST_EMPTY(&pool->head));
+
+    qemu_mutex_lock(&pool->lock);
+
+    /* Stop new threads from spawning */
+    qemu_bh_delete(pool->new_thread_bh);
+    pool->cur_threads -= pool->new_threads;
+    pool->new_threads = 0;
+
+    /* Wait for worker threads to terminate */
+    pool->stopping = true;
+    while (pool->cur_threads > 0) {
+        qemu_sem_post(&pool->sem);
+        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+
+    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL);
+    qemu_sem_destroy(&pool->sem);
+    qemu_cond_destroy(&pool->check_cancel);
+    qemu_cond_destroy(&pool->worker_stopped);
+    qemu_mutex_destroy(&pool->lock);
+    event_notifier_cleanup(&pool->notifier);
+    g_free(pool);
+}