]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/zil.c
Fix the spelling of deferred
[mirror_zfs.git] / module / zfs / zil.c
index d9ae1f413e944b8524ca3ca67faf4e609b5f590d..ff14a98b6b256d9674ab28d32d4f42f3ef568056 100644 (file)
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
  * Copyright (c) 2014 Integros [integros.com]
+ * Copyright (c) 2018 Datto Inc.
  */
 
 /* Portions Copyright 2010 Robert Milkowski */
 
 #include <sys/zfs_context.h>
 #include <sys/spa.h>
+#include <sys/spa_impl.h>
 #include <sys/dmu.h>
 #include <sys/zap.h>
 #include <sys/arc.h>
 #include <sys/stat.h>
-#include <sys/resource.h>
 #include <sys/zil.h>
 #include <sys/zil_impl.h>
 #include <sys/dsl_dataset.h>
@@ -117,11 +118,12 @@ static kstat_t *zil_ksp;
 int zil_replay_disable = 0;
 
 /*
- * Tunable parameter for debugging or performance analysis.  Setting
- * zfs_nocacheflush will cause corruption on power loss if a volatile
- * out-of-order write cache is enabled.
+ * Disable the DKIOCFLUSHWRITECACHE commands that are normally sent to
+ * the disk(s) by the ZIL after an LWB write has completed. Setting this
+ * will cause ZIL corruption on power loss if a volatile out-of-order
+ * write cache is enabled.
  */
-int zfs_nocacheflush = 0;
+int zil_nocacheflush = 0;
 
 /*
  * Limit SLOG write size per commit executed with synchronous priority.
@@ -430,6 +432,35 @@ done:
        return (error);
 }
 
+/* ARGSUSED */
+static int
+zil_clear_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
+{
+       ASSERT(!BP_IS_HOLE(bp));
+
+       /*
+        * As we call this function from the context of a rewind to a
+        * checkpoint, each ZIL block whose txg is later than the txg
+        * that we rewind to is invalid. Thus, we return -1 so
+        * zil_parse() doesn't attempt to read it.
+        */
+       if (bp->blk_birth >= first_txg)
+               return (-1);
+
+       if (zil_bp_tree_add(zilog, bp) != 0)
+               return (0);
+
+       zio_free(zilog->zl_spa, first_txg, bp);
+       return (0);
+}
+
+/* ARGSUSED */
+static int
+zil_noop_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
+{
+       return (0);
+}
+
 static int
 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
 {
@@ -476,7 +507,7 @@ zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
 static int
 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
 {
-       zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
+       zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 
        return (0);
 }
@@ -557,7 +588,7 @@ zil_free_lwb(zilog_t *zilog, lwb_t *lwb)
        ASSERT3P(lwb->lwb_root_zio, ==, NULL);
        ASSERT3U(lwb->lwb_max_txg, <=, spa_syncing_txg(zilog->zl_spa));
        ASSERT(lwb->lwb_state == LWB_STATE_CLOSED ||
-           lwb->lwb_state == LWB_STATE_DONE);
+           lwb->lwb_state == LWB_STATE_FLUSH_DONE);
 
        /*
         * Clear the zilog's field to indicate this lwb is no longer
@@ -662,7 +693,7 @@ zil_create(zilog_t *zilog)
                txg = dmu_tx_get_txg(tx);
 
                if (!BP_IS_HOLE(&blk)) {
-                       zio_free_zil(zilog->zl_spa, txg, &blk);
+                       zio_free(zilog->zl_spa, txg, &blk);
                        BP_ZERO(&blk);
                }
 
@@ -690,7 +721,8 @@ zil_create(zilog_t *zilog)
                txg_wait_synced(zilog->zl_dmu_pool, txg);
        }
 
-       ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
+       ASSERT(error != 0 || bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
+       IMPLY(error == 0, lwb != NULL);
 
        return (lwb);
 }
@@ -767,8 +799,8 @@ int
 zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
 {
        dmu_tx_t *tx = txarg;
-       uint64_t first_txg = dmu_tx_get_txg(tx);
        zilog_t *zilog;
+       uint64_t first_txg;
        zil_header_t *zh;
        objset_t *os;
        int error;
@@ -790,10 +822,43 @@ zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
 
        zilog = dmu_objset_zil(os);
        zh = zil_header_in_syncing_context(zilog);
+       ASSERT3U(tx->tx_txg, ==, spa_first_txg(zilog->zl_spa));
+       first_txg = spa_min_claim_txg(zilog->zl_spa);
 
-       if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
-               if (!BP_IS_HOLE(&zh->zh_log))
-                       zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
+       /*
+        * If the spa_log_state is not set to be cleared, check whether
+        * the current uberblock is a checkpoint one and if the current
+        * header has been claimed before moving on.
+        *
+        * If the current uberblock is a checkpointed uberblock then
+        * one of the following scenarios took place:
+        *
+        * 1] We are currently rewinding to the checkpoint of the pool.
+        * 2] We crashed in the middle of a checkpoint rewind but we
+        *    did manage to write the checkpointed uberblock to the
+        *    vdev labels, so when we tried to import the pool again
+        *    the checkpointed uberblock was selected from the import
+        *    procedure.
+        *
+        * In both cases we want to zero out all the ZIL blocks, except
+        * the ones that have been claimed at the time of the checkpoint
+        * (their zh_claim_txg != 0). The reason is that these blocks
+        * may be corrupted since we may have reused their locations on
+        * disk after we took the checkpoint.
+        *
+        * We could try to set spa_log_state to SPA_LOG_CLEAR earlier
+        * when we first figure out whether the current uberblock is
+        * checkpointed or not. Unfortunately, that would discard all
+        * the logs, including the ones that are claimed, and we would
+        * leak space.
+        */
+       if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR ||
+           (zilog->zl_spa->spa_uberblock.ub_checkpoint_txg != 0 &&
+           zh->zh_claim_txg == 0)) {
+               if (!BP_IS_HOLE(&zh->zh_log)) {
+                       (void) zil_parse(zilog, zil_clear_log_block,
+                           zil_noop_log_record, tx, first_txg, B_FALSE);
+               }
                BP_ZERO(&zh->zh_log);
                if (os->os_encrypted)
                        os->os_next_write_raw[tx->tx_txg & TXG_MASK] = B_TRUE;
@@ -802,6 +867,12 @@ zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
                return (0);
        }
 
+       /*
+        * If we are not rewinding and opening the pool normally, then
+        * the min_claim_txg should be equal to the first txg of the pool.
+        */
+       ASSERT3U(first_txg, ==, spa_first_txg(zilog->zl_spa));
+
        /*
         * Claim all log blocks if we haven't already done so, and remember
         * the highest claimed sequence number.  This ensures that if we can
@@ -855,16 +926,17 @@ zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
        zilog = dmu_objset_zil(os);
        bp = (blkptr_t *)&zilog->zl_header->zh_log;
 
-       /*
-        * Check the first block and determine if it's on a log device
-        * which may have been removed or faulted prior to loading this
-        * pool.  If so, there's no point in checking the rest of the log
-        * as its content should have already been synced to the pool.
-        */
        if (!BP_IS_HOLE(bp)) {
                vdev_t *vd;
                boolean_t valid = B_TRUE;
 
+               /*
+                * Check the first block and determine if it's on a log device
+                * which may have been removed or faulted prior to loading this
+                * pool.  If so, there's no point in checking the rest of the
+                * log as its content should have already been synced to the
+                * pool.
+                */
                spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
                vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
                if (vd->vdev_islog && vdev_is_dead(vd))
@@ -873,6 +945,18 @@ zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
 
                if (!valid)
                        return (0);
+
+               /*
+                * Check whether the current uberblock is checkpointed (e.g.
+                * we are rewinding) and whether the current header has been
+                * claimed or not. If it hasn't then skip verifying it. We
+                * do this because its ZIL blocks may be part of the pool's
+                * state before the rewind, which is no longer valid.
+                */
+               zil_header_t *zh = zil_header_in_syncing_context(zilog);
+               if (zilog->zl_spa->spa_uberblock.ub_checkpoint_txg != 0 &&
+                   zh->zh_claim_txg == 0)
+                       return (0);
        }
 
        /*
@@ -883,8 +967,8 @@ zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
         * which will update spa_max_claim_txg.  See spa_load() for details.
         */
        error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
-           zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa),
-           B_FALSE);
+           zilog->zl_header->zh_claim_txg ? -1ULL :
+           spa_min_claim_txg(os->os_spa), B_FALSE);
 
        return ((error == ECKSUM || error == ENOENT) ? 0 : error);
 }
@@ -927,7 +1011,8 @@ zil_commit_waiter_link_lwb(zil_commit_waiter_t *zcw, lwb_t *lwb)
        ASSERT3P(zcw->zcw_lwb, ==, NULL);
        ASSERT3P(lwb, !=, NULL);
        ASSERT(lwb->lwb_state == LWB_STATE_OPENED ||
-           lwb->lwb_state == LWB_STATE_ISSUED);
+           lwb->lwb_state == LWB_STATE_ISSUED ||
+           lwb->lwb_state == LWB_STATE_WRITE_DONE);
 
        list_insert_tail(&lwb->lwb_waiters, zcw);
        zcw->zcw_lwb = lwb;
@@ -958,7 +1043,7 @@ zil_lwb_add_block(lwb_t *lwb, const blkptr_t *bp)
        int ndvas = BP_GET_NDVAS(bp);
        int i;
 
-       if (zfs_nocacheflush)
+       if (zil_nocacheflush)
                return;
 
        mutex_enter(&lwb->lwb_vdev_lock);
@@ -973,6 +1058,42 @@ zil_lwb_add_block(lwb_t *lwb, const blkptr_t *bp)
        mutex_exit(&lwb->lwb_vdev_lock);
 }
 
+static void
+zil_lwb_flush_defer(lwb_t *lwb, lwb_t *nlwb)
+{
+       avl_tree_t *src = &lwb->lwb_vdev_tree;
+       avl_tree_t *dst = &nlwb->lwb_vdev_tree;
+       void *cookie = NULL;
+       zil_vdev_node_t *zv;
+
+       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
+       ASSERT3S(nlwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
+       ASSERT3S(nlwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
+
+       /*
+        * While 'lwb' is at a point in its lifetime where lwb_vdev_tree does
+        * not need the protection of lwb_vdev_lock (it will only be modified
+        * while holding zilog->zl_lock) as its writes and those of its
+        * children have all completed.  The younger 'nlwb' may be waiting on
+        * future writes to additional vdevs.
+        */
+       mutex_enter(&nlwb->lwb_vdev_lock);
+       /*
+        * Tear down the 'lwb' vdev tree, ensuring that entries which do not
+        * exist in 'nlwb' are moved to it, freeing any would-be duplicates.
+        */
+       while ((zv = avl_destroy_nodes(src, &cookie)) != NULL) {
+               avl_index_t where;
+
+               if (avl_find(dst, zv, &where) == NULL) {
+                       avl_insert(dst, zv, where);
+               } else {
+                       kmem_free(zv, sizeof (*zv));
+               }
+       }
+       mutex_exit(&nlwb->lwb_vdev_lock);
+}
+
 void
 zil_lwb_add_txg(lwb_t *lwb, uint64_t txg)
 {
@@ -980,9 +1101,13 @@ zil_lwb_add_txg(lwb_t *lwb, uint64_t txg)
 }
 
 /*
- * This function is a called after all VDEVs associated with a given lwb
+ * This function is a called after all vdevs associated with a given lwb
  * write have completed their DKIOCFLUSHWRITECACHE command; or as soon
- * as the lwb write completes, if "zfs_nocacheflush" is set.
+ * as the lwb write completes, if "zil_nocacheflush" is set. Further,
+ * all "previous" lwb's will have completed before this function is
+ * called; i.e. this function is called for all previous lwbs before
+ * it's called for "this" lwb (enforced via zio the dependencies
+ * configured in zil_lwb_set_zio_dependency()).
  *
  * The intention is for this function to be called as soon as the
  * contents of an lwb are considered "stable" on disk, and will survive
@@ -1020,7 +1145,9 @@ zil_lwb_flush_vdevs_done(zio_t *zio)
        zilog->zl_last_lwb_latency = gethrtime() - lwb->lwb_issued_timestamp;
 
        lwb->lwb_root_zio = NULL;
-       lwb->lwb_state = LWB_STATE_DONE;
+
+       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
+       lwb->lwb_state = LWB_STATE_FLUSH_DONE;
 
        if (zilog->zl_last_lwb_opened == lwb) {
                /*
@@ -1066,14 +1193,17 @@ zil_lwb_flush_vdevs_done(zio_t *zio)
 }
 
 /*
- * This is called when an lwb write completes. This means, this specific
- * lwb was written to disk, and all dependent lwb have also been
- * written to disk.
- *
- * At this point, a DKIOCFLUSHWRITECACHE command hasn't been issued to
- * the VDEVs involved in writing out this specific lwb. The lwb will be
- * "done" once zil_lwb_flush_vdevs_done() is called, which occurs in the
- * zio completion callback for the lwb's root zio.
+ * This is called when an lwb's write zio completes. The callback's
+ * purpose is to issue the DKIOCFLUSHWRITECACHE commands for the vdevs
+ * in the lwb's lwb_vdev_tree. The tree will contain the vdevs involved
+ * in writing out this specific lwb's data, and in the case that cache
+ * flushes have been deferred, vdevs involved in writing the data for
+ * previous lwbs. The writes corresponding to all the vdevs in the
+ * lwb_vdev_tree will have completed by the time this is called, due to
+ * the zio dependencies configured in zil_lwb_set_zio_dependency(),
+ * which takes deferred flushes into account. The lwb will be "done"
+ * once zil_lwb_flush_vdevs_done() is called, which occurs in the zio
+ * completion callback for the lwb's root zio.
  */
 static void
 zil_lwb_write_done(zio_t *zio)
@@ -1084,6 +1214,7 @@ zil_lwb_write_done(zio_t *zio)
        avl_tree_t *t = &lwb->lwb_vdev_tree;
        void *cookie = NULL;
        zil_vdev_node_t *zv;
+       lwb_t *nlwb;
 
        ASSERT3S(spa_config_held(spa, SCL_STATE, RW_READER), !=, 0);
 
@@ -1097,11 +1228,12 @@ zil_lwb_write_done(zio_t *zio)
 
        abd_put(zio->io_abd);
 
-       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
-
        mutex_enter(&zilog->zl_lock);
+       ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
+       lwb->lwb_state = LWB_STATE_WRITE_DONE;
        lwb->lwb_write_zio = NULL;
        lwb->lwb_fastwrite = FALSE;
+       nlwb = list_next(&zilog->zl_lwb_list, lwb);
        mutex_exit(&zilog->zl_lock);
 
        if (avl_numnodes(t) == 0)
@@ -1120,6 +1252,27 @@ zil_lwb_write_done(zio_t *zio)
                return;
        }
 
+       /*
+        * If this lwb does not have any threads waiting for it to
+        * complete, we want to defer issuing the DKIOCFLUSHWRITECACHE
+        * command to the vdevs written to by "this" lwb, and instead
+        * rely on the "next" lwb to handle the DKIOCFLUSHWRITECACHE
+        * command for those vdevs. Thus, we merge the vdev tree of
+        * "this" lwb with the vdev tree of the "next" lwb in the list,
+        * and assume the "next" lwb will handle flushing the vdevs (or
+        * deferring the flush(s) again).
+        *
+        * This is a useful performance optimization, especially for
+        * workloads with lots of async write activity and few sync
+        * write and/or fsync activity, as it has the potential to
+        * coalesce multiple flush commands to a vdev into one.
+        */
+       if (list_head(&lwb->lwb_waiters) == NULL && nlwb != NULL) {
+               zil_lwb_flush_defer(lwb, nlwb);
+               ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
+               return;
+       }
+
        while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
                vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
                if (vd != NULL)
@@ -1128,6 +1281,73 @@ zil_lwb_write_done(zio_t *zio)
        }
 }
 
+static void
+zil_lwb_set_zio_dependency(zilog_t *zilog, lwb_t *lwb)
+{
+       lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
+
+       ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
+       ASSERT(MUTEX_HELD(&zilog->zl_lock));
+
+       /*
+        * The zilog's "zl_last_lwb_opened" field is used to build the
+        * lwb/zio dependency chain, which is used to preserve the
+        * ordering of lwb completions that is required by the semantics
+        * of the ZIL. Each new lwb zio becomes a parent of the
+        * "previous" lwb zio, such that the new lwb's zio cannot
+        * complete until the "previous" lwb's zio completes.
+        *
+        * This is required by the semantics of zil_commit(); the commit
+        * waiters attached to the lwbs will be woken in the lwb zio's
+        * completion callback, so this zio dependency graph ensures the
+        * waiters are woken in the correct order (the same order the
+        * lwbs were created).
+        */
+       if (last_lwb_opened != NULL &&
+           last_lwb_opened->lwb_state != LWB_STATE_FLUSH_DONE) {
+               ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
+                   last_lwb_opened->lwb_state == LWB_STATE_ISSUED ||
+                   last_lwb_opened->lwb_state == LWB_STATE_WRITE_DONE);
+
+               ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
+               zio_add_child(lwb->lwb_root_zio,
+                   last_lwb_opened->lwb_root_zio);
+
+               /*
+                * If the previous lwb's write hasn't already completed,
+                * we also want to order the completion of the lwb write
+                * zios (above, we only order the completion of the lwb
+                * root zios). This is required because of how we can
+                * defer the DKIOCFLUSHWRITECACHE commands for each lwb.
+                *
+                * When the DKIOCFLUSHWRITECACHE commands are deferred,
+                * the previous lwb will rely on this lwb to flush the
+                * vdevs written to by that previous lwb. Thus, we need
+                * to ensure this lwb doesn't issue the flush until
+                * after the previous lwb's write completes. We ensure
+                * this ordering by setting the zio parent/child
+                * relationship here.
+                *
+                * Without this relationship on the lwb's write zio,
+                * it's possible for this lwb's write to complete prior
+                * to the previous lwb's write completing; and thus, the
+                * vdevs for the previous lwb would be flushed prior to
+                * that lwb's data being written to those vdevs (the
+                * vdevs are flushed in the lwb write zio's completion
+                * handler, zil_lwb_write_done()).
+                */
+               if (last_lwb_opened->lwb_state != LWB_STATE_WRITE_DONE) {
+                       ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
+                           last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
+
+                       ASSERT3P(last_lwb_opened->lwb_write_zio, !=, NULL);
+                       zio_add_child(lwb->lwb_write_zio,
+                           last_lwb_opened->lwb_write_zio);
+               }
+       }
+}
+
+
 /*
  * This function's purpose is to "open" an lwb such that it is ready to
  * accept new itxs being committed to it. To do this, the lwb's zio
@@ -1179,30 +1399,7 @@ zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb)
 
                lwb->lwb_state = LWB_STATE_OPENED;
 
-               /*
-                * The zilog's "zl_last_lwb_opened" field is used to
-                * build the lwb/zio dependency chain, which is used to
-                * preserve the ordering of lwb completions that is
-                * required by the semantics of the ZIL. Each new lwb
-                * zio becomes a parent of the "previous" lwb zio, such
-                * that the new lwb's zio cannot complete until the
-                * "previous" lwb's zio completes.
-                *
-                * This is required by the semantics of zil_commit();
-                * the commit waiters attached to the lwbs will be woken
-                * in the lwb zio's completion callback, so this zio
-                * dependency graph ensures the waiters are woken in the
-                * correct order (the same order the lwbs were created).
-                */
-               lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
-               if (last_lwb_opened != NULL &&
-                   last_lwb_opened->lwb_state != LWB_STATE_DONE) {
-                       ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
-                           last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
-                       ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
-                       zio_add_child(lwb->lwb_root_zio,
-                           last_lwb_opened->lwb_root_zio);
-               }
+               zil_lwb_set_zio_dependency(zilog, lwb);
                zilog->zl_last_lwb_opened = lwb;
        }
        mutex_exit(&zilog->zl_lock);
@@ -1928,7 +2125,8 @@ zil_prune_commit_list(zilog_t *zilog)
                mutex_enter(&zilog->zl_lock);
 
                lwb_t *last_lwb = zilog->zl_last_lwb_opened;
-               if (last_lwb == NULL || last_lwb->lwb_state == LWB_STATE_DONE) {
+               if (last_lwb == NULL ||
+                   last_lwb->lwb_state == LWB_STATE_FLUSH_DONE) {
                        /*
                         * All of the itxs this waiter was waiting on
                         * must have already completed (or there were
@@ -2011,7 +2209,8 @@ zil_process_commit_list(zilog_t *zilog)
                lwb = zil_create(zilog);
        } else {
                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_DONE);
+               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
+               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
        }
 
        while ((itx = list_head(&zilog->zl_itx_commit_list)) != NULL) {
@@ -2133,7 +2332,8 @@ zil_process_commit_list(zilog_t *zilog)
                ASSERT(list_is_empty(&nolwb_waiters));
                ASSERT3P(lwb, !=, NULL);
                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
-               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_DONE);
+               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
+               ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
 
                /*
                 * At this point, the ZIL block pointed at by the "lwb"
@@ -2256,7 +2456,8 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         * acquiring it when it's not necessary to do so.
         */
        if (lwb->lwb_state == LWB_STATE_ISSUED ||
-           lwb->lwb_state == LWB_STATE_DONE)
+           lwb->lwb_state == LWB_STATE_WRITE_DONE ||
+           lwb->lwb_state == LWB_STATE_FLUSH_DONE)
                return;
 
        /*
@@ -2304,7 +2505,8 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         * more details on the lwb states, and locking requirements.
         */
        if (lwb->lwb_state == LWB_STATE_ISSUED ||
-           lwb->lwb_state == LWB_STATE_DONE)
+           lwb->lwb_state == LWB_STATE_WRITE_DONE ||
+           lwb->lwb_state == LWB_STATE_FLUSH_DONE)
                goto out;
 
        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
@@ -2317,7 +2519,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
         */
        lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb);
 
-       ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
+       IMPLY(nlwb != NULL, lwb->lwb_state != LWB_STATE_OPENED);
 
        /*
         * Since the lwb's zio hadn't been issued by the time this thread
@@ -2477,7 +2679,8 @@ zil_commit_waiter(zilog_t *zilog, zil_commit_waiter_t *zcw)
 
                        IMPLY(lwb != NULL,
                            lwb->lwb_state == LWB_STATE_ISSUED ||
-                           lwb->lwb_state == LWB_STATE_DONE);
+                           lwb->lwb_state == LWB_STATE_WRITE_DONE ||
+                           lwb->lwb_state == LWB_STATE_FLUSH_DONE);
                        cv_wait(&zcw->zcw_cv, &zcw->zcw_lock);
                }
        }
@@ -3029,7 +3232,7 @@ zil_close(zilog_t *zilog)
                txg_wait_synced(zilog->zl_dmu_pool, txg);
 
        if (zilog_is_dirty(zilog))
-               zfs_dbgmsg("zil (%p) is dirty, txg %llu", zilog, txg);
+               zfs_dbgmsg("zil (%px) is dirty, txg %llu", zilog, txg);
        if (txg < spa_freeze_txg(zilog->zl_spa))
                VERIFY(!zilog_is_dirty(zilog));
 
@@ -3154,13 +3357,13 @@ zil_suspend(const char *osname, void **cookiep)
         * grabbing a reference to it. If the key isn't loaded we have no
         * choice but to return an error until the wrapping key is loaded.
         */
-       if (os->os_encrypted && spa_keystore_create_mapping(os->os_spa,
-           dmu_objset_ds(os), FTAG) != 0) {
+       if (os->os_encrypted &&
+           dsl_dataset_create_key_mapping(dmu_objset_ds(os)) != 0) {
                zilog->zl_suspend--;
                mutex_exit(&zilog->zl_lock);
                dsl_dataset_long_rele(dmu_objset_ds(os), suspend_tag);
                dsl_dataset_rele(dmu_objset_ds(os), suspend_tag);
-               return (SET_ERROR(EBUSY));
+               return (SET_ERROR(EACCES));
        }
 
        zilog->zl_suspending = B_TRUE;
@@ -3172,13 +3375,13 @@ zil_suspend(const char *osname, void **cookiep)
         * to disk before proceeding. If we used zil_commit instead, it
         * would just call txg_wait_synced(), because zl_suspend is set.
         * txg_wait_synced() doesn't wait for these lwb's to be
-        * LWB_STATE_DONE before returning.
+        * LWB_STATE_FLUSH_DONE before returning.
         */
        zil_commit_impl(zilog, 0);
 
        /*
-        * Now that we've ensured all lwb's are LWB_STATE_DONE, we use
-        * txg_wait_synced() to ensure the data from the zilog has
+        * Now that we've ensured all lwb's are LWB_STATE_FLUSH_DONE, we
+        * use txg_wait_synced() to ensure the data from the zilog has
         * migrated to the main pool before calling zil_destroy().
         */
        txg_wait_synced(zilog->zl_dmu_pool, 0);
@@ -3190,19 +3393,8 @@ zil_suspend(const char *osname, void **cookiep)
        cv_broadcast(&zilog->zl_cv_suspend);
        mutex_exit(&zilog->zl_lock);
 
-       if (os->os_encrypted) {
-               /*
-                * Encrypted datasets need to wait for all data to be
-                * synced out before removing the mapping.
-                *
-                * XXX: Depending on the number of datasets with
-                * outstanding ZIL data on a given log device, this
-                * might cause spa_offline_log() to take a long time.
-                */
-               txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
-               VERIFY0(spa_keystore_remove_mapping(os->os_spa,
-                   dmu_objset_id(os), FTAG));
-       }
+       if (os->os_encrypted)
+               dsl_dataset_remove_key_mapping(dmu_objset_ds(os));
 
        if (cookiep == NULL)
                zil_resume(os);
@@ -3396,17 +3588,20 @@ zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
 
 /* ARGSUSED */
 int
-zil_vdev_offline(const char *osname, void *arg)
+zil_reset(const char *osname, void *arg)
 {
        int error;
 
        error = zil_suspend(osname, NULL);
+       /* EACCES means crypto key not loaded */
+       if ((error == EACCES) || (error == EBUSY))
+               return (SET_ERROR(error));
        if (error != 0)
                return (SET_ERROR(EEXIST));
        return (0);
 }
 
-#if defined(_KERNEL) && defined(HAVE_SPL)
+#if defined(_KERNEL)
 EXPORT_SYMBOL(zil_alloc);
 EXPORT_SYMBOL(zil_free);
 EXPORT_SYMBOL(zil_open);
@@ -3419,7 +3614,6 @@ EXPORT_SYMBOL(zil_itx_create);
 EXPORT_SYMBOL(zil_itx_destroy);
 EXPORT_SYMBOL(zil_itx_assign);
 EXPORT_SYMBOL(zil_commit);
-EXPORT_SYMBOL(zil_vdev_offline);
 EXPORT_SYMBOL(zil_claim);
 EXPORT_SYMBOL(zil_check_log_chain);
 EXPORT_SYMBOL(zil_sync);
@@ -3438,8 +3632,8 @@ MODULE_PARM_DESC(zfs_commit_timeout_pct, "ZIL block open timeout percentage");
 module_param(zil_replay_disable, int, 0644);
 MODULE_PARM_DESC(zil_replay_disable, "Disable intent logging replay");
 
-module_param(zfs_nocacheflush, int, 0644);
-MODULE_PARM_DESC(zfs_nocacheflush, "Disable cache flushes");
+module_param(zil_nocacheflush, int, 0644);
+MODULE_PARM_DESC(zil_nocacheflush, "Disable ZIL cache flushes");
 
 module_param(zil_slog_bulk, ulong, 0644);
 MODULE_PARM_DESC(zil_slog_bulk, "Limit in bytes slog sync writes per commit");