t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+ ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
+ ASSERT(!timer_pending(&t->tqent_timer));
list_del_init(&t->tqent_list);
SRETURN(t);
ASSERT(t);
ASSERT(spin_is_locked(&tq->tq_lock));
ASSERT(list_empty(&t->tqent_list));
+ ASSERT(!timer_pending(&t->tqent_timer));
kmem_free(t, sizeof(taskq_ent_t));
tq->tq_nalloc--;
ASSERT(t);
ASSERT(spin_is_locked(&tq->tq_lock));
+ /* Wake tasks blocked in taskq_wait_id() */
+ wake_up_all(&t->tqent_waitq);
+
list_del_init(&t->tqent_list);
if (tq->tq_nalloc <= tq->tq_minalloc) {
}
/*
- * As tasks are submitted to the task queue they are assigned a
- * monotonically increasing taskqid and added to the tail of the pending
- * list. As worker threads become available the tasks are removed from
- * the head of the pending or priority list, giving preference to the
- * priority list. The tasks are then removed from their respective
- * list, and the taskq_thread servicing the task is added to the active
- * list, preserving the order using the serviced task's taskqid.
- * Finally, as tasks complete the taskq_thread servicing the task is
- * removed from the active list. This means that the pending task and
- * active taskq_thread lists are always kept sorted by taskqid. Thus the
- * lowest outstanding incomplete taskqid can be determined simply by
- * checking the min taskqid for each head item on the pending, priority,
- * and active taskq_thread list. This value is stored in
- * tq->tq_lowest_id and only updated to the new lowest id when the
- * previous lowest id completes. All taskqids lower than
- * tq->tq_lowest_id must have completed. It is also possible larger
- * taskqid's have completed because they may be processed in parallel by
- * several worker threads. However, this is not a problem because the
- * behavior of taskq_wait_id() is to block until all previously
- * submitted taskqid's have completed.
- *
- * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
- * 64-bit values so even if a taskq is processing 2^24 (16,777,216)
- * taskqid_ts per second it will still take 2^40 seconds, 34,865 years,
- * before the wrap occurs. I can live with that for now.
+ * When a delayed task timer expires remove it from the delay list and
+ * add it to the priority list in order for immediate processing.
*/
-static int
-taskq_wait_check(taskq_t *tq, taskqid_t id)
+static void
+task_expire(unsigned long data)
{
- int rc;
+ taskq_ent_t *w, *t = (taskq_ent_t *)data;
+ taskq_t *tq = t->tqent_taskq;
+ struct list_head *l;
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
- rc = (id < tq->tq_lowest_id);
+
+ if (t->tqent_flags & TQENT_FLAG_CANCEL) {
+ ASSERT(list_empty(&t->tqent_list));
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ return;
+ }
+
+ /*
+ * The priority list must be maintained in strict task id order
+ * from lowest to highest for lowest_id to be easily calculable.
+ */
+ list_del(&t->tqent_list);
+ list_for_each_prev(l, &tq->tq_prio_list) {
+ w = list_entry(l, taskq_ent_t, tqent_list);
+ if (w->tqent_id < t->tqent_id) {
+ list_add(&t->tqent_list, l);
+ break;
+ }
+ }
+ if (l == &tq->tq_prio_list)
+ list_add(&t->tqent_list, &tq->tq_prio_list);
+
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
- SRETURN(rc);
+ wake_up(&tq->tq_work_waitq);
+}
+
+/*
+ * Returns the lowest incomplete taskqid_t. The taskqid_t may
+ * be queued on the pending list, on the priority list, on the
+ * delay list, or on the work list currently being handled, but
+ * it is not 100% complete yet.
+ */
+static taskqid_t
+taskq_lowest_id(taskq_t *tq)
+{
+ taskqid_t lowest_id = tq->tq_next_id;
+ taskq_ent_t *t;
+ taskq_thread_t *tqt;
+ SENTRY;
+
+ ASSERT(tq);
+ ASSERT(spin_is_locked(&tq->tq_lock));
+
+ if (!list_empty(&tq->tq_pend_list)) {
+ t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
+ lowest_id = MIN(lowest_id, t->tqent_id);
+ }
+
+ if (!list_empty(&tq->tq_prio_list)) {
+ t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
+ lowest_id = MIN(lowest_id, t->tqent_id);
+ }
+
+ if (!list_empty(&tq->tq_delay_list)) {
+ t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
+ lowest_id = MIN(lowest_id, t->tqent_id);
+ }
+
+ if (!list_empty(&tq->tq_active_list)) {
+ tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
+ tqt_active_list);
+ ASSERT(tqt->tqt_id != 0);
+ lowest_id = MIN(lowest_id, tqt->tqt_id);
+ }
+
+ SRETURN(lowest_id);
+}
+
+/*
+ * Insert a task into a list keeping the list sorted by increasing taskqid.
+ */
+static void
+taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
+{
+ taskq_thread_t *w;
+ struct list_head *l;
+
+ SENTRY;
+ ASSERT(tq);
+ ASSERT(tqt);
+ ASSERT(spin_is_locked(&tq->tq_lock));
+
+ list_for_each_prev(l, &tq->tq_active_list) {
+ w = list_entry(l, taskq_thread_t, tqt_active_list);
+ if (w->tqt_id < tqt->tqt_id) {
+ list_add(&tqt->tqt_active_list, l);
+ break;
+ }
+ }
+ if (l == &tq->tq_active_list)
+ list_add(&tqt->tqt_active_list, &tq->tq_active_list);
+
+ SEXIT;
+}
+
+/*
+ * Find and return a task from the given list if it exists. The list
+ * must be in lowest to highest task id order.
+ */
+static taskq_ent_t *
+taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
+{
+ struct list_head *l;
+ taskq_ent_t *t;
+ SENTRY;
+
+ ASSERT(spin_is_locked(&tq->tq_lock));
+
+ list_for_each(l, lh) {
+ t = list_entry(l, taskq_ent_t, tqent_list);
+
+ if (t->tqent_id == id)
+ SRETURN(t);
+
+ if (t->tqent_id > id)
+ break;
+ }
+
+ SRETURN(NULL);
}
+/*
+ * Find an already dispatched task given the task id regardless of what
+ * state it is in. If a task is still pending or executing it will be
+ * returned and 'active' set appropriately. If the task has already
+ * been run then NULL is returned.
+ */
+static taskq_ent_t *
+taskq_find(taskq_t *tq, taskqid_t id, int *active)
+{
+ taskq_thread_t *tqt;
+ struct list_head *l;
+ taskq_ent_t *t;
+ SENTRY;
+
+ ASSERT(spin_is_locked(&tq->tq_lock));
+ *active = 0;
+
+ t = taskq_find_list(tq, &tq->tq_delay_list, id);
+ if (t)
+ SRETURN(t);
+
+ t = taskq_find_list(tq, &tq->tq_prio_list, id);
+ if (t)
+ SRETURN(t);
+
+ t = taskq_find_list(tq, &tq->tq_pend_list, id);
+ if (t)
+ SRETURN(t);
+
+ list_for_each(l, &tq->tq_active_list) {
+ tqt = list_entry(l, taskq_thread_t, tqt_active_list);
+ if (tqt->tqt_id == id) {
+ t = tqt->tqt_task;
+ *active = 1;
+ SRETURN(t);
+ }
+ }
+
+ SRETURN(NULL);
+}
+
+/*
+ * The taskq_wait_id() function blocks until the passed task id completes.
+ * This does not guarantee that all lower task id's have completed.
+ */
void
taskq_wait_id(taskq_t *tq, taskqid_t id)
{
+ DEFINE_WAIT(wait);
+ taskq_ent_t *t;
+ int active = 0;
SENTRY;
+
ASSERT(tq);
+ ASSERT(id > 0);
- wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
+ spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ t = taskq_find(tq, id, &active);
+ if (t)
+ prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE);
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+ /*
+ * We rely on the kernels autoremove_wake_function() function to
+ * remove us from the wait queue in the context of wake_up().
+ * Once woken the taskq_ent_t pointer must never be accessed.
+ */
+ if (t) {
+ t = NULL;
+ schedule();
+ __set_current_state(TASK_RUNNING);
+ }
SEXIT;
}
EXPORT_SYMBOL(taskq_wait_id);
+/*
+ * The taskq_wait() function will block until all previously submitted
+ * tasks have been completed. A previously submitted task is defined as
+ * a task with a lower task id than the current task queue id. Note that
+ * all task id's are assigned monotonically at dispatch time.
+ *
+ * Waiting for all previous tasks to complete is accomplished by tracking
+ * the lowest outstanding task id. As tasks are dispatched they are added
+ * added to the tail of the pending, priority, or delay lists. And as
+ * worker threads become available the tasks are removed from the heads
+ * of these lists and linked to the worker threads. This ensures the
+ * lists are kept in lowest to highest task id order.
+ *
+ * Therefore the lowest outstanding task id can be quickly determined by
+ * checking the head item from all of these lists. This value is stored
+ * with the task queue as the lowest id. It only needs to be recalculated
+ * when either the task with the current lowest id completes or is canceled.
+ *
+ * By blocking until the lowest task id exceeds the current task id when
+ * the function was called we ensure all previous tasks have completed.
+ *
+ * NOTE: When there are multiple worked threads it is possible for larger
+ * task ids to complete before smaller ones. Conversely when the task
+ * queue contains delay tasks with small task ids, you may block for a
+ * considerable length of time waiting for them to expire and execute.
+ */
+static int
+taskq_wait_check(taskq_t *tq, taskqid_t id)
+{
+ int rc;
+
+ spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ rc = (id < tq->tq_lowest_id);
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+ SRETURN(rc);
+}
+
+void
+taskq_wait_all(taskq_t *tq, taskqid_t id)
+{
+ wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
+}
+EXPORT_SYMBOL(taskq_wait_all);
+
void
taskq_wait(taskq_t *tq)
{
id = tq->tq_next_id - 1;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
- taskq_wait_id(tq, id);
+ taskq_wait_all(tq, id);
SEXIT;
}
EXPORT_SYMBOL(taskq_member);
+/*
+ * Cancel an already dispatched task given the task id. Still pending tasks
+ * will be immediately canceled, and if the task is active the function will
+ * block until it completes. Preallocated tasks which are canceled must be
+ * freed by the caller.
+ */
+int
+taskq_cancel_id(taskq_t *tq, taskqid_t id)
+{
+ taskq_ent_t *t;
+ int active = 0;
+ int rc = ENOENT;
+ SENTRY;
+
+ ASSERT(tq);
+
+ spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ t = taskq_find(tq, id, &active);
+ if (t && !active) {
+ list_del_init(&t->tqent_list);
+ t->tqent_flags |= TQENT_FLAG_CANCEL;
+
+ /*
+ * When canceling the lowest outstanding task id we
+ * must recalculate the new lowest outstanding id.
+ */
+ if (tq->tq_lowest_id == t->tqent_id) {
+ tq->tq_lowest_id = taskq_lowest_id(tq);
+ ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
+ }
+
+ /*
+ * The task_expire() function takes the tq->tq_lock so drop
+ * drop the lock before synchronously cancelling the timer.
+ */
+ if (timer_pending(&t->tqent_timer)) {
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ del_timer_sync(&t->tqent_timer);
+ spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ }
+
+ if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
+ task_done(tq, t);
+
+ rc = 0;
+ }
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+ if (active) {
+ taskq_wait_id(tq, id);
+ rc = EBUSY;
+ }
+
+ SRETURN(rc);
+}
+EXPORT_SYMBOL(taskq_cancel_id);
+
taskqid_t
taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
{
tq->tq_next_id++;
t->tqent_func = func;
t->tqent_arg = arg;
+ t->tqent_taskq = tq;
+ t->tqent_timer.data = 0;
+ t->tqent_timer.function = NULL;
+ t->tqent_timer.expires = 0;
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
}
EXPORT_SYMBOL(taskq_dispatch);
+taskqid_t
+taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
+ uint_t flags, clock_t expire_time)
+{
+ taskq_ent_t *t;
+ taskqid_t rc = 0;
+ SENTRY;
+
+ ASSERT(tq);
+ ASSERT(func);
+
+ spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+
+ /* Taskq being destroyed and all tasks drained */
+ if (!(tq->tq_flags & TQ_ACTIVE))
+ SGOTO(out, rc = 0);
+
+ if ((t = task_alloc(tq, flags)) == NULL)
+ SGOTO(out, rc = 0);
+
+ spin_lock(&t->tqent_lock);
+
+ /* Queue to the delay list for subsequent execution */
+ list_add_tail(&t->tqent_list, &tq->tq_delay_list);
+
+ t->tqent_id = rc = tq->tq_next_id;
+ tq->tq_next_id++;
+ t->tqent_func = func;
+ t->tqent_arg = arg;
+ t->tqent_taskq = tq;
+ t->tqent_timer.data = (unsigned long)t;
+ t->tqent_timer.function = task_expire;
+ t->tqent_timer.expires = (unsigned long)expire_time;
+ add_timer(&t->tqent_timer);
+
+ ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+
+ spin_unlock(&t->tqent_lock);
+out:
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+ SRETURN(rc);
+}
+EXPORT_SYMBOL(taskq_dispatch_delay);
+
void
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
taskq_ent_t *t)
tq->tq_next_id++;
t->tqent_func = func;
t->tqent_arg = arg;
+ t->tqent_taskq = tq;
spin_unlock(&t->tqent_lock);
taskq_init_ent(taskq_ent_t *t)
{
spin_lock_init(&t->tqent_lock);
+ init_waitqueue_head(&t->tqent_waitq);
+ init_timer(&t->tqent_timer);
INIT_LIST_HEAD(&t->tqent_list);
t->tqent_id = 0;
t->tqent_func = NULL;
t->tqent_arg = NULL;
t->tqent_flags = 0;
+ t->tqent_taskq = NULL;
}
EXPORT_SYMBOL(taskq_init_ent);
-/*
- * Returns the lowest incomplete taskqid_t. The taskqid_t may
- * be queued on the pending list, on the priority list, or on
- * the work list currently being handled, but it is not 100%
- * complete yet.
- */
-static taskqid_t
-taskq_lowest_id(taskq_t *tq)
-{
- taskqid_t lowest_id = tq->tq_next_id;
- taskq_ent_t *t;
- taskq_thread_t *tqt;
- SENTRY;
-
- ASSERT(tq);
- ASSERT(spin_is_locked(&tq->tq_lock));
-
- if (!list_empty(&tq->tq_pend_list)) {
- t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
- lowest_id = MIN(lowest_id, t->tqent_id);
- }
-
- if (!list_empty(&tq->tq_prio_list)) {
- t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
- lowest_id = MIN(lowest_id, t->tqent_id);
- }
-
- if (!list_empty(&tq->tq_active_list)) {
- tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
- tqt_active_list);
- ASSERT(tqt->tqt_id != 0);
- lowest_id = MIN(lowest_id, tqt->tqt_id);
- }
-
- SRETURN(lowest_id);
-}
-
-/*
- * Insert a task into a list keeping the list sorted by increasing taskqid.
- */
-static void
-taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
-{
- taskq_thread_t *w;
- struct list_head *l;
-
- SENTRY;
- ASSERT(tq);
- ASSERT(tqt);
- ASSERT(spin_is_locked(&tq->tq_lock));
-
- list_for_each_prev(l, &tq->tq_active_list) {
- w = list_entry(l, taskq_thread_t, tqt_active_list);
- if (w->tqt_id < tqt->tqt_id) {
- list_add(&tqt->tqt_active_list, l);
- break;
- }
- }
- if (l == &tq->tq_active_list)
- list_add(&tqt->tqt_active_list, &tq->tq_active_list);
-
- SEXIT;
-}
-
static int
taskq_thread(void *args)
{
* preallocated taskq_ent_t, tqent_id must be
* stored prior to executing tqent_func. */
tqt->tqt_id = t->tqent_id;
+ tqt->tqt_task = t;
/* We must store a copy of the flags prior to
* servicing the task (servicing a prealloc'd task
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nactive--;
list_del_init(&tqt->tqt_active_list);
+ tqt->tqt_task = NULL;
/* For prealloc'd tasks, we don't free anything. */
if ((tq->tq_flags & TASKQ_DYNAMIC) ||
INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list);
+ INIT_LIST_HEAD(&tq->tq_delay_list);
init_waitqueue_head(&tq->tq_work_waitq);
init_waitqueue_head(&tq->tq_wait_waitq);
ASSERT(list_empty(&tq->tq_free_list));
ASSERT(list_empty(&tq->tq_pend_list));
ASSERT(list_empty(&tq->tq_prio_list));
+ ASSERT(list_empty(&tq->tq_delay_list));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);