]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/zio.c
OpenZFS 6531 - Provide mechanism to artificially limit disk performance
[mirror_zfs.git] / module / zfs / zio.c
index 7cc3d4c9a4c28621ef0f11704a3e8ae1b48e3e3e..4063703adf2dd576c9b5bd1bb93fb98e7eb02e30 100644 (file)
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2013 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
  */
 
+#include <sys/sysmacros.h>
 #include <sys/zfs_context.h>
 #include <sys/fm/fs/zfs.h>
 #include <sys/spa.h>
 #include <sys/dmu_objset.h>
 #include <sys/arc.h>
 #include <sys/ddt.h>
+#include <sys/blkptr.h>
+#include <sys/zfeature.h>
+#include <sys/time.h>
+#include <sys/trace_zio.h>
 
 /*
  * ==========================================================================
@@ -53,14 +58,16 @@ const char *zio_type_name[ZIO_TYPES] = {
  */
 kmem_cache_t *zio_cache;
 kmem_cache_t *zio_link_cache;
-kmem_cache_t *zio_vdev_cache;
 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
-int zio_bulk_flags = 0;
 int zio_delay_max = ZIO_DELAY_MAX;
 
-extern int zfs_mg_alloc_failures;
+#define        ZIO_PIPELINE_CONTINUE           0x100
+#define        ZIO_PIPELINE_STOP               0x101
 
+#define        BP_SPANB(indblkshift, level) \
+       (((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
+#define        COMPARE_META_LEVEL      0x80000000ul
 /*
  * The following actions directly effect the spa's sync-to-convergence logic.
  * The values below define the sync pass when we start performing the action.
@@ -93,60 +100,38 @@ int zio_buf_debug_limit = 0;
 
 static inline void __zio_execute(zio_t *zio);
 
-static int
-zio_cons(void *arg, void *unused, int kmflag)
-{
-       zio_t *zio = arg;
-
-       bzero(zio, sizeof (zio_t));
-
-       mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
-       cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
-
-       list_create(&zio->io_parent_list, sizeof (zio_link_t),
-           offsetof(zio_link_t, zl_parent_node));
-       list_create(&zio->io_child_list, sizeof (zio_link_t),
-           offsetof(zio_link_t, zl_child_node));
-
-       return (0);
-}
-
-static void
-zio_dest(void *arg, void *unused)
-{
-       zio_t *zio = arg;
-
-       mutex_destroy(&zio->io_lock);
-       cv_destroy(&zio->io_cv);
-       list_destroy(&zio->io_parent_list);
-       list_destroy(&zio->io_child_list);
-}
-
 void
 zio_init(void)
 {
        size_t c;
        vmem_t *data_alloc_arena = NULL;
 
-       zio_cache = kmem_cache_create("zio_cache", sizeof (zio_t), 0,
-           zio_cons, zio_dest, NULL, NULL, NULL, KMC_KMEM);
+       zio_cache = kmem_cache_create("zio_cache",
+           sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
        zio_link_cache = kmem_cache_create("zio_link_cache",
-           sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, KMC_KMEM);
-       zio_vdev_cache = kmem_cache_create("zio_vdev_cache", sizeof(vdev_io_t),
-           PAGESIZE, NULL, NULL, NULL, NULL, NULL, KMC_VMEM);
+           sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
 
        /*
         * For small buffers, we want a cache for each multiple of
-        * SPA_MINBLOCKSIZE.  For medium-size buffers, we want a cache
-        * for each quarter-power of 2.  For large buffers, we want
-        * a cache for each multiple of PAGESIZE.
+        * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
+        * for each quarter-power of 2.
         */
        for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
                size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
                size_t p2 = size;
                size_t align = 0;
+               size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
 
-               while (p2 & (p2 - 1))
+#ifdef _ILP32
+               /*
+                * Cache size limited to 1M on 32-bit platforms until ARC
+                * buffers no longer require virtual address space.
+                */
+               if (size > zfs_max_recordsize)
+                       break;
+#endif
+
+               while (!ISP2(p2))
                        p2 &= p2 - 1;
 
 #ifndef _KERNEL
@@ -161,35 +146,20 @@ zio_init(void)
 #endif
                if (size <= 4 * SPA_MINBLOCKSIZE) {
                        align = SPA_MINBLOCKSIZE;
-               } else if (IS_P2ALIGNED(size, PAGESIZE)) {
-                       align = PAGESIZE;
                } else if (IS_P2ALIGNED(size, p2 >> 2)) {
-                       align = p2 >> 2;
+                       align = MIN(p2 >> 2, PAGESIZE);
                }
 
                if (align != 0) {
                        char name[36];
-                       int flags = zio_bulk_flags;
-
-                       /*
-                        * The smallest buffers (512b) are heavily used and
-                        * experience a lot of churn.  The slabs allocated
-                        * for them are also relatively small (32K).  Thus
-                        * in over to avoid expensive calls to vmalloc() we
-                        * make an exception to the usual slab allocation
-                        * policy and force these buffers to be kmem backed.
-                        */
-                       if (size == (1 << SPA_MINBLOCKSHIFT))
-                               flags |= KMC_KMEM;
-
                        (void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
                        zio_buf_cache[c] = kmem_cache_create(name, size,
-                           align, NULL, NULL, NULL, NULL, NULL, flags);
+                           align, NULL, NULL, NULL, NULL, NULL, cflags);
 
                        (void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
                        zio_data_buf_cache[c] = kmem_cache_create(name, size,
                            align, NULL, NULL, NULL, NULL,
-                           data_alloc_arena, flags);
+                           data_alloc_arena, cflags);
                }
        }
 
@@ -203,13 +173,6 @@ zio_init(void)
                        zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
        }
 
-       /*
-        * The zio write taskqs have 1 thread per cpu, allow 1/2 of the taskqs
-        * to fail 3 times per txg or 8 failures, whichever is greater.
-        */
-       if (zfs_mg_alloc_failures == 0)
-               zfs_mg_alloc_failures = MAX((3 * max_ncpus / 2), 8);
-
        zio_inject_init();
 
        lz4_init();
@@ -223,6 +186,14 @@ zio_fini(void)
        kmem_cache_t *last_data_cache = NULL;
 
        for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
+#ifdef _ILP32
+               /*
+                * Cache size limited to 1M on 32-bit platforms until ARC
+                * buffers no longer require virtual address space.
+                */
+               if (((c + 1) << SPA_MINBLOCKSHIFT) > zfs_max_recordsize)
+                       break;
+#endif
                if (zio_buf_cache[c] != last_cache) {
                        last_cache = zio_buf_cache[c];
                        kmem_cache_destroy(zio_buf_cache[c]);
@@ -236,7 +207,6 @@ zio_fini(void)
                zio_data_buf_cache[c] = NULL;
        }
 
-       kmem_cache_destroy(zio_vdev_cache);
        kmem_cache_destroy(zio_link_cache);
        kmem_cache_destroy(zio_cache);
 
@@ -262,9 +232,9 @@ zio_buf_alloc(size_t size)
 {
        size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
 
-       ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
+       VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-       return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE | KM_NODEBUG));
+       return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
 }
 
 /*
@@ -278,48 +248,43 @@ zio_data_buf_alloc(size_t size)
 {
        size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
 
-       ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
+       VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-       return (kmem_cache_alloc(zio_data_buf_cache[c],
-           KM_PUSHPAGE | KM_NODEBUG));
+       return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
 }
 
-void
-zio_buf_free(void *buf, size_t size)
+/*
+ * Use zio_buf_alloc_flags when specific allocation flags are needed.  e.g.
+ * passing KM_NOSLEEP when it is acceptable for an allocation to fail.
+ */
+void *
+zio_buf_alloc_flags(size_t size, int flags)
 {
        size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
 
-       ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
+       VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-       kmem_cache_free(zio_buf_cache[c], buf);
+       return (kmem_cache_alloc(zio_buf_cache[c], flags));
 }
 
 void
-zio_data_buf_free(void *buf, size_t size)
+zio_buf_free(void *buf, size_t size)
 {
        size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
 
-       ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
+       VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-       kmem_cache_free(zio_data_buf_cache[c], buf);
-}
-
-/*
- * Dedicated I/O buffers to ensure that memory fragmentation never prevents
- * or significantly delays the issuing of a zio.   These buffers are used
- * to aggregate I/O and could be used for raidz stripes.
- */
-void *
-zio_vdev_alloc(void)
-{
-       return (kmem_cache_alloc(zio_vdev_cache, KM_PUSHPAGE));
+       kmem_cache_free(zio_buf_cache[c], buf);
 }
 
 void
-zio_vdev_free(void *buf)
+zio_data_buf_free(void *buf, size_t size)
 {
-       kmem_cache_free(zio_vdev_cache, buf);
+       size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
+
+       VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
+       kmem_cache_free(zio_data_buf_cache[c], buf);
 }
 
 /*
@@ -331,7 +296,7 @@ static void
 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
        zio_transform_func_t *transform)
 {
-       zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_PUSHPAGE);
+       zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
 
        zt->zt_orig_data = zio->io_data;
        zt->zt_orig_size = zio->io_size;
@@ -446,7 +411,7 @@ zio_unique_parent(zio_t *cio)
 void
 zio_add_child(zio_t *pio, zio_t *cio)
 {
-       zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_PUSHPAGE);
+       zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
        int w;
 
        /*
@@ -557,7 +522,7 @@ static zio_t *
 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
     zio_type_t type, zio_priority_t priority, enum zio_flag flags,
-    vdev_t *vd, uint64_t offset, const zbookmark_t *zb,
+    vdev_t *vd, uint64_t offset, const zbookmark_phys_t *zb,
     enum zio_stage stage, enum zio_stage pipeline)
 {
        zio_t *zio;
@@ -570,7 +535,16 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
        ASSERT(vd || stage == ZIO_STAGE_OPEN);
 
-       zio = kmem_cache_alloc(zio_cache, KM_PUSHPAGE);
+       zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
+       bzero(zio, sizeof (zio_t));
+
+       mutex_init(&zio->io_lock, NULL, MUTEX_NOLOCKDEP, NULL);
+       cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
+
+       list_create(&zio->io_parent_list, sizeof (zio_link_t),
+           offsetof(zio_link_t, zl_parent_node));
+       list_create(&zio->io_child_list, sizeof (zio_link_t),
+           offsetof(zio_link_t, zl_child_node));
 
        if (vd != NULL)
                zio->io_child_type = ZIO_CHILD_VDEV;
@@ -582,7 +556,6 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
                zio->io_child_type = ZIO_CHILD_LOGICAL;
 
        if (bp != NULL) {
-               zio->io_logical = NULL;
                zio->io_bp = (blkptr_t *)bp;
                zio->io_bp_copy = *bp;
                zio->io_bp_orig = *bp;
@@ -593,55 +566,21 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
                        zio->io_logical = zio;
                if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
                        pipeline |= ZIO_GANG_STAGES;
-       } else {
-               zio->io_logical = NULL;
-               zio->io_bp = NULL;
-               bzero(&zio->io_bp_copy, sizeof (blkptr_t));
-               bzero(&zio->io_bp_orig, sizeof (blkptr_t));
        }
 
        zio->io_spa = spa;
        zio->io_txg = txg;
-       zio->io_ready = NULL;
-       zio->io_physdone = NULL;
        zio->io_done = done;
        zio->io_private = private;
-       zio->io_prev_space_delta = 0;
        zio->io_type = type;
        zio->io_priority = priority;
        zio->io_vd = vd;
-       zio->io_vsd = NULL;
-       zio->io_vsd_ops = NULL;
        zio->io_offset = offset;
-       zio->io_timestamp = 0;
-       zio->io_delta = 0;
-       zio->io_delay = 0;
        zio->io_orig_data = zio->io_data = data;
        zio->io_orig_size = zio->io_size = size;
        zio->io_orig_flags = zio->io_flags = flags;
        zio->io_orig_stage = zio->io_stage = stage;
        zio->io_orig_pipeline = zio->io_pipeline = pipeline;
-       bzero(&zio->io_prop, sizeof (zio_prop_t));
-       zio->io_cmd = 0;
-       zio->io_reexecute = 0;
-       zio->io_bp_override = NULL;
-       zio->io_walk_link = NULL;
-       zio->io_transform_stack = NULL;
-       zio->io_error = 0;
-       zio->io_child_count = 0;
-       zio->io_phys_children = 0;
-       zio->io_parent_count = 0;
-       zio->io_stall = NULL;
-       zio->io_gang_leader = NULL;
-       zio->io_gang_tree = NULL;
-       zio->io_executor = NULL;
-       zio->io_waiter = NULL;
-       zio->io_cksum_report = NULL;
-       zio->io_ena = 0;
-       bzero(zio->io_child_error, sizeof (int) * ZIO_CHILD_TYPES);
-       bzero(zio->io_children,
-           sizeof (uint64_t) * ZIO_CHILD_TYPES * ZIO_WAIT_TYPES);
-       bzero(&zio->io_bookmark, sizeof (zbookmark_t));
 
        zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
        zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
@@ -665,6 +604,10 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 static void
 zio_destroy(zio_t *zio)
 {
+       list_destroy(&zio->io_parent_list);
+       list_destroy(&zio->io_child_list);
+       mutex_destroy(&zio->io_lock);
+       cv_destroy(&zio->io_cv);
        kmem_cache_free(zio_cache, zio);
 }
 
@@ -687,13 +630,101 @@ zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
        return (zio_null(NULL, spa, NULL, done, private, flags));
 }
 
+void
+zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp)
+{
+       int i;
+
+       if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
+               zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
+                   bp, (longlong_t)BP_GET_TYPE(bp));
+       }
+       if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||
+           BP_GET_CHECKSUM(bp) <= ZIO_CHECKSUM_ON) {
+               zfs_panic_recover("blkptr at %p has invalid CHECKSUM %llu",
+                   bp, (longlong_t)BP_GET_CHECKSUM(bp));
+       }
+       if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS ||
+           BP_GET_COMPRESS(bp) <= ZIO_COMPRESS_ON) {
+               zfs_panic_recover("blkptr at %p has invalid COMPRESS %llu",
+                   bp, (longlong_t)BP_GET_COMPRESS(bp));
+       }
+       if (BP_GET_LSIZE(bp) > SPA_MAXBLOCKSIZE) {
+               zfs_panic_recover("blkptr at %p has invalid LSIZE %llu",
+                   bp, (longlong_t)BP_GET_LSIZE(bp));
+       }
+       if (BP_GET_PSIZE(bp) > SPA_MAXBLOCKSIZE) {
+               zfs_panic_recover("blkptr at %p has invalid PSIZE %llu",
+                   bp, (longlong_t)BP_GET_PSIZE(bp));
+       }
+
+       if (BP_IS_EMBEDDED(bp)) {
+               if (BPE_GET_ETYPE(bp) > NUM_BP_EMBEDDED_TYPES) {
+                       zfs_panic_recover("blkptr at %p has invalid ETYPE %llu",
+                           bp, (longlong_t)BPE_GET_ETYPE(bp));
+               }
+       }
+
+       /*
+        * Pool-specific checks.
+        *
+        * Note: it would be nice to verify that the blk_birth and
+        * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
+        * allows the birth time of log blocks (and dmu_sync()-ed blocks
+        * that are in the log) to be arbitrarily large.
+        */
+       for (i = 0; i < BP_GET_NDVAS(bp); i++) {
+               uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
+               vdev_t *vd;
+               uint64_t offset, asize;
+               if (vdevid >= spa->spa_root_vdev->vdev_children) {
+                       zfs_panic_recover("blkptr at %p DVA %u has invalid "
+                           "VDEV %llu",
+                           bp, i, (longlong_t)vdevid);
+                       continue;
+               }
+               vd = spa->spa_root_vdev->vdev_child[vdevid];
+               if (vd == NULL) {
+                       zfs_panic_recover("blkptr at %p DVA %u has invalid "
+                           "VDEV %llu",
+                           bp, i, (longlong_t)vdevid);
+                       continue;
+               }
+               if (vd->vdev_ops == &vdev_hole_ops) {
+                       zfs_panic_recover("blkptr at %p DVA %u has hole "
+                           "VDEV %llu",
+                           bp, i, (longlong_t)vdevid);
+                       continue;
+               }
+               if (vd->vdev_ops == &vdev_missing_ops) {
+                       /*
+                        * "missing" vdevs are valid during import, but we
+                        * don't have their detailed info (e.g. asize), so
+                        * we can't perform any more checks on them.
+                        */
+                       continue;
+               }
+               offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
+               asize = DVA_GET_ASIZE(&bp->blk_dva[i]);
+               if (BP_IS_GANG(bp))
+                       asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
+               if (offset + asize > vd->vdev_asize) {
+                       zfs_panic_recover("blkptr at %p DVA %u has invalid "
+                           "OFFSET %llu",
+                           bp, i, (longlong_t)offset);
+               }
+       }
+}
+
 zio_t *
 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
-    zio_priority_t priority, enum zio_flag flags, const zbookmark_t *zb)
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
+       zfs_blkptr_verify(spa, bp);
+
        zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
            data, size, done, private,
            ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
@@ -708,7 +739,7 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     void *data, uint64_t size, const zio_prop_t *zp,
     zio_done_func_t *ready, zio_done_func_t *physdone, zio_done_func_t *done,
     void *private,
-    zio_priority_t priority, enum zio_flag flags, const zbookmark_t *zb)
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -730,13 +761,23 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
        zio->io_physdone = physdone;
        zio->io_prop = *zp;
 
+       /*
+        * Data can be NULL if we are going to call zio_write_override() to
+        * provide the already-allocated BP.  But we may need the data to
+        * verify a dedup hit (if requested).  In this case, don't try to
+        * dedup (just take the already-allocated BP verbatim).
+        */
+       if (data == NULL && zio->io_prop.zp_dedup_verify) {
+               zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
+       }
+
        return (zio);
 }
 
 zio_t *
 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
     uint64_t size, zio_done_func_t *done, void *private,
-    zio_priority_t priority, enum zio_flag flags, zbookmark_t *zb)
+    zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -769,6 +810,14 @@ zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
 void
 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
 {
+
+       /*
+        * The check for EMBEDDED is a performance optimization.  We
+        * process the free here (by ignoring it) rather than
+        * putting it on the list and then processing it in zio_free_sync().
+        */
+       if (BP_IS_EMBEDDED(bp))
+               return;
        metaslab_check_free(spa, bp);
 
        /*
@@ -793,13 +842,13 @@ zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        zio_t *zio;
        enum zio_stage stage = ZIO_FREE_PIPELINE;
 
-       dprintf_bp(bp, "freeing in txg %llu, pass %u",
-           (longlong_t)txg, spa->spa_sync_pass);
-
        ASSERT(!BP_IS_HOLE(bp));
        ASSERT(spa_syncing_txg(spa) == txg);
        ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
 
+       if (BP_IS_EMBEDDED(bp))
+               return (zio_null(pio, spa, NULL, NULL, NULL, 0));
+
        metaslab_check_free(spa, bp);
        arc_freed(spa, bp);
 
@@ -824,6 +873,11 @@ zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 {
        zio_t *zio;
 
+       dprintf_bp(bp, "claiming in txg %llu", txg);
+
+       if (BP_IS_EMBEDDED(bp))
+               return (zio_null(pio, spa, NULL, NULL, NULL, 0));
+
        /*
         * A claim is an allocation of a specific block.  Claims are needed
         * to support immediate writes in the intent log.  The issue is that
@@ -884,8 +938,8 @@ zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
        ASSERT3U(offset + size, <=, vd->vdev_psize);
 
        zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
-           ZIO_TYPE_READ, priority, flags, vd, offset, NULL,
-           ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
+           ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
+           NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
 
        zio->io_prop.zp_checksum = checksum;
 
@@ -905,8 +959,8 @@ zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
        ASSERT3U(offset + size, <=, vd->vdev_psize);
 
        zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
-           ZIO_TYPE_WRITE, priority, flags, vd, offset, NULL,
-           ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
+           ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
+           NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
 
        zio->io_prop.zp_checksum = checksum;
 
@@ -1030,12 +1084,20 @@ zio_read_bp_init(zio_t *zio)
        if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
            zio->io_child_type == ZIO_CHILD_LOGICAL &&
            !(zio->io_flags & ZIO_FLAG_RAW)) {
-               uint64_t psize = BP_GET_PSIZE(bp);
+               uint64_t psize =
+                   BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
                void *cbuf = zio_buf_alloc(psize);
 
                zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
        }
 
+       if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
+               zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+               decode_embedded_bp_compressed(bp, zio->io_data);
+       } else {
+               ASSERT(!BP_IS_EMBEDDED(bp));
+       }
+
        if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
                zio->io_flags |= ZIO_FLAG_DONT_CACHE;
 
@@ -1079,6 +1141,9 @@ zio_write_bp_init(zio_t *zio)
                *bp = *zio->io_bp_override;
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+               if (BP_IS_EMBEDDED(bp))
+                       return (ZIO_PIPELINE_CONTINUE);
+
                /*
                 * If we've been overridden and nopwrite is set then
                 * set the flag accordingly to indicate that a nopwrite
@@ -1107,7 +1172,7 @@ zio_write_bp_init(zio_t *zio)
                BP_ZERO(bp);
        }
 
-       if (bp->blk_birth == zio->io_txg) {
+       if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
                /*
                 * We're rewriting an existing block, which means we're
                 * working on behalf of spa_sync().  For spa_sync() to
@@ -1127,7 +1192,7 @@ zio_write_bp_init(zio_t *zio)
                        compress = ZIO_COMPRESS_OFF;
 
                /* Make sure someone doesn't change their mind on overwrites */
-               ASSERT(MIN(zp->zp_copies + BP_IS_GANG(bp),
+               ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
                    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
        }
 
@@ -1137,9 +1202,45 @@ zio_write_bp_init(zio_t *zio)
                if (psize == 0 || psize == lsize) {
                        compress = ZIO_COMPRESS_OFF;
                        zio_buf_free(cbuf, lsize);
+               } else if (!zp->zp_dedup && psize <= BPE_PAYLOAD_SIZE &&
+                   zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
+                   spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
+                       encode_embedded_bp_compressed(bp,
+                           cbuf, compress, lsize, psize);
+                       BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
+                       BP_SET_TYPE(bp, zio->io_prop.zp_type);
+                       BP_SET_LEVEL(bp, zio->io_prop.zp_level);
+                       zio_buf_free(cbuf, lsize);
+                       bp->blk_birth = zio->io_txg;
+                       zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+                       ASSERT(spa_feature_is_active(spa,
+                           SPA_FEATURE_EMBEDDED_DATA));
+                       return (ZIO_PIPELINE_CONTINUE);
                } else {
-                       ASSERT(psize < lsize);
-                       zio_push_transform(zio, cbuf, psize, lsize, NULL);
+                       /*
+                        * Round up compressed size up to the ashift
+                        * of the smallest-ashift device, and zero the tail.
+                        * This ensures that the compressed size of the BP
+                        * (and thus compressratio property) are correct,
+                        * in that we charge for the padding used to fill out
+                        * the last sector.
+                        */
+                       size_t rounded;
+
+                       ASSERT3U(spa->spa_min_ashift, >=, SPA_MINBLOCKSHIFT);
+
+                       rounded = (size_t)P2ROUNDUP(psize,
+                           1ULL << spa->spa_min_ashift);
+                       if (rounded >= lsize) {
+                               compress = ZIO_COMPRESS_OFF;
+                               zio_buf_free(cbuf, lsize);
+                               psize = lsize;
+                       } else {
+                               bzero((char *)cbuf + psize, rounded - psize);
+                               psize = rounded;
+                               zio_push_transform(zio, cbuf,
+                                   psize, lsize, NULL);
+                       }
                }
        }
 
@@ -1151,7 +1252,8 @@ zio_write_bp_init(zio_t *zio)
         * spa_sync() to allocate new blocks, but force rewrites after that.
         * There should only be a handful of blocks after pass 1 in any case.
         */
-       if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == psize &&
+       if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
+           BP_GET_PSIZE(bp) == psize &&
            pass >= zfs_sync_pass_rewrite) {
                enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
                ASSERT(psize != 0);
@@ -1163,15 +1265,22 @@ zio_write_bp_init(zio_t *zio)
        }
 
        if (psize == 0) {
+               if (zio->io_bp_orig.blk_birth != 0 &&
+                   spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
+                       BP_SET_LSIZE(bp, lsize);
+                       BP_SET_TYPE(bp, zp->zp_type);
+                       BP_SET_LEVEL(bp, zp->zp_level);
+                       BP_SET_BIRTH(bp, zio->io_txg, 0);
+               }
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
        } else {
                ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
                BP_SET_LSIZE(bp, lsize);
+               BP_SET_TYPE(bp, zp->zp_type);
+               BP_SET_LEVEL(bp, zp->zp_level);
                BP_SET_PSIZE(bp, psize);
                BP_SET_COMPRESS(bp, compress);
                BP_SET_CHECKSUM(bp, zp->zp_checksum);
-               BP_SET_TYPE(bp, zp->zp_type);
-               BP_SET_LEVEL(bp, zp->zp_level);
                BP_SET_DEDUP(bp, zp->zp_dedup);
                BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
                if (zp->zp_dedup) {
@@ -1282,6 +1391,76 @@ zio_interrupt(zio_t *zio)
        zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
 }
 
+void
+zio_delay_interrupt(zio_t *zio)
+{
+       /*
+        * The timeout_generic() function isn't defined in userspace, so
+        * rather than trying to implement the function, the zio delay
+        * functionality has been disabled for userspace builds.
+        */
+
+#ifdef _KERNEL
+       /*
+        * If io_target_timestamp is zero, then no delay has been registered
+        * for this IO, thus jump to the end of this function and "skip" the
+        * delay; issuing it directly to the zio layer.
+        */
+       if (zio->io_target_timestamp != 0) {
+               hrtime_t now = gethrtime();
+
+               if (now >= zio->io_target_timestamp) {
+                       /*
+                        * This IO has already taken longer than the target
+                        * delay to complete, so we don't want to delay it
+                        * any longer; we "miss" the delay and issue it
+                        * directly to the zio layer. This is likely due to
+                        * the target latency being set to a value less than
+                        * the underlying hardware can satisfy (e.g. delay
+                        * set to 1ms, but the disks take 10ms to complete an
+                        * IO request).
+                        */
+
+                       DTRACE_PROBE2(zio__delay__miss, zio_t *, zio,
+                           hrtime_t, now);
+
+                       zio_interrupt(zio);
+               } else {
+                       taskqid_t tid;
+                       hrtime_t diff = zio->io_target_timestamp - now;
+                       clock_t expire_at_tick = ddi_get_lbolt() +
+                           NSEC_TO_TICK(diff);
+
+                       DTRACE_PROBE3(zio__delay__hit, zio_t *, zio,
+                           hrtime_t, now, hrtime_t, diff);
+
+                       if (NSEC_TO_TICK(diff) == 0) {
+                               /* Our delay is less than a jiffy - just spin */
+                               zfs_sleep_until(zio->io_target_timestamp);
+                       } else {
+                               /*
+                                * Use taskq_dispatch_delay() in the place of
+                                * OpenZFS's timeout_generic().
+                                */
+                               tid = taskq_dispatch_delay(system_taskq,
+                                   (task_func_t *) zio_interrupt,
+                                   zio, TQ_NOSLEEP, expire_at_tick);
+                               if (!tid) {
+                                       /*
+                                        * Couldn't allocate a task.  Just
+                                        * finish the zio without a delay.
+                                        */
+                                       zio_interrupt(zio);
+                               }
+                       }
+               }
+               return;
+       }
+#endif
+       DTRACE_PROBE1(zio__delay__skip, zio_t *, zio);
+       zio_interrupt(zio);
+}
+
 /*
  * Execute the I/O pipeline until one of the following occurs:
  * (1) the I/O completes; (2) the pipeline stalls waiting for
@@ -1308,7 +1487,36 @@ static zio_pipe_stage_t *zio_pipeline[];
 void
 zio_execute(zio_t *zio)
 {
+       fstrans_cookie_t cookie;
+
+       cookie = spl_fstrans_mark();
        __zio_execute(zio);
+       spl_fstrans_unmark(cookie);
+}
+
+/*
+ * Used to determine if in the current context the stack is sized large
+ * enough to allow zio_execute() to be called recursively.  A minimum
+ * stack size of 16K is required to avoid needing to re-dispatch the zio.
+ */
+boolean_t
+zio_execute_stack_check(zio_t *zio)
+{
+#if !defined(HAVE_LARGE_STACKS)
+       dsl_pool_t *dp = spa_get_dsl(zio->io_spa);
+
+       /* Executing in txg_sync_thread() context. */
+       if (dp && curthread == dp->dp_tx.tx_sync_thread)
+               return (B_TRUE);
+
+       /* Pool initialization outside of zio_taskq context. */
+       if (dp && spa_is_initializing(dp->dp_spa) &&
+           !zio_taskq_member(zio, ZIO_TASKQ_ISSUE) &&
+           !zio_taskq_member(zio, ZIO_TASKQ_ISSUE_HIGH))
+               return (B_TRUE);
+#endif /* HAVE_LARGE_STACKS */
+
+       return (B_FALSE);
 }
 
 __attribute__((always_inline))
@@ -1320,8 +1528,6 @@ __zio_execute(zio_t *zio)
        while (zio->io_stage < ZIO_STAGE_DONE) {
                enum zio_stage pipeline = zio->io_pipeline;
                enum zio_stage stage = zio->io_stage;
-               dsl_pool_t *dp;
-               boolean_t cut;
                int rv;
 
                ASSERT(!MUTEX_HELD(&zio->io_lock));
@@ -1334,10 +1540,6 @@ __zio_execute(zio_t *zio)
 
                ASSERT(stage <= ZIO_STAGE_DONE);
 
-               dp = spa_get_dsl(zio->io_spa);
-               cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
-                   zio_requeue_io_start_cut_in_line : B_FALSE;
-
                /*
                 * If we are in interrupt context and this pipeline stage
                 * will grab a config lock that is held across I/O,
@@ -1349,29 +1551,25 @@ __zio_execute(zio_t *zio)
                 */
                if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
                    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
+                       boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
+                           zio_requeue_io_start_cut_in_line : B_FALSE;
                        zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
                        return;
                }
 
-#ifdef _KERNEL
                /*
-                * If we executing in the context of the tx_sync_thread,
-                * or we are performing pool initialization outside of a
-                * zio_taskq[ZIO_TASKQ_ISSUE|ZIO_TASKQ_ISSUE_HIGH] context.
-                * Then issue the zio asynchronously to minimize stack usage
-                * for these deep call paths.
+                * If the current context doesn't have large enough stacks
+                * the zio must be issued asynchronously to prevent overflow.
                 */
-               if ((dp && curthread == dp->dp_tx.tx_sync_thread) ||
-                   (dp && spa_is_initializing(dp->dp_spa) &&
-                   !zio_taskq_member(zio, ZIO_TASKQ_ISSUE) &&
-                   !zio_taskq_member(zio, ZIO_TASKQ_ISSUE_HIGH))) {
+               if (zio_execute_stack_check(zio)) {
+                       boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
+                           zio_requeue_io_start_cut_in_line : B_FALSE;
                        zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
                        return;
                }
-#endif
 
                zio->io_stage = stage;
-               rv = zio_pipeline[highbit(stage) - 1](zio);
+               rv = zio_pipeline[highbit64(stage) - 1](zio);
 
                if (rv == ZIO_PIPELINE_STOP)
                        return;
@@ -1416,14 +1614,19 @@ zio_nowait(zio_t *zio)
 
        if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
            zio_unique_parent(zio) == NULL) {
+               zio_t *pio;
+
                /*
                 * This is a logical async I/O with no parent to wait for it.
                 * We add it to the spa_async_root_zio "Godfather" I/O which
                 * will ensure they complete prior to unloading the pool.
                 */
                spa_t *spa = zio->io_spa;
+               kpreempt_disable();
+               pio = spa->spa_async_zio_root[CPU_SEQID];
+               kpreempt_enable();
 
-               zio_add_child(spa->spa_async_zio_root, zio);
+               zio_add_child(pio, zio);
        }
 
        __zio_execute(zio);
@@ -1698,7 +1901,7 @@ zio_gang_node_alloc(zio_gang_node_t **gnpp)
 
        ASSERT(*gnpp == NULL);
 
-       gn = kmem_zalloc(sizeof (*gn), KM_PUSHPAGE);
+       gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
        gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
        *gnpp = gn;
 
@@ -1852,11 +2055,11 @@ static void
 zio_write_gang_member_ready(zio_t *zio)
 {
        zio_t *pio = zio_unique_parent(zio);
-       ASSERTV(zio_t *gio = zio->io_gang_leader;)
        dva_t *cdva = zio->io_bp->blk_dva;
        dva_t *pdva = pio->io_bp->blk_dva;
        uint64_t asize;
        int d;
+       ASSERTV(zio_t *gio = zio->io_gang_leader);
 
        if (BP_IS_HOLE(zio->io_bp))
                return;
@@ -2148,7 +2351,7 @@ zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
 
                if (ddp->ddp_phys_birth != 0) {
                        arc_buf_t *abuf = NULL;
-                       uint32_t aflags = ARC_WAIT;
+                       arc_flags_t aflags = ARC_FLAG_WAIT;
                        blkptr_t blk = *zio->io_bp;
                        int error;
 
@@ -2544,6 +2747,18 @@ zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
  * Read and write to physical devices
  * ==========================================================================
  */
+
+
+/*
+ * Issue an I/O to the underlying vdev. Typically the issue pipeline
+ * stops after this stage and will resume upon I/O completion.
+ * However, there are instances where the vdev layer may need to
+ * continue the pipeline when an I/O was not issued. Since the I/O
+ * that was sent to the vdev layer might be different than the one
+ * currently active in the pipeline (see vdev_queue_io()), we explicitly
+ * force the underlying vdev layers to call either zio_execute() or
+ * zio_interrupt() to ensure that the pipeline continues with the correct I/O.
+ */
 static int
 zio_vdev_io_start(zio_t *zio)
 {
@@ -2551,6 +2766,8 @@ zio_vdev_io_start(zio_t *zio)
        uint64_t align;
        spa_t *spa = zio->io_spa;
 
+       zio->io_delay = 0;
+
        ASSERT(zio->io_error == 0);
        ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
 
@@ -2561,7 +2778,8 @@ zio_vdev_io_start(zio_t *zio)
                /*
                 * The mirror_ops handle multiple DVAs in a single BP.
                 */
-               return (vdev_mirror_ops.vdev_op_io_start(zio));
+               vdev_mirror_ops.vdev_op_io_start(zio);
+               return (ZIO_PIPELINE_STOP);
        }
 
        /*
@@ -2569,7 +2787,7 @@ zio_vdev_io_start(zio_t *zio)
         * can quickly react to certain workloads.  In particular, we care
         * about non-scrubbing, top-level reads and writes with the following
         * characteristics:
-        *      - synchronous writes of user data to non-slog devices
+        *      - synchronous writes of user data to non-slog devices
         *      - any reads of user data
         * When these conditions are met, adjust the timestamp of spa_last_io
         * which allows the scan thread to adjust its workload accordingly.
@@ -2586,7 +2804,9 @@ zio_vdev_io_start(zio_t *zio)
 
        align = 1ULL << vd->vdev_top->vdev_ashift;
 
-       if (P2PHASE(zio->io_size, align) != 0) {
+       if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
+           P2PHASE(zio->io_size, align) != 0) {
+               /* Transform logical writes to be a full physical block size. */
                uint64_t asize = P2ROUNDUP(zio->io_size, align);
                char *abuf = zio_buf_alloc(asize);
                ASSERT(vd == vd->vdev_top);
@@ -2597,8 +2817,22 @@ zio_vdev_io_start(zio_t *zio)
                zio_push_transform(zio, abuf, asize, asize, zio_subblock);
        }
 
-       ASSERT(P2PHASE(zio->io_offset, align) == 0);
-       ASSERT(P2PHASE(zio->io_size, align) == 0);
+       /*
+        * If this is not a physical io, make sure that it is properly aligned
+        * before proceeding.
+        */
+       if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
+               ASSERT0(P2PHASE(zio->io_offset, align));
+               ASSERT0(P2PHASE(zio->io_size, align));
+       } else {
+               /*
+                * For physical writes, we allow 512b aligned writes and assume
+                * the device will perform a read-modify-write as necessary.
+                */
+               ASSERT0(P2PHASE(zio->io_offset, SPA_MINBLOCKSIZE));
+               ASSERT0(P2PHASE(zio->io_size, SPA_MINBLOCKSIZE));
+       }
+
        VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
 
        /*
@@ -2626,7 +2860,7 @@ zio_vdev_io_start(zio_t *zio)
        if (vd->vdev_ops->vdev_op_leaf &&
            (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
 
-               if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio) == 0)
+               if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio))
                        return (ZIO_PIPELINE_CONTINUE);
 
                if ((zio = vdev_queue_io(zio)) == NULL)
@@ -2639,7 +2873,9 @@ zio_vdev_io_start(zio_t *zio)
                }
        }
 
-       return (vd->vdev_ops->vdev_op_io_start(zio));
+       zio->io_delay = gethrtime();
+       vd->vdev_ops->vdev_op_io_start(zio);
+       return (ZIO_PIPELINE_STOP);
 }
 
 static int
@@ -2654,6 +2890,9 @@ zio_vdev_io_done(zio_t *zio)
 
        ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
 
+       if (zio->io_delay)
+               zio->io_delay = gethrtime() - zio->io_delay;
+
        if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
 
                vdev_queue_io_done(zio);
@@ -2864,7 +3103,8 @@ zio_checksum_verify(zio_t *zio)
 
        if ((error = zio_checksum_error(zio, &info)) != 0) {
                zio->io_error = error;
-               if (!(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
+               if (error == ECKSUM &&
+                   !(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
                        zfs_ereport_start_checksum(zio->io_spa,
                            zio->io_vd, zio, zio->io_offset,
                            zio->io_size, NULL, &info);
@@ -2886,7 +3126,7 @@ zio_checksum_verified(zio_t *zio)
 /*
  * ==========================================================================
  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
- * An error of 0 indictes success.  ENXIO indicates whole-device failure,
+ * An error of 0 indicates success.  ENXIO indicates whole-device failure,
  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
  * indicate errors that are specific to one I/O, and most likely permanent.
  * Any other error is presumed to be worse because we weren't expecting it.
@@ -2992,18 +3232,21 @@ zio_done(zio_t *zio)
                for (w = 0; w < ZIO_WAIT_TYPES; w++)
                        ASSERT(zio->io_children[c][w] == 0);
 
-       if (zio->io_bp != NULL) {
+       if (zio->io_bp != NULL && !BP_IS_EMBEDDED(zio->io_bp)) {
                ASSERT(zio->io_bp->blk_pad[0] == 0);
                ASSERT(zio->io_bp->blk_pad[1] == 0);
-               ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
+               ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy,
+                   sizeof (blkptr_t)) == 0 ||
                    (zio->io_bp == zio_unique_parent(zio)->io_bp));
                if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(zio->io_bp) &&
                    zio->io_bp_override == NULL &&
                    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
                        ASSERT(!BP_SHOULD_BYTESWAP(zio->io_bp));
-                       ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
+                       ASSERT3U(zio->io_prop.zp_copies, <=,
+                           BP_GET_NDVAS(zio->io_bp));
                        ASSERT(BP_COUNT_GANG(zio->io_bp) == 0 ||
-                           (BP_COUNT_GANG(zio->io_bp) == BP_GET_NDVAS(zio->io_bp)));
+                           (BP_COUNT_GANG(zio->io_bp) ==
+                           BP_GET_NDVAS(zio->io_bp)));
                }
                if (zio->io_flags & ZIO_FLAG_NOPWRITE)
                        VERIFY(BP_EQUAL(zio->io_bp, &zio->io_bp_orig));
@@ -3030,7 +3273,7 @@ zio_done(zio_t *zio)
                        if (asize != zio->io_size) {
                                abuf = zio_buf_alloc(asize);
                                bcopy(zio->io_data, abuf, zio->io_size);
-                               bzero(abuf + zio->io_size, asize - zio->io_size);
+                               bzero(abuf+zio->io_size, asize-zio->io_size);
                        }
 
                        zio->io_cksum_report = zcr->zcr_next;
@@ -3052,10 +3295,10 @@ zio_done(zio_t *zio)
         * 30 seconds to complete, post an error described the I/O delay.
         * We ignore these errors if the device is currently unavailable.
         */
-       if (zio->io_delay >= MSEC_TO_TICK(zio_delay_max)) {
+       if (zio->io_delay >= MSEC2NSEC(zio_delay_max)) {
                if (zio->io_vd != NULL && !vdev_is_dead(zio->io_vd))
                        zfs_ereport_post(FM_EREPORT_ZFS_DELAY, zio->io_spa,
-                                         zio->io_vd, zio, 0, 0);
+                           zio->io_vd, zio, 0, 0);
        }
 
        if (zio->io_error) {
@@ -3078,8 +3321,8 @@ zio_done(zio_t *zio)
                         * error and generate a logical data ereport.
                         */
                        spa_log_error(zio->io_spa, zio);
-                       zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa, NULL, zio,
-                           0, 0);
+                       zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa,
+                           NULL, zio, 0, 0);
                }
        }
 
@@ -3226,7 +3469,8 @@ zio_done(zio_t *zio)
        }
 
        if (zio->io_flags & ZIO_FLAG_FASTWRITE && zio->io_bp &&
-           !BP_IS_HOLE(zio->io_bp) && !(zio->io_flags & ZIO_FLAG_NOPWRITE)) {
+           !BP_IS_HOLE(zio->io_bp) && !BP_IS_EMBEDDED(zio->io_bp) &&
+           !(zio->io_flags & ZIO_FLAG_NOPWRITE)) {
                metaslab_fastwrite_unmark(zio->io_spa, zio->io_bp);
        }
 
@@ -3291,61 +3535,138 @@ static zio_pipe_stage_t *zio_pipeline[] = {
        zio_done
 };
 
-/* dnp is the dnode for zb1->zb_object */
-boolean_t
-zbookmark_is_before(const dnode_phys_t *dnp, const zbookmark_t *zb1,
-    const zbookmark_t *zb2)
-{
-       uint64_t zb1nextL0, zb2thisobj;
 
-       ASSERT(zb1->zb_objset == zb2->zb_objset);
-       ASSERT(zb2->zb_level == 0);
 
+
+/*
+ * Compare two zbookmark_phys_t's to see which we would reach first in a
+ * pre-order traversal of the object tree.
+ *
+ * This is simple in every case aside from the meta-dnode object. For all other
+ * objects, we traverse them in order (object 1 before object 2, and so on).
+ * However, all of these objects are traversed while traversing object 0, since
+ * the data it points to is the list of objects.  Thus, we need to convert to a
+ * canonical representation so we can compare meta-dnode bookmarks to
+ * non-meta-dnode bookmarks.
+ *
+ * We do this by calculating "equivalents" for each field of the zbookmark.
+ * zbookmarks outside of the meta-dnode use their own object and level, and
+ * calculate the level 0 equivalent (the first L0 blkid that is contained in the
+ * blocks this bookmark refers to) by multiplying their blkid by their span
+ * (the number of L0 blocks contained within one block at their level).
+ * zbookmarks inside the meta-dnode calculate their object equivalent
+ * (which is L0equiv * dnodes per data block), use 0 for their L0equiv, and use
+ * level + 1<<31 (any value larger than a level could ever be) for their level.
+ * This causes them to always compare before a bookmark in their object
+ * equivalent, compare appropriately to bookmarks in other objects, and to
+ * compare appropriately to other bookmarks in the meta-dnode.
+ */
+int
+zbookmark_compare(uint16_t dbss1, uint8_t ibs1, uint16_t dbss2, uint8_t ibs2,
+    const zbookmark_phys_t *zb1, const zbookmark_phys_t *zb2)
+{
        /*
-        * A bookmark in the deadlist is considered to be after
-        * everything else.
+        * These variables represent the "equivalent" values for the zbookmark,
+        * after converting zbookmarks inside the meta dnode to their
+        * normal-object equivalents.
         */
-       if (zb2->zb_object == DMU_DEADLIST_OBJECT)
-               return (B_TRUE);
-
-       /* The objset_phys_t isn't before anything. */
-       if (dnp == NULL)
-               return (B_FALSE);
+       uint64_t zb1obj, zb2obj;
+       uint64_t zb1L0, zb2L0;
+       uint64_t zb1level, zb2level;
 
-       zb1nextL0 = (zb1->zb_blkid + 1) <<
-           ((zb1->zb_level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT));
+       if (zb1->zb_object == zb2->zb_object &&
+           zb1->zb_level == zb2->zb_level &&
+           zb1->zb_blkid == zb2->zb_blkid)
+               return (0);
 
-       zb2thisobj = zb2->zb_object ? zb2->zb_object :
-           zb2->zb_blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT);
+       /*
+        * BP_SPANB calculates the span in blocks.
+        */
+       zb1L0 = (zb1->zb_blkid) * BP_SPANB(ibs1, zb1->zb_level);
+       zb2L0 = (zb2->zb_blkid) * BP_SPANB(ibs2, zb2->zb_level);
 
        if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
-               uint64_t nextobj = zb1nextL0 *
-                   (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT) >> DNODE_SHIFT;
-               return (nextobj <= zb2thisobj);
+               zb1obj = zb1L0 * (dbss1 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
+               zb1L0 = 0;
+               zb1level = zb1->zb_level + COMPARE_META_LEVEL;
+       } else {
+               zb1obj = zb1->zb_object;
+               zb1level = zb1->zb_level;
        }
 
-       if (zb1->zb_object < zb2thisobj)
-               return (B_TRUE);
-       if (zb1->zb_object > zb2thisobj)
-               return (B_FALSE);
-       if (zb2->zb_object == DMU_META_DNODE_OBJECT)
+       if (zb2->zb_object == DMU_META_DNODE_OBJECT) {
+               zb2obj = zb2L0 * (dbss2 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
+               zb2L0 = 0;
+               zb2level = zb2->zb_level + COMPARE_META_LEVEL;
+       } else {
+               zb2obj = zb2->zb_object;
+               zb2level = zb2->zb_level;
+       }
+
+       /* Now that we have a canonical representation, do the comparison. */
+       if (zb1obj != zb2obj)
+               return (zb1obj < zb2obj ? -1 : 1);
+       else if (zb1L0 != zb2L0)
+               return (zb1L0 < zb2L0 ? -1 : 1);
+       else if (zb1level != zb2level)
+               return (zb1level > zb2level ? -1 : 1);
+       /*
+        * This can (theoretically) happen if the bookmarks have the same object
+        * and level, but different blkids, if the block sizes are not the same.
+        * There is presently no way to change the indirect block sizes
+        */
+       return (0);
+}
+
+/*
+ *  This function checks the following: given that last_block is the place that
+ *  our traversal stopped last time, does that guarantee that we've visited
+ *  every node under subtree_root?  Therefore, we can't just use the raw output
+ *  of zbookmark_compare.  We have to pass in a modified version of
+ *  subtree_root; by incrementing the block id, and then checking whether
+ *  last_block is before or equal to that, we can tell whether or not having
+ *  visited last_block implies that all of subtree_root's children have been
+ *  visited.
+ */
+boolean_t
+zbookmark_subtree_completed(const dnode_phys_t *dnp,
+    const zbookmark_phys_t *subtree_root, const zbookmark_phys_t *last_block)
+{
+       zbookmark_phys_t mod_zb = *subtree_root;
+       mod_zb.zb_blkid++;
+       ASSERT(last_block->zb_level == 0);
+
+       /* The objset_phys_t isn't before anything. */
+       if (dnp == NULL)
                return (B_FALSE);
-       return (zb1nextL0 <= zb2->zb_blkid);
+
+       /*
+        * We pass in 1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT) for the
+        * data block size in sectors, because that variable is only used if
+        * the bookmark refers to a block in the meta-dnode.  Since we don't
+        * know without examining it what object it refers to, and there's no
+        * harm in passing in this value in other cases, we always pass it in.
+        *
+        * We pass in 0 for the indirect block size shift because zb2 must be
+        * level 0.  The indirect block size is only used to calculate the span
+        * of the bookmark, but since the bookmark must be level 0, the span is
+        * always 1, so the math works out.
+        *
+        * If you make changes to how the zbookmark_compare code works, be sure
+        * to make sure that this code still works afterwards.
+        */
+       return (zbookmark_compare(dnp->dn_datablkszsec, dnp->dn_indblkshift,
+           1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT), 0, &mod_zb,
+           last_block) <= 0);
 }
 
 #if defined(_KERNEL) && defined(HAVE_SPL)
-/* Fault injection */
-EXPORT_SYMBOL(zio_injection_enabled);
-EXPORT_SYMBOL(zio_inject_fault);
-EXPORT_SYMBOL(zio_inject_list_next);
-EXPORT_SYMBOL(zio_clear_fault);
-EXPORT_SYMBOL(zio_handle_fault_injection);
-EXPORT_SYMBOL(zio_handle_device_injection);
-EXPORT_SYMBOL(zio_handle_label_injection);
 EXPORT_SYMBOL(zio_type_name);
-
-module_param(zio_bulk_flags, int, 0644);
-MODULE_PARM_DESC(zio_bulk_flags, "Additional flags to pass to bulk buffers");
+EXPORT_SYMBOL(zio_buf_alloc);
+EXPORT_SYMBOL(zio_data_buf_alloc);
+EXPORT_SYMBOL(zio_buf_alloc_flags);
+EXPORT_SYMBOL(zio_buf_free);
+EXPORT_SYMBOL(zio_data_buf_free);
 
 module_param(zio_delay_max, int, 0644);
 MODULE_PARM_DESC(zio_delay_max, "Max zio millisec delay before posting event");
@@ -3355,13 +3676,13 @@ MODULE_PARM_DESC(zio_requeue_io_start_cut_in_line, "Prioritize requeued I/O");
 
 module_param(zfs_sync_pass_deferred_free, int, 0644);
 MODULE_PARM_DESC(zfs_sync_pass_deferred_free,
-    "defer frees starting in this pass");
+       "Defer frees starting in this pass");
 
 module_param(zfs_sync_pass_dont_compress, int, 0644);
 MODULE_PARM_DESC(zfs_sync_pass_dont_compress,
-    "don't compress starting in this pass");
+       "Don't compress starting in this pass");
 
 module_param(zfs_sync_pass_rewrite, int, 0644);
 MODULE_PARM_DESC(zfs_sync_pass_rewrite,
-    "rewrite new bps starting in this pass");
+       "Rewrite new bps starting in this pass");
 #endif