nthreads = MAX((mp_ncpus * nthreads) / 100, 1);
tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
+ tq->tq_nthreads = nthreads;
tq->tq_queue = taskqueue_create(name, M_WAITOK,
taskqueue_thread_enqueue, &tq->tq_queue);
taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
kmem_free(tq, sizeof (*tq));
}
+static void taskq_sync_assign(void *arg);
+
+typedef struct taskq_sync_arg {
+ kthread_t *tqa_thread;
+ kcondvar_t tqa_cv;
+ kmutex_t tqa_lock;
+ int tqa_ready;
+} taskq_sync_arg_t;
+
+static void
+taskq_sync_assign(void *arg)
+{
+ taskq_sync_arg_t *tqa = arg;
+
+ mutex_enter(&tqa->tqa_lock);
+ tqa->tqa_thread = curthread;
+ tqa->tqa_ready = 1;
+ cv_signal(&tqa->tqa_cv);
+ while (tqa->tqa_ready == 1)
+ cv_wait(&tqa->tqa_cv, &tqa->tqa_lock);
+ mutex_exit(&tqa->tqa_lock);
+}
+
+/*
+ * Create a taskq with a specified number of pool threads. Allocate
+ * and return an array of nthreads kthread_t pointers, one for each
+ * thread in the pool. The array is not ordered and must be freed
+ * by the caller.
+ */
+taskq_t *
+taskq_create_synced(const char *name, int nthreads, pri_t pri,
+ int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
+{
+ taskq_t *tq;
+ taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP);
+ kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
+ KM_SLEEP);
+
+ flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
+
+ tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
+ flags | TASKQ_PREPOPULATE);
+ VERIFY(tq != NULL);
+ VERIFY(tq->tq_nthreads == nthreads);
+
+ /* spawn all syncthreads */
+ for (int i = 0; i < nthreads; i++) {
+ cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL);
+ mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL);
+ (void) taskq_dispatch(tq, taskq_sync_assign,
+ &tqs[i], TQ_FRONT);
+ }
+
+ /* wait on all syncthreads to start */
+ for (int i = 0; i < nthreads; i++) {
+ mutex_enter(&tqs[i].tqa_lock);
+ while (tqs[i].tqa_ready == 0)
+ cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock);
+ mutex_exit(&tqs[i].tqa_lock);
+ }
+
+ /* let all syncthreads resume, finish */
+ for (int i = 0; i < nthreads; i++) {
+ mutex_enter(&tqs[i].tqa_lock);
+ tqs[i].tqa_ready = 2;
+ cv_broadcast(&tqs[i].tqa_cv);
+ mutex_exit(&tqs[i].tqa_lock);
+ }
+ taskq_wait(tq);
+
+ for (int i = 0; i < nthreads; i++) {
+ kthreads[i] = tqs[i].tqa_thread;
+ mutex_destroy(&tqs[i].tqa_lock);
+ cv_destroy(&tqs[i].tqa_cv);
+ }
+ kmem_free(tqs, sizeof (*tqs) * nthreads);
+
+ *ktpp = kthreads;
+ return (tq);
+}
+
int
taskq_member(taskq_t *tq, kthread_t *thread)
{