]> git.proxmox.com Git - mirror_spl-debian.git/commitdiff
Fix taskq code to not drop tasks when TQ_SLEEP is used.
authorRicardo M. Correia <ricardo.correia@oracle.com>
Mon, 2 Aug 2010 09:24:01 +0000 (09:24 +0000)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Mon, 2 Aug 2010 18:20:31 +0000 (11:20 -0700)
When TQ_SLEEP is used, taskq_dispatch() should always succeed even if the
number of pending tasks is above tq->tq_maxalloc. This semantic is similar
to KM_SLEEP in kmem allocations, which also always succeed.

However, we cannot block forever otherwise there is a risk of deadlock.
Therefore, we still allow the number of pending tasks to go above
tq->tq_maxalloc with TQ_SLEEP, but we may sleep up to 1 second per task
dispatch, thereby throttling the task dispatch rate.

One of the existing splat tests was also augmented to test for this scenario.
The test would fail with the previous implementation but now it succeeds.

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
module/spl/spl-taskq.c
module/splat/splat-taskq.c

index 201cb59490104ca3e4d73bf63b7a65a9ddde420e..5a17f1ccf56f5c9904c83c6e60b80ca823d4c4ed 100644 (file)
@@ -78,35 +78,36 @@ retry:
                 if (flags & TQ_NOSLEEP)
                         SRETURN(NULL);
 
-                /* Sleep periodically polling the free list for an available
-                 * spl_task_t.  If a full second passes and we have not found
-                 * one gives up and return a NULL to the caller. */
-                if (flags & TQ_SLEEP) {
-                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-                        schedule_timeout(HZ / 100);
-                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
-                        if (count < 100)
-                                SGOTO(retry, count++);
-
-                        SRETURN(NULL);
-                }
-
-                /* Unreachable, Neither TQ_SLEEP or TQ_NOSLEEP set */
-                PANIC("Neither TQ_SLEEP or TQ_NOSLEEP set");
+                /*
+                 * Sleep periodically polling the free list for an available
+                 * spl_task_t. Dispatching with TQ_SLEEP should always succeed
+                 * but we cannot block forever waiting for an spl_taskq_t to
+                 * show up in the free list, otherwise a deadlock can happen.
+                 *
+                 * Therefore, we need to allocate a new task even if the number
+                 * of allocated tasks is above tq->tq_maxalloc, but we still
+                 * end up delaying the task allocation by one second, thereby
+                 * throttling the task dispatch rate.
+                 */
+                 spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+                 schedule_timeout(HZ / 100);
+                 spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+                 if (count < 100)
+                        SGOTO(retry, count++);
         }
 
-       spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
         t = kmem_alloc(sizeof(spl_task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
 
-       if (t) {
-               spin_lock_init(&t->t_lock);
+        if (t) {
+                spin_lock_init(&t->t_lock);
                 INIT_LIST_HEAD(&t->t_list);
-               t->t_id = 0;
-               t->t_func = NULL;
-               t->t_arg = NULL;
-               tq->tq_nalloc++;
-       }
+                t->t_id = 0;
+                t->t_func = NULL;
+                t->t_arg = NULL;
+                tq->tq_nalloc++;
+        }
 
         SRETURN(t);
 }
index d4540f37a78e3a9ff4412fc7ea03b53d6a7baace..a7c537236ca946b189392b27d11beee8b7dfff37 100644 (file)
@@ -296,6 +296,10 @@ splat_taskq_test3(struct file *file, void *arg)
  * Then use taskq_wait() to block until all the tasks complete, then
  * cross check that all the tasks ran by checking tg_arg->count which
  * is incremented in the task function.  Finally cleanup the taskq.
+ *
+ * First we try with a large 'maxalloc' value, then we try with a small one.
+ * We should not drop tasks when TQ_SLEEP is used in taskq_dispatch(), even
+ * if the number of pending tasks is above maxalloc.
  */
 static void
 splat_taskq_test4_func(void *arg)
@@ -307,16 +311,18 @@ splat_taskq_test4_func(void *arg)
 }
 
 static int
-splat_taskq_test4(struct file *file, void *arg)
+splat_taskq_test4_common(struct file *file, void *arg, int minalloc,
+                         int maxalloc, int nr_tasks)
 {
        taskq_t *tq;
        splat_taskq_arg_t tq_arg;
        int i, j, rc = 0;
 
-       splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' creating\n",
-                    SPLAT_TASKQ_TEST4_NAME);
+       splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' creating "
+                    "(%d/%d/%d)\n", SPLAT_TASKQ_TEST4_NAME, minalloc, maxalloc,
+                    nr_tasks);
        if ((tq = taskq_create(SPLAT_TASKQ_TEST4_NAME, 1, maxclsyspri,
-                              50, INT_MAX, TASKQ_PREPOPULATE)) == NULL) {
+                              minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) {
                splat_vprint(file, SPLAT_TASKQ_TEST4_NAME,
                             "Taskq '%s' create failed\n",
                             SPLAT_TASKQ_TEST4_NAME);
@@ -326,7 +332,7 @@ splat_taskq_test4(struct file *file, void *arg)
        tq_arg.file = file;
        tq_arg.name = SPLAT_TASKQ_TEST4_NAME;
 
-       for (i = 1; i <= 1024; i *= 2) {
+       for (i = 1; i <= nr_tasks; i *= 2) {
                atomic_set(&tq_arg.count, 0);
                splat_vprint(file, SPLAT_TASKQ_TEST4_NAME,
                             "Taskq '%s' function '%s' dispatched %d times\n",
@@ -364,6 +370,19 @@ out:
        return rc;
 }
 
+static int splat_taskq_test4(struct file *file, void *arg)
+{
+       int rc;
+
+       rc = splat_taskq_test4_common(file, arg, 50, INT_MAX, 1024);
+       if (rc)
+               return rc;
+
+       rc = splat_taskq_test4_common(file, arg, 1, 1, 32);
+
+       return rc;
+}
+
 /*
  * Create a taskq and dispatch a specific sequence of tasks carefully
  * crafted to validate the order in which tasks are processed.  When