]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/os/freebsd/spl/spl_taskq.c
Improve ZFS objset sync parallelism
[mirror_zfs.git] / module / os / freebsd / spl / spl_taskq.c
index 842b80ade1fbd020c83dec21faf741a81ce09650..6912b220a94e315a0979600d8e8ec746d9c506e1 100644 (file)
@@ -220,6 +220,7 @@ taskq_create_impl(const char *name, int nthreads, pri_t pri,
                nthreads = MAX((mp_ncpus * nthreads) / 100, 1);
 
        tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
+       tq->tq_nthreads = nthreads;
        tq->tq_queue = taskqueue_create(name, M_WAITOK,
            taskqueue_thread_enqueue, &tq->tq_queue);
        taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
@@ -254,6 +255,87 @@ taskq_destroy(taskq_t *tq)
        kmem_free(tq, sizeof (*tq));
 }
 
+static void taskq_sync_assign(void *arg);
+
+typedef struct taskq_sync_arg {
+       kthread_t       *tqa_thread;
+       kcondvar_t      tqa_cv;
+       kmutex_t        tqa_lock;
+       int             tqa_ready;
+} taskq_sync_arg_t;
+
+static void
+taskq_sync_assign(void *arg)
+{
+       taskq_sync_arg_t *tqa = arg;
+
+       mutex_enter(&tqa->tqa_lock);
+       tqa->tqa_thread = curthread;
+       tqa->tqa_ready = 1;
+       cv_signal(&tqa->tqa_cv);
+       while (tqa->tqa_ready == 1)
+               cv_wait(&tqa->tqa_cv, &tqa->tqa_lock);
+       mutex_exit(&tqa->tqa_lock);
+}
+
+/*
+ * Create a taskq with a specified number of pool threads. Allocate
+ * and return an array of nthreads kthread_t pointers, one for each
+ * thread in the pool. The array is not ordered and must be freed
+ * by the caller.
+ */
+taskq_t *
+taskq_create_synced(const char *name, int nthreads, pri_t pri,
+    int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
+{
+       taskq_t *tq;
+       taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP);
+       kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
+           KM_SLEEP);
+
+       flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
+
+       tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
+           flags | TASKQ_PREPOPULATE);
+       VERIFY(tq != NULL);
+       VERIFY(tq->tq_nthreads == nthreads);
+
+       /* spawn all syncthreads */
+       for (int i = 0; i < nthreads; i++) {
+               cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL);
+               mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL);
+               (void) taskq_dispatch(tq, taskq_sync_assign,
+                   &tqs[i], TQ_FRONT);
+       }
+
+       /* wait on all syncthreads to start */
+       for (int i = 0; i < nthreads; i++) {
+               mutex_enter(&tqs[i].tqa_lock);
+               while (tqs[i].tqa_ready == 0)
+                       cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock);
+               mutex_exit(&tqs[i].tqa_lock);
+       }
+
+       /* let all syncthreads resume, finish */
+       for (int i = 0; i < nthreads; i++) {
+               mutex_enter(&tqs[i].tqa_lock);
+               tqs[i].tqa_ready = 2;
+               cv_broadcast(&tqs[i].tqa_cv);
+               mutex_exit(&tqs[i].tqa_lock);
+       }
+       taskq_wait(tq);
+
+       for (int i = 0; i < nthreads; i++) {
+               kthreads[i] = tqs[i].tqa_thread;
+               mutex_destroy(&tqs[i].tqa_lock);
+               cv_destroy(&tqs[i].tqa_cv);
+       }
+       kmem_free(tqs, sizeof (*tqs) * nthreads);
+
+       *ktpp = kthreads;
+       return (tq);
+}
+
 int
 taskq_member(taskq_t *tq, kthread_t *thread)
 {