]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/zil.c
Encryption patch follow-up
[mirror_zfs.git] / module / zfs / zil.c
index b69a7bf56eaf63265fd3e92abf50c42759ee8ce8..e90e583ae06eef8735d44b7a726aa344600f2ead 100644 (file)
@@ -20,7 +20,8 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2013 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
+ * Copyright (c) 2014 Integros [integros.com]
  */
 
 /* Portions Copyright 2010 Robert Milkowski */
@@ -39,6 +40,8 @@
 #include <sys/dmu_tx.h>
 #include <sys/dsl_pool.h>
 #include <sys/metaslab.h>
+#include <sys/trace_zil.h>
+#include <sys/abd.h>
 
 /*
  * The zfs intent log (ZIL) saves transaction records of system calls
@@ -99,6 +102,13 @@ int zil_replay_disable = 0;
  */
 int zfs_nocacheflush = 0;
 
+/*
+ * Limit SLOG write size per commit executed with synchronous priority.
+ * Any writes above that will be executed with lower (asynchronous) priority
+ * to limit potential SLOG device abuse by single active ZIL writer.
+ */
+unsigned long zil_slog_bulk = 768 * 1024;
+
 static kmem_cache_t *zil_lwb_cache;
 
 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
@@ -106,33 +116,17 @@ static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
 #define        LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
 
-
-/*
- * ziltest is by and large an ugly hack, but very useful in
- * checking replay without tedious work.
- * When running ziltest we want to keep all itx's and so maintain
- * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
- * We subtract TXG_CONCURRENT_STATES to allow for common code.
- */
-#define        ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
-
 static int
 zil_bp_compare(const void *x1, const void *x2)
 {
        const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
        const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
 
-       if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
-               return (-1);
-       if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
-               return (1);
-
-       if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
-               return (-1);
-       if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
-               return (1);
+       int cmp = AVL_CMP(DVA_GET_VDEV(dva1), DVA_GET_VDEV(dva2));
+       if (likely(cmp))
+               return (cmp);
 
-       return (0);
+       return (AVL_CMP(DVA_GET_OFFSET(dva1), DVA_GET_OFFSET(dva2)));
 }
 
 static void
@@ -159,14 +153,19 @@ int
 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
 {
        avl_tree_t *t = &zilog->zl_bp_tree;
-       const dva_t *dva = BP_IDENTITY(bp);
+       const dva_t *dva;
        zil_bp_node_t *zn;
        avl_index_t where;
 
+       if (BP_IS_EMBEDDED(bp))
+               return (0);
+
+       dva = BP_IDENTITY(bp);
+
        if (avl_find(t, dva, &where) != NULL)
                return (SET_ERROR(EEXIST));
 
-       zn = kmem_alloc(sizeof (zil_bp_node_t), KM_PUSHPAGE);
+       zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
        zn->zn_dva = *dva;
        avl_insert(t, zn, where);
 
@@ -194,13 +193,13 @@ zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
  * Read a log block and make sure it's valid.
  */
 static int
-zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
-    char **end)
+zil_read_log_block(zilog_t *zilog, boolean_t decrypt, const blkptr_t *bp,
+    blkptr_t *nbp, void *dst, char **end)
 {
        enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
-       uint32_t aflags = ARC_WAIT;
+       arc_flags_t aflags = ARC_FLAG_WAIT;
        arc_buf_t *abuf = NULL;
-       zbookmark_t zb;
+       zbookmark_phys_t zb;
        int error;
 
        if (zilog->zl_header->zh_claim_txg == 0)
@@ -209,11 +208,14 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
        if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
                zio_flags |= ZIO_FLAG_SPECULATIVE;
 
+       if (!decrypt)
+               zio_flags |= ZIO_FLAG_RAW;
+
        SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
            ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
 
-       error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
-           ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
+       error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func,
+           &abuf, ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
 
        if (error == 0) {
                zio_cksum_t cksum = bp->blk_cksum;
@@ -237,6 +239,7 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
                            sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
                                error = SET_ERROR(ECKSUM);
                        } else {
+                               ASSERT3U(len, <=, SPA_OLD_MAXBLOCKSIZE);
                                bcopy(lr, dst, len);
                                *end = (char *)dst + len;
                                *nbp = zilc->zc_next_blk;
@@ -251,13 +254,15 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
                            (zilc->zc_nused > (size - sizeof (*zilc)))) {
                                error = SET_ERROR(ECKSUM);
                        } else {
+                               ASSERT3U(zilc->zc_nused, <=,
+                                   SPA_OLD_MAXBLOCKSIZE);
                                bcopy(lr, dst, zilc->zc_nused);
                                *end = (char *)dst + zilc->zc_nused;
                                *nbp = zilc->zc_next_blk;
                        }
                }
 
-               VERIFY(arc_buf_remove_ref(abuf, &abuf));
+               arc_buf_destroy(abuf, &abuf);
        }
 
        return (error);
@@ -271,9 +276,9 @@ zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
 {
        enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
        const blkptr_t *bp = &lr->lr_blkptr;
-       uint32_t aflags = ARC_WAIT;
+       arc_flags_t aflags = ARC_FLAG_WAIT;
        arc_buf_t *abuf = NULL;
-       zbookmark_t zb;
+       zbookmark_phys_t zb;
        int error;
 
        if (BP_IS_HOLE(bp)) {
@@ -285,6 +290,14 @@ zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
        if (zilog->zl_header->zh_claim_txg == 0)
                zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 
+       /*
+        * If we are not using the resulting data, we are just checking that
+        * it hasn't been corrupted so we don't need to waste CPU time
+        * decompressing and decrypting it.
+        */
+       if (wbuf == NULL)
+               zio_flags |= ZIO_FLAG_RAW;
+
        SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
            ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
 
@@ -294,7 +307,7 @@ zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
        if (error == 0) {
                if (wbuf != NULL)
                        bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
-               (void) arc_buf_remove_ref(abuf, &abuf);
+               arc_buf_destroy(abuf, &abuf);
        }
 
        return (error);
@@ -305,7 +318,8 @@ zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
  */
 int
 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
-    zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
+    zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg,
+    boolean_t decrypt)
 {
        const zil_header_t *zh = zilog->zl_header;
        boolean_t claimed = !!zh->zh_claim_txg;
@@ -336,7 +350,7 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
         * If the log has been claimed, stop if we encounter a sequence
         * number greater than the highest claimed sequence number.
         */
-       lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
+       lrbuf = zio_buf_alloc(SPA_OLD_MAXBLOCKSIZE);
        zil_bp_tree_init(zilog);
 
        for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
@@ -346,7 +360,9 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
 
                if (blk_seq > claim_blk_seq)
                        break;
-               if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
+
+               error = parse_blk_func(zilog, &blk, arg, txg);
+               if (error != 0)
                        break;
                ASSERT3U(max_blk_seq, <, blk_seq);
                max_blk_seq = blk_seq;
@@ -355,7 +371,8 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
                if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
                        break;
 
-               error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
+               error = zil_read_log_block(zilog, decrypt, &blk, &next_blk,
+                   lrbuf, &end);
                if (error != 0)
                        break;
 
@@ -365,7 +382,9 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
                        ASSERT3U(reclen, >=, sizeof (lr_t));
                        if (lr->lrc_seq > claim_lr_seq)
                                goto done;
-                       if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
+
+                       error = parse_lr_func(zilog, lr, arg, txg);
+                       if (error != 0)
                                goto done;
                        ASSERT3U(max_lr_seq, <, lr->lrc_seq);
                        max_lr_seq = lr->lrc_seq;
@@ -380,10 +399,11 @@ done:
        zilog->zl_parse_lr_count = lr_count;
 
        ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
-           (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
+           (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq) ||
+           (decrypt && error == EIO));
 
        zil_bp_tree_fini(zilog);
-       zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
+       zio_buf_free(lrbuf, SPA_OLD_MAXBLOCKSIZE);
 
        return (error);
 }
@@ -395,7 +415,8 @@ zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
         * Claim log block if not already committed and not already claimed.
         * If tx == NULL, just verify that the block is claimable.
         */
-       if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
+       if (BP_IS_HOLE(bp) || bp->blk_birth < first_txg ||
+           zil_bp_tree_add(zilog, bp) != 0)
                return (0);
 
        return (zio_wait(zio_claim(NULL, zilog->zl_spa,
@@ -420,9 +441,12 @@ zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
         * waited for all writes to be stable first), so it is semantically
         * correct to declare this the end of the log.
         */
-       if (lr->lr_blkptr.blk_birth >= first_txg &&
-           (error = zil_read_log_data(zilog, lr, NULL)) != 0)
-               return (error);
+       if (lr->lr_blkptr.blk_birth >= first_txg) {
+               error = zil_read_log_data(zilog, lr, NULL);
+               if (error != 0)
+                       return (error);
+       }
+
        return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
 }
 
@@ -445,21 +469,24 @@ zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
         * If we previously claimed it, we need to free it.
         */
        if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
-           bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
+           bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0 &&
+           !BP_IS_HOLE(bp))
                zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 
        return (0);
 }
 
 static lwb_t *
-zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg, boolean_t fastwrite)
+zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg,
+    boolean_t fastwrite)
 {
        lwb_t *lwb;
 
-       lwb = kmem_cache_alloc(zil_lwb_cache, KM_PUSHPAGE);
+       lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
        lwb->lwb_zilog = zilog;
        lwb->lwb_blk = *bp;
        lwb->lwb_fastwrite = fastwrite;
+       lwb->lwb_slog = slog;
        lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
        lwb->lwb_max_txg = txg;
        lwb->lwb_zio = NULL;
@@ -489,7 +516,7 @@ zilog_dirty(zilog_t *zilog, uint64_t txg)
        dsl_pool_t *dp = zilog->zl_dmu_pool;
        dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
 
-       if (dsl_dataset_is_snapshot(ds))
+       if (ds->ds_is_snapshot)
                panic("dirtying snapshot!");
 
        if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg)) {
@@ -498,6 +525,27 @@ zilog_dirty(zilog_t *zilog, uint64_t txg)
        }
 }
 
+/*
+ * Determine if the zil is dirty in the specified txg. Callers wanting to
+ * ensure that the dirty state does not change must hold the itxg_lock for
+ * the specified txg. Holding the lock will ensure that the zil cannot be
+ * dirtied (zil_itx_assign) or cleaned (zil_clean) while we check its current
+ * state.
+ */
+boolean_t
+zilog_is_dirty_in_txg(zilog_t *zilog, uint64_t txg)
+{
+       dsl_pool_t *dp = zilog->zl_dmu_pool;
+
+       if (txg_list_member(&dp->dp_dirty_zilogs, zilog, txg & TXG_MASK))
+               return (B_TRUE);
+       return (B_FALSE);
+}
+
+/*
+ * Determine if the zil is dirty. The zil is considered dirty if it has
+ * any pending itx records that have not been cleaned by zil_clean().
+ */
 boolean_t
 zilog_is_dirty(zilog_t *zilog)
 {
@@ -524,6 +572,7 @@ zil_create(zilog_t *zilog)
        blkptr_t blk;
        int error = 0;
        boolean_t fastwrite = FALSE;
+       boolean_t slog = FALSE;
 
        /*
         * Wait for any previous destroy to complete.
@@ -538,7 +587,7 @@ zil_create(zilog_t *zilog)
        /*
         * Allocate an initial log block if:
         *    - there isn't one already
-        *    - the existing block is the wrong endianess
+        *    - the existing block is the wrong endianness
         */
        if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
                tx = dmu_tx_create(zilog->zl_os);
@@ -551,8 +600,8 @@ zil_create(zilog_t *zilog)
                        BP_ZERO(&blk);
                }
 
-               error = zio_alloc_zil(zilog->zl_spa, txg, &blk,
-                   ZIL_MIN_BLKSZ, B_TRUE);
+               error = zio_alloc_zil(zilog->zl_spa, zilog->zl_os, txg, &blk,
+                   ZIL_MIN_BLKSZ, &slog);
                fastwrite = TRUE;
 
                if (error == 0)
@@ -563,7 +612,7 @@ zil_create(zilog_t *zilog)
         * Allocate a log write buffer (lwb) for the first log block.
         */
        if (error == 0)
-               lwb = zil_alloc_lwb(zilog, &blk, txg, fastwrite);
+               lwb = zil_alloc_lwb(zilog, &blk, slog, txg, fastwrite);
 
        /*
         * If we just allocated the first log block, commit our transaction
@@ -645,11 +694,11 @@ zil_destroy_sync(zilog_t *zilog, dmu_tx_t *tx)
 {
        ASSERT(list_is_empty(&zilog->zl_lwb_list));
        (void) zil_parse(zilog, zil_free_log_block,
-           zil_free_log_record, tx, zilog->zl_header->zh_claim_txg);
+           zil_free_log_record, tx, zilog->zl_header->zh_claim_txg, B_FALSE);
 }
 
 int
-zil_claim(const char *osname, void *txarg)
+zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
 {
        dmu_tx_t *tx = txarg;
        uint64_t first_txg = dmu_tx_get_txg(tx);
@@ -658,9 +707,18 @@ zil_claim(const char *osname, void *txarg)
        objset_t *os;
        int error;
 
-       error = dmu_objset_own(osname, DMU_OST_ANY, B_FALSE, FTAG, &os);
+       error = dmu_objset_own_obj(dp, ds->ds_object,
+           DMU_OST_ANY, B_FALSE, B_FALSE, FTAG, &os);
        if (error != 0) {
-               cmn_err(CE_WARN, "can't open objset for %s", osname);
+               /*
+                * EBUSY indicates that the objset is inconsistent, in which
+                * case it can not have a ZIL.
+                */
+               if (error != EBUSY) {
+                       cmn_err(CE_WARN, "can't open objset for %llu, error %u",
+                           (unsigned long long)ds->ds_object, error);
+               }
+
                return (0);
        }
 
@@ -671,8 +729,10 @@ zil_claim(const char *osname, void *txarg)
                if (!BP_IS_HOLE(&zh->zh_log))
                        zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
                BP_ZERO(&zh->zh_log);
+               if (os->os_encrypted)
+                       os->os_next_write_raw = B_TRUE;
                dsl_dataset_dirty(dmu_objset_ds(os), tx);
-               dmu_objset_disown(os, FTAG);
+               dmu_objset_disown(os, B_FALSE, FTAG);
                return (0);
        }
 
@@ -686,7 +746,7 @@ zil_claim(const char *osname, void *txarg)
        ASSERT3U(zh->zh_claim_txg, <=, first_txg);
        if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
                (void) zil_parse(zilog, zil_claim_log_block,
-                   zil_claim_log_record, tx, first_txg);
+                   zil_claim_log_record, tx, first_txg, B_FALSE);
                zh->zh_claim_txg = first_txg;
                zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
                zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
@@ -697,7 +757,7 @@ zil_claim(const char *osname, void *txarg)
        }
 
        ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
-       dmu_objset_disown(os, FTAG);
+       dmu_objset_disown(os, B_FALSE, FTAG);
        return (0);
 }
 
@@ -706,8 +766,9 @@ zil_claim(const char *osname, void *txarg)
  * Checksum errors are ok as they indicate the end of the chain.
  * Any other error (no device or read failure) returns an error.
  */
+/* ARGSUSED */
 int
-zil_check_log_chain(const char *osname, void *tx)
+zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
 {
        zilog_t *zilog;
        objset_t *os;
@@ -716,9 +777,10 @@ zil_check_log_chain(const char *osname, void *tx)
 
        ASSERT(tx == NULL);
 
-       error = dmu_objset_hold(osname, FTAG, &os);
+       error = dmu_objset_from_ds(ds, &os);
        if (error != 0) {
-               cmn_err(CE_WARN, "can't open objset for %s", osname);
+               cmn_err(CE_WARN, "can't open objset %llu, error %d",
+                   (unsigned long long)ds->ds_object, error);
                return (0);
        }
 
@@ -741,10 +803,8 @@ zil_check_log_chain(const char *osname, void *tx)
                        valid = vdev_log_state_valid(vd);
                spa_config_exit(os->os_spa, SCL_STATE, FTAG);
 
-               if (!valid) {
-                       dmu_objset_rele(os, FTAG);
+               if (!valid)
                        return (0);
-               }
        }
 
        /*
@@ -755,9 +815,8 @@ zil_check_log_chain(const char *osname, void *tx)
         * which will update spa_max_claim_txg.  See spa_load() for details.
         */
        error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
-           zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
-
-       dmu_objset_rele(os, FTAG);
+           zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa),
+           B_FALSE);
 
        return ((error == ECKSUM || error == ENOENT) ? 0 : error);
 }
@@ -768,12 +827,7 @@ zil_vdev_compare(const void *x1, const void *x2)
        const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
        const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
 
-       if (v1 < v2)
-               return (-1);
-       if (v1 > v2)
-               return (1);
-
-       return (0);
+       return (AVL_CMP(v1, v2));
 }
 
 void
@@ -799,7 +853,7 @@ zil_add_block(zilog_t *zilog, const blkptr_t *bp)
        for (i = 0; i < ndvas; i++) {
                zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
                if (avl_find(t, &zvsearch, &where) == NULL) {
-                       zv = kmem_alloc(sizeof (*zv), KM_PUSHPAGE);
+                       zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
                        zv->zv_vdev = zvsearch.zv_vdev;
                        avl_insert(t, zv, where);
                }
@@ -861,7 +915,7 @@ zil_lwb_write_done(zio_t *zio)
        ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
        ASSERT(!BP_IS_GANG(zio->io_bp));
        ASSERT(!BP_IS_HOLE(zio->io_bp));
-       ASSERT(zio->io_bp->blk_fill == 0);
+       ASSERT(BP_GET_FILL(zio->io_bp) == 0);
 
        /*
         * Ensure the lwb buffer pointer is cleared before releasing
@@ -871,6 +925,7 @@ zil_lwb_write_done(zio_t *zio)
         * one in zil_commit_writer(). zil_sync() will only remove
         * the lwb if lwb_buf is null.
         */
+       abd_put(zio->io_abd);
        zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
        mutex_enter(&zilog->zl_lock);
        lwb->lwb_zio = NULL;
@@ -893,7 +948,8 @@ zil_lwb_write_done(zio_t *zio)
 static void
 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
 {
-       zbookmark_t zb;
+       zbookmark_phys_t zb;
+       zio_priority_t prio;
 
        SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
            ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
@@ -907,13 +963,19 @@ zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
        /* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */
        mutex_enter(&zilog->zl_lock);
        if (lwb->lwb_zio == NULL) {
+               abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
+                   BP_GET_LSIZE(&lwb->lwb_blk));
                if (!lwb->lwb_fastwrite) {
                        metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk);
                        lwb->lwb_fastwrite = 1;
                }
+               if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
+                       prio = ZIO_PRIORITY_SYNC_WRITE;
+               else
+                       prio = ZIO_PRIORITY_ASYNC_WRITE;
                lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
-                   0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
-                   zil_lwb_write_done, lwb, ZIO_PRIORITY_SYNC_WRITE,
+                   0, &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk),
+                   zil_lwb_write_done, lwb, prio,
                    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE |
                    ZIO_FLAG_FASTWRITE, &zb);
        }
@@ -925,7 +987,7 @@ zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
  *
  * These must be a multiple of 4KB. Note only the amount used (again
  * aligned to 4KB) actually gets written. However, we can't always just
- * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
+ * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
  */
 uint64_t zil_block_buckets[] = {
     4096,              /* non TX_WRITE */
@@ -934,15 +996,6 @@ uint64_t zil_block_buckets[] = {
     UINT64_MAX
 };
 
-/*
- * Use the slog as long as the current commit size is less than the
- * limit or the total list size is less than 2X the limit.  Limit
- * checking is disabled by setting zil_slog_limit to UINT64_MAX.
- */
-unsigned long zil_slog_limit = 1024 * 1024;
-#define        USE_SLOG(zilog) (((zilog)->zl_cur_used < zil_slog_limit) || \
-       ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1)))
-
 /*
  * Start a log block write and advance to the next log block.
  * Calls are serialized.
@@ -958,7 +1011,7 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
        uint64_t txg;
        uint64_t zil_blksz, wsz;
        int i, error;
-       boolean_t use_slog;
+       boolean_t slog;
 
        if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
                zilc = (zil_chain_t *)lwb->lwb_buf;
@@ -980,7 +1033,24 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
         * to clean up in the event of allocation failure or I/O failure.
         */
        tx = dmu_tx_create(zilog->zl_os);
-       VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
+
+       /*
+        * Since we are not going to create any new dirty data and we can even
+        * help with clearing the existing dirty data, we should not be subject
+        * to the dirty data based delays.
+        * We (ab)use TXG_WAITED to bypass the delay mechanism.
+        * One side effect from using TXG_WAITED is that dmu_tx_assign() can
+        * fail if the pool is suspended.  Those are dramatic circumstances,
+        * so we return NULL to signal that the normal ZIL processing is not
+        * possible and txg_wait_synced() should be used to ensure that the data
+        * is on disk.
+        */
+       error = dmu_tx_assign(tx, TXG_WAITED);
+       if (error != 0) {
+               ASSERT3S(error, ==, EIO);
+               dmu_tx_abort(tx);
+               return (NULL);
+       }
        dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
        txg = dmu_tx_get_txg(tx);
 
@@ -1007,17 +1077,15 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
                continue;
        zil_blksz = zil_block_buckets[i];
        if (zil_blksz == UINT64_MAX)
-               zil_blksz = SPA_MAXBLOCKSIZE;
+               zil_blksz = SPA_OLD_MAXBLOCKSIZE;
        zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
        for (i = 0; i < ZIL_PREV_BLKS; i++)
                zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
        zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
 
        BP_ZERO(bp);
-       use_slog = USE_SLOG(zilog);
-       error = zio_alloc_zil(spa, txg, bp, zil_blksz,
-           USE_SLOG(zilog));
-       if (use_slog) {
+       error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, zil_blksz, &slog);
+       if (slog) {
                ZIL_STAT_BUMP(zil_itx_metaslab_slog_count);
                ZIL_STAT_INCR(zil_itx_metaslab_slog_bytes, lwb->lwb_nused);
        } else {
@@ -1032,7 +1100,7 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
                /*
                 * Allocate a new log write buffer (lwb).
                 */
-               nlwb = zil_alloc_lwb(zilog, bp, txg, TRUE);
+               nlwb = zil_alloc_lwb(zilog, bp, slog, txg, TRUE);
 
                /* Record the block for later vdev flushing */
                zil_add_block(zilog, &lwb->lwb_blk);
@@ -1069,47 +1137,53 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
 static lwb_t *
 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
 {
-       lr_t *lrc = &itx->itx_lr; /* common log record */
-       lr_write_t *lrw = (lr_write_t *)lrc;
+       lr_t *lrcb, *lrc;
+       lr_write_t *lrwb, *lrw;
        char *lr_buf;
-       uint64_t txg = lrc->lrc_txg;
-       uint64_t reclen = lrc->lrc_reclen;
-       uint64_t dlen = 0;
+       uint64_t dlen, dnow, lwb_sp, reclen, txg;
 
        if (lwb == NULL)
                return (NULL);
 
        ASSERT(lwb->lwb_buf != NULL);
-       ASSERT(zilog_is_dirty(zilog) ||
-           spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
 
-       if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
+       lrc = &itx->itx_lr;             /* Common log record inside itx. */
+       lrw = (lr_write_t *)lrc;        /* Write log record inside itx. */
+       if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
                dlen = P2ROUNDUP_TYPED(
                    lrw->lr_length, sizeof (uint64_t), uint64_t);
-
+       } else {
+               dlen = 0;
+       }
+       reclen = lrc->lrc_reclen;
        zilog->zl_cur_used += (reclen + dlen);
+       txg = lrc->lrc_txg;
 
        zil_lwb_write_init(zilog, lwb);
 
+cont:
        /*
         * If this record won't fit in the current log block, start a new one.
+        * For WR_NEED_COPY optimize layout for minimal number of chunks.
         */
-       if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
+       lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+       if (reclen > lwb_sp || (reclen + dlen > lwb_sp &&
+           lwb_sp < ZIL_MAX_WASTE_SPACE && (dlen % ZIL_MAX_LOG_DATA == 0 ||
+           lwb_sp < reclen + dlen % ZIL_MAX_LOG_DATA))) {
                lwb = zil_lwb_write_start(zilog, lwb);
                if (lwb == NULL)
                        return (NULL);
                zil_lwb_write_init(zilog, lwb);
                ASSERT(LWB_EMPTY(lwb));
-               if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
-                       txg_wait_synced(zilog->zl_dmu_pool, txg);
-                       return (lwb);
-               }
+               lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+               ASSERT3U(reclen + MIN(dlen, sizeof (uint64_t)), <=, lwb_sp);
        }
 
+       dnow = MIN(dlen, lwb_sp - reclen);
        lr_buf = lwb->lwb_buf + lwb->lwb_nused;
        bcopy(lrc, lr_buf, reclen);
-       lrc = (lr_t *)lr_buf;
-       lrw = (lr_write_t *)lrc;
+       lrcb = (lr_t *)lr_buf;          /* Like lrc, but inside lwb. */
+       lrwb = (lr_write_t *)lrcb;      /* Like lrw, but inside lwb. */
 
        ZIL_STAT_BUMP(zil_itx_count);
 
@@ -1126,10 +1200,13 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
                        char *dbuf;
                        int error;
 
-                       if (dlen) {
-                               ASSERT(itx->itx_wr_state == WR_NEED_COPY);
+                       if (itx->itx_wr_state == WR_NEED_COPY) {
                                dbuf = lr_buf + reclen;
-                               lrw->lr_common.lrc_reclen += dlen;
+                               lrcb->lrc_reclen += dnow;
+                               if (lrwb->lr_length > dnow)
+                                       lrwb->lr_length = dnow;
+                               lrw->lr_offset += dnow;
+                               lrw->lr_length -= dnow;
                                ZIL_STAT_BUMP(zil_itx_needcopy_count);
                                ZIL_STAT_INCR(zil_itx_needcopy_bytes,
                                    lrw->lr_length);
@@ -1141,7 +1218,7 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
                                    lrw->lr_length);
                        }
                        error = zilog->zl_get_data(
-                           itx->itx_private, lrw, dbuf, lwb->lwb_zio);
+                           itx->itx_private, lrwb, dbuf, lwb->lwb_zio);
                        if (error == EIO) {
                                txg_wait_synced(zilog->zl_dmu_pool, txg);
                                return (lwb);
@@ -1160,12 +1237,18 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
         * equal to the itx sequence number because not all transactions
         * are synchronous, and sometimes spa_sync() gets there first.
         */
-       lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
-       lwb->lwb_nused += reclen + dlen;
+       lrcb->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
+       lwb->lwb_nused += reclen + dnow;
        lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
        ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
        ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
 
+       dlen -= dnow;
+       if (dlen > 0) {
+               zilog->zl_cur_used += reclen;
+               goto cont;
+       }
+
        return (lwb);
 }
 
@@ -1176,11 +1259,9 @@ zil_itx_create(uint64_t txtype, size_t lrsize)
 
        lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
 
-       itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize,
-           KM_PUSHPAGE | KM_NODEBUG);
+       itx = zio_data_buf_alloc(offsetof(itx_t, itx_lr) + lrsize);
        itx->itx_lr.lrc_txtype = txtype;
        itx->itx_lr.lrc_reclen = lrsize;
-       itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
        itx->itx_lr.lrc_seq = 0;        /* defensive */
        itx->itx_sync = B_TRUE;         /* default is synchronous */
        itx->itx_callback = NULL;
@@ -1192,7 +1273,7 @@ zil_itx_create(uint64_t txtype, size_t lrsize)
 void
 zil_itx_destroy(itx_t *itx)
 {
-       kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
+       zio_data_buf_free(itx, offsetof(itx_t, itx_lr)+itx->itx_lr.lrc_reclen);
 }
 
 /*
@@ -1213,8 +1294,7 @@ zil_itxg_clean(itxs_t *itxs)
                if (itx->itx_callback != NULL)
                        itx->itx_callback(itx->itx_callback_data);
                list_remove(list, itx);
-               kmem_free(itx, offsetof(itx_t, itx_lr) +
-                   itx->itx_lr.lrc_reclen);
+               zil_itx_destroy(itx);
        }
 
        cookie = NULL;
@@ -1225,8 +1305,7 @@ zil_itxg_clean(itxs_t *itxs)
                        if (itx->itx_callback != NULL)
                                itx->itx_callback(itx->itx_callback_data);
                        list_remove(list, itx);
-                       kmem_free(itx, offsetof(itx_t, itx_lr) +
-                           itx->itx_lr.lrc_reclen);
+                       zil_itx_destroy(itx);
                }
                list_destroy(list);
                kmem_free(ian, sizeof (itx_async_node_t));
@@ -1242,12 +1321,7 @@ zil_aitx_compare(const void *x1, const void *x2)
        const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
        const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
 
-       if (o1 < o2)
-               return (-1);
-       if (o1 > o2)
-               return (1);
-
-       return (0);
+       return (AVL_CMP(o1, o2));
 }
 
 /*
@@ -1293,8 +1367,7 @@ zil_remove_async(zilog_t *zilog, uint64_t oid)
                if (itx->itx_callback != NULL)
                        itx->itx_callback(itx->itx_callback_data);
                list_remove(&clean_list, itx);
-               kmem_free(itx, offsetof(itx_t, itx_lr) +
-                   itx->itx_lr.lrc_reclen);
+               zil_itx_destroy(itx);
        }
        list_destroy(&clean_list);
 }
@@ -1337,14 +1410,13 @@ zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
                         * this itxg. Save the itxs for release below.
                         * This should be rare.
                         */
-                       atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
-                       itxg->itxg_sod = 0;
+                       zfs_dbgmsg("zil_itx_assign: missed itx cleanup for "
+                           "txg %llu", itxg->itxg_txg);
                        clean = itxg->itxg_itxs;
                }
-               ASSERT(itxg->itxg_sod == 0);
                itxg->itxg_txg = txg;
                itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t),
-                   KM_PUSHPAGE);
+                   KM_SLEEP);
 
                list_create(&itxs->i_sync_list, sizeof (itx_t),
                    offsetof(itx_t, itx_node));
@@ -1354,18 +1426,17 @@ zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
        }
        if (itx->itx_sync) {
                list_insert_tail(&itxs->i_sync_list, itx);
-               atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
-               itxg->itxg_sod += itx->itx_sod;
        } else {
                avl_tree_t *t = &itxs->i_async_tree;
-               uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
+               uint64_t foid =
+                   LR_FOID_GET_OBJ(((lr_ooo_t *)&itx->itx_lr)->lr_foid);
                itx_async_node_t *ian;
                avl_index_t where;
 
                ian = avl_find(t, &foid, &where);
                if (ian == NULL) {
                        ian = kmem_alloc(sizeof (itx_async_node_t),
-                           KM_PUSHPAGE);
+                           KM_SLEEP);
                        list_create(&ian->ia_list, sizeof (itx_t),
                            offsetof(itx_t, itx_node));
                        ian->ia_foid = foid;
@@ -1404,8 +1475,6 @@ zil_clean(zilog_t *zilog, uint64_t synced_txg)
        ASSERT3U(itxg->itxg_txg, <=, synced_txg);
        ASSERT(itxg->itxg_txg != 0);
        ASSERT(zilog->zl_clean_taskq != NULL);
-       atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
-       itxg->itxg_sod = 0;
        clean_me = itxg->itxg_itxs;
        itxg->itxg_itxs = NULL;
        itxg->itxg_txg = 0;
@@ -1429,13 +1498,17 @@ zil_get_commit_list(zilog_t *zilog)
 {
        uint64_t otxg, txg;
        list_t *commit_list = &zilog->zl_itx_commit_list;
-       uint64_t push_sod = 0;
 
        if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
                otxg = ZILTEST_TXG;
        else
                otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 
+       /*
+        * This is inherently racy, since there is nothing to prevent
+        * the last synced txg from changing. That's okay since we'll
+        * only commit things in the future.
+        */
        for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
                itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 
@@ -1445,13 +1518,20 @@ zil_get_commit_list(zilog_t *zilog)
                        continue;
                }
 
+               /*
+                * If we're adding itx records to the zl_itx_commit_list,
+                * then the zil better be dirty in this "txg". We can assert
+                * that here since we're holding the itxg_lock which will
+                * prevent spa_sync from cleaning it. Once we add the itxs
+                * to the zl_itx_commit_list we must commit it to disk even
+                * if it's unnecessary (i.e. the txg was synced).
+                */
+               ASSERT(zilog_is_dirty_in_txg(zilog, txg) ||
+                   spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
                list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
-               push_sod += itxg->itxg_sod;
-               itxg->itxg_sod = 0;
 
                mutex_exit(&itxg->itxg_lock);
        }
-       atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
 }
 
 /*
@@ -1470,6 +1550,10 @@ zil_async_to_sync(zilog_t *zilog, uint64_t foid)
        else
                otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 
+       /*
+        * This is inherently racy, since there is nothing to prevent
+        * the last synced txg from changing.
+        */
        for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
                itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 
@@ -1542,8 +1626,14 @@ zil_commit_writer(zilog_t *zilog)
        for (itx = list_head(&zilog->zl_itx_commit_list); itx != NULL;
            itx = list_next(&zilog->zl_itx_commit_list, itx)) {
                txg = itx->itx_lr.lrc_txg;
-               ASSERT(txg);
+               ASSERT3U(txg, !=, 0);
 
+               /*
+                * This is inherently racy and may result in us writing
+                * out a log block for a txg that was just synced. This is
+                * ok since we'll end cleaning up that log block the next
+                * time we call zil_sync().
+                */
                if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
                        lwb = zil_lwb_commit(zilog, itx, lwb);
        }
@@ -1574,8 +1664,7 @@ zil_commit_writer(zilog_t *zilog)
                if (itx->itx_callback != NULL)
                        itx->itx_callback(itx->itx_callback_data);
                list_remove(&zilog->zl_itx_commit_list, itx);
-               kmem_free(itx, offsetof(itx_t, itx_lr)
-                   + itx->itx_lr.lrc_reclen);
+               zil_itx_destroy(itx);
        }
 
        mutex_enter(&zilog->zl_lock);
@@ -1783,7 +1872,7 @@ zil_alloc(objset_t *os, zil_header_t *zh_phys)
        zilog_t *zilog;
        int i;
 
-       zilog = kmem_zalloc(sizeof (zilog_t), KM_PUSHPAGE);
+       zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
 
        zilog->zl_header = zh_phys;
        zilog->zl_os = os;
@@ -1875,7 +1964,7 @@ zil_open(objset_t *os, zil_get_data_t *get_data)
        ASSERT(list_is_empty(&zilog->zl_lwb_list));
 
        zilog->zl_get_data = get_data;
-       zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
+       zilog->zl_clean_taskq = taskq_create("zil_clean", 1, defclsyspri,
            2, 2, TASKQ_PREPOPULATE);
 
        return (zilog);
@@ -1905,7 +1994,11 @@ zil_close(zilog_t *zilog)
        mutex_exit(&zilog->zl_lock);
        if (txg)
                txg_wait_synced(zilog->zl_dmu_pool, txg);
-       ASSERT(!zilog_is_dirty(zilog));
+
+       if (zilog_is_dirty(zilog))
+               zfs_dbgmsg("zil (%p) is dirty, txg %llu", zilog, txg);
+       if (txg < spa_freeze_txg(zilog->zl_spa))
+               VERIFY(!zilog_is_dirty(zilog));
 
        taskq_destroy(zilog->zl_clean_taskq);
        zilog->zl_clean_taskq = NULL;
@@ -2022,6 +2115,21 @@ zil_suspend(const char *osname, void **cookiep)
                return (0);
        }
 
+       /*
+        * The ZIL has work to do. Ensure that the associated encryption
+        * key will remain mapped while we are committing the log by
+        * grabbing a reference to it. If the key isn't loaded we have no
+        * choice but to return an error until the wrapping key is loaded.
+        */
+       if (os->os_encrypted && spa_keystore_create_mapping(os->os_spa,
+           dmu_objset_ds(os), FTAG) != 0) {
+               zilog->zl_suspend--;
+               mutex_exit(&zilog->zl_lock);
+               dsl_dataset_long_rele(dmu_objset_ds(os), suspend_tag);
+               dsl_dataset_rele(dmu_objset_ds(os), suspend_tag);
+               return (SET_ERROR(EBUSY));
+       }
+
        zilog->zl_suspending = B_TRUE;
        mutex_exit(&zilog->zl_lock);
 
@@ -2034,6 +2142,20 @@ zil_suspend(const char *osname, void **cookiep)
        cv_broadcast(&zilog->zl_cv_suspend);
        mutex_exit(&zilog->zl_lock);
 
+       if (os->os_encrypted) {
+               /*
+                * Encrypted datasets need to wait for all data to be
+                * synced out before removing the mapping.
+                *
+                * XXX: Depending on the number of datasets with
+                * outstanding ZIL data on a given log device, this
+                * might cause spa_offline_log() to take a long time.
+                */
+               txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
+               VERIFY0(spa_keystore_remove_mapping(os->os_spa,
+                   dmu_objset_id(os), FTAG));
+       }
+
        if (cookiep == NULL)
                zil_resume(os);
        else
@@ -2065,7 +2187,7 @@ typedef struct zil_replay_arg {
 static int
 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
 {
-       char name[MAXNAMELEN];
+       char name[ZFS_MAX_DATASET_NAME_LEN];
 
        zilog->zl_replaying_seq--;      /* didn't actually replay this one */
 
@@ -2109,7 +2231,7 @@ zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
         */
        if (TX_OOO(txtype)) {
                error = dmu_object_info(zilog->zl_os,
-                   ((lr_ooo_t *)lr)->lr_foid, NULL);
+                   LR_FOID_GET_OBJ(((lr_ooo_t *)lr)->lr_foid), NULL);
                if (error == ENOENT || error == EEXIST)
                        return (0);
        }
@@ -2189,7 +2311,7 @@ zil_replay(objset_t *os, void *arg, zil_replay_func_t replay_func[TX_MAX_TYPE])
        zr.zr_replay = replay_func;
        zr.zr_arg = arg;
        zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
-       zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_PUSHPAGE);
+       zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
 
        /*
         * Wait for in-progress removes to sync before starting replay.
@@ -2200,7 +2322,7 @@ zil_replay(objset_t *os, void *arg, zil_replay_func_t replay_func[TX_MAX_TYPE])
        zilog->zl_replay_time = ddi_get_lbolt();
        ASSERT(zilog->zl_replay_blks == 0);
        (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
-           zh->zh_claim_txg);
+           zh->zh_claim_txg, B_TRUE);
        vmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
 
        zil_destroy(zilog, B_FALSE);
@@ -2237,12 +2359,38 @@ zil_vdev_offline(const char *osname, void *arg)
 }
 
 #if defined(_KERNEL) && defined(HAVE_SPL)
+EXPORT_SYMBOL(zil_alloc);
+EXPORT_SYMBOL(zil_free);
+EXPORT_SYMBOL(zil_open);
+EXPORT_SYMBOL(zil_close);
+EXPORT_SYMBOL(zil_replay);
+EXPORT_SYMBOL(zil_replaying);
+EXPORT_SYMBOL(zil_destroy);
+EXPORT_SYMBOL(zil_destroy_sync);
+EXPORT_SYMBOL(zil_itx_create);
+EXPORT_SYMBOL(zil_itx_destroy);
+EXPORT_SYMBOL(zil_itx_assign);
+EXPORT_SYMBOL(zil_commit);
+EXPORT_SYMBOL(zil_vdev_offline);
+EXPORT_SYMBOL(zil_claim);
+EXPORT_SYMBOL(zil_check_log_chain);
+EXPORT_SYMBOL(zil_sync);
+EXPORT_SYMBOL(zil_clean);
+EXPORT_SYMBOL(zil_suspend);
+EXPORT_SYMBOL(zil_resume);
+EXPORT_SYMBOL(zil_add_block);
+EXPORT_SYMBOL(zil_bp_tree_add);
+EXPORT_SYMBOL(zil_set_sync);
+EXPORT_SYMBOL(zil_set_logbias);
+
+/* BEGIN CSTYLED */
 module_param(zil_replay_disable, int, 0644);
 MODULE_PARM_DESC(zil_replay_disable, "Disable intent logging replay");
 
 module_param(zfs_nocacheflush, int, 0644);
 MODULE_PARM_DESC(zfs_nocacheflush, "Disable cache flushes");
 
-module_param(zil_slog_limit, ulong, 0644);
-MODULE_PARM_DESC(zil_slog_limit, "Max commit bytes to separate log device");
+module_param(zil_slog_bulk, ulong, 0644);
+MODULE_PARM_DESC(zil_slog_bulk, "Limit in bytes slog sync writes per commit");
+/* END CSTYLED */
 #endif