]> git.proxmox.com Git - mirror_qemu.git/commitdiff
util/event-loop-base: Introduce options to set the thread pool size
authorNicolas Saenz Julienne <nsaenzju@redhat.com>
Mon, 25 Apr 2022 07:57:23 +0000 (09:57 +0200)
committerStefan Hajnoczi <stefanha@redhat.com>
Mon, 9 May 2022 09:43:23 +0000 (10:43 +0100)
The thread pool regulates itself: when idle, it kills threads until
empty, when in demand, it creates new threads until full. This behaviour
doesn't play well with latency sensitive workloads where the price of
creating a new thread is too high. For example, when paired with qemu's
'-mlock', or using safety features like SafeStack, creating a new thread
has been measured take multiple milliseconds.

In order to mitigate this let's introduce a new 'EventLoopBase'
property to set the thread pool size. The threads will be created during
the pool's initialization or upon updating the property's value, remain
available during its lifetime regardless of demand, and destroyed upon
freeing it. A properly characterized workload will then be able to
configure the pool to avoid any latency spikes.

Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Acked-by: Markus Armbruster <armbru@redhat.com>
Message-id: 20220425075723.20019-4-nsaenzju@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
event-loop-base.c
include/block/aio.h
include/block/thread-pool.h
include/sysemu/event-loop-base.h
iothread.c
qapi/qom.json
util/aio-posix.c
util/async.c
util/main-loop.c
util/thread-pool.c

index e7f99a6ec8fd306766d4472aa2a2dead0bf659bc..d5be4dc6fcf1609423459b706c6705f32ad61c01 100644 (file)
@@ -14,6 +14,7 @@
 #include "qemu/osdep.h"
 #include "qom/object_interfaces.h"
 #include "qapi/error.h"
+#include "block/thread-pool.h"
 #include "sysemu/event-loop-base.h"
 
 typedef struct {
@@ -21,9 +22,22 @@ typedef struct {
     ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
 } EventLoopBaseParamInfo;
 
+static void event_loop_base_instance_init(Object *obj)
+{
+    EventLoopBase *base = EVENT_LOOP_BASE(obj);
+
+    base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+}
+
 static EventLoopBaseParamInfo aio_max_batch_info = {
     "aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
 };
+static EventLoopBaseParamInfo thread_pool_min_info = {
+    "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
+};
+static EventLoopBaseParamInfo thread_pool_max_info = {
+    "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
+};
 
 static void event_loop_base_get_param(Object *obj, Visitor *v,
         const char *name, void *opaque, Error **errp)
@@ -95,12 +109,21 @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
                               event_loop_base_get_param,
                               event_loop_base_set_param,
                               NULL, &aio_max_batch_info);
+    object_class_property_add(klass, "thread-pool-min", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &thread_pool_min_info);
+    object_class_property_add(klass, "thread-pool-max", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &thread_pool_max_info);
 }
 
 static const TypeInfo event_loop_base_info = {
     .name = TYPE_EVENT_LOOP_BASE,
     .parent = TYPE_OBJECT,
     .instance_size = sizeof(EventLoopBase),
+    .instance_init = event_loop_base_instance_init,
     .class_size = sizeof(EventLoopBaseClass),
     .class_init = event_loop_base_class_init,
     .abstract = true,
index 5634173b1233e8d8fee3c798014270ecb73c6d04..d128558f1d565e49bd32e29159293bbb7dfb5ac9 100644 (file)
@@ -192,6 +192,8 @@ struct AioContext {
     QSLIST_HEAD(, Coroutine) scheduled_coroutines;
     QEMUBH *co_schedule_bh;
 
+    int thread_pool_min;
+    int thread_pool_max;
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
                                 Error **errp);
 
+/**
+ * aio_context_set_thread_pool_params:
+ * @ctx: the aio context
+ * @min: min number of threads to have readily available in the thread pool
+ * @min: max number of threads the thread pool can contain
+ */
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+                                        int64_t max, Error **errp);
 #endif
index 7dd7d730a004af57dfeeae4c58ac21afe984cde1..2020bcc92df07451b5b156cb8a3de86d17327ccc 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "block/block.h"
 
+#define THREAD_POOL_MAX_THREADS_DEFAULT         64
+
 typedef int ThreadPoolFunc(void *opaque);
 
 typedef struct ThreadPool ThreadPool;
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg);
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
 
 #endif
index fced4c9fea0e9911441465ac1495c72a84cf1f73..2748bf6ae1f827db44793c3e62e826a5b67b85ae 100644 (file)
@@ -33,5 +33,9 @@ struct EventLoopBase {
 
     /* AioContext AIO engine parameters */
     int64_t aio_max_batch;
+
+    /* AioContext thread pool parameters */
+    int64_t thread_pool_min;
+    int64_t thread_pool_max;
 };
 #endif
index 8fa2f3bfb80fad75ef6614a46f2afaa6cc895dfb..529194a566d3ced2f7902f6541c2007ab69512e7 100644 (file)
@@ -174,6 +174,9 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
     aio_context_set_aio_params(iothread->ctx,
                                iothread->parent_obj.aio_max_batch,
                                errp);
+
+    aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
+                                       base->thread_pool_max, errp);
 }
 
 
index 7d4a2ac1b9dd4ab47215edfe9b73b656f38cc115..6a653c6636137055e9cce2567c16e0dba808259a 100644 (file)
 #                 0 means that the engine will use its default.
 #                 (default: 0)
 #
+# @thread-pool-min: minimum number of threads reserved in the thread pool
+#                   (default:0)
+#
+# @thread-pool-max: maximum number of threads the thread pool can contain
+#                   (default:64)
+#
 # Since: 7.1
 ##
 { 'struct': 'EventLoopBaseProperties',
-  'data': { '*aio-max-batch': 'int' } }
+  'data': { '*aio-max-batch': 'int',
+            '*thread-pool-min': 'int',
+            '*thread-pool-max': 'int' } }
 
 ##
 # @IothreadProperties:
index be0182a3c64f7e2413a925d91f88902820e4fbc7..731f3826c062df3df4caac5f45d26d203489ee04 100644 (file)
@@ -15,6 +15,7 @@
 
 #include "qemu/osdep.h"
 #include "block/block.h"
+#include "block/thread-pool.h"
 #include "qemu/main-loop.h"
 #include "qemu/rcu.h"
 #include "qemu/rcu_queue.h"
index 2ea1172f3ee7e5218e490014173e6b6a3dbcf302..554ba70cca2186138748077987873592d2676c30 100644 (file)
@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
 
     ctx->aio_max_batch = 0;
 
+    ctx->thread_pool_min = 0;
+    ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+
     return ctx;
 fail:
     g_source_destroy(&ctx->source);
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
     assert(!get_my_aiocontext());
     set_my_aiocontext(ctx);
 }
+
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+                                        int64_t max, Error **errp)
+{
+
+    if (min > max || !max || min > INT_MAX || max > INT_MAX) {
+        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+        return;
+    }
+
+    ctx->thread_pool_min = min;
+    ctx->thread_pool_max = max;
+
+    if (ctx->thread_pool) {
+        thread_pool_update_params(ctx->thread_pool, ctx);
+    }
+}
index e30f034815647b6d0025d162d0f28158ce357b4a..f00a25451bdca2bcca781fda20e2c43266f38e14 100644 (file)
@@ -30,6 +30,7 @@
 #include "sysemu/replay.h"
 #include "qemu/main-loop.h"
 #include "block/aio.h"
+#include "block/thread-pool.h"
 #include "qemu/error-report.h"
 #include "qemu/queue.h"
 #include "qemu/compiler.h"
@@ -187,12 +188,20 @@ int qemu_init_main_loop(Error **errp)
 
 static void main_loop_update_params(EventLoopBase *base, Error **errp)
 {
+    ERRP_GUARD();
+
     if (!qemu_aio_context) {
         error_setg(errp, "qemu aio context not ready");
         return;
     }
 
     aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
+    if (*errp) {
+        return;
+    }
+
+    aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
+                                       base->thread_pool_max, errp);
 }
 
 MainLoop *mloop;
index d763cea505b68575af8e1b39cd95d02caa73801a..196835b4d358653eb0324fcf64d47841d27a1e5c 100644 (file)
@@ -58,7 +58,6 @@ struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuSemaphore sem;
-    int max_threads;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     bool stopping;
+    int min_threads;
+    int max_threads;
 };
 
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
+{
+    /*
+     * The semaphore timed out, we should exit the loop except when:
+     *  - There is work to do, we raced with the signal.
+     *  - The max threads threshold just changed, we raced with the signal.
+     *  - The thread pool forces a minimum number of readily available threads.
+     */
+    if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+            pool->cur_threads > pool->max_threads ||
+            pool->cur_threads <= pool->min_threads)) {
+            return true;
+    }
+
+    return false;
+}
+
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
             ret = qemu_sem_timedwait(&pool->sem, 10000);
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1 || pool->stopping) {
+        } while (back_to_sleep(pool, ret));
+        if (ret == -1 || pool->stopping ||
+            pool->cur_threads > pool->max_threads) {
             break;
         }
 
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
     thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+    qemu_mutex_lock(&pool->lock);
+
+    pool->min_threads = ctx->thread_pool_min;
+    pool->max_threads = ctx->thread_pool_max;
+
+    /*
+     * We either have to:
+     *  - Increase the number available of threads until over the min_threads
+     *    threshold.
+     *  - Decrease the number of available threads until under the max_threads
+     *    threshold.
+     *  - Do nothing. The current number of threads fall in between the min and
+     *    max thresholds. We'll let the pool manage itself.
+     */
+    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+        spawn_thread(pool);
+    }
+
+    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+        qemu_sem_post(&pool->sem);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+}
+
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
     if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
-    pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+
+    thread_pool_update_params(pool, ctx);
 }
 
 ThreadPool *thread_pool_new(AioContext *ctx)