/*
* As tasks are submitted to the task queue they are assigned a
- * monotonically increasing taskqid and added to the tail of the
- * pending list. As worker threads become available the tasks are
- * removed from the head of the pending list and added to the tail
- * of the work list. Finally, as tasks complete they are removed
- * from the work list. This means that the pending and work lists
- * are always kept sorted by taskqid. Thus the lowest outstanding
+ * monotonically increasing taskqid and added to the tail of the pending
+ * list. As worker threads become available the tasks are removed from
+ * the head of the pending or priority list, giving preference to the
+ * priority list. The tasks are then added to the work list, preserving
+ * the ordering by taskqid. Finally, as tasks complete they are removed
+ * from the work list. This means that the pending and work lists are
+ * always kept sorted by taskqid. Thus the lowest outstanding
* incomplete taskqid can be determined simply by checking the min
- * taskqid for each head item on the pending and work list. This
- * value is stored in tq->tq_lowest_id and only updated to the new
- * lowest id when the previous lowest id completes. All taskqids
- * lower than tq->tq_lowest_id must have completed. It is also
- * possible larger taskqid's have completed because they may be
- * processed in parallel by several worker threads. However, this
- * is not a problem because the behavior of taskq_wait_id() is to
- * block until all previously submitted taskqid's have completed.
+ * taskqid for each head item on the pending, priority, and work list.
+ * This value is stored in tq->tq_lowest_id and only updated to the new
+ * lowest id when the previous lowest id completes. All taskqids lower
+ * than tq->tq_lowest_id must have completed. It is also possible
+ * larger taskqid's have completed because they may be processed in
+ * parallel by several worker threads. However, this is not a problem
+ * because the behavior of taskq_wait_id() is to block until all
+ * previously submitted taskqid's have completed.
*
* XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
* 64-bit values so even if a taskq is processing 2^24 (16,777,216)
GOTO(out, rc = 0);
spin_lock(&t->t_lock);
- list_add_tail(&t->t_list, &tq->tq_pend_list);
+
+ /* Queue to the priority list instead of the pending list */
+ if (flags & TQ_FRONT)
+ list_add_tail(&t->t_list, &tq->tq_prio_list);
+ else
+ list_add_tail(&t->t_list, &tq->tq_pend_list);
+
t->t_id = rc = tq->tq_next_id;
tq->tq_next_id++;
t->t_func = func;
/*
* Returns the lowest incomplete taskqid_t. The taskqid_t may
- * be queued on the pending list or may be on the work list
- * currently being handled, but it is not 100% complete yet.
+ * be queued on the pending list, on the priority list, or on
+ * the work list currently being handled, but it is not 100%
+ * complete yet.
*/
static taskqid_t
taskq_lowest_id(taskq_t *tq)
lowest_id = MIN(lowest_id, t->t_id);
}
+ if (!list_empty(&tq->tq_prio_list)) {
+ t = list_entry(tq->tq_prio_list.next, spl_task_t, t_list);
+ lowest_id = MIN(lowest_id, t->t_id);
+ }
+
if (!list_empty(&tq->tq_work_list)) {
t = list_entry(tq->tq_work_list.next, spl_task_t, t_list);
lowest_id = MIN(lowest_id, t->t_id);
RETURN(lowest_id);
}
+/*
+ * Insert a task into a list keeping the list sorted by increasing
+ * taskqid.
+ */
+static void
+taskq_insert_in_order(taskq_t *tq, spl_task_t *t)
+{
+ spl_task_t *w;
+ struct list_head *l;
+
+ ENTRY;
+ ASSERT(tq);
+ ASSERT(t);
+ ASSERT(spin_is_locked(&tq->tq_lock));
+
+ list_for_each_prev(l, &tq->tq_work_list) {
+ w = list_entry(l, spl_task_t, t_list);
+ if (w->t_id < t->t_id) {
+ list_add(&t->t_list, l);
+ break;
+ }
+ }
+ if (l == &tq->tq_work_list)
+ list_add(&t->t_list, &tq->tq_work_list);
+
+ EXIT;
+}
+
static int
taskq_thread(void *args)
{
taskqid_t id;
taskq_t *tq = args;
spl_task_t *t;
+ struct list_head *pend_list;
ENTRY;
ASSERT(tq);
while (!kthread_should_stop()) {
add_wait_queue(&tq->tq_work_waitq, &wait);
- if (list_empty(&tq->tq_pend_list)) {
+ if (list_empty(&tq->tq_pend_list) &&
+ list_empty(&tq->tq_prio_list)) {
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
schedule();
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
}
remove_wait_queue(&tq->tq_work_waitq, &wait);
- if (!list_empty(&tq->tq_pend_list)) {
- t = list_entry(tq->tq_pend_list.next,spl_task_t,t_list);
+
+ if (!list_empty(&tq->tq_prio_list))
+ pend_list = &tq->tq_prio_list;
+ else if (!list_empty(&tq->tq_pend_list))
+ pend_list = &tq->tq_pend_list;
+ else
+ pend_list = NULL;
+
+ if (pend_list) {
+ t = list_entry(pend_list->next, spl_task_t, t_list);
list_del_init(&t->t_list);
- list_add_tail(&t->t_list, &tq->tq_work_list);
+ taskq_insert_in_order(tq, t);
tq->tq_nactive++;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_work_list);
INIT_LIST_HEAD(&tq->tq_pend_list);
+ INIT_LIST_HEAD(&tq->tq_prio_list);
init_waitqueue_head(&tq->tq_work_waitq);
init_waitqueue_head(&tq->tq_wait_waitq);
ASSERT(list_empty(&tq->tq_free_list));
ASSERT(list_empty(&tq->tq_work_list));
ASSERT(list_empty(&tq->tq_pend_list));
+ ASSERT(list_empty(&tq->tq_prio_list));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));