* GNU GPL, version 2 or (at your option) any later version.
*/
#include "qemu/osdep.h"
-#include "qemu-common.h"
#include "qemu/queue.h"
#include "qemu/thread.h"
#include "qemu/coroutine.h"
QEMUBH *completion_bh;
QemuMutex lock;
QemuCond worker_stopped;
- QemuSemaphore sem;
- int max_threads;
+ QemuCond request_cond;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
int idle_threads;
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
- bool stopping;
+ int min_threads;
+ int max_threads;
};
static void *worker_thread(void *opaque)
pool->pending_threads--;
do_spawn_thread(pool);
- while (!pool->stopping) {
+ while (pool->cur_threads <= pool->max_threads) {
ThreadPoolElement *req;
int ret;
- do {
+ if (QTAILQ_EMPTY(&pool->request_list)) {
pool->idle_threads++;
- qemu_mutex_unlock(&pool->lock);
- ret = qemu_sem_timedwait(&pool->sem, 10000);
- qemu_mutex_lock(&pool->lock);
+ ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
pool->idle_threads--;
- } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
- if (ret == -1 || pool->stopping) {
- break;
+ if (ret == 0 &&
+ QTAILQ_EMPTY(&pool->request_list) &&
+ pool->cur_threads > pool->min_threads) {
+ /* Timed out + no work to do + no need for warm threads = exit. */
+ break;
+ }
+ /*
+ * Even if there was some work to do, check if there aren't
+ * too many worker threads before picking it up.
+ */
+ continue;
}
req = QTAILQ_FIRST(&pool->request_list);
smp_wmb();
req->state = THREAD_DONE;
- qemu_mutex_lock(&pool->lock);
-
qemu_bh_schedule(pool->completion_bh);
+ qemu_mutex_lock(&pool->lock);
}
pool->cur_threads--;
qemu_cond_signal(&pool->worker_stopped);
qemu_mutex_unlock(&pool->lock);
+
+ /*
+ * Wake up another thread, in case we got a wakeup but decided
+ * to exit due to pool->cur_threads > pool->max_threads.
+ */
+ qemu_cond_signal(&pool->request_cond);
return NULL;
}
trace_thread_pool_cancel(elem, elem->common.opaque);
- qemu_mutex_lock(&pool->lock);
- if (elem->state == THREAD_QUEUED &&
- /* No thread has yet started working on elem. we can try to "steal"
- * the item from the worker if we can get a signal from the
- * semaphore. Because this is non-blocking, we can do it with
- * the lock taken and ensure that elem will remain THREAD_QUEUED.
- */
- qemu_sem_timedwait(&pool->sem, 0) == 0) {
+ QEMU_LOCK_GUARD(&pool->lock);
+ if (elem->state == THREAD_QUEUED) {
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
qemu_bh_schedule(pool->completion_bh);
elem->ret = -ECANCELED;
}
- qemu_mutex_unlock(&pool->lock);
}
static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb)
}
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
qemu_mutex_unlock(&pool->lock);
- qemu_sem_post(&pool->sem);
+ qemu_cond_signal(&pool->request_cond);
return &req->common;
}
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
}
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+ qemu_mutex_lock(&pool->lock);
+
+ pool->min_threads = ctx->thread_pool_min;
+ pool->max_threads = ctx->thread_pool_max;
+
+ /*
+ * We either have to:
+ * - Increase the number available of threads until over the min_threads
+ * threshold.
+ * - Bump the worker threads so that they exit, until under the max_threads
+ * threshold.
+ * - Do nothing. The current number of threads fall in between the min and
+ * max thresholds. We'll let the pool manage itself.
+ */
+ for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+ spawn_thread(pool);
+ }
+
+ for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+ qemu_cond_signal(&pool->request_cond);
+ }
+
+ qemu_mutex_unlock(&pool->lock);
+}
+
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
if (!ctx) {
pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->worker_stopped);
- qemu_sem_init(&pool->sem, 0);
- pool->max_threads = 64;
+ qemu_cond_init(&pool->request_cond);
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
QLIST_INIT(&pool->head);
QTAILQ_INIT(&pool->request_list);
+
+ thread_pool_update_params(pool, ctx);
}
ThreadPool *thread_pool_new(AioContext *ctx)
pool->new_threads = 0;
/* Wait for worker threads to terminate */
- pool->stopping = true;
+ pool->max_threads = 0;
+ qemu_cond_broadcast(&pool->request_cond);
while (pool->cur_threads > 0) {
- qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}
qemu_mutex_unlock(&pool->lock);
qemu_bh_delete(pool->completion_bh);
- qemu_sem_destroy(&pool->sem);
+ qemu_cond_destroy(&pool->request_cond);
qemu_cond_destroy(&pool->worker_stopped);
qemu_mutex_destroy(&pool->lock);
g_free(pool);