]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/zil.c
OpenZFS 8997 - ztest assertion failure in zil_lwb_write_issue
[mirror_zfs.git] / module / zfs / zil.c
index 3688c8a1615af3874062c4fe9dcafde0307e604d..a2bbdcb9aa5d72cbd0b007723b152499f0390e82 100644 (file)
@@ -20,7 +20,8 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2013 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
+ * Copyright (c) 2014 Integros [integros.com]
  */
 
 /* Portions Copyright 2010 Robert Milkowski */
@@ -39,6 +40,8 @@
 #include <sys/dmu_tx.h>
 #include <sys/dsl_pool.h>
 #include <sys/metaslab.h>
+#include <sys/trace_zil.h>
+#include <sys/abd.h>
 
 /*
  * The zfs intent log (ZIL) saves transaction records of system calls
  * See zil.h for more information about these fields.
  */
 zil_stats_t zil_stats = {
-       { "zil_commit_count",              KSTAT_DATA_UINT64 },
-       { "zil_commit_writer_count",       KSTAT_DATA_UINT64 },
-       { "zil_itx_count",                 KSTAT_DATA_UINT64 },
-       { "zil_itx_indirect_count",        KSTAT_DATA_UINT64 },
-       { "zil_itx_indirect_bytes",        KSTAT_DATA_UINT64 },
-       { "zil_itx_copied_count",          KSTAT_DATA_UINT64 },
-       { "zil_itx_copied_bytes",          KSTAT_DATA_UINT64 },
-       { "zil_itx_needcopy_count",        KSTAT_DATA_UINT64 },
-       { "zil_itx_needcopy_bytes",        KSTAT_DATA_UINT64 },
-       { "zil_itx_metaslab_normal_count", KSTAT_DATA_UINT64 },
-       { "zil_itx_metaslab_normal_bytes", KSTAT_DATA_UINT64 },
-       { "zil_itx_metaslab_slog_count",   KSTAT_DATA_UINT64 },
-       { "zil_itx_metaslab_slog_bytes",   KSTAT_DATA_UINT64 },
+       { "zil_commit_count",                   KSTAT_DATA_UINT64 },
+       { "zil_commit_writer_count",            KSTAT_DATA_UINT64 },
+       { "zil_itx_count",                      KSTAT_DATA_UINT64 },
+       { "zil_itx_indirect_count",             KSTAT_DATA_UINT64 },
+       { "zil_itx_indirect_bytes",             KSTAT_DATA_UINT64 },
+       { "zil_itx_copied_count",               KSTAT_DATA_UINT64 },
+       { "zil_itx_copied_bytes",               KSTAT_DATA_UINT64 },
+       { "zil_itx_needcopy_count",             KSTAT_DATA_UINT64 },
+       { "zil_itx_needcopy_bytes",             KSTAT_DATA_UINT64 },
+       { "zil_itx_metaslab_normal_count",      KSTAT_DATA_UINT64 },
+       { "zil_itx_metaslab_normal_bytes",      KSTAT_DATA_UINT64 },
+       { "zil_itx_metaslab_slog_count",        KSTAT_DATA_UINT64 },
+       { "zil_itx_metaslab_slog_bytes",        KSTAT_DATA_UINT64 },
 };
 
 static kstat_t *zil_ksp;
@@ -99,6 +102,13 @@ int zil_replay_disable = 0;
  */
 int zfs_nocacheflush = 0;
 
+/*
+ * Limit SLOG write size per commit executed with synchronous priority.
+ * Any writes above that will be executed with lower (asynchronous) priority
+ * to limit potential SLOG device abuse by single active ZIL writer.
+ */
+unsigned long zil_slog_bulk = 768 * 1024;
+
 static kmem_cache_t *zil_lwb_cache;
 
 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
@@ -106,33 +116,17 @@ static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
 #define        LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
 
-
-/*
- * ziltest is by and large an ugly hack, but very useful in
- * checking replay without tedious work.
- * When running ziltest we want to keep all itx's and so maintain
- * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
- * We subtract TXG_CONCURRENT_STATES to allow for common code.
- */
-#define        ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
-
 static int
 zil_bp_compare(const void *x1, const void *x2)
 {
        const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
        const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
 
-       if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
-               return (-1);
-       if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
-               return (1);
-
-       if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
-               return (-1);
-       if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
-               return (1);
+       int cmp = AVL_CMP(DVA_GET_VDEV(dva1), DVA_GET_VDEV(dva2));
+       if (likely(cmp))
+               return (cmp);
 
-       return (0);
+       return (AVL_CMP(DVA_GET_OFFSET(dva1), DVA_GET_OFFSET(dva2)));
 }
 
 static void
@@ -159,14 +153,19 @@ int
 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
 {
        avl_tree_t *t = &zilog->zl_bp_tree;
-       const dva_t *dva = BP_IDENTITY(bp);
+       const dva_t *dva;
        zil_bp_node_t *zn;
        avl_index_t where;
 
+       if (BP_IS_EMBEDDED(bp))
+               return (0);
+
+       dva = BP_IDENTITY(bp);
+
        if (avl_find(t, dva, &where) != NULL)
                return (SET_ERROR(EEXIST));
 
-       zn = kmem_alloc(sizeof (zil_bp_node_t), KM_PUSHPAGE);
+       zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
        zn->zn_dva = *dva;
        avl_insert(t, zn, where);
 
@@ -198,9 +197,9 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
     char **end)
 {
        enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
-       uint32_t aflags = ARC_WAIT;
+       arc_flags_t aflags = ARC_FLAG_WAIT;
        arc_buf_t *abuf = NULL;
-       zbookmark_t zb;
+       zbookmark_phys_t zb;
        int error;
 
        if (zilog->zl_header->zh_claim_txg == 0)
@@ -237,6 +236,7 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
                            sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
                                error = SET_ERROR(ECKSUM);
                        } else {
+                               ASSERT3U(len, <=, SPA_OLD_MAXBLOCKSIZE);
                                bcopy(lr, dst, len);
                                *end = (char *)dst + len;
                                *nbp = zilc->zc_next_blk;
@@ -251,13 +251,15 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
                            (zilc->zc_nused > (size - sizeof (*zilc)))) {
                                error = SET_ERROR(ECKSUM);
                        } else {
+                               ASSERT3U(zilc->zc_nused, <=,
+                                   SPA_OLD_MAXBLOCKSIZE);
                                bcopy(lr, dst, zilc->zc_nused);
                                *end = (char *)dst + zilc->zc_nused;
                                *nbp = zilc->zc_next_blk;
                        }
                }
 
-               VERIFY(arc_buf_remove_ref(abuf, &abuf));
+               arc_buf_destroy(abuf, &abuf);
        }
 
        return (error);
@@ -271,9 +273,9 @@ zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
 {
        enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
        const blkptr_t *bp = &lr->lr_blkptr;
-       uint32_t aflags = ARC_WAIT;
+       arc_flags_t aflags = ARC_FLAG_WAIT;
        arc_buf_t *abuf = NULL;
-       zbookmark_t zb;
+       zbookmark_phys_t zb;
        int error;
 
        if (BP_IS_HOLE(bp)) {
@@ -294,7 +296,7 @@ zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
        if (error == 0) {
                if (wbuf != NULL)
                        bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
-               (void) arc_buf_remove_ref(abuf, &abuf);
+               arc_buf_destroy(abuf, &abuf);
        }
 
        return (error);
@@ -319,7 +321,7 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
        char *lrbuf, *lrp;
        int error = 0;
 
-       bzero(&next_blk, sizeof(blkptr_t));
+       bzero(&next_blk, sizeof (blkptr_t));
 
        /*
         * Old logs didn't record the maximum zh_claim_lr_seq.
@@ -336,7 +338,7 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
         * If the log has been claimed, stop if we encounter a sequence
         * number greater than the highest claimed sequence number.
         */
-       lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
+       lrbuf = zio_buf_alloc(SPA_OLD_MAXBLOCKSIZE);
        zil_bp_tree_init(zilog);
 
        for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
@@ -383,7 +385,7 @@ done:
            (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
 
        zil_bp_tree_fini(zilog);
-       zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
+       zio_buf_free(lrbuf, SPA_OLD_MAXBLOCKSIZE);
 
        return (error);
 }
@@ -395,7 +397,8 @@ zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
         * Claim log block if not already committed and not already claimed.
         * If tx == NULL, just verify that the block is claimable.
         */
-       if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
+       if (BP_IS_HOLE(bp) || bp->blk_birth < first_txg ||
+           zil_bp_tree_add(zilog, bp) != 0)
                return (0);
 
        return (zio_wait(zio_claim(NULL, zilog->zl_spa,
@@ -445,21 +448,24 @@ zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
         * If we previously claimed it, we need to free it.
         */
        if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
-           bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
+           bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0 &&
+           !BP_IS_HOLE(bp))
                zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 
        return (0);
 }
 
 static lwb_t *
-zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg, boolean_t fastwrite)
+zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg,
+    boolean_t fastwrite)
 {
        lwb_t *lwb;
 
-       lwb = kmem_cache_alloc(zil_lwb_cache, KM_PUSHPAGE);
+       lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
        lwb->lwb_zilog = zilog;
        lwb->lwb_blk = *bp;
        lwb->lwb_fastwrite = fastwrite;
+       lwb->lwb_slog = slog;
        lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
        lwb->lwb_max_txg = txg;
        lwb->lwb_zio = NULL;
@@ -489,7 +495,7 @@ zilog_dirty(zilog_t *zilog, uint64_t txg)
        dsl_pool_t *dp = zilog->zl_dmu_pool;
        dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
 
-       if (dsl_dataset_is_snapshot(ds))
+       if (ds->ds_is_snapshot)
                panic("dirtying snapshot!");
 
        if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg)) {
@@ -498,6 +504,27 @@ zilog_dirty(zilog_t *zilog, uint64_t txg)
        }
 }
 
+/*
+ * Determine if the zil is dirty in the specified txg. Callers wanting to
+ * ensure that the dirty state does not change must hold the itxg_lock for
+ * the specified txg. Holding the lock will ensure that the zil cannot be
+ * dirtied (zil_itx_assign) or cleaned (zil_clean) while we check its current
+ * state.
+ */
+boolean_t
+zilog_is_dirty_in_txg(zilog_t *zilog, uint64_t txg)
+{
+       dsl_pool_t *dp = zilog->zl_dmu_pool;
+
+       if (txg_list_member(&dp->dp_dirty_zilogs, zilog, txg & TXG_MASK))
+               return (B_TRUE);
+       return (B_FALSE);
+}
+
+/*
+ * Determine if the zil is dirty. The zil is considered dirty if it has
+ * any pending itx records that have not been cleaned by zil_clean().
+ */
 boolean_t
 zilog_is_dirty(zilog_t *zilog)
 {
@@ -524,6 +551,7 @@ zil_create(zilog_t *zilog)
        blkptr_t blk;
        int error = 0;
        boolean_t fastwrite = FALSE;
+       boolean_t slog = FALSE;
 
        /*
         * Wait for any previous destroy to complete.
@@ -538,7 +566,7 @@ zil_create(zilog_t *zilog)
        /*
         * Allocate an initial log block if:
         *    - there isn't one already
-        *    - the existing block is the wrong endianess
+        *    - the existing block is the wrong endianness
         */
        if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
                tx = dmu_tx_create(zilog->zl_os);
@@ -552,7 +580,7 @@ zil_create(zilog_t *zilog)
                }
 
                error = zio_alloc_zil(zilog->zl_spa, txg, &blk,
-                   ZIL_MIN_BLKSZ, B_TRUE);
+                   ZIL_MIN_BLKSZ, &slog);
                fastwrite = TRUE;
 
                if (error == 0)
@@ -563,7 +591,7 @@ zil_create(zilog_t *zilog)
         * Allocate a log write buffer (lwb) for the first log block.
         */
        if (error == 0)
-               lwb = zil_alloc_lwb(zilog, &blk, txg, fastwrite);
+               lwb = zil_alloc_lwb(zilog, &blk, slog, txg, fastwrite);
 
        /*
         * If we just allocated the first log block, commit our transaction
@@ -649,7 +677,7 @@ zil_destroy_sync(zilog_t *zilog, dmu_tx_t *tx)
 }
 
 int
-zil_claim(const char *osname, void *txarg)
+zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
 {
        dmu_tx_t *tx = txarg;
        uint64_t first_txg = dmu_tx_get_txg(tx);
@@ -658,9 +686,18 @@ zil_claim(const char *osname, void *txarg)
        objset_t *os;
        int error;
 
-       error = dmu_objset_own(osname, DMU_OST_ANY, B_FALSE, FTAG, &os);
+       error = dmu_objset_own_obj(dp, ds->ds_object,
+           DMU_OST_ANY, B_FALSE, FTAG, &os);
        if (error != 0) {
-               cmn_err(CE_WARN, "can't open objset for %s", osname);
+               /*
+                * EBUSY indicates that the objset is inconsistent, in which
+                * case it can not have a ZIL.
+                */
+               if (error != EBUSY) {
+                       cmn_err(CE_WARN, "can't open objset for %llu, error %u",
+                           (unsigned long long)ds->ds_object, error);
+               }
+
                return (0);
        }
 
@@ -706,8 +743,9 @@ zil_claim(const char *osname, void *txarg)
  * Checksum errors are ok as they indicate the end of the chain.
  * Any other error (no device or read failure) returns an error.
  */
+/* ARGSUSED */
 int
-zil_check_log_chain(const char *osname, void *tx)
+zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
 {
        zilog_t *zilog;
        objset_t *os;
@@ -716,9 +754,10 @@ zil_check_log_chain(const char *osname, void *tx)
 
        ASSERT(tx == NULL);
 
-       error = dmu_objset_hold(osname, FTAG, &os);
+       error = dmu_objset_from_ds(ds, &os);
        if (error != 0) {
-               cmn_err(CE_WARN, "can't open objset for %s", osname);
+               cmn_err(CE_WARN, "can't open objset %llu, error %d",
+                   (unsigned long long)ds->ds_object, error);
                return (0);
        }
 
@@ -741,10 +780,8 @@ zil_check_log_chain(const char *osname, void *tx)
                        valid = vdev_log_state_valid(vd);
                spa_config_exit(os->os_spa, SCL_STATE, FTAG);
 
-               if (!valid) {
-                       dmu_objset_rele(os, FTAG);
+               if (!valid)
                        return (0);
-               }
        }
 
        /*
@@ -757,8 +794,6 @@ zil_check_log_chain(const char *osname, void *tx)
        error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
            zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
 
-       dmu_objset_rele(os, FTAG);
-
        return ((error == ECKSUM || error == ENOENT) ? 0 : error);
 }
 
@@ -768,12 +803,7 @@ zil_vdev_compare(const void *x1, const void *x2)
        const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
        const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
 
-       if (v1 < v2)
-               return (-1);
-       if (v1 > v2)
-               return (1);
-
-       return (0);
+       return (AVL_CMP(v1, v2));
 }
 
 void
@@ -799,7 +829,7 @@ zil_add_block(zilog_t *zilog, const blkptr_t *bp)
        for (i = 0; i < ndvas; i++) {
                zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
                if (avl_find(t, &zvsearch, &where) == NULL) {
-                       zv = kmem_alloc(sizeof (*zv), KM_PUSHPAGE);
+                       zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
                        zv->zv_vdev = zvsearch.zv_vdev;
                        avl_insert(t, zv, where);
                }
@@ -861,7 +891,7 @@ zil_lwb_write_done(zio_t *zio)
        ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
        ASSERT(!BP_IS_GANG(zio->io_bp));
        ASSERT(!BP_IS_HOLE(zio->io_bp));
-       ASSERT(zio->io_bp->blk_fill == 0);
+       ASSERT(BP_GET_FILL(zio->io_bp) == 0);
 
        /*
         * Ensure the lwb buffer pointer is cleared before releasing
@@ -871,6 +901,7 @@ zil_lwb_write_done(zio_t *zio)
         * one in zil_commit_writer(). zil_sync() will only remove
         * the lwb if lwb_buf is null.
         */
+       abd_put(zio->io_abd);
        zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
        mutex_enter(&zilog->zl_lock);
        lwb->lwb_zio = NULL;
@@ -893,7 +924,8 @@ zil_lwb_write_done(zio_t *zio)
 static void
 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
 {
-       zbookmark_t zb;
+       zbookmark_phys_t zb;
+       zio_priority_t prio;
 
        SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
            ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
@@ -907,13 +939,19 @@ zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
        /* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */
        mutex_enter(&zilog->zl_lock);
        if (lwb->lwb_zio == NULL) {
+               abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
+                   BP_GET_LSIZE(&lwb->lwb_blk));
                if (!lwb->lwb_fastwrite) {
                        metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk);
                        lwb->lwb_fastwrite = 1;
                }
+               if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
+                       prio = ZIO_PRIORITY_SYNC_WRITE;
+               else
+                       prio = ZIO_PRIORITY_ASYNC_WRITE;
                lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
-                   0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
-                   zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
+                   0, &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk),
+                   zil_lwb_write_done, lwb, prio,
                    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE |
                    ZIO_FLAG_FASTWRITE, &zb);
        }
@@ -925,7 +963,7 @@ zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
  *
  * These must be a multiple of 4KB. Note only the amount used (again
  * aligned to 4KB) actually gets written. However, we can't always just
- * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
+ * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
  */
 uint64_t zil_block_buckets[] = {
     4096,              /* non TX_WRITE */
@@ -934,15 +972,6 @@ uint64_t zil_block_buckets[] = {
     UINT64_MAX
 };
 
-/*
- * Use the slog as long as the current commit size is less than the
- * limit or the total list size is less than 2X the limit.  Limit
- * checking is disabled by setting zil_slog_limit to UINT64_MAX.
- */
-unsigned long zil_slog_limit = 1024 * 1024;
-#define        USE_SLOG(zilog) (((zilog)->zl_cur_used < zil_slog_limit) || \
-       ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1)))
-
 /*
  * Start a log block write and advance to the next log block.
  * Calls are serialized.
@@ -958,7 +987,7 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
        uint64_t txg;
        uint64_t zil_blksz, wsz;
        int i, error;
-       boolean_t use_slog;
+       boolean_t slog;
 
        if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
                zilc = (zil_chain_t *)lwb->lwb_buf;
@@ -980,7 +1009,15 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
         * to clean up in the event of allocation failure or I/O failure.
         */
        tx = dmu_tx_create(zilog->zl_os);
-       VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
+
+       /*
+        * Since we are not going to create any new dirty data, and we
+        * can even help with clearing the existing dirty data, we
+        * should not be subject to the dirty data based delays. We
+        * use TXG_NOTHROTTLE to bypass the delay mechanism.
+        */
+       VERIFY0(dmu_tx_assign(tx, TXG_WAIT | TXG_NOTHROTTLE));
+
        dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
        txg = dmu_tx_get_txg(tx);
 
@@ -1007,23 +1044,18 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
                continue;
        zil_blksz = zil_block_buckets[i];
        if (zil_blksz == UINT64_MAX)
-               zil_blksz = SPA_MAXBLOCKSIZE;
+               zil_blksz = SPA_OLD_MAXBLOCKSIZE;
        zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
        for (i = 0; i < ZIL_PREV_BLKS; i++)
                zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
        zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
 
        BP_ZERO(bp);
-       use_slog = USE_SLOG(zilog);
-       error = zio_alloc_zil(spa, txg, bp, zil_blksz,
-           USE_SLOG(zilog));
-       if (use_slog)
-       {
+       error = zio_alloc_zil(spa, txg, bp, zil_blksz, &slog);
+       if (slog) {
                ZIL_STAT_BUMP(zil_itx_metaslab_slog_count);
                ZIL_STAT_INCR(zil_itx_metaslab_slog_bytes, lwb->lwb_nused);
-       }
-       else
-       {
+       } else {
                ZIL_STAT_BUMP(zil_itx_metaslab_normal_count);
                ZIL_STAT_INCR(zil_itx_metaslab_normal_bytes, lwb->lwb_nused);
        }
@@ -1035,7 +1067,7 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
                /*
                 * Allocate a new log write buffer (lwb).
                 */
-               nlwb = zil_alloc_lwb(zilog, bp, txg, TRUE);
+               nlwb = zil_alloc_lwb(zilog, bp, slog, txg, TRUE);
 
                /* Record the block for later vdev flushing */
                zil_add_block(zilog, &lwb->lwb_blk);
@@ -1072,47 +1104,53 @@ zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
 static lwb_t *
 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
 {
-       lr_t *lrc = &itx->itx_lr; /* common log record */
-       lr_write_t *lrw = (lr_write_t *)lrc;
+       lr_t *lrcb, *lrc;
+       lr_write_t *lrwb, *lrw;
        char *lr_buf;
-       uint64_t txg = lrc->lrc_txg;
-       uint64_t reclen = lrc->lrc_reclen;
-       uint64_t dlen = 0;
+       uint64_t dlen, dnow, lwb_sp, reclen, txg;
 
        if (lwb == NULL)
                return (NULL);
 
        ASSERT(lwb->lwb_buf != NULL);
-       ASSERT(zilog_is_dirty(zilog) ||
-           spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
 
-       if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
+       lrc = &itx->itx_lr;             /* Common log record inside itx. */
+       lrw = (lr_write_t *)lrc;        /* Write log record inside itx. */
+       if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
                dlen = P2ROUNDUP_TYPED(
                    lrw->lr_length, sizeof (uint64_t), uint64_t);
-
+       } else {
+               dlen = 0;
+       }
+       reclen = lrc->lrc_reclen;
        zilog->zl_cur_used += (reclen + dlen);
+       txg = lrc->lrc_txg;
 
        zil_lwb_write_init(zilog, lwb);
 
+cont:
        /*
         * If this record won't fit in the current log block, start a new one.
+        * For WR_NEED_COPY optimize layout for minimal number of chunks.
         */
-       if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
+       lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+       if (reclen > lwb_sp || (reclen + dlen > lwb_sp &&
+           lwb_sp < ZIL_MAX_WASTE_SPACE && (dlen % ZIL_MAX_LOG_DATA == 0 ||
+           lwb_sp < reclen + dlen % ZIL_MAX_LOG_DATA))) {
                lwb = zil_lwb_write_start(zilog, lwb);
                if (lwb == NULL)
                        return (NULL);
                zil_lwb_write_init(zilog, lwb);
                ASSERT(LWB_EMPTY(lwb));
-               if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
-                       txg_wait_synced(zilog->zl_dmu_pool, txg);
-                       return (lwb);
-               }
+               lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+               ASSERT3U(reclen + MIN(dlen, sizeof (uint64_t)), <=, lwb_sp);
        }
 
+       dnow = MIN(dlen, lwb_sp - reclen);
        lr_buf = lwb->lwb_buf + lwb->lwb_nused;
        bcopy(lrc, lr_buf, reclen);
-       lrc = (lr_t *)lr_buf;
-       lrw = (lr_write_t *)lrc;
+       lrcb = (lr_t *)lr_buf;          /* Like lrc, but inside lwb. */
+       lrwb = (lr_write_t *)lrcb;      /* Like lrw, but inside lwb. */
 
        ZIL_STAT_BUMP(zil_itx_count);
 
@@ -1129,20 +1167,24 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
                        char *dbuf;
                        int error;
 
-                       if (dlen) {
-                               ASSERT(itx->itx_wr_state == WR_NEED_COPY);
+                       if (itx->itx_wr_state == WR_NEED_COPY) {
                                dbuf = lr_buf + reclen;
-                               lrw->lr_common.lrc_reclen += dlen;
+                               lrcb->lrc_reclen += dnow;
+                               if (lrwb->lr_length > dnow)
+                                       lrwb->lr_length = dnow;
+                               lrw->lr_offset += dnow;
+                               lrw->lr_length -= dnow;
                                ZIL_STAT_BUMP(zil_itx_needcopy_count);
-                               ZIL_STAT_INCR(zil_itx_needcopy_bytes, lrw->lr_length);
+                               ZIL_STAT_INCR(zil_itx_needcopy_bytes, dnow);
                        } else {
                                ASSERT(itx->itx_wr_state == WR_INDIRECT);
                                dbuf = NULL;
                                ZIL_STAT_BUMP(zil_itx_indirect_count);
-                               ZIL_STAT_INCR(zil_itx_indirect_bytes, lrw->lr_length);
+                               ZIL_STAT_INCR(zil_itx_indirect_bytes,
+                                   lrw->lr_length);
                        }
                        error = zilog->zl_get_data(
-                           itx->itx_private, lrw, dbuf, lwb->lwb_zio);
+                           itx->itx_private, lrwb, dbuf, lwb->lwb_zio);
                        if (error == EIO) {
                                txg_wait_synced(zilog->zl_dmu_pool, txg);
                                return (lwb);
@@ -1161,29 +1203,38 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
         * equal to the itx sequence number because not all transactions
         * are synchronous, and sometimes spa_sync() gets there first.
         */
-       lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
-       lwb->lwb_nused += reclen + dlen;
+       lrcb->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
+       lwb->lwb_nused += reclen + dnow;
        lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
        ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
        ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
 
+       dlen -= dnow;
+       if (dlen > 0) {
+               zilog->zl_cur_used += reclen;
+               goto cont;
+       }
+
        return (lwb);
 }
 
 itx_t *
 zil_itx_create(uint64_t txtype, size_t lrsize)
 {
+       size_t itxsize;
        itx_t *itx;
 
        lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
+       itxsize = offsetof(itx_t, itx_lr) + lrsize;
 
-       itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize,
-           KM_PUSHPAGE | KM_NODEBUG);
+       itx = zio_data_buf_alloc(itxsize);
        itx->itx_lr.lrc_txtype = txtype;
        itx->itx_lr.lrc_reclen = lrsize;
-       itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
        itx->itx_lr.lrc_seq = 0;        /* defensive */
        itx->itx_sync = B_TRUE;         /* default is synchronous */
+       itx->itx_callback = NULL;
+       itx->itx_callback_data = NULL;
+       itx->itx_size = itxsize;
 
        return (itx);
 }
@@ -1191,7 +1242,7 @@ zil_itx_create(uint64_t txtype, size_t lrsize)
 void
 zil_itx_destroy(itx_t *itx)
 {
-       kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
+       zio_data_buf_free(itx, itx->itx_size);
 }
 
 /*
@@ -1209,9 +1260,10 @@ zil_itxg_clean(itxs_t *itxs)
 
        list = &itxs->i_sync_list;
        while ((itx = list_head(list)) != NULL) {
+               if (itx->itx_callback != NULL)
+                       itx->itx_callback(itx->itx_callback_data);
                list_remove(list, itx);
-               kmem_free(itx, offsetof(itx_t, itx_lr) +
-                   itx->itx_lr.lrc_reclen);
+               zil_itx_destroy(itx);
        }
 
        cookie = NULL;
@@ -1219,9 +1271,10 @@ zil_itxg_clean(itxs_t *itxs)
        while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
                list = &ian->ia_list;
                while ((itx = list_head(list)) != NULL) {
+                       if (itx->itx_callback != NULL)
+                               itx->itx_callback(itx->itx_callback_data);
                        list_remove(list, itx);
-                       kmem_free(itx, offsetof(itx_t, itx_lr) +
-                           itx->itx_lr.lrc_reclen);
+                       zil_itx_destroy(itx);
                }
                list_destroy(list);
                kmem_free(ian, sizeof (itx_async_node_t));
@@ -1237,12 +1290,7 @@ zil_aitx_compare(const void *x1, const void *x2)
        const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
        const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
 
-       if (o1 < o2)
-               return (-1);
-       if (o1 > o2)
-               return (1);
-
-       return (0);
+       return (AVL_CMP(o1, o2));
 }
 
 /*
@@ -1285,9 +1333,10 @@ zil_remove_async(zilog_t *zilog, uint64_t oid)
                mutex_exit(&itxg->itxg_lock);
        }
        while ((itx = list_head(&clean_list)) != NULL) {
+               if (itx->itx_callback != NULL)
+                       itx->itx_callback(itx->itx_callback_data);
                list_remove(&clean_list, itx);
-               kmem_free(itx, offsetof(itx_t, itx_lr) +
-                   itx->itx_lr.lrc_reclen);
+               zil_itx_destroy(itx);
        }
        list_destroy(&clean_list);
 }
@@ -1330,13 +1379,13 @@ zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
                         * this itxg. Save the itxs for release below.
                         * This should be rare.
                         */
-                       atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
-                       itxg->itxg_sod = 0;
+                       zfs_dbgmsg("zil_itx_assign: missed itx cleanup for "
+                           "txg %llu", itxg->itxg_txg);
                        clean = itxg->itxg_itxs;
                }
-               ASSERT(itxg->itxg_sod == 0);
                itxg->itxg_txg = txg;
-               itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_PUSHPAGE);
+               itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t),
+                   KM_SLEEP);
 
                list_create(&itxs->i_sync_list, sizeof (itx_t),
                    offsetof(itx_t, itx_node));
@@ -1346,17 +1395,17 @@ zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
        }
        if (itx->itx_sync) {
                list_insert_tail(&itxs->i_sync_list, itx);
-               atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
-               itxg->itxg_sod += itx->itx_sod;
        } else {
                avl_tree_t *t = &itxs->i_async_tree;
-               uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
+               uint64_t foid =
+                   LR_FOID_GET_OBJ(((lr_ooo_t *)&itx->itx_lr)->lr_foid);
                itx_async_node_t *ian;
                avl_index_t where;
 
                ian = avl_find(t, &foid, &where);
                if (ian == NULL) {
-                       ian = kmem_alloc(sizeof (itx_async_node_t), KM_PUSHPAGE);
+                       ian = kmem_alloc(sizeof (itx_async_node_t),
+                           KM_SLEEP);
                        list_create(&ian->ia_list, sizeof (itx_t),
                            offsetof(itx_t, itx_node));
                        ian->ia_foid = foid;
@@ -1395,8 +1444,6 @@ zil_clean(zilog_t *zilog, uint64_t synced_txg)
        ASSERT3U(itxg->itxg_txg, <=, synced_txg);
        ASSERT(itxg->itxg_txg != 0);
        ASSERT(zilog->zl_clean_taskq != NULL);
-       atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
-       itxg->itxg_sod = 0;
        clean_me = itxg->itxg_itxs;
        itxg->itxg_itxs = NULL;
        itxg->itxg_txg = 0;
@@ -1420,13 +1467,17 @@ zil_get_commit_list(zilog_t *zilog)
 {
        uint64_t otxg, txg;
        list_t *commit_list = &zilog->zl_itx_commit_list;
-       uint64_t push_sod = 0;
 
        if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
                otxg = ZILTEST_TXG;
        else
                otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 
+       /*
+        * This is inherently racy, since there is nothing to prevent
+        * the last synced txg from changing. That's okay since we'll
+        * only commit things in the future.
+        */
        for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
                itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 
@@ -1436,13 +1487,20 @@ zil_get_commit_list(zilog_t *zilog)
                        continue;
                }
 
+               /*
+                * If we're adding itx records to the zl_itx_commit_list,
+                * then the zil better be dirty in this "txg". We can assert
+                * that here since we're holding the itxg_lock which will
+                * prevent spa_sync from cleaning it. Once we add the itxs
+                * to the zl_itx_commit_list we must commit it to disk even
+                * if it's unnecessary (i.e. the txg was synced).
+                */
+               ASSERT(zilog_is_dirty_in_txg(zilog, txg) ||
+                   spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
                list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
-               push_sod += itxg->itxg_sod;
-               itxg->itxg_sod = 0;
 
                mutex_exit(&itxg->itxg_lock);
        }
-       atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
 }
 
 /*
@@ -1461,6 +1519,10 @@ zil_async_to_sync(zilog_t *zilog, uint64_t foid)
        else
                otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 
+       /*
+        * This is inherently racy, since there is nothing to prevent
+        * the last synced txg from changing.
+        */
        for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
                itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 
@@ -1530,15 +1592,19 @@ zil_commit_writer(zilog_t *zilog)
        }
 
        DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
-       while ((itx = list_head(&zilog->zl_itx_commit_list))) {
+       for (itx = list_head(&zilog->zl_itx_commit_list); itx != NULL;
+           itx = list_next(&zilog->zl_itx_commit_list, itx)) {
                txg = itx->itx_lr.lrc_txg;
-               ASSERT(txg);
+               ASSERT3U(txg, !=, 0);
 
+               /*
+                * This is inherently racy and may result in us writing
+                * out a log block for a txg that was just synced. This is
+                * ok since we'll end cleaning up that log block the next
+                * time we call zil_sync().
+                */
                if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
                        lwb = zil_lwb_commit(zilog, itx, lwb);
-               list_remove(&zilog->zl_itx_commit_list, itx);
-               kmem_free(itx, offsetof(itx_t, itx_lr)
-                   + itx->itx_lr.lrc_reclen);
        }
        DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
 
@@ -1560,6 +1626,16 @@ zil_commit_writer(zilog_t *zilog)
        if (error || lwb == NULL)
                txg_wait_synced(zilog->zl_dmu_pool, 0);
 
+       while ((itx = list_head(&zilog->zl_itx_commit_list))) {
+               txg = itx->itx_lr.lrc_txg;
+               ASSERT(txg);
+
+               if (itx->itx_callback != NULL)
+                       itx->itx_callback(itx->itx_callback_data);
+               list_remove(&zilog->zl_itx_commit_list, itx);
+               zil_itx_destroy(itx);
+       }
+
        mutex_enter(&zilog->zl_lock);
 
        /*
@@ -1727,7 +1803,7 @@ zil_init(void)
            sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
 
        zil_ksp = kstat_create("zfs", 0, "zil", "misc",
-           KSTAT_TYPE_NAMED, sizeof(zil_stats) / sizeof(kstat_named_t),
+           KSTAT_TYPE_NAMED, sizeof (zil_stats) / sizeof (kstat_named_t),
            KSTAT_FLAG_VIRTUAL);
 
        if (zil_ksp != NULL) {
@@ -1765,7 +1841,7 @@ zil_alloc(objset_t *os, zil_header_t *zh_phys)
        zilog_t *zilog;
        int i;
 
-       zilog = kmem_zalloc(sizeof (zilog_t), KM_PUSHPAGE);
+       zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
 
        zilog->zl_header = zh_phys;
        zilog->zl_os = os;
@@ -1857,7 +1933,7 @@ zil_open(objset_t *os, zil_get_data_t *get_data)
        ASSERT(list_is_empty(&zilog->zl_lwb_list));
 
        zilog->zl_get_data = get_data;
-       zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
+       zilog->zl_clean_taskq = taskq_create("zil_clean", 1, defclsyspri,
            2, 2, TASKQ_PREPOPULATE);
 
        return (zilog);
@@ -1887,7 +1963,11 @@ zil_close(zilog_t *zilog)
        mutex_exit(&zilog->zl_lock);
        if (txg)
                txg_wait_synced(zilog->zl_dmu_pool, txg);
-       ASSERT(!zilog_is_dirty(zilog));
+
+       if (zilog_is_dirty(zilog))
+               zfs_dbgmsg("zil (%p) is dirty, txg %llu", zilog, txg);
+       if (txg < spa_freeze_txg(zilog->zl_spa))
+               VERIFY(!zilog_is_dirty(zilog));
 
        taskq_destroy(zilog->zl_clean_taskq);
        zilog->zl_clean_taskq = NULL;
@@ -2047,7 +2127,7 @@ typedef struct zil_replay_arg {
 static int
 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
 {
-       char name[MAXNAMELEN];
+       char name[ZFS_MAX_DATASET_NAME_LEN];
 
        zilog->zl_replaying_seq--;      /* didn't actually replay this one */
 
@@ -2091,7 +2171,7 @@ zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
         */
        if (TX_OOO(txtype)) {
                error = dmu_object_info(zilog->zl_os,
-                   ((lr_ooo_t *)lr)->lr_foid, NULL);
+                   LR_FOID_GET_OBJ(((lr_ooo_t *)lr)->lr_foid), NULL);
                if (error == ENOENT || error == EEXIST)
                        return (0);
        }
@@ -2171,7 +2251,7 @@ zil_replay(objset_t *os, void *arg, zil_replay_func_t replay_func[TX_MAX_TYPE])
        zr.zr_replay = replay_func;
        zr.zr_arg = arg;
        zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
-       zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_PUSHPAGE);
+       zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
 
        /*
         * Wait for in-progress removes to sync before starting replay.
@@ -2219,12 +2299,38 @@ zil_vdev_offline(const char *osname, void *arg)
 }
 
 #if defined(_KERNEL) && defined(HAVE_SPL)
+EXPORT_SYMBOL(zil_alloc);
+EXPORT_SYMBOL(zil_free);
+EXPORT_SYMBOL(zil_open);
+EXPORT_SYMBOL(zil_close);
+EXPORT_SYMBOL(zil_replay);
+EXPORT_SYMBOL(zil_replaying);
+EXPORT_SYMBOL(zil_destroy);
+EXPORT_SYMBOL(zil_destroy_sync);
+EXPORT_SYMBOL(zil_itx_create);
+EXPORT_SYMBOL(zil_itx_destroy);
+EXPORT_SYMBOL(zil_itx_assign);
+EXPORT_SYMBOL(zil_commit);
+EXPORT_SYMBOL(zil_vdev_offline);
+EXPORT_SYMBOL(zil_claim);
+EXPORT_SYMBOL(zil_check_log_chain);
+EXPORT_SYMBOL(zil_sync);
+EXPORT_SYMBOL(zil_clean);
+EXPORT_SYMBOL(zil_suspend);
+EXPORT_SYMBOL(zil_resume);
+EXPORT_SYMBOL(zil_add_block);
+EXPORT_SYMBOL(zil_bp_tree_add);
+EXPORT_SYMBOL(zil_set_sync);
+EXPORT_SYMBOL(zil_set_logbias);
+
+/* BEGIN CSTYLED */
 module_param(zil_replay_disable, int, 0644);
 MODULE_PARM_DESC(zil_replay_disable, "Disable intent logging replay");
 
 module_param(zfs_nocacheflush, int, 0644);
 MODULE_PARM_DESC(zfs_nocacheflush, "Disable cache flushes");
 
-module_param(zil_slog_limit, ulong, 0644);
-MODULE_PARM_DESC(zil_slog_limit, "Max commit bytes to separate log device");
+module_param(zil_slog_bulk, ulong, 0644);
+MODULE_PARM_DESC(zil_slog_bulk, "Limit in bytes slog sync writes per commit");
+/* END CSTYLED */
 #endif