]> git.proxmox.com Git - mirror_zfs.git/commitdiff
ZIL: Second attempt to reduce scope of zl_issuer_lock.
authorAlexander Motin <mav@FreeBSD.org>
Fri, 25 Aug 2023 00:08:49 +0000 (20:08 -0400)
committerGitHub <noreply@github.com>
Fri, 25 Aug 2023 00:08:49 +0000 (17:08 -0700)
The previous patch #14841 appeared to have significant flaw, causing
deadlocks if zl_get_data callback got blocked waiting for TXG sync.  I
already handled some of such cases in the original patch, but issue
 #14982 shown cases that were impossible to solve in that design.

This patch fixes the problem by postponing log blocks allocation till
the very end, just before the zios issue, leaving nothing blocking after
that point to cause deadlocks.  Before that point though any sleeps are
now allowed, not causing sync thread blockage.  This require slightly
more complicated lwb state machine to allocate blocks and issue zios
in proper order.  But with removal of special early issue workarounds
the new code is much cleaner now, and should even be more efficient.

Since this patch uses null zios between write, I've found that null
zios do not wait for logical children ready status in zio_ready(),
that makes parent write to proceed prematurely, producing incorrect
log blocks.  Added ZIO_CHILD_LOGICAL_BIT to zio_wait_for_children()
fixes it.

Reviewed-by: Rob Norris <rob.norris@klarasystems.com>
Reviewed-by: Mark Maybee <mark.maybee@delphix.com>
Reviewed-by: George Wilson <george.wilson@delphix.com>
Signed-off-by: Alexander Motin <mav@FreeBSD.org>
Sponsored by: iXsystems, Inc.
Closes #15122

cmd/ztest.c
include/sys/zil_impl.h
module/zfs/zfs_vnops.c
module/zfs/zil.c
module/zfs/zio.c
module/zfs/zvol.c

index b6b99bfff6db7fa8b8adbaa8d728ca8e1664fe2f..398c519cfc35ed5fc67332b2283009052a2b1b19 100644 (file)
@@ -2412,7 +2412,6 @@ ztest_get_data(void *arg, uint64_t arg2, lr_write_t *lr, char *buf,
        int error;
 
        ASSERT3P(lwb, !=, NULL);
-       ASSERT3P(zio, !=, NULL);
        ASSERT3U(size, !=, 0);
 
        ztest_object_lock(zd, object, RL_READER);
@@ -2446,6 +2445,7 @@ ztest_get_data(void *arg, uint64_t arg2, lr_write_t *lr, char *buf,
                    DMU_READ_NO_PREFETCH);
                ASSERT0(error);
        } else {
+               ASSERT3P(zio, !=, NULL);
                size = doi.doi_data_block_size;
                if (ISP2(size)) {
                        offset = P2ALIGN(offset, size);
index b58dad9695a69e303fcad5dbfa290bc7fd41c381..f780ad3d61bc1012792633eddcb4639180994353 100644 (file)
@@ -38,14 +38,22 @@ extern "C" {
 /*
  * Possible states for a given lwb structure.
  *
- * An lwb will start out in the "closed" state, and then transition to
- * the "opened" state via a call to zil_lwb_write_open(). When
- * transitioning from "closed" to "opened" the zilog's "zl_issuer_lock"
- * must be held.
+ * An lwb will start out in the "new" state, and transition to the "opened"
+ * state via a call to zil_lwb_write_open() on first itx assignment.  When
+ * transitioning from "new" to "opened" the zilog's "zl_issuer_lock" must be
+ * held.
  *
- * After the lwb is "opened", it can transition into the "issued" state
- * via zil_lwb_write_close(). Again, the zilog's "zl_issuer_lock" must
- * be held when making this transition.
+ * After the lwb is "opened", it can be assigned number of itxs and transition
+ * into the "closed" state via zil_lwb_write_close() when full or on timeout.
+ * When transitioning from "opened" to "closed" the zilog's "zl_issuer_lock"
+ * must be held.  New lwb allocation also takes "zl_lock" to protect the list.
+ *
+ * After the lwb is "closed", it can transition into the "ready" state via
+ * zil_lwb_write_issue().  "zl_lock" must be held when making this transition.
+ * Since it is done by the same thread, "zl_issuer_lock" is not needed.
+ *
+ * When lwb in "ready" state receives its block pointer, it can transition to
+ * "issued". "zl_lock" must be held when making this transition.
  *
  * After the lwb's write zio completes, it transitions into the "write
  * done" state via zil_lwb_write_done(); and then into the "flush done"
@@ -62,17 +70,20 @@ extern "C" {
  *
  * Additionally, correctness when reading an lwb's state is often
  * achieved by exploiting the fact that these state transitions occur in
- * this specific order; i.e. "closed" to "opened" to "issued" to "done".
+ * this specific order; i.e. "new" to "opened" to "closed" to "ready" to
+ * "issued" to "write_done" and finally "flush_done".
  *
- * Thus, if an lwb is in the "closed" or "opened" state, holding the
+ * Thus, if an lwb is in the "new" or "opened" state, holding the
  * "zl_issuer_lock" will prevent a concurrent thread from transitioning
- * that lwb to the "issued" state. Likewise, if an lwb is already in the
- * "issued" state, holding the "zl_lock" will prevent a concurrent
- * thread from transitioning that lwb to the "write done" state.
+ * that lwb to the "closed" state. Likewise, if an lwb is already in the
+ * "ready" state, holding the "zl_lock" will prevent a concurrent thread
+ * from transitioning that lwb to the "issued" state.
  */
 typedef enum {
-    LWB_STATE_CLOSED,
+    LWB_STATE_NEW,
     LWB_STATE_OPENED,
+    LWB_STATE_CLOSED,
+    LWB_STATE_READY,
     LWB_STATE_ISSUED,
     LWB_STATE_WRITE_DONE,
     LWB_STATE_FLUSH_DONE,
@@ -91,17 +102,21 @@ typedef enum {
 typedef struct lwb {
        zilog_t         *lwb_zilog;     /* back pointer to log struct */
        blkptr_t        lwb_blk;        /* on disk address of this log blk */
+       boolean_t       lwb_slim;       /* log block has slim format */
        boolean_t       lwb_slog;       /* lwb_blk is on SLOG device */
-       boolean_t       lwb_indirect;   /* do not postpone zil_lwb_commit() */
+       int             lwb_error;      /* log block allocation error */
+       int             lwb_nmax;       /* max bytes in the buffer */
        int             lwb_nused;      /* # used bytes in buffer */
        int             lwb_nfilled;    /* # filled bytes in buffer */
        int             lwb_sz;         /* size of block and buffer */
        lwb_state_t     lwb_state;      /* the state of this lwb */
        char            *lwb_buf;       /* log write buffer */
+       zio_t           *lwb_child_zio; /* parent zio for children */
        zio_t           *lwb_write_zio; /* zio for the lwb buffer */
        zio_t           *lwb_root_zio;  /* root zio for lwb write and flushes */
        hrtime_t        lwb_issued_timestamp; /* when was the lwb issued? */
        uint64_t        lwb_issued_txg; /* the txg when the write is issued */
+       uint64_t        lwb_alloc_txg;  /* the txg when lwb_blk is allocated */
        uint64_t        lwb_max_txg;    /* highest txg in this lwb */
        list_node_t     lwb_node;       /* zilog->zl_lwb_list linkage */
        list_node_t     lwb_issue_node; /* linkage of lwbs ready for issue */
index fceb56dfc029ac463161d6222a12c2d1b12a92fa..f8d13075d5c00bdc23e2ef5306f0676bff17f674 100644 (file)
@@ -839,7 +839,6 @@ zfs_get_data(void *arg, uint64_t gen, lr_write_t *lr, char *buf,
        uint64_t zp_gen;
 
        ASSERT3P(lwb, !=, NULL);
-       ASSERT3P(zio, !=, NULL);
        ASSERT3U(size, !=, 0);
 
        /*
@@ -889,6 +888,7 @@ zfs_get_data(void *arg, uint64_t gen, lr_write_t *lr, char *buf,
                }
                ASSERT(error == 0 || error == ENOENT);
        } else { /* indirect write */
+               ASSERT3P(zio, !=, NULL);
                /*
                 * Have to lock the whole block to ensure when it's
                 * written out and its checksum is being calculated
index 567787a19b66e2b3ed7106e12ebc34f24c193dc9..f2d279e36a963b1a8a8ce3a9ffb47e68a7c1e165 100644 (file)
@@ -151,7 +151,6 @@ static kmem_cache_t *zil_lwb_cache;
 static kmem_cache_t *zil_zcw_cache;
 
 static void zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx);
-static void zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb);
 static itx_t *zil_itx_clone(itx_t *oitx);
 
 static int
@@ -760,33 +759,52 @@ zil_lwb_vdev_compare(const void *x1, const void *x2)
        return (TREE_CMP(v1, v2));
 }
 
+/*
+ * Allocate a new lwb.  We may already have a block pointer for it, in which
+ * case we get size and version from there.  Or we may not yet, in which case
+ * we choose them here and later make the block allocation match.
+ */
 static lwb_t *
-zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg)
+zil_alloc_lwb(zilog_t *zilog, int sz, blkptr_t *bp, boolean_t slog,
+    uint64_t txg, lwb_state_t state)
 {
        lwb_t *lwb;
 
        lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
        lwb->lwb_zilog = zilog;
-       lwb->lwb_blk = *bp;
+       if (bp) {
+               lwb->lwb_blk = *bp;
+               lwb->lwb_slim = (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2);
+               sz = BP_GET_LSIZE(bp);
+       } else {
+               BP_ZERO(&lwb->lwb_blk);
+               lwb->lwb_slim = (spa_version(zilog->zl_spa) >=
+                   SPA_VERSION_SLIM_ZIL);
+       }
        lwb->lwb_slog = slog;
-       lwb->lwb_indirect = B_FALSE;
-       if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
+       lwb->lwb_error = 0;
+       if (lwb->lwb_slim) {
+               lwb->lwb_nmax = sz;
                lwb->lwb_nused = lwb->lwb_nfilled = sizeof (zil_chain_t);
-               lwb->lwb_sz = BP_GET_LSIZE(bp);
        } else {
+               lwb->lwb_nmax = sz - sizeof (zil_chain_t);
                lwb->lwb_nused = lwb->lwb_nfilled = 0;
-               lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
        }
-       lwb->lwb_state = LWB_STATE_CLOSED;
-       lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
+       lwb->lwb_sz = sz;
+       lwb->lwb_state = state;
+       lwb->lwb_buf = zio_buf_alloc(sz);
+       lwb->lwb_child_zio = NULL;
        lwb->lwb_write_zio = NULL;
        lwb->lwb_root_zio = NULL;
        lwb->lwb_issued_timestamp = 0;
        lwb->lwb_issued_txg = 0;
-       lwb->lwb_max_txg = txg;
+       lwb->lwb_alloc_txg = txg;
+       lwb->lwb_max_txg = 0;
 
        mutex_enter(&zilog->zl_lock);
        list_insert_tail(&zilog->zl_lwb_list, lwb);
+       if (state != LWB_STATE_NEW)
+               zilog->zl_last_lwb_opened = lwb;
        mutex_exit(&zilog->zl_lock);
 
        return (lwb);
@@ -800,10 +818,12 @@ zil_free_lwb(zilog_t *zilog, lwb_t *lwb)
        VERIFY(list_is_empty(&lwb->lwb_waiters));
        VERIFY(list_is_empty(&lwb->lwb_itxs));
        ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
+       ASSERT3P(lwb->lwb_child_zio, ==, NULL);
        ASSERT3P(lwb->lwb_write_zio, ==, NULL);
        ASSERT3P(lwb->lwb_root_zio, ==, NULL);
+       ASSERT3U(lwb->lwb_alloc_txg, <=, spa_syncing_txg(zilog->zl_spa));
        ASSERT3U(lwb->lwb_max_txg, <=, spa_syncing_txg(zilog->zl_spa));
-       ASSERT(lwb->lwb_state == LWB_STATE_CLOSED ||
+       ASSERT(lwb->lwb_state == LWB_STATE_NEW ||
            lwb->lwb_state == LWB_STATE_FLUSH_DONE);
 
        /*
@@ -954,7 +974,7 @@ zil_create(zilog_t *zilog)
         * Allocate a log write block (lwb) for the first log block.
         */
        if (error == 0)
-               lwb = zil_alloc_lwb(zilog, &blk, slog, txg);
+               lwb = zil_alloc_lwb(zilog, 0, &blk, slog, txg, LWB_STATE_NEW);
 
        /*
         * If we just allocated the first log block, commit our transaction
@@ -1041,7 +1061,8 @@ zil_destroy(zilog_t *zilog, boolean_t keep_first)
                while ((lwb = list_remove_head(&zilog->zl_lwb_list)) != NULL) {
                        if (lwb->lwb_buf != NULL)
                                zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
-                       zio_free(zilog->zl_spa, txg, &lwb->lwb_blk);
+                       if (!BP_IS_HOLE(&lwb->lwb_blk))
+                               zio_free(zilog->zl_spa, txg, &lwb->lwb_blk);
                        zil_free_lwb(zilog, lwb);
                }
        } else if (!keep_first) {
@@ -1269,21 +1290,21 @@ zil_commit_waiter_link_lwb(zil_commit_waiter_t *zcw, lwb_t *lwb)
 {
        /*
         * The lwb_waiters field of the lwb is protected by the zilog's
-        * zl_lock, thus it must be held when calling this function.
+        * zl_issuer_lock while the lwb is open and zl_lock otherwise.
+        * zl_issuer_lock also protects leaving the open state.
+        * zcw_lwb setting is protected by zl_issuer_lock and state !=
+        * flush_done, which transition is protected by zl_lock.
         */
-       ASSERT(MUTEX_HELD(&lwb->lwb_zilog->zl_lock));
+       ASSERT(MUTEX_HELD(&lwb->lwb_zilog->zl_issuer_lock));
+       IMPLY(lwb->lwb_state != LWB_STATE_OPENED,
+           MUTEX_HELD(&lwb->lwb_zilog->zl_lock));
+       ASSERT3S(lwb->lwb_state, !=, LWB_STATE_NEW);
+       ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
 
-       mutex_enter(&zcw->zcw_lock);
        ASSERT(!list_link_active(&zcw->zcw_node));
-       ASSERT3P(zcw->zcw_lwb, ==, NULL);
-       ASSERT3P(lwb, !=, NULL);
-       ASSERT(lwb->lwb_state == LWB_STATE_OPENED ||
-           lwb->lwb_state == LWB_STATE_ISSUED ||
-           lwb->lwb_state == LWB_STATE_WRITE_DONE);
-
        list_insert_tail(&lwb->lwb_waiters, zcw);
+       ASSERT3P(zcw->zcw_lwb, ==, NULL);
        zcw->zcw_lwb = lwb;
-       mutex_exit(&zcw->zcw_lock);
 }
 
 /*
@@ -1294,11 +1315,9 @@ zil_commit_waiter_link_lwb(zil_commit_waiter_t *zcw, lwb_t *lwb)
 static void
 zil_commit_waiter_link_nolwb(zil_commit_waiter_t *zcw, list_t *nolwb)
 {
-       mutex_enter(&zcw->zcw_lock);
        ASSERT(!list_link_active(&zcw->zcw_node));
-       ASSERT3P(zcw->zcw_lwb, ==, NULL);
        list_insert_tail(nolwb, zcw);
-       mutex_exit(&zcw->zcw_lock);
+       ASSERT3P(zcw->zcw_lwb, ==, NULL);
 }
 
 void
@@ -1484,7 +1503,7 @@ zil_lwb_flush_wait_all(zilog_t *zilog, uint64_t txg)
        mutex_enter(&zilog->zl_lock);
        mutex_enter(&zilog->zl_lwb_io_lock);
        lwb_t *lwb = list_head(&zilog->zl_lwb_list);
-       while (lwb != NULL && lwb->lwb_max_txg <= txg) {
+       while (lwb != NULL) {
                if (lwb->lwb_issued_txg <= txg) {
                        ASSERT(lwb->lwb_state != LWB_STATE_ISSUED);
                        ASSERT(lwb->lwb_state != LWB_STATE_WRITE_DONE);
@@ -1527,14 +1546,6 @@ zil_lwb_write_done(zio_t *zio)
 
        ASSERT3S(spa_config_held(spa, SCL_STATE, RW_READER), !=, 0);
 
-       ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
-       ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
-       ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
-       ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
-       ASSERT(!BP_IS_GANG(zio->io_bp));
-       ASSERT(!BP_IS_HOLE(zio->io_bp));
-       ASSERT(BP_GET_FILL(zio->io_bp) == 0);
-
        abd_free(zio->io_abd);
        zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
        lwb->lwb_buf = NULL;
@@ -1542,6 +1553,7 @@ zil_lwb_write_done(zio_t *zio)
        mutex_enter(&zilog->zl_lock);
        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
        lwb->lwb_state = LWB_STATE_WRITE_DONE;
+       lwb->lwb_child_zio = NULL;
        lwb->lwb_write_zio = NULL;
        nlwb = list_next(&zilog->zl_lwb_list, lwb);
        mutex_exit(&zilog->zl_lock);
@@ -1606,116 +1618,75 @@ zil_lwb_write_done(zio_t *zio)
        }
 }
 
+/*
+ * Build the zio dependency chain, which is used to preserve the ordering of
+ * lwb completions that is required by the semantics of the ZIL. Each new lwb
+ * zio becomes a parent of the previous lwb zio, such that the new lwb's zio
+ * cannot complete until the previous lwb's zio completes.
+ *
+ * This is required by the semantics of zil_commit(): the commit waiters
+ * attached to the lwbs will be woken in the lwb zio's completion callback,
+ * so this zio dependency graph ensures the waiters are woken in the correct
+ * order (the same order the lwbs were created).
+ */
 static void
 zil_lwb_set_zio_dependency(zilog_t *zilog, lwb_t *lwb)
 {
-       lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
-
-       ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
        ASSERT(MUTEX_HELD(&zilog->zl_lock));
 
+       lwb_t *prev_lwb = list_prev(&zilog->zl_lwb_list, lwb);
+       if (prev_lwb == NULL ||
+           prev_lwb->lwb_state == LWB_STATE_FLUSH_DONE)
+               return;
+
        /*
-        * The zilog's "zl_last_lwb_opened" field is used to build the
-        * lwb/zio dependency chain, which is used to preserve the
-        * ordering of lwb completions that is required by the semantics
-        * of the ZIL. Each new lwb zio becomes a parent of the
-        * "previous" lwb zio, such that the new lwb's zio cannot
-        * complete until the "previous" lwb's zio completes.
+        * If the previous lwb's write hasn't already completed, we also want
+        * to order the completion of the lwb write zios (above, we only order
+        * the completion of the lwb root zios). This is required because of
+        * how we can defer the DKIOCFLUSHWRITECACHE commands for each lwb.
+        *
+        * When the DKIOCFLUSHWRITECACHE commands are deferred, the previous
+        * lwb will rely on this lwb to flush the vdevs written to by that
+        * previous lwb. Thus, we need to ensure this lwb doesn't issue the
+        * flush until after the previous lwb's write completes. We ensure
+        * this ordering by setting the zio parent/child relationship here.
         *
-        * This is required by the semantics of zil_commit(); the commit
-        * waiters attached to the lwbs will be woken in the lwb zio's
-        * completion callback, so this zio dependency graph ensures the
-        * waiters are woken in the correct order (the same order the
-        * lwbs were created).
+        * Without this relationship on the lwb's write zio, it's possible
+        * for this lwb's write to complete prior to the previous lwb's write
+        * completing; and thus, the vdevs for the previous lwb would be
+        * flushed prior to that lwb's data being written to those vdevs (the
+        * vdevs are flushed in the lwb write zio's completion handler,
+        * zil_lwb_write_done()).
         */
-       if (last_lwb_opened != NULL &&
-           last_lwb_opened->lwb_state != LWB_STATE_FLUSH_DONE) {
-               ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
-                   last_lwb_opened->lwb_state == LWB_STATE_ISSUED ||
-                   last_lwb_opened->lwb_state == LWB_STATE_WRITE_DONE);
-
-               ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
-               zio_add_child(lwb->lwb_root_zio,
-                   last_lwb_opened->lwb_root_zio);
-
-               /*
-                * If the previous lwb's write hasn't already completed,
-                * we also want to order the completion of the lwb write
-                * zios (above, we only order the completion of the lwb
-                * root zios). This is required because of how we can
-                * defer the DKIOCFLUSHWRITECACHE commands for each lwb.
-                *
-                * When the DKIOCFLUSHWRITECACHE commands are deferred,
-                * the previous lwb will rely on this lwb to flush the
-                * vdevs written to by that previous lwb. Thus, we need
-                * to ensure this lwb doesn't issue the flush until
-                * after the previous lwb's write completes. We ensure
-                * this ordering by setting the zio parent/child
-                * relationship here.
-                *
-                * Without this relationship on the lwb's write zio,
-                * it's possible for this lwb's write to complete prior
-                * to the previous lwb's write completing; and thus, the
-                * vdevs for the previous lwb would be flushed prior to
-                * that lwb's data being written to those vdevs (the
-                * vdevs are flushed in the lwb write zio's completion
-                * handler, zil_lwb_write_done()).
-                */
-               if (last_lwb_opened->lwb_state != LWB_STATE_WRITE_DONE) {
-                       ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
-                           last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
-
-                       ASSERT3P(last_lwb_opened->lwb_write_zio, !=, NULL);
-                       zio_add_child(lwb->lwb_write_zio,
-                           last_lwb_opened->lwb_write_zio);
-               }
+       if (prev_lwb->lwb_state == LWB_STATE_ISSUED) {
+               ASSERT3P(prev_lwb->lwb_write_zio, !=, NULL);
+               zio_add_child(lwb->lwb_write_zio, prev_lwb->lwb_write_zio);
+       } else {
+               ASSERT3S(prev_lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
        }
+
+       ASSERT3P(prev_lwb->lwb_root_zio, !=, NULL);
+       zio_add_child(lwb->lwb_root_zio, prev_lwb->lwb_root_zio);
 }
 
 
 /*
  * This function's purpose is to "open" an lwb such that it is ready to
- * accept new itxs being committed to it. To do this, the lwb's zio
- * structures are created, and linked to the lwb. This function is
- * idempotent; if the passed in lwb has already been opened, this
- * function is essentially a no-op.
+ * accept new itxs being committed to it. This function is idempotent; if
+ * the passed in lwb has already been opened, it is essentially a no-op.
  */
 static void
 zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb)
 {
-       zbookmark_phys_t zb;
-       zio_priority_t prio;
-
        ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
-       ASSERT3P(lwb, !=, NULL);
-       EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED);
-       EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED);
 
-       if (lwb->lwb_root_zio != NULL)
+       if (lwb->lwb_state != LWB_STATE_NEW) {
+               ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
                return;
-
-       lwb->lwb_root_zio = zio_root(zilog->zl_spa,
-           zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL);
-
-       abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
-           BP_GET_LSIZE(&lwb->lwb_blk));
-
-       if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
-               prio = ZIO_PRIORITY_SYNC_WRITE;
-       else
-               prio = ZIO_PRIORITY_ASYNC_WRITE;
-
-       SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
-           ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
-           lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
-
-       lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, zilog->zl_spa, 0,
-           &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk),
-           zil_lwb_write_done, lwb, prio, ZIO_FLAG_CANFAIL, &zb);
+       }
 
        mutex_enter(&zilog->zl_lock);
        lwb->lwb_state = LWB_STATE_OPENED;
-       zil_lwb_set_zio_dependency(zilog, lwb);
        zilog->zl_last_lwb_opened = lwb;
        mutex_exit(&zilog->zl_lock);
 }
@@ -1752,57 +1723,21 @@ static uint_t zil_maxblocksize = SPA_OLD_MAXBLOCKSIZE;
  * Has to be called under zl_issuer_lock to chain more lwbs.
  */
 static lwb_t *
-zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb, list_t *ilwbs)
+zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb, lwb_state_t state)
 {
-       lwb_t *nlwb = NULL;
-       zil_chain_t *zilc;
-       spa_t *spa = zilog->zl_spa;
-       blkptr_t *bp;
-       dmu_tx_t *tx;
-       uint64_t txg;
-       uint64_t zil_blksz;
-       int i, error;
-       boolean_t slog;
+       int i;
 
        ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
-       ASSERT3P(lwb->lwb_root_zio, !=, NULL);
-       ASSERT3P(lwb->lwb_write_zio, !=, NULL);
        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
+       lwb->lwb_state = LWB_STATE_CLOSED;
 
        /*
-        * If this lwb includes indirect writes, we have to commit before
-        * creating the transaction, otherwise we may end up in dead lock.
-        */
-       if (lwb->lwb_indirect) {
-               for (itx_t *itx = list_head(&lwb->lwb_itxs); itx;
-                   itx = list_next(&lwb->lwb_itxs, itx))
-                       zil_lwb_commit(zilog, lwb, itx);
-               lwb->lwb_nused = lwb->lwb_nfilled;
-       }
-
-       /*
-        * Allocate the next block and save its address in this block
-        * before writing it in order to establish the log chain.
+        * If there was an allocation failure then returned NULL will trigger
+        * zil_commit_writer_stall() at the caller.  This is inherently racy,
+        * since allocation may not have happened yet.
         */
-
-       tx = dmu_tx_create(zilog->zl_os);
-
-       /*
-        * Since we are not going to create any new dirty data, and we
-        * can even help with clearing the existing dirty data, we
-        * should not be subject to the dirty data based delays. We
-        * use TXG_NOTHROTTLE to bypass the delay mechanism.
-        */
-       VERIFY0(dmu_tx_assign(tx, TXG_WAIT | TXG_NOTHROTTLE));
-
-       dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
-       txg = dmu_tx_get_txg(tx);
-
-       mutex_enter(&zilog->zl_lwb_io_lock);
-       lwb->lwb_issued_txg = txg;
-       zilog->zl_lwb_inflight[txg & TXG_MASK]++;
-       zilog->zl_lwb_max_issued_txg = MAX(txg, zilog->zl_lwb_max_issued_txg);
-       mutex_exit(&zilog->zl_lwb_io_lock);
+       if (lwb->lwb_error != 0)
+               return (NULL);
 
        /*
         * Log blocks are pre-allocated. Here we select the size of the next
@@ -1820,7 +1755,7 @@ zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb, list_t *ilwbs)
         * the maximum block size because we can exhaust the available
         * pool log space.
         */
-       zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
+       uint64_t zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
        for (i = 0; zil_blksz > zil_block_buckets[i].limit; i++)
                continue;
        zil_blksz = MIN(zil_block_buckets[i].blksz, zilog->zl_max_block_size);
@@ -1832,94 +1767,149 @@ zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb, list_t *ilwbs)
            uint64_t, zilog->zl_prev_blks[zilog->zl_prev_rotor]);
        zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
 
-       if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2)
-               zilc = (zil_chain_t *)lwb->lwb_buf;
-       else
-               zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
-       bp = &zilc->zc_next_blk;
-       BP_ZERO(bp);
-       error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, zil_blksz, &slog);
-       if (error == 0) {
-               ASSERT3U(bp->blk_birth, ==, txg);
-               bp->blk_cksum = lwb->lwb_blk.blk_cksum;
-               bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
+       return (zil_alloc_lwb(zilog, zil_blksz, NULL, 0, 0, state));
+}
 
-               /*
-                * Allocate a new log write block (lwb).
-                */
-               nlwb = zil_alloc_lwb(zilog, bp, slog, txg);
-       }
+/*
+ * Finalize previously closed block and issue the write zio.
+ */
+static void
+zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
+{
+       spa_t *spa = zilog->zl_spa;
+       zil_chain_t *zilc;
+       boolean_t slog;
+       zbookmark_phys_t zb;
+       zio_priority_t prio;
+       int error;
 
-       lwb->lwb_state = LWB_STATE_ISSUED;
+       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_CLOSED);
 
-       dmu_tx_commit(tx);
+       /* Actually fill the lwb with the data. */
+       for (itx_t *itx = list_head(&lwb->lwb_itxs); itx;
+           itx = list_next(&lwb->lwb_itxs, itx))
+               zil_lwb_commit(zilog, lwb, itx);
+       lwb->lwb_nused = lwb->lwb_nfilled;
+
+       lwb->lwb_root_zio = zio_root(spa, zil_lwb_flush_vdevs_done, lwb,
+           ZIO_FLAG_CANFAIL);
 
        /*
-        * We need to acquire the config lock for the lwb to issue it later.
-        * However, if we already have a queue of closed parent lwbs already
-        * holding the config lock (but not yet issued), we can't block here
-        * waiting on the lock or we will deadlock.  In that case we must
-        * first issue to parent IOs before waiting on the lock.
+        * The lwb is now ready to be issued, but it can be only if it already
+        * got its block pointer allocated or the allocation has failed.
+        * Otherwise leave it as-is, relying on some other thread to issue it
+        * after allocating its block pointer via calling zil_lwb_write_issue()
+        * for the previous lwb(s) in the chain.
         */
-       if (ilwbs && !list_is_empty(ilwbs)) {
-               if (!spa_config_tryenter(spa, SCL_STATE, lwb, RW_READER)) {
-                       lwb_t *tlwb;
-                       while ((tlwb = list_remove_head(ilwbs)) != NULL)
-                               zil_lwb_write_issue(zilog, tlwb);
-                       spa_config_enter(spa, SCL_STATE, lwb, RW_READER);
+       mutex_enter(&zilog->zl_lock);
+       lwb->lwb_state = LWB_STATE_READY;
+       if (BP_IS_HOLE(&lwb->lwb_blk) && lwb->lwb_error == 0) {
+               mutex_exit(&zilog->zl_lock);
+               return;
+       }
+       mutex_exit(&zilog->zl_lock);
+
+next_lwb:
+       if (lwb->lwb_slim)
+               zilc = (zil_chain_t *)lwb->lwb_buf;
+       else
+               zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_nmax);
+       int wsz = lwb->lwb_sz;
+       if (lwb->lwb_error == 0) {
+               abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf, lwb->lwb_sz);
+               if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
+                       prio = ZIO_PRIORITY_SYNC_WRITE;
+               else
+                       prio = ZIO_PRIORITY_ASYNC_WRITE;
+               SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
+                   ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
+                   lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
+               lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, spa, 0,
+                   &lwb->lwb_blk, lwb_abd, lwb->lwb_sz, zil_lwb_write_done,
+                   lwb, prio, ZIO_FLAG_CANFAIL, &zb);
+               zil_lwb_add_block(lwb, &lwb->lwb_blk);
+
+               if (lwb->lwb_slim) {
+                       /* For Slim ZIL only write what is used. */
+                       wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ,
+                           int);
+                       ASSERT3S(wsz, <=, lwb->lwb_sz);
+                       zio_shrink(lwb->lwb_write_zio, wsz);
+                       wsz = lwb->lwb_write_zio->io_size;
                }
+               memset(lwb->lwb_buf + lwb->lwb_nused, 0, wsz - lwb->lwb_nused);
+               zilc->zc_pad = 0;
+               zilc->zc_nused = lwb->lwb_nused;
+               zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
        } else {
-               spa_config_enter(spa, SCL_STATE, lwb, RW_READER);
+               /*
+                * We can't write the lwb if there was an allocation failure,
+                * so create a null zio instead just to maintain dependencies.
+                */
+               lwb->lwb_write_zio = zio_null(lwb->lwb_root_zio, spa, NULL,
+                   zil_lwb_write_done, lwb, ZIO_FLAG_CANFAIL);
+               lwb->lwb_write_zio->io_error = lwb->lwb_error;
        }
-
-       if (ilwbs)
-               list_insert_tail(ilwbs, lwb);
+       if (lwb->lwb_child_zio)
+               zio_add_child(lwb->lwb_write_zio, lwb->lwb_child_zio);
 
        /*
-        * If there was an allocation failure then nlwb will be null which
-        * forces a txg_wait_synced().
+        * Open transaction to allocate the next block pointer.
         */
-       return (nlwb);
-}
+       dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
+       VERIFY0(dmu_tx_assign(tx, TXG_WAIT | TXG_NOTHROTTLE));
+       dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
+       uint64_t txg = dmu_tx_get_txg(tx);
 
-/*
- * Finalize previously closed block and issue the write zio.
- * Does not require locking.
- */
-static void
-zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
-{
-       zil_chain_t *zilc;
-       int wsz;
-
-       /* Actually fill the lwb with the data if not yet. */
-       if (!lwb->lwb_indirect) {
-               for (itx_t *itx = list_head(&lwb->lwb_itxs); itx;
-                   itx = list_next(&lwb->lwb_itxs, itx))
-                       zil_lwb_commit(zilog, lwb, itx);
-               lwb->lwb_nused = lwb->lwb_nfilled;
+       /*
+        * Allocate next the block pointer unless we are already in error.
+        */
+       lwb_t *nlwb = list_next(&zilog->zl_lwb_list, lwb);
+       blkptr_t *bp = &zilc->zc_next_blk;
+       BP_ZERO(bp);
+       error = lwb->lwb_error;
+       if (error == 0) {
+               error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, nlwb->lwb_sz,
+                   &slog);
+       }
+       if (error == 0) {
+               ASSERT3U(bp->blk_birth, ==, txg);
+               BP_SET_CHECKSUM(bp, nlwb->lwb_slim ? ZIO_CHECKSUM_ZILOG2 :
+                   ZIO_CHECKSUM_ZILOG);
+               bp->blk_cksum = lwb->lwb_blk.blk_cksum;
+               bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
        }
 
-       if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
-               /* For Slim ZIL only write what is used. */
-               wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, int);
-               ASSERT3S(wsz, <=, lwb->lwb_sz);
-               zio_shrink(lwb->lwb_write_zio, wsz);
-               wsz = lwb->lwb_write_zio->io_size;
+       /*
+        * Reduce TXG open time by incrementing inflight counter and committing
+        * the transaciton.  zil_sync() will wait for it to return to zero.
+        */
+       mutex_enter(&zilog->zl_lwb_io_lock);
+       lwb->lwb_issued_txg = txg;
+       zilog->zl_lwb_inflight[txg & TXG_MASK]++;
+       zilog->zl_lwb_max_issued_txg = MAX(txg, zilog->zl_lwb_max_issued_txg);
+       mutex_exit(&zilog->zl_lwb_io_lock);
+       dmu_tx_commit(tx);
 
-               zilc = (zil_chain_t *)lwb->lwb_buf;
-       } else {
-               wsz = lwb->lwb_sz;
-               zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
-       }
-       zilc->zc_pad = 0;
-       zilc->zc_nused = lwb->lwb_nused;
-       zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
+       spa_config_enter(spa, SCL_STATE, lwb, RW_READER);
 
        /*
-        * clear unused data for security
+        * We've completed all potentially blocking operations.  Update the
+        * nlwb and allow it proceed without possible lock order reversals.
         */
-       memset(lwb->lwb_buf + lwb->lwb_nused, 0, wsz - lwb->lwb_nused);
+       mutex_enter(&zilog->zl_lock);
+       zil_lwb_set_zio_dependency(zilog, lwb);
+       lwb->lwb_state = LWB_STATE_ISSUED;
+
+       if (nlwb) {
+               nlwb->lwb_blk = *bp;
+               nlwb->lwb_error = error;
+               nlwb->lwb_slog = slog;
+               nlwb->lwb_alloc_txg = txg;
+               if (nlwb->lwb_state != LWB_STATE_READY)
+                       nlwb = NULL;
+       }
+       mutex_exit(&zilog->zl_lock);
 
        if (lwb->lwb_slog) {
                ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count);
@@ -1938,11 +1928,19 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
                ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_alloc,
                    BP_GET_LSIZE(&lwb->lwb_blk));
        }
-       ASSERT(spa_config_held(zilog->zl_spa, SCL_STATE, RW_READER));
-       zil_lwb_add_block(lwb, &lwb->lwb_blk);
        lwb->lwb_issued_timestamp = gethrtime();
        zio_nowait(lwb->lwb_root_zio);
        zio_nowait(lwb->lwb_write_zio);
+       if (lwb->lwb_child_zio)
+               zio_nowait(lwb->lwb_child_zio);
+
+       /*
+        * If nlwb was ready when we gave it the block pointer,
+        * it is on us to issue it and possibly following ones.
+        */
+       lwb = nlwb;
+       if (lwb)
+               goto next_lwb;
 }
 
 /*
@@ -2014,10 +2012,7 @@ zil_lwb_assign(zilog_t *zilog, lwb_t *lwb, itx_t *itx, list_t *ilwbs)
         * For more details, see the comment above zil_commit().
         */
        if (lr->lrc_txtype == TX_COMMIT) {
-               mutex_enter(&zilog->zl_lock);
                zil_commit_waiter_link_lwb(itx->itx_private, lwb);
-               itx->itx_private = NULL;
-               mutex_exit(&zilog->zl_lock);
                list_insert_tail(&lwb->lwb_itxs, itx);
                return (lwb);
        }
@@ -2036,17 +2031,17 @@ cont:
         * If this record won't fit in the current log block, start a new one.
         * For WR_NEED_COPY optimize layout for minimal number of chunks.
         */
-       lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+       lwb_sp = lwb->lwb_nmax - lwb->lwb_nused;
        max_log_data = zil_max_log_data(zilog, sizeof (lr_write_t));
        if (reclen > lwb_sp || (reclen + dlen > lwb_sp &&
            lwb_sp < zil_max_waste_space(zilog) &&
            (dlen % max_log_data == 0 ||
            lwb_sp < reclen + dlen % max_log_data))) {
-               lwb = zil_lwb_write_close(zilog, lwb, ilwbs);
+               list_insert_tail(ilwbs, lwb);
+               lwb = zil_lwb_write_close(zilog, lwb, LWB_STATE_OPENED);
                if (lwb == NULL)
                        return (NULL);
-               zil_lwb_write_open(zilog, lwb);
-               lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+               lwb_sp = lwb->lwb_nmax - lwb->lwb_nused;
 
                /*
                 * There must be enough space in the new, empty log block to
@@ -2084,7 +2079,7 @@ cont:
        clr->lrc_seq = ++zilog->zl_lr_seq;
 
        lwb->lwb_nused += reclen + dnow;
-       ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
+       ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_nmax);
        ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
 
        zil_lwb_add_txg(lwb, lr->lrc_txg);
@@ -2096,22 +2091,9 @@ cont:
                goto cont;
        }
 
-       /*
-        * We have to really issue all queued LWBs before we may have to
-        * wait for a txg sync.  Otherwise we may end up in a dead lock.
-        */
-       if (lr->lrc_txtype == TX_WRITE) {
-               boolean_t frozen = lr->lrc_txg > spa_freeze_txg(zilog->zl_spa);
-               if (frozen || itx->itx_wr_state == WR_INDIRECT) {
-                       lwb_t *tlwb;
-                       while ((tlwb = list_remove_head(ilwbs)) != NULL)
-                               zil_lwb_write_issue(zilog, tlwb);
-               }
-               if (itx->itx_wr_state == WR_INDIRECT)
-                       lwb->lwb_indirect = B_TRUE;
-               if (frozen)
-                       txg_wait_synced(zilog->zl_dmu_pool, lr->lrc_txg);
-       }
+       if (lr->lrc_txtype == TX_WRITE &&
+           lr->lrc_txg > spa_freeze_txg(zilog->zl_spa))
+               txg_wait_synced(zilog->zl_dmu_pool, lr->lrc_txg);
 
        return (lwb);
 }
@@ -2174,26 +2156,24 @@ zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx)
                                ZIL_STAT_BUMP(zilog, zil_itx_indirect_count);
                                ZIL_STAT_INCR(zilog, zil_itx_indirect_bytes,
                                    lrw->lr_length);
+                               if (lwb->lwb_child_zio == NULL) {
+                                       lwb->lwb_child_zio = zio_root(
+                                           zilog->zl_spa, NULL, NULL,
+                                           ZIO_FLAG_CANFAIL);
+                               }
                        }
 
                        /*
-                        * We pass in the "lwb_write_zio" rather than
-                        * "lwb_root_zio" so that the "lwb_write_zio"
-                        * becomes the parent of any zio's created by
-                        * the "zl_get_data" callback. The vdevs are
-                        * flushed after the "lwb_write_zio" completes,
-                        * so we want to make sure that completion
-                        * callback waits for these additional zio's,
-                        * such that the vdevs used by those zio's will
-                        * be included in the lwb's vdev tree, and those
-                        * vdevs will be properly flushed. If we passed
-                        * in "lwb_root_zio" here, then these additional
-                        * vdevs may not be flushed; e.g. if these zio's
-                        * completed after "lwb_write_zio" completed.
+                        * The "lwb_child_zio" we pass in will become a child of
+                        * "lwb_write_zio", when one is created, so one will be
+                        * a parent of any zio's created by the "zl_get_data".
+                        * This way "lwb_write_zio" will first wait for children
+                        * block pointers before own writing, and then for their
+                        * writing completion before the vdev cache flushing.
                         */
                        error = zilog->zl_get_data(itx->itx_private,
                            itx->itx_gen, lrwb, dbuf, lwb,
-                           lwb->lwb_write_zio);
+                           lwb->lwb_child_zio);
                        if (dbuf != NULL && error == 0) {
                                /* Zero any padding bytes in the last block. */
                                memset((char *)dbuf + lrwb->lr_length, 0,
@@ -2226,12 +2206,8 @@ zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx)
                                    error);
                                zfs_fallthrough;
                        case EIO:
-                               if (lwb->lwb_indirect) {
-                                       txg_wait_synced(zilog->zl_dmu_pool,
-                                           lr->lrc_txg);
-                               } else {
-                                       lwb->lwb_write_zio->io_error = error;
-                               }
+                               txg_wait_synced(zilog->zl_dmu_pool,
+                                   lr->lrc_txg);
                                zfs_fallthrough;
                        case ENOENT:
                                zfs_fallthrough;
@@ -2675,7 +2651,6 @@ zil_prune_commit_list(zilog_t *zilog)
                        zil_commit_waiter_skip(itx->itx_private);
                } else {
                        zil_commit_waiter_link_lwb(itx->itx_private, last_lwb);
-                       itx->itx_private = NULL;
                }
 
                mutex_exit(&zilog->zl_lock);
@@ -2753,10 +2728,9 @@ zil_process_commit_list(zilog_t *zilog, zil_commit_waiter_t *zcw, list_t *ilwbs)
                 * have already been created (zl_lwb_list not empty).
                 */
                zil_commit_activate_saxattr_feature(zilog);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
-               first = (lwb->lwb_state != LWB_STATE_OPENED) &&
+               ASSERT(lwb->lwb_state == LWB_STATE_NEW ||
+                   lwb->lwb_state == LWB_STATE_OPENED);
+               first = (lwb->lwb_state == LWB_STATE_NEW) &&
                    ((plwb = list_prev(&zilog->zl_lwb_list, lwb)) == NULL ||
                    plwb->lwb_state == LWB_STATE_FLUSH_DONE);
        }
@@ -2880,37 +2854,32 @@ zil_process_commit_list(zilog_t *zilog, zil_commit_waiter_t *zcw, list_t *ilwbs)
        } else {
                ASSERT(list_is_empty(&nolwb_waiters));
                ASSERT3P(lwb, !=, NULL);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
+               ASSERT(lwb->lwb_state == LWB_STATE_NEW ||
+                   lwb->lwb_state == LWB_STATE_OPENED);
 
                /*
                 * At this point, the ZIL block pointed at by the "lwb"
-                * variable is in one of the following states: "closed"
-                * or "open".
+                * variable is in "new" or "opened" state.
                 *
-                * If it's "closed", then no itxs have been committed to
-                * it, so there's no point in issuing its zio (i.e. it's
-                * "empty").
+                * If it's "new", then no itxs have been committed to it, so
+                * there's no point in issuing its zio (i.e. it's "empty").
                 *
-                * If it's "open", then it contains one or more itxs that
+                * If it's "opened", then it contains one or more itxs that
                 * eventually need to be committed to stable storage. In
                 * this case we intentionally do not issue the lwb's zio
                 * to disk yet, and instead rely on one of the following
                 * two mechanisms for issuing the zio:
                 *
-                * 1. Ideally, there will be more ZIL activity occurring
-                * on the system, such that this function will be
-                * immediately called again (not necessarily by the same
-                * thread) and this lwb's zio will be issued via
-                * zil_lwb_assign(). This way, the lwb is guaranteed to
-                * be "full" when it is issued to disk, and we'll make
-                * use of the lwb's size the best we can.
+                * 1. Ideally, there will be more ZIL activity occurring on
+                * the system, such that this function will be immediately
+                * called again by different thread and this lwb will be
+                * closed by zil_lwb_assign().  This way, the lwb will be
+                * "full" when it is issued to disk, and we'll make use of
+                * the lwb's size the best we can.
                 *
                 * 2. If there isn't sufficient ZIL activity occurring on
-                * the system, such that this lwb's zio isn't issued via
-                * zil_lwb_assign(), zil_commit_waiter() will issue the
-                * lwb's zio. If this occurs, the lwb is not guaranteed
+                * the system, zil_commit_waiter() will close it and issue
+                * the zio.  If this occurs, the lwb is not guaranteed
                 * to be "full" by the time its zio is issued, and means
                 * the size of the lwb was "too large" given the amount
                 * of ZIL activity occurring on the system at that time.
@@ -2940,8 +2909,11 @@ zil_process_commit_list(zilog_t *zilog, zil_commit_waiter_t *zcw, list_t *ilwbs)
                        hrtime_t sleep = zilog->zl_last_lwb_latency *
                            zfs_commit_timeout_pct / 100;
                        if (sleep < zil_min_commit_timeout ||
-                           lwb->lwb_sz - lwb->lwb_nused < lwb->lwb_sz / 8) {
-                               lwb = zil_lwb_write_close(zilog, lwb, ilwbs);
+                           lwb->lwb_nmax - lwb->lwb_nused <
+                           lwb->lwb_nmax / 8) {
+                               list_insert_tail(ilwbs, lwb);
+                               lwb = zil_lwb_write_close(zilog, lwb,
+                                   LWB_STATE_NEW);
                                zilog->zl_cur_used = 0;
                                if (lwb == NULL) {
                                        while ((lwb = list_remove_head(ilwbs))
@@ -3024,7 +2996,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
 
        lwb_t *lwb = zcw->zcw_lwb;
        ASSERT3P(lwb, !=, NULL);
-       ASSERT3S(lwb->lwb_state, !=, LWB_STATE_CLOSED);
+       ASSERT3S(lwb->lwb_state, !=, LWB_STATE_NEW);
 
        /*
         * If the lwb has already been issued by another thread, we can
@@ -3033,9 +3005,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         * do this prior to acquiring the zl_issuer_lock, to avoid
         * acquiring it when it's not necessary to do so.
         */
-       if (lwb->lwb_state == LWB_STATE_ISSUED ||
-           lwb->lwb_state == LWB_STATE_WRITE_DONE ||
-           lwb->lwb_state == LWB_STATE_FLUSH_DONE)
+       if (lwb->lwb_state != LWB_STATE_OPENED)
                return;
 
        /*
@@ -3058,8 +3028,8 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         * wind up with a use-after-free error below.
         */
        if (zcw->zcw_done) {
-               lwb = NULL;
-               goto out;
+               mutex_exit(&zilog->zl_issuer_lock);
+               return;
        }
 
        ASSERT3P(lwb, ==, zcw->zcw_lwb);
@@ -3070,28 +3040,33 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         * second time while holding the lock.
         *
         * We don't need to hold the zl_lock since the lwb cannot transition
-        * from OPENED to ISSUED while we hold the zl_issuer_lock. The lwb
-        * _can_ transition from ISSUED to DONE, but it's OK to race with
+        * from OPENED to CLOSED while we hold the zl_issuer_lock. The lwb
+        * _can_ transition from CLOSED to DONE, but it's OK to race with
         * that transition since we treat the lwb the same, whether it's in
-        * the ISSUED or DONE states.
+        * the CLOSED, ISSUED or DONE states.
         *
         * The important thing, is we treat the lwb differently depending on
-        * if it's ISSUED or OPENED, and block any other threads that might
-        * attempt to issue this lwb. For that reason we hold the
+        * if it's OPENED or CLOSED, and block any other threads that might
+        * attempt to close/issue this lwb. For that reason we hold the
         * zl_issuer_lock when checking the lwb_state; we must not call
-        * zil_lwb_write_close() if the lwb had already been issued.
+        * zil_lwb_write_close() if the lwb had already been closed/issued.
         *
         * See the comment above the lwb_state_t structure definition for
         * more details on the lwb states, and locking requirements.
         */
-       if (lwb->lwb_state == LWB_STATE_ISSUED ||
-           lwb->lwb_state == LWB_STATE_WRITE_DONE ||
-           lwb->lwb_state == LWB_STATE_FLUSH_DONE) {
-               lwb = NULL;
-               goto out;
+       if (lwb->lwb_state != LWB_STATE_OPENED) {
+               mutex_exit(&zilog->zl_issuer_lock);
+               return;
        }
 
-       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
+       /*
+        * We do not need zcw_lock once we hold zl_issuer_lock and know lwb
+        * is still open.  But we have to drop it to avoid a deadlock in case
+        * callback of zio issued by zil_lwb_write_issue() try to get it,
+        * while zil_lwb_write_issue() is blocked on attempt to issue next
+        * lwb it found in LWB_STATE_READY state.
+        */
+       mutex_exit(&zcw->zcw_lock);
 
        /*
         * As described in the comments above zil_commit_waiter() and
@@ -3099,9 +3074,9 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         * since we've reached the commit waiter's timeout and it still
         * hasn't been issued.
         */
-       lwb_t *nlwb = zil_lwb_write_close(zilog, lwb, NULL);
+       lwb_t *nlwb = zil_lwb_write_close(zilog, lwb, LWB_STATE_NEW);
 
-       ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
+       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_CLOSED);
 
        /*
         * Since the lwb's zio hadn't been issued by the time this thread
@@ -3124,34 +3099,15 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
                 * "next" lwb on-disk. When this occurs, the ZIL write
                 * pipeline must be stalled; see the comment within the
                 * zil_commit_writer_stall() function for more details.
-                *
-                * We must drop the commit waiter's lock prior to
-                * calling zil_commit_writer_stall() or else we can wind
-                * up with the following deadlock:
-                *
-                * - This thread is waiting for the txg to sync while
-                *   holding the waiter's lock; txg_wait_synced() is
-                *   used within txg_commit_writer_stall().
-                *
-                * - The txg can't sync because it is waiting for this
-                *   lwb's zio callback to call dmu_tx_commit().
-                *
-                * - The lwb's zio callback can't call dmu_tx_commit()
-                *   because it's blocked trying to acquire the waiter's
-                *   lock, which occurs prior to calling dmu_tx_commit()
                 */
-               mutex_exit(&zcw->zcw_lock);
                zil_lwb_write_issue(zilog, lwb);
-               lwb = NULL;
                zil_commit_writer_stall(zilog);
-               mutex_enter(&zcw->zcw_lock);
-       }
-
-out:
-       mutex_exit(&zilog->zl_issuer_lock);
-       if (lwb)
+               mutex_exit(&zilog->zl_issuer_lock);
+       } else {
+               mutex_exit(&zilog->zl_issuer_lock);
                zil_lwb_write_issue(zilog, lwb);
-       ASSERT(MUTEX_HELD(&zcw->zcw_lock));
+       }
+       mutex_enter(&zcw->zcw_lock);
 }
 
 /*
@@ -3216,7 +3172,7 @@ zil_commit_waiter(zilog_t *zilog, zil_commit_waiter_t *zcw)
                 * where it's "zcw_lwb" field is NULL, and it hasn't yet
                 * been skipped, so it's "zcw_done" field is still B_FALSE.
                 */
-               IMPLY(lwb != NULL, lwb->lwb_state != LWB_STATE_CLOSED);
+               IMPLY(lwb != NULL, lwb->lwb_state != LWB_STATE_NEW);
 
                if (lwb != NULL && lwb->lwb_state == LWB_STATE_OPENED) {
                        ASSERT3B(timedout, ==, B_FALSE);
@@ -3264,6 +3220,8 @@ zil_commit_waiter(zilog_t *zilog, zil_commit_waiter_t *zcw)
                         */
 
                        IMPLY(lwb != NULL,
+                           lwb->lwb_state == LWB_STATE_CLOSED ||
+                           lwb->lwb_state == LWB_STATE_READY ||
                            lwb->lwb_state == LWB_STATE_ISSUED ||
                            lwb->lwb_state == LWB_STATE_WRITE_DONE ||
                            lwb->lwb_state == LWB_STATE_FLUSH_DONE);
@@ -3618,10 +3576,11 @@ zil_sync(zilog_t *zilog, dmu_tx_t *tx)
        while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
                zh->zh_log = lwb->lwb_blk;
                if (lwb->lwb_state != LWB_STATE_FLUSH_DONE ||
-                   lwb->lwb_max_txg > txg)
+                   lwb->lwb_alloc_txg > txg || lwb->lwb_max_txg > txg)
                        break;
                list_remove(&zilog->zl_lwb_list, lwb);
-               zio_free(spa, txg, &lwb->lwb_blk);
+               if (!BP_IS_HOLE(&lwb->lwb_blk))
+                       zio_free(spa, txg, &lwb->lwb_blk);
                zil_free_lwb(zilog, lwb);
 
                /*
@@ -3825,17 +3784,18 @@ zil_close(zilog_t *zilog)
        }
 
        mutex_enter(&zilog->zl_lock);
+       txg = zilog->zl_dirty_max_txg;
        lwb = list_tail(&zilog->zl_lwb_list);
-       if (lwb == NULL)
-               txg = zilog->zl_dirty_max_txg;
-       else
-               txg = MAX(zilog->zl_dirty_max_txg, lwb->lwb_max_txg);
+       if (lwb != NULL) {
+               txg = MAX(txg, lwb->lwb_alloc_txg);
+               txg = MAX(txg, lwb->lwb_max_txg);
+       }
        mutex_exit(&zilog->zl_lock);
 
        /*
         * zl_lwb_max_issued_txg may be larger than lwb_max_txg. It depends
         * on the time when the dmu_tx transaction is assigned in
-        * zil_lwb_write_close().
+        * zil_lwb_write_issue().
         */
        mutex_enter(&zilog->zl_lwb_io_lock);
        txg = MAX(zilog->zl_lwb_max_issued_txg, txg);
@@ -3864,8 +3824,7 @@ zil_close(zilog_t *zilog)
        lwb = list_remove_head(&zilog->zl_lwb_list);
        if (lwb != NULL) {
                ASSERT(list_is_empty(&zilog->zl_lwb_list));
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
-
+               ASSERT3S(lwb->lwb_state, ==, LWB_STATE_NEW);
                zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
                zil_free_lwb(zilog, lwb);
        }
@@ -3986,7 +3945,7 @@ zil_suspend(const char *osname, void **cookiep)
 
        /*
         * We need to use zil_commit_impl to ensure we wait for all
-        * LWB_STATE_OPENED and LWB_STATE_ISSUED lwbs to be committed
+        * LWB_STATE_OPENED, _CLOSED and _READY lwbs to be committed
         * to disk before proceeding. If we used zil_commit instead, it
         * would just call txg_wait_synced(), because zl_suspend is set.
         * txg_wait_synced() doesn't wait for these lwb's to be
index b5627109900c1372629f959c1481feebd0b91ce0..7458b416c2921c9500c2bd0a50f7f2f225c04ea3 100644 (file)
@@ -4466,8 +4466,8 @@ zio_ready(zio_t *zio)
        zio_t *pio, *pio_next;
        zio_link_t *zl = NULL;
 
-       if (zio_wait_for_children(zio, ZIO_CHILD_GANG_BIT | ZIO_CHILD_DDT_BIT,
-           ZIO_WAIT_READY)) {
+       if (zio_wait_for_children(zio, ZIO_CHILD_LOGICAL_BIT |
+           ZIO_CHILD_GANG_BIT | ZIO_CHILD_DDT_BIT, ZIO_WAIT_READY)) {
                return (NULL);
        }
 
index bbef53e4e4797f208bffe3bd63ae193aeb69fe0a..cc11fd80618b52059827665eb9da4f642d5be346 100644 (file)
@@ -698,7 +698,6 @@ zvol_get_data(void *arg, uint64_t arg2, lr_write_t *lr, char *buf,
        int error;
 
        ASSERT3P(lwb, !=, NULL);
-       ASSERT3P(zio, !=, NULL);
        ASSERT3U(size, !=, 0);
 
        zgd = kmem_zalloc(sizeof (zgd_t), KM_SLEEP);
@@ -717,6 +716,7 @@ zvol_get_data(void *arg, uint64_t arg2, lr_write_t *lr, char *buf,
                error = dmu_read_by_dnode(zv->zv_dn, offset, size, buf,
                    DMU_READ_NO_PREFETCH);
        } else { /* indirect write */
+               ASSERT3P(zio, !=, NULL);
                /*
                 * Have to lock the whole block to ensure when it's written out
                 * and its checksum is being calculated that no one can change