]> git.proxmox.com Git - mirror_zfs-debian.git/blobdiff - module/zfs/zio.c
Imported Upstream version 0.6.4.2
[mirror_zfs-debian.git] / module / zfs / zio.c
index 0bf0f6e127c9b0210599f5b2f6a20f9a027d1751..066f04f1864c6c8a6ccc59c70573ebecf38f28db 100644 (file)
@@ -20,6 +20,8 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
 #include <sys/dmu_objset.h>
 #include <sys/arc.h>
 #include <sys/ddt.h>
-
-/*
- * ==========================================================================
- * I/O priority table
- * ==========================================================================
- */
-uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
-       0,      /* ZIO_PRIORITY_NOW             */
-       0,      /* ZIO_PRIORITY_SYNC_READ       */
-       0,      /* ZIO_PRIORITY_SYNC_WRITE      */
-       0,      /* ZIO_PRIORITY_LOG_WRITE       */
-       1,      /* ZIO_PRIORITY_CACHE_FILL      */
-       1,      /* ZIO_PRIORITY_AGG             */
-       4,      /* ZIO_PRIORITY_FREE            */
-       4,      /* ZIO_PRIORITY_ASYNC_WRITE     */
-       6,      /* ZIO_PRIORITY_ASYNC_READ      */
-       10,     /* ZIO_PRIORITY_RESILVER        */
-       20,     /* ZIO_PRIORITY_SCRUB           */
-       2,      /* ZIO_PRIORITY_DDT_PREFETCH    */
-};
+#include <sys/blkptr.h>
+#include <sys/zfeature.h>
 
 /*
  * ==========================================================================
  * I/O type descriptions
  * ==========================================================================
  */
-char *zio_type_name[ZIO_TYPES] = {
-       "zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
-       "zio_ioctl"
+const char *zio_type_name[ZIO_TYPES] = {
+       "z_null", "z_rd", "z_wr", "z_fr", "z_cl", "z_ioctl"
 };
 
 /*
@@ -74,10 +57,23 @@ kmem_cache_t *zio_cache;
 kmem_cache_t *zio_link_cache;
 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
+int zio_delay_max = ZIO_DELAY_MAX;
 
-#ifdef _KERNEL
-extern vmem_t *zio_alloc_arena;
-#endif
+/*
+ * The following actions directly effect the spa's sync-to-convergence logic.
+ * The values below define the sync pass when we start performing the action.
+ * Care should be taken when changing these values as they directly impact
+ * spa_sync() performance. Tuning these values may introduce subtle performance
+ * pathologies and should only be done in the context of performance analysis.
+ * These tunables will eventually be removed and replaced with #defines once
+ * enough analysis has been done to determine optimal values.
+ *
+ * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
+ * regular blocks are not deferred.
+ */
+int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
+int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
+int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
 
 /*
  * An allocating zio is one that either currently has the DVA allocate
@@ -85,7 +81,7 @@ extern vmem_t *zio_alloc_arena;
  */
 #define        IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
 
-boolean_t      zio_requeue_io_start_cut_in_line = B_TRUE;
+int zio_requeue_io_start_cut_in_line = 1;
 
 #ifdef ZFS_DEBUG
 int zio_buf_debug_limit = 16384;
@@ -93,15 +89,14 @@ int zio_buf_debug_limit = 16384;
 int zio_buf_debug_limit = 0;
 #endif
 
+static inline void __zio_execute(zio_t *zio);
+
 void
 zio_init(void)
 {
        size_t c;
        vmem_t *data_alloc_arena = NULL;
 
-#ifdef _KERNEL
-       data_alloc_arena = zio_alloc_arena;
-#endif
        zio_cache = kmem_cache_create("zio_cache",
            sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
        zio_link_cache = kmem_cache_create("zio_link_cache",
@@ -117,15 +112,26 @@ zio_init(void)
                size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
                size_t p2 = size;
                size_t align = 0;
+               size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
 
                while (p2 & (p2 - 1))
                        p2 &= p2 - 1;
 
+#ifndef _KERNEL
+               /*
+                * If we are using watchpoints, put each buffer on its own page,
+                * to eliminate the performance overhead of trapping to the
+                * kernel when modifying a non-watched buffer that shares the
+                * page with a watched buffer.
+                */
+               if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
+                       continue;
+#endif
                if (size <= 4 * SPA_MINBLOCKSIZE) {
                        align = SPA_MINBLOCKSIZE;
-               } else if (P2PHASE(size, PAGESIZE) == 0) {
+               } else if (IS_P2ALIGNED(size, PAGESIZE)) {
                        align = PAGESIZE;
-               } else if (P2PHASE(size, p2 >> 2) == 0) {
+               } else if (IS_P2ALIGNED(size, p2 >> 2)) {
                        align = p2 >> 2;
                }
 
@@ -133,13 +139,12 @@ zio_init(void)
                        char name[36];
                        (void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
                        zio_buf_cache[c] = kmem_cache_create(name, size,
-                           align, NULL, NULL, NULL, NULL, NULL,
-                           size > zio_buf_debug_limit ? KMC_NODEBUG : 0);
+                           align, NULL, NULL, NULL, NULL, NULL, cflags);
 
                        (void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
                        zio_data_buf_cache[c] = kmem_cache_create(name, size,
-                           align, NULL, NULL, NULL, NULL, data_alloc_arena,
-                           size > zio_buf_debug_limit ? KMC_NODEBUG : 0);
+                           align, NULL, NULL, NULL, NULL,
+                           data_alloc_arena, cflags);
                }
        }
 
@@ -154,6 +159,8 @@ zio_init(void)
        }
 
        zio_inject_init();
+
+       lz4_init();
 }
 
 void
@@ -181,6 +188,8 @@ zio_fini(void)
        kmem_cache_destroy(zio_cache);
 
        zio_inject_fini();
+
+       lz4_fini();
 }
 
 /*
@@ -200,7 +209,7 @@ zio_buf_alloc(size_t size)
 {
        size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
 
-       ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
+       ASSERT3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
        return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
 }
@@ -305,7 +314,7 @@ zio_decompress(zio_t *zio, void *data, uint64_t size)
        if (zio->io_error == 0 &&
            zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
            zio->io_data, data, zio->io_size, size) != 0)
-               zio->io_error = EIO;
+               zio->io_error = SET_ERROR(EIO);
 }
 
 /*
@@ -436,7 +445,8 @@ zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
        return (waiting);
 }
 
-static void
+__attribute__((always_inline))
+static inline void
 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
 {
        uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
@@ -447,10 +457,13 @@ zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
                *errorp = zio_worst_error(*errorp, zio->io_error);
        pio->io_reexecute |= zio->io_reexecute;
        ASSERT3U(*countp, >, 0);
-       if (--*countp == 0 && pio->io_stall == countp) {
+
+       (*countp)--;
+
+       if (*countp == 0 && pio->io_stall == countp) {
                pio->io_stall = NULL;
                mutex_exit(&pio->io_lock);
-               zio_execute(pio);
+               __zio_execute(pio);
        } else {
                mutex_exit(&pio->io_lock);
        }
@@ -471,8 +484,8 @@ zio_inherit_child_errors(zio_t *zio, enum zio_child c)
 static zio_t *
 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
-    zio_type_t type, int priority, enum zio_flag flags,
-    vdev_t *vd, uint64_t offset, const zbookmark_t *zb,
+    zio_type_t type, zio_priority_t priority, enum zio_flag flags,
+    vdev_t *vd, uint64_t offset, const zbookmark_phys_t *zb,
     enum zio_stage stage, enum zio_stage pipeline)
 {
        zio_t *zio;
@@ -546,6 +559,8 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
                zio_add_child(pio, zio);
        }
 
+       taskq_init_ent(&zio->io_tqent);
+
        return (zio);
 }
 
@@ -581,7 +596,7 @@ zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
 zio_t *
 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, const zbookmark_t *zb)
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -597,8 +612,9 @@ zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
 zio_t *
 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     void *data, uint64_t size, const zio_prop_t *zp,
-    zio_done_func_t *ready, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, const zbookmark_t *zb)
+    zio_done_func_t *ready, zio_done_func_t *physdone, zio_done_func_t *done,
+    void *private,
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -606,12 +622,10 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
            zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
            zp->zp_compress >= ZIO_COMPRESS_OFF &&
            zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
-           zp->zp_type < DMU_OT_NUMTYPES &&
+           DMU_OT_IS_VALID(zp->zp_type) &&
            zp->zp_level < 32 &&
            zp->zp_copies > 0 &&
-           zp->zp_copies <= spa_max_replication(spa) &&
-           zp->zp_dedup <= 1 &&
-           zp->zp_dedup_verify <= 1);
+           zp->zp_copies <= spa_max_replication(spa));
 
        zio = zio_create(pio, spa, txg, bp, data, size, done, private,
            ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
@@ -619,15 +633,26 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
            ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
 
        zio->io_ready = ready;
+       zio->io_physdone = physdone;
        zio->io_prop = *zp;
 
+       /*
+        * Data can be NULL if we are going to call zio_write_override() to
+        * provide the already-allocated BP.  But we may need the data to
+        * verify a dedup hit (if requested).  In this case, don't try to
+        * dedup (just take the already-allocated BP verbatim).
+        */
+       if (data == NULL && zio->io_prop.zp_dedup_verify) {
+               zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
+       }
+
        return (zio);
 }
 
 zio_t *
 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
-    uint64_t size, zio_done_func_t *done, void *private, int priority,
-    enum zio_flag flags, zbookmark_t *zb)
+    uint64_t size, zio_done_func_t *done, void *private,
+    zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -639,13 +664,20 @@ zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
 }
 
 void
-zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
+zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
 {
        ASSERT(zio->io_type == ZIO_TYPE_WRITE);
        ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
        ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
        ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
 
+       /*
+        * We must reset the io_prop to match the values that existed
+        * when the bp was first written by dmu_sync() keeping in mind
+        * that nopwrite and dedup are mutually exclusive.
+        */
+       zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
+       zio->io_prop.zp_nopwrite = nopwrite;
        zio->io_prop.zp_copies = copies;
        zio->io_bp_override = bp;
 }
@@ -653,7 +685,29 @@ zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
 void
 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
 {
-       bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
+
+       /*
+        * The check for EMBEDDED is a performance optimization.  We
+        * process the free here (by ignoring it) rather than
+        * putting it on the list and then processing it in zio_free_sync().
+        */
+       if (BP_IS_EMBEDDED(bp))
+               return;
+       metaslab_check_free(spa, bp);
+
+       /*
+        * Frees that are for the currently-syncing txg, are not going to be
+        * deferred, and which will not need to do a read (i.e. not GANG or
+        * DEDUP), can be processed immediately.  Otherwise, put them on the
+        * in-memory list for later processing.
+        */
+       if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp) ||
+           txg != spa->spa_syncing_txg ||
+           spa_sync_pass(spa) >= zfs_sync_pass_deferred_free) {
+               bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
+       } else {
+               VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp, 0)));
+       }
 }
 
 zio_t *
@@ -661,17 +715,29 @@ zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
     enum zio_flag flags)
 {
        zio_t *zio;
-
-       dprintf_bp(bp, "freeing in txg %llu, pass %u",
-           (longlong_t)txg, spa->spa_sync_pass);
+       enum zio_stage stage = ZIO_FREE_PIPELINE;
 
        ASSERT(!BP_IS_HOLE(bp));
        ASSERT(spa_syncing_txg(spa) == txg);
-       ASSERT(spa_sync_pass(spa) <= SYNC_PASS_DEFERRED_FREE);
+       ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
+
+       if (BP_IS_EMBEDDED(bp))
+               return (zio_null(pio, spa, NULL, NULL, NULL, 0));
+
+       metaslab_check_free(spa, bp);
+       arc_freed(spa, bp);
+
+       /*
+        * GANG and DEDUP blocks can induce a read (for the gang block header,
+        * or the DDT), so issue them asynchronously so that this thread is
+        * not tied up.
+        */
+       if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
+               stage |= ZIO_STAGE_ISSUE_ASYNC;
 
        zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
-           NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, flags,
-           NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PIPELINE);
+           NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW, flags,
+           NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
 
        return (zio);
 }
@@ -682,6 +748,11 @@ zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 {
        zio_t *zio;
 
+       dprintf_bp(bp, "claiming in txg %llu", txg);
+
+       if (BP_IS_EMBEDDED(bp))
+               return (zio_null(pio, spa, NULL, NULL, NULL, 0));
+
        /*
         * A claim is an allocation of a specific block.  Claims are needed
         * to support immediate writes in the intent log.  The issue is that
@@ -707,14 +778,14 @@ zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 
 zio_t *
 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
-    zio_done_func_t *done, void *private, int priority, enum zio_flag flags)
+    zio_done_func_t *done, void *private, enum zio_flag flags)
 {
        zio_t *zio;
        int c;
 
        if (vd->vdev_children == 0) {
                zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
-                   ZIO_TYPE_IOCTL, priority, flags, vd, 0, NULL,
+                   ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
                    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
 
                zio->io_cmd = cmd;
@@ -723,7 +794,7 @@ zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
 
                for (c = 0; c < vd->vdev_children; c++)
                        zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
-                           done, private, priority, flags));
+                           done, private, flags));
        }
 
        return (zio);
@@ -732,7 +803,7 @@ zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
 zio_t *
 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     void *data, int checksum, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, boolean_t labels)
+    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
 {
        zio_t *zio;
 
@@ -742,8 +813,8 @@ zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
        ASSERT3U(offset + size, <=, vd->vdev_psize);
 
        zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
-           ZIO_TYPE_READ, priority, flags, vd, offset, NULL,
-           ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
+           ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
+           NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
 
        zio->io_prop.zp_checksum = checksum;
 
@@ -753,7 +824,7 @@ zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
 zio_t *
 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     void *data, int checksum, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, boolean_t labels)
+    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
 {
        zio_t *zio;
 
@@ -763,8 +834,8 @@ zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
        ASSERT3U(offset + size, <=, vd->vdev_psize);
 
        zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
-           ZIO_TYPE_WRITE, priority, flags, vd, offset, NULL,
-           ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
+           ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
+           NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
 
        zio->io_prop.zp_checksum = checksum;
 
@@ -788,8 +859,8 @@ zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
  */
 zio_t *
 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
-       void *data, uint64_t size, int type, int priority, enum zio_flag flags,
-       zio_done_func_t *done, void *private)
+       void *data, uint64_t size, int type, zio_priority_t priority,
+       enum zio_flag flags, zio_done_func_t *done, void *private)
 {
        enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
        zio_t *zio;
@@ -824,12 +895,16 @@ zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
            done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
            ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
 
+       zio->io_physdone = pio->io_physdone;
+       if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
+               zio->io_logical->io_phys_children++;
+
        return (zio);
 }
 
 zio_t *
 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
-       int type, int priority, enum zio_flag flags,
+       int type, zio_priority_t priority, enum zio_flag flags,
        zio_done_func_t *done, void *private)
 {
        zio_t *zio;
@@ -838,7 +913,7 @@ zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
 
        zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
            data, size, done, private, type, priority,
-           flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY,
+           flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
            vd, offset, NULL,
            ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
 
@@ -849,7 +924,7 @@ void
 zio_flush(zio_t *zio, vdev_t *vd)
 {
        zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
-           NULL, NULL, ZIO_PRIORITY_NOW,
+           NULL, NULL,
            ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
 }
 
@@ -884,13 +959,21 @@ zio_read_bp_init(zio_t *zio)
        if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
            zio->io_child_type == ZIO_CHILD_LOGICAL &&
            !(zio->io_flags & ZIO_FLAG_RAW)) {
-               uint64_t psize = BP_GET_PSIZE(bp);
+               uint64_t psize =
+                   BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
                void *cbuf = zio_buf_alloc(psize);
 
                zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
        }
 
-       if (!dmu_ot[BP_GET_TYPE(bp)].ot_metadata && BP_GET_LEVEL(bp) == 0)
+       if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
+               zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+               decode_embedded_bp_compressed(bp, zio->io_data);
+       } else {
+               ASSERT(!BP_IS_EMBEDDED(bp));
+       }
+
+       if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
                zio->io_flags |= ZIO_FLAG_DONT_CACHE;
 
        if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
@@ -933,6 +1016,22 @@ zio_write_bp_init(zio_t *zio)
                *bp = *zio->io_bp_override;
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+               if (BP_IS_EMBEDDED(bp))
+                       return (ZIO_PIPELINE_CONTINUE);
+
+               /*
+                * If we've been overridden and nopwrite is set then
+                * set the flag accordingly to indicate that a nopwrite
+                * has already occurred.
+                */
+               if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
+                       ASSERT(!zp->zp_dedup);
+                       zio->io_flags |= ZIO_FLAG_NOPWRITE;
+                       return (ZIO_PIPELINE_CONTINUE);
+               }
+
+               ASSERT(!zp->zp_nopwrite);
+
                if (BP_IS_HOLE(bp) || !zp->zp_dedup)
                        return (ZIO_PIPELINE_CONTINUE);
 
@@ -944,11 +1043,9 @@ zio_write_bp_init(zio_t *zio)
                        zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
                        return (ZIO_PIPELINE_CONTINUE);
                }
-               zio->io_bp_override = NULL;
-               BP_ZERO(bp);
        }
 
-       if (bp->blk_birth == zio->io_txg) {
+       if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
                /*
                 * We're rewriting an existing block, which means we're
                 * working on behalf of spa_sync().  For spa_sync() to
@@ -964,11 +1061,11 @@ zio_write_bp_init(zio_t *zio)
                ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
                ASSERT(!BP_GET_DEDUP(bp));
 
-               if (pass > SYNC_PASS_DONT_COMPRESS)
+               if (pass >= zfs_sync_pass_dont_compress)
                        compress = ZIO_COMPRESS_OFF;
 
                /* Make sure someone doesn't change their mind on overwrites */
-               ASSERT(MIN(zp->zp_copies + BP_IS_GANG(bp),
+               ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
                    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
        }
 
@@ -978,9 +1075,38 @@ zio_write_bp_init(zio_t *zio)
                if (psize == 0 || psize == lsize) {
                        compress = ZIO_COMPRESS_OFF;
                        zio_buf_free(cbuf, lsize);
+               } else if (!zp->zp_dedup && psize <= BPE_PAYLOAD_SIZE &&
+                   zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
+                   spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
+                       encode_embedded_bp_compressed(bp,
+                           cbuf, compress, lsize, psize);
+                       BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
+                       BP_SET_TYPE(bp, zio->io_prop.zp_type);
+                       BP_SET_LEVEL(bp, zio->io_prop.zp_level);
+                       zio_buf_free(cbuf, lsize);
+                       bp->blk_birth = zio->io_txg;
+                       zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+                       ASSERT(spa_feature_is_active(spa,
+                           SPA_FEATURE_EMBEDDED_DATA));
+                       return (ZIO_PIPELINE_CONTINUE);
                } else {
-                       ASSERT(psize < lsize);
-                       zio_push_transform(zio, cbuf, psize, lsize, NULL);
+                       /*
+                        * Round up compressed size to MINBLOCKSIZE and
+                        * zero the tail.
+                        */
+                       size_t rounded =
+                           P2ROUNDUP(psize, (size_t)SPA_MINBLOCKSIZE);
+                       if (rounded > psize) {
+                               bzero((char *)cbuf + psize, rounded - psize);
+                               psize = rounded;
+                       }
+                       if (psize == lsize) {
+                               compress = ZIO_COMPRESS_OFF;
+                               zio_buf_free(cbuf, lsize);
+                       } else {
+                               zio_push_transform(zio, cbuf,
+                                   psize, lsize, NULL);
+                       }
                }
        }
 
@@ -992,8 +1118,9 @@ zio_write_bp_init(zio_t *zio)
         * spa_sync() to allocate new blocks, but force rewrites after that.
         * There should only be a handful of blocks after pass 1 in any case.
         */
-       if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == psize &&
-           pass > SYNC_PASS_REWRITE) {
+       if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
+           BP_GET_PSIZE(bp) == psize &&
+           pass >= zfs_sync_pass_rewrite) {
                enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
                ASSERT(psize != 0);
                zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
@@ -1004,15 +1131,22 @@ zio_write_bp_init(zio_t *zio)
        }
 
        if (psize == 0) {
+               if (zio->io_bp_orig.blk_birth != 0 &&
+                   spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
+                       BP_SET_LSIZE(bp, lsize);
+                       BP_SET_TYPE(bp, zp->zp_type);
+                       BP_SET_LEVEL(bp, zp->zp_level);
+                       BP_SET_BIRTH(bp, zio->io_txg, 0);
+               }
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
        } else {
                ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
                BP_SET_LSIZE(bp, lsize);
+               BP_SET_TYPE(bp, zp->zp_type);
+               BP_SET_LEVEL(bp, zp->zp_level);
                BP_SET_PSIZE(bp, psize);
                BP_SET_COMPRESS(bp, compress);
                BP_SET_CHECKSUM(bp, zp->zp_checksum);
-               BP_SET_TYPE(bp, zp->zp_type);
-               BP_SET_LEVEL(bp, zp->zp_level);
                BP_SET_DEDUP(bp, zp->zp_dedup);
                BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
                if (zp->zp_dedup) {
@@ -1020,6 +1154,11 @@ zio_write_bp_init(zio_t *zio)
                        ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
                        zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
                }
+               if (zp->zp_nopwrite) {
+                       ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
+                       ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
+                       zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
+               }
        }
 
        return (ZIO_PIPELINE_CONTINUE);
@@ -1045,11 +1184,11 @@ zio_free_bp_init(zio_t *zio)
  */
 
 static void
-zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
+zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
 {
        spa_t *spa = zio->io_spa;
        zio_type_t t = zio->io_type;
-       int flags = TQ_SLEEP | (cutinline ? TQ_FRONT : 0);
+       int flags = (cutinline ? TQ_FRONT : 0);
 
        /*
         * If we're a config writer or a probe, the normal issue and
@@ -1066,27 +1205,40 @@ zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
                t = ZIO_TYPE_NULL;
 
        /*
-        * If this is a high priority I/O, then use the high priority taskq.
+        * If this is a high priority I/O, then use the high priority taskq if
+        * available.
         */
        if (zio->io_priority == ZIO_PRIORITY_NOW &&
-           spa->spa_zio_taskq[t][q + 1] != NULL)
+           spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
                q++;
 
        ASSERT3U(q, <, ZIO_TASKQ_TYPES);
-       (void) taskq_dispatch(spa->spa_zio_taskq[t][q],
-           (task_func_t *)zio_execute, zio, flags);
+
+       /*
+        * NB: We are assuming that the zio can only be dispatched
+        * to a single taskq at a time.  It would be a grievous error
+        * to dispatch the zio to another taskq at the same time.
+        */
+       ASSERT(taskq_empty_ent(&zio->io_tqent));
+       spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
+           flags, &zio->io_tqent);
 }
 
 static boolean_t
-zio_taskq_member(zio_t *zio, enum zio_taskq_type q)
+zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
 {
        kthread_t *executor = zio->io_executor;
        spa_t *spa = zio->io_spa;
        zio_type_t t;
 
-       for (t = 0; t < ZIO_TYPES; t++)
-               if (taskq_member(spa->spa_zio_taskq[t][q], executor))
-                       return (B_TRUE);
+       for (t = 0; t < ZIO_TYPES; t++) {
+               spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
+               uint_t i;
+               for (i = 0; i < tqs->stqs_count; i++) {
+                       if (taskq_member(tqs->stqs_taskq[i], executor))
+                               return (B_TRUE);
+               }
+       }
 
        return (B_FALSE);
 }
@@ -1113,21 +1265,42 @@ zio_interrupt(zio_t *zio)
  * vdev-level caching or aggregation; (5) the I/O is deferred
  * due to vdev-level queueing; (6) the I/O is handed off to
  * another thread.  In all cases, the pipeline stops whenever
- * there's no CPU work; it never burns a thread in cv_wait().
+ * there's no CPU work; it never burns a thread in cv_wait_io().
  *
  * There's no locking on io_stage because there's no legitimate way
  * for multiple threads to be attempting to process the same I/O.
  */
 static zio_pipe_stage_t *zio_pipeline[];
 
+/*
+ * zio_execute() is a wrapper around the static function
+ * __zio_execute() so that we can force  __zio_execute() to be
+ * inlined.  This reduces stack overhead which is important
+ * because __zio_execute() is called recursively in several zio
+ * code paths.  zio_execute() itself cannot be inlined because
+ * it is externally visible.
+ */
 void
 zio_execute(zio_t *zio)
+{
+       fstrans_cookie_t cookie;
+
+       cookie = spl_fstrans_mark();
+       __zio_execute(zio);
+       spl_fstrans_unmark(cookie);
+}
+
+__attribute__((always_inline))
+static inline void
+__zio_execute(zio_t *zio)
 {
        zio->io_executor = curthread;
 
        while (zio->io_stage < ZIO_STAGE_DONE) {
                enum zio_stage pipeline = zio->io_pipeline;
                enum zio_stage stage = zio->io_stage;
+               dsl_pool_t *dp;
+               boolean_t cut;
                int rv;
 
                ASSERT(!MUTEX_HELD(&zio->io_lock));
@@ -1140,6 +1313,10 @@ zio_execute(zio_t *zio)
 
                ASSERT(stage <= ZIO_STAGE_DONE);
 
+               dp = spa_get_dsl(zio->io_spa);
+               cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
+                   zio_requeue_io_start_cut_in_line : B_FALSE;
+
                /*
                 * If we are in interrupt context and this pipeline stage
                 * will grab a config lock that is held across I/O,
@@ -1151,14 +1328,27 @@ zio_execute(zio_t *zio)
                 */
                if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
                    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
-                       boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
-                           zio_requeue_io_start_cut_in_line : B_FALSE;
+                       zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
+                       return;
+               }
+
+               /*
+                * If we executing in the context of the tx_sync_thread,
+                * or we are performing pool initialization outside of a
+                * zio_taskq[ZIO_TASKQ_ISSUE|ZIO_TASKQ_ISSUE_HIGH] context.
+                * Then issue the zio asynchronously to minimize stack usage
+                * for these deep call paths.
+                */
+               if ((dp && curthread == dp->dp_tx.tx_sync_thread) ||
+                   (dp && spa_is_initializing(dp->dp_spa) &&
+                   !zio_taskq_member(zio, ZIO_TASKQ_ISSUE) &&
+                   !zio_taskq_member(zio, ZIO_TASKQ_ISSUE_HIGH))) {
                        zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
                        return;
                }
 
                zio->io_stage = stage;
-               rv = zio_pipeline[highbit(stage) - 1](zio);
+               rv = zio_pipeline[highbit64(stage) - 1](zio);
 
                if (rv == ZIO_PIPELINE_STOP)
                        return;
@@ -1167,6 +1357,7 @@ zio_execute(zio_t *zio)
        }
 }
 
+
 /*
  * ==========================================================================
  * Initiate I/O, either sync or async
@@ -1182,11 +1373,11 @@ zio_wait(zio_t *zio)
 
        zio->io_waiter = curthread;
 
-       zio_execute(zio);
+       __zio_execute(zio);
 
        mutex_enter(&zio->io_lock);
        while (zio->io_executor != NULL)
-               cv_wait(&zio->io_cv, &zio->io_lock);
+               cv_wait_io(&zio->io_cv, &zio->io_lock);
        mutex_exit(&zio->io_lock);
 
        error = zio->io_error;
@@ -1202,17 +1393,22 @@ zio_nowait(zio_t *zio)
 
        if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
            zio_unique_parent(zio) == NULL) {
+               zio_t *pio;
+
                /*
                 * This is a logical async I/O with no parent to wait for it.
                 * We add it to the spa_async_root_zio "Godfather" I/O which
                 * will ensure they complete prior to unloading the pool.
                 */
                spa_t *spa = zio->io_spa;
+               kpreempt_disable();
+               pio = spa->spa_async_zio_root[CPU_SEQID];
+               kpreempt_enable();
 
-               zio_add_child(spa->spa_async_zio_root, zio);
+               zio_add_child(pio, zio);
        }
 
-       zio_execute(zio);
+       __zio_execute(zio);
 }
 
 /*
@@ -1236,6 +1432,7 @@ zio_reexecute(zio_t *pio)
        pio->io_stage = pio->io_orig_stage;
        pio->io_pipeline = pio->io_orig_pipeline;
        pio->io_reexecute = 0;
+       pio->io_flags |= ZIO_FLAG_REEXECUTED;
        pio->io_error = 0;
        for (w = 0; w < ZIO_WAIT_TYPES; w++)
                pio->io_state[w] = 0;
@@ -1267,7 +1464,7 @@ zio_reexecute(zio_t *pio)
         * responsibility of the caller to wait on him.
         */
        if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
-               zio_execute(pio);
+               __zio_execute(pio);
 }
 
 void
@@ -1278,6 +1475,9 @@ zio_suspend(spa_t *spa, zio_t *zio)
                    "failure and the failure mode property for this pool "
                    "is set to panic.", spa_name(spa));
 
+       cmn_err(CE_WARN, "Pool '%s' has encountered an uncorrectable I/O "
+           "failure and has been suspended.\n", spa_name(spa));
+
        zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
 
        mutex_enter(&spa->spa_suspend_lock);
@@ -1634,11 +1834,11 @@ static void
 zio_write_gang_member_ready(zio_t *zio)
 {
        zio_t *pio = zio_unique_parent(zio);
-       zio_t *gio = zio->io_gang_leader;
        dva_t *cdva = zio->io_bp->blk_dva;
        dva_t *pdva = pio->io_bp->blk_dva;
        uint64_t asize;
        int d;
+       ASSERTV(zio_t *gio = zio->io_gang_leader);
 
        if (BP_IS_HOLE(zio->io_bp))
                return;
@@ -1716,12 +1916,13 @@ zio_write_gang_block(zio_t *pio)
                zp.zp_type = DMU_OT_NONE;
                zp.zp_level = 0;
                zp.zp_copies = gio->io_prop.zp_copies;
-               zp.zp_dedup = 0;
-               zp.zp_dedup_verify = 0;
+               zp.zp_dedup = B_FALSE;
+               zp.zp_dedup_verify = B_FALSE;
+               zp.zp_nopwrite = B_FALSE;
 
                zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
                    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
-                   zio_write_gang_member_ready, NULL, &gn->gn_child[g],
+                   zio_write_gang_member_ready, NULL, NULL, &gn->gn_child[g],
                    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
                    &pio->io_bookmark));
        }
@@ -1731,11 +1932,72 @@ zio_write_gang_block(zio_t *pio)
         */
        pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+       /*
+        * We didn't allocate this bp, so make sure it doesn't get unmarked.
+        */
+       pio->io_flags &= ~ZIO_FLAG_FASTWRITE;
+
        zio_nowait(zio);
 
        return (ZIO_PIPELINE_CONTINUE);
 }
 
+/*
+ * The zio_nop_write stage in the pipeline determines if allocating
+ * a new bp is necessary.  By leveraging a cryptographically secure checksum,
+ * such as SHA256, we can compare the checksums of the new data and the old
+ * to determine if allocating a new block is required.  The nopwrite
+ * feature can handle writes in either syncing or open context (i.e. zil
+ * writes) and as a result is mutually exclusive with dedup.
+ */
+static int
+zio_nop_write(zio_t *zio)
+{
+       blkptr_t *bp = zio->io_bp;
+       blkptr_t *bp_orig = &zio->io_bp_orig;
+       zio_prop_t *zp = &zio->io_prop;
+
+       ASSERT(BP_GET_LEVEL(bp) == 0);
+       ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
+       ASSERT(zp->zp_nopwrite);
+       ASSERT(!zp->zp_dedup);
+       ASSERT(zio->io_bp_override == NULL);
+       ASSERT(IO_IS_ALLOCATING(zio));
+
+       /*
+        * Check to see if the original bp and the new bp have matching
+        * characteristics (i.e. same checksum, compression algorithms, etc).
+        * If they don't then just continue with the pipeline which will
+        * allocate a new bp.
+        */
+       if (BP_IS_HOLE(bp_orig) ||
+           !zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_dedup ||
+           BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
+           BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
+           BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
+           zp->zp_copies != BP_GET_NDVAS(bp_orig))
+               return (ZIO_PIPELINE_CONTINUE);
+
+       /*
+        * If the checksums match then reset the pipeline so that we
+        * avoid allocating a new bp and issuing any I/O.
+        */
+       if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
+               ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup);
+               ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
+               ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
+               ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
+               ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
+                   sizeof (uint64_t)) == 0);
+
+               *bp = *bp_orig;
+               zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+               zio->io_flags |= ZIO_FLAG_NOPWRITE;
+       }
+
+       return (ZIO_PIPELINE_CONTINUE);
+}
+
 /*
  * ==========================================================================
  * Dedup
@@ -1876,7 +2138,7 @@ zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
 
                        ddt_exit(ddt);
 
-                       error = arc_read_nolock(NULL, spa, &blk,
+                       error = arc_read(NULL, spa, &blk,
                            arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
                            ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
                            &aflags, &zio->io_bookmark);
@@ -1885,8 +2147,8 @@ zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
                                if (arc_buf_size(abuf) != zio->io_orig_size ||
                                    bcmp(abuf->b_data, zio->io_orig_data,
                                    zio->io_orig_size) != 0)
-                                       error = EEXIST;
-                               VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
+                                       error = SET_ERROR(EEXIST);
+                               VERIFY(arc_buf_remove_ref(abuf, &abuf));
                        }
 
                        ddt_enter(ddt);
@@ -1949,12 +2211,12 @@ static void
 zio_ddt_ditto_write_done(zio_t *zio)
 {
        int p = DDT_PHYS_DITTO;
-       zio_prop_t *zp = &zio->io_prop;
        blkptr_t *bp = zio->io_bp;
        ddt_t *ddt = ddt_select(zio->io_spa, bp);
        ddt_entry_t *dde = zio->io_private;
        ddt_phys_t *ddp = &dde->dde_phys[p];
        ddt_key_t *ddk = &dde->dde_key;
+       ASSERTV(zio_prop_t *zp = &zio->io_prop);
 
        ddt_enter(ddt);
 
@@ -2010,7 +2272,7 @@ zio_ddt_write(zio_t *zio)
                        zio->io_stage = ZIO_STAGE_OPEN;
                        BP_ZERO(bp);
                } else {
-                       zp->zp_dedup = 0;
+                       zp->zp_dedup = B_FALSE;
                }
                zio->io_pipeline = ZIO_WRITE_PIPELINE;
                ddt_exit(ddt);
@@ -2044,7 +2306,7 @@ zio_ddt_write(zio_t *zio)
                }
 
                dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
-                   zio->io_orig_size, &czp, NULL,
+                   zio->io_orig_size, &czp, NULL, NULL,
                    zio_ddt_ditto_write_done, dde, zio->io_priority,
                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
 
@@ -2066,7 +2328,7 @@ zio_ddt_write(zio_t *zio)
                ddt_phys_addref(ddp);
        } else {
                cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
-                   zio->io_orig_size, zp, zio_ddt_child_write_ready,
+                   zio->io_orig_size, zp, zio_ddt_child_write_ready, NULL,
                    zio_ddt_child_write_done, dde, zio->io_priority,
                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
 
@@ -2100,8 +2362,11 @@ zio_ddt_free(zio_t *zio)
 
        ddt_enter(ddt);
        freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
-       ddp = ddt_phys_select(dde, bp);
-       ddt_phys_decref(ddp);
+       if (dde) {
+               ddp = ddt_phys_select(dde, bp);
+               if (ddp)
+                       ddt_phys_decref(ddp);
+       }
        ddt_exit(ddt);
 
        return (ZIO_PIPELINE_CONTINUE);
@@ -2119,6 +2384,7 @@ zio_dva_allocate(zio_t *zio)
        metaslab_class_t *mc = spa_normal_class(spa);
        blkptr_t *bp = zio->io_bp;
        int error;
+       int flags = 0;
 
        if (zio->io_gang_leader == NULL) {
                ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
@@ -2126,15 +2392,27 @@ zio_dva_allocate(zio_t *zio)
        }
 
        ASSERT(BP_IS_HOLE(bp));
-       ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
+       ASSERT0(BP_GET_NDVAS(bp));
        ASSERT3U(zio->io_prop.zp_copies, >, 0);
        ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
        ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
 
+       /*
+        * The dump device does not support gang blocks so allocation on
+        * behalf of the dump device (i.e. ZIO_FLAG_NODATA) must avoid
+        * the "fast" gang feature.
+        */
+       flags |= (zio->io_flags & ZIO_FLAG_NODATA) ? METASLAB_GANG_AVOID : 0;
+       flags |= (zio->io_flags & ZIO_FLAG_GANG_CHILD) ?
+           METASLAB_GANG_CHILD : 0;
+       flags |= (zio->io_flags & ZIO_FLAG_FASTWRITE) ? METASLAB_FASTWRITE : 0;
        error = metaslab_alloc(spa, mc, zio->io_size, bp,
-           zio->io_prop.zp_copies, zio->io_txg, NULL, 0);
+           zio->io_prop.zp_copies, zio->io_txg, NULL, flags);
 
        if (error) {
+               spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
+                   "size %llu, error %d", spa_name(spa), zio, zio->io_size,
+                   error);
                if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
                        return (zio_write_gang_block(zio));
                zio->io_error = error;
@@ -2191,20 +2469,29 @@ zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
  */
 int
-zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
-    uint64_t size, boolean_t use_slog)
+zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, uint64_t size,
+    boolean_t use_slog)
 {
        int error = 1;
 
        ASSERT(txg > spa_syncing_txg(spa));
 
-       if (use_slog)
+       /*
+        * ZIL blocks are always contiguous (i.e. not gang blocks) so we
+        * set the METASLAB_GANG_AVOID flag so that they don't "fast gang"
+        * when allocating them.
+        */
+       if (use_slog) {
                error = metaslab_alloc(spa, spa_log_class(spa), size,
-                   new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
+                   new_bp, 1, txg, NULL,
+                   METASLAB_FASTWRITE | METASLAB_GANG_AVOID);
+       }
 
-       if (error)
+       if (error) {
                error = metaslab_alloc(spa, spa_normal_class(spa), size,
-                   new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
+                   new_bp, 1, txg, NULL,
+                   METASLAB_FASTWRITE);
+       }
 
        if (error == 0) {
                BP_SET_LSIZE(new_bp, size);
@@ -2281,7 +2568,9 @@ zio_vdev_io_start(zio_t *zio)
 
        align = 1ULL << vd->vdev_top->vdev_ashift;
 
-       if (P2PHASE(zio->io_size, align) != 0) {
+       if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
+           P2PHASE(zio->io_size, align) != 0) {
+               /* Transform logical writes to be a full physical block size. */
                uint64_t asize = P2ROUNDUP(zio->io_size, align);
                char *abuf = zio_buf_alloc(asize);
                ASSERT(vd == vd->vdev_top);
@@ -2292,8 +2581,22 @@ zio_vdev_io_start(zio_t *zio)
                zio_push_transform(zio, abuf, asize, asize, zio_subblock);
        }
 
-       ASSERT(P2PHASE(zio->io_offset, align) == 0);
-       ASSERT(P2PHASE(zio->io_size, align) == 0);
+       /*
+        * If this is not a physical io, make sure that it is properly aligned
+        * before proceeding.
+        */
+       if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
+               ASSERT0(P2PHASE(zio->io_offset, align));
+               ASSERT0(P2PHASE(zio->io_size, align));
+       } else {
+               /*
+                * For physical writes, we allow 512b aligned writes and assume
+                * the device will perform a read-modify-write as necessary.
+                */
+               ASSERT0(P2PHASE(zio->io_offset, SPA_MINBLOCKSIZE));
+               ASSERT0(P2PHASE(zio->io_size, SPA_MINBLOCKSIZE));
+       }
+
        VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
 
        /*
@@ -2321,14 +2624,14 @@ zio_vdev_io_start(zio_t *zio)
        if (vd->vdev_ops->vdev_op_leaf &&
            (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
 
-               if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio) == 0)
+               if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio))
                        return (ZIO_PIPELINE_CONTINUE);
 
                if ((zio = vdev_queue_io(zio)) == NULL)
                        return (ZIO_PIPELINE_STOP);
 
                if (!vdev_accessible(vd, zio)) {
-                       zio->io_error = ENXIO;
+                       zio->io_error = SET_ERROR(ENXIO);
                        zio_interrupt(zio);
                        return (ZIO_PIPELINE_STOP);
                }
@@ -2365,7 +2668,7 @@ zio_vdev_io_done(zio_t *zio)
 
                if (zio->io_error) {
                        if (!vdev_accessible(vd, zio)) {
-                               zio->io_error = ENXIO;
+                               zio->io_error = SET_ERROR(ENXIO);
                        } else {
                                unexpected_error = B_TRUE;
                        }
@@ -2450,19 +2753,27 @@ zio_vdev_io_assess(zio_t *zio)
         */
        if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
            !vdev_accessible(vd, zio))
-               zio->io_error = ENXIO;
+               zio->io_error = SET_ERROR(ENXIO);
 
        /*
         * If we can't write to an interior vdev (mirror or RAID-Z),
         * set vdev_cant_write so that we stop trying to allocate from it.
         */
        if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
-           vd != NULL && !vd->vdev_ops->vdev_op_leaf)
+           vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
                vd->vdev_cant_write = B_TRUE;
+       }
 
        if (zio->io_error)
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+       if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
+           zio->io_physdone != NULL) {
+               ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
+               ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
+               zio->io_physdone(zio->io_logical);
+       }
+
        return (ZIO_PIPELINE_CONTINUE);
 }
 
@@ -2573,7 +2884,7 @@ zio_checksum_verified(zio_t *zio)
 /*
  * ==========================================================================
  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
- * An error of 0 indictes success.  ENXIO indicates whole-device failure,
+ * An error of 0 indicates success.  ENXIO indicates whole-device failure,
  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
  * indicate errors that are specific to one I/O, and most likely permanent.
  * Any other error is presumed to be worse because we weren't expecting it.
@@ -2613,7 +2924,8 @@ zio_ready(zio_t *zio)
 
        if (zio->io_ready) {
                ASSERT(IO_IS_ALLOCATING(zio));
-               ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
+               ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
+                   (zio->io_flags & ZIO_FLAG_NOPWRITE));
                ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
 
                zio->io_ready(zio);
@@ -2661,11 +2973,6 @@ zio_ready(zio_t *zio)
 static int
 zio_done(zio_t *zio)
 {
-       spa_t *spa = zio->io_spa;
-       zio_t *lio = zio->io_logical;
-       blkptr_t *bp = zio->io_bp;
-       vdev_t *vd = zio->io_vd;
-       uint64_t psize = zio->io_size;
        zio_t *pio, *pio_next;
        int c, w;
 
@@ -2683,19 +2990,24 @@ zio_done(zio_t *zio)
                for (w = 0; w < ZIO_WAIT_TYPES; w++)
                        ASSERT(zio->io_children[c][w] == 0);
 
-       if (bp != NULL) {
-               ASSERT(bp->blk_pad[0] == 0);
-               ASSERT(bp->blk_pad[1] == 0);
-               ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
-                   (bp == zio_unique_parent(zio)->io_bp));
-               if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
+       if (zio->io_bp != NULL && !BP_IS_EMBEDDED(zio->io_bp)) {
+               ASSERT(zio->io_bp->blk_pad[0] == 0);
+               ASSERT(zio->io_bp->blk_pad[1] == 0);
+               ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy,
+                   sizeof (blkptr_t)) == 0 ||
+                   (zio->io_bp == zio_unique_parent(zio)->io_bp));
+               if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(zio->io_bp) &&
                    zio->io_bp_override == NULL &&
                    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
-                       ASSERT(!BP_SHOULD_BYTESWAP(bp));
-                       ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
-                       ASSERT(BP_COUNT_GANG(bp) == 0 ||
-                           (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
+                       ASSERT(!BP_SHOULD_BYTESWAP(zio->io_bp));
+                       ASSERT3U(zio->io_prop.zp_copies, <=,
+                           BP_GET_NDVAS(zio->io_bp));
+                       ASSERT(BP_COUNT_GANG(zio->io_bp) == 0 ||
+                           (BP_COUNT_GANG(zio->io_bp) ==
+                           BP_GET_NDVAS(zio->io_bp)));
                }
+               if (zio->io_flags & ZIO_FLAG_NOPWRITE)
+                       VERIFY(BP_EQUAL(zio->io_bp, &zio->io_bp_orig));
        }
 
        /*
@@ -2713,13 +3025,13 @@ zio_done(zio_t *zio)
                while (zio->io_cksum_report != NULL) {
                        zio_cksum_report_t *zcr = zio->io_cksum_report;
                        uint64_t align = zcr->zcr_align;
-                       uint64_t asize = P2ROUNDUP(psize, align);
+                       uint64_t asize = P2ROUNDUP(zio->io_size, align);
                        char *abuf = zio->io_data;
 
-                       if (asize != psize) {
+                       if (asize != zio->io_size) {
                                abuf = zio_buf_alloc(asize);
-                               bcopy(zio->io_data, abuf, psize);
-                               bzero(abuf + psize, asize - psize);
+                               bcopy(zio->io_data, abuf, zio->io_size);
+                               bzero(abuf+zio->io_size, asize-zio->io_size);
                        }
 
                        zio->io_cksum_report = zcr->zcr_next;
@@ -2727,14 +3039,25 @@ zio_done(zio_t *zio)
                        zcr->zcr_finish(zcr, abuf);
                        zfs_ereport_free_checksum(zcr);
 
-                       if (asize != psize)
+                       if (asize != zio->io_size)
                                zio_buf_free(abuf, asize);
                }
        }
 
        zio_pop_transforms(zio);        /* note: may set zio->io_error */
 
-       vdev_stat_update(zio, psize);
+       vdev_stat_update(zio, zio->io_size);
+
+       /*
+        * If this I/O is attached to a particular vdev is slow, exceeding
+        * 30 seconds to complete, post an error described the I/O delay.
+        * We ignore these errors if the device is currently unavailable.
+        */
+       if (zio->io_delay >= MSEC_TO_TICK(zio_delay_max)) {
+               if (zio->io_vd != NULL && !vdev_is_dead(zio->io_vd))
+                       zfs_ereport_post(FM_EREPORT_ZFS_DELAY, zio->io_spa,
+                           zio->io_vd, zio, 0, 0);
+       }
 
        if (zio->io_error) {
                /*
@@ -2743,28 +3066,30 @@ zio_done(zio_t *zio)
                 * at the block level.  We ignore these errors if the
                 * device is currently unavailable.
                 */
-               if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
-                       zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
+               if (zio->io_error != ECKSUM && zio->io_vd != NULL &&
+                       !vdev_is_dead(zio->io_vd))
+                       zfs_ereport_post(FM_EREPORT_ZFS_IO, zio->io_spa,
+                                               zio->io_vd, zio, 0, 0);
 
                if ((zio->io_error == EIO || !(zio->io_flags &
                    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
-                   zio == lio) {
+                   zio == zio->io_logical) {
                        /*
                         * For logical I/O requests, tell the SPA to log the
                         * error and generate a logical data ereport.
                         */
-                       spa_log_error(spa, zio);
-                       zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
-                           0, 0);
+                       spa_log_error(zio->io_spa, zio);
+                       zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa,
+                           NULL, zio, 0, 0);
                }
        }
 
-       if (zio->io_error && zio == lio) {
+       if (zio->io_error && zio == zio->io_logical) {
                /*
                 * Determine whether zio should be reexecuted.  This will
                 * propagate all the way to the root via zio_notify_parent().
                 */
-               ASSERT(vd == NULL && bp != NULL);
+               ASSERT(zio->io_vd == NULL && zio->io_bp != NULL);
                ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 
                if (IO_IS_ALLOCATING(zio) &&
@@ -2779,8 +3104,8 @@ zio_done(zio_t *zio)
                    zio->io_type == ZIO_TYPE_FREE) &&
                    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
                    zio->io_error == ENXIO &&
-                   spa_load_state(spa) == SPA_LOAD_NONE &&
-                   spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
+                   spa_load_state(zio->io_spa) == SPA_LOAD_NONE &&
+                   spa_get_failmode(zio->io_spa) != ZIO_FAILURE_MODE_CONTINUE)
                        zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
 
                if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
@@ -2805,8 +3130,8 @@ zio_done(zio_t *zio)
 
        if ((zio->io_error || zio->io_reexecute) &&
            IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
-           !(zio->io_flags & ZIO_FLAG_IO_REWRITE))
-               zio_dva_unallocate(zio, zio->io_gang_tree, bp);
+           !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
+               zio_dva_unallocate(zio, zio->io_gang_tree, zio->io_bp);
 
        zio_gang_tree_free(&zio->io_gang_tree);
 
@@ -2871,15 +3196,17 @@ zio_done(zio_t *zio)
                         * We'd fail again if we reexecuted now, so suspend
                         * until conditions improve (e.g. device comes online).
                         */
-                       zio_suspend(spa, zio);
+                       zio_suspend(zio->io_spa, zio);
                } else {
                        /*
                         * Reexecution is potentially a huge amount of work.
                         * Hand it off to the otherwise-unused claim taskq.
                         */
-                       (void) taskq_dispatch(
-                           spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
-                           (task_func_t *)zio_reexecute, zio, TQ_SLEEP);
+                       ASSERT(taskq_empty_ent(&zio->io_tqent));
+                       spa_taskq_dispatch_ent(zio->io_spa,
+                           ZIO_TYPE_CLAIM, ZIO_TASKQ_ISSUE,
+                           (task_func_t *)zio_reexecute, zio, 0,
+                           &zio->io_tqent);
                }
                return (ZIO_PIPELINE_STOP);
        }
@@ -2899,6 +3226,12 @@ zio_done(zio_t *zio)
                zfs_ereport_free_checksum(zcr);
        }
 
+       if (zio->io_flags & ZIO_FLAG_FASTWRITE && zio->io_bp &&
+           !BP_IS_HOLE(zio->io_bp) && !BP_IS_EMBEDDED(zio->io_bp) &&
+           !(zio->io_flags & ZIO_FLAG_NOPWRITE)) {
+               metaslab_fastwrite_unmark(zio->io_spa, zio->io_bp);
+       }
+
        /*
         * It is the responsibility of the done callback to ensure that this
         * particular zio is no longer discoverable for adoption, and as
@@ -2942,6 +3275,7 @@ static zio_pipe_stage_t *zio_pipeline[] = {
        zio_issue_async,
        zio_write_bp_init,
        zio_checksum_generate,
+       zio_nop_write,
        zio_ddt_read_start,
        zio_ddt_read_done,
        zio_ddt_write,
@@ -2958,3 +3292,64 @@ static zio_pipe_stage_t *zio_pipeline[] = {
        zio_checksum_verify,
        zio_done
 };
+
+/* dnp is the dnode for zb1->zb_object */
+boolean_t
+zbookmark_is_before(const dnode_phys_t *dnp, const zbookmark_phys_t *zb1,
+    const zbookmark_phys_t *zb2)
+{
+       uint64_t zb1nextL0, zb2thisobj;
+
+       ASSERT(zb1->zb_objset == zb2->zb_objset);
+       ASSERT(zb2->zb_level == 0);
+
+       /* The objset_phys_t isn't before anything. */
+       if (dnp == NULL)
+               return (B_FALSE);
+
+       zb1nextL0 = (zb1->zb_blkid + 1) <<
+           ((zb1->zb_level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT));
+
+       zb2thisobj = zb2->zb_object ? zb2->zb_object :
+           zb2->zb_blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT);
+
+       if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
+               uint64_t nextobj = zb1nextL0 *
+                   (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT) >> DNODE_SHIFT;
+               return (nextobj <= zb2thisobj);
+       }
+
+       if (zb1->zb_object < zb2thisobj)
+               return (B_TRUE);
+       if (zb1->zb_object > zb2thisobj)
+               return (B_FALSE);
+       if (zb2->zb_object == DMU_META_DNODE_OBJECT)
+               return (B_FALSE);
+       return (zb1nextL0 <= zb2->zb_blkid);
+}
+
+#if defined(_KERNEL) && defined(HAVE_SPL)
+EXPORT_SYMBOL(zio_type_name);
+EXPORT_SYMBOL(zio_buf_alloc);
+EXPORT_SYMBOL(zio_data_buf_alloc);
+EXPORT_SYMBOL(zio_buf_free);
+EXPORT_SYMBOL(zio_data_buf_free);
+
+module_param(zio_delay_max, int, 0644);
+MODULE_PARM_DESC(zio_delay_max, "Max zio millisec delay before posting event");
+
+module_param(zio_requeue_io_start_cut_in_line, int, 0644);
+MODULE_PARM_DESC(zio_requeue_io_start_cut_in_line, "Prioritize requeued I/O");
+
+module_param(zfs_sync_pass_deferred_free, int, 0644);
+MODULE_PARM_DESC(zfs_sync_pass_deferred_free,
+       "Defer frees starting in this pass");
+
+module_param(zfs_sync_pass_dont_compress, int, 0644);
+MODULE_PARM_DESC(zfs_sync_pass_dont_compress,
+       "Don't compress starting in this pass");
+
+module_param(zfs_sync_pass_rewrite, int, 0644);
+MODULE_PARM_DESC(zfs_sync_pass_rewrite,
+       "Rewrite new bps starting in this pass");
+#endif