]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/zio.c
Provide macros for setting and getting blkptr birth times
[mirror_zfs.git] / module / zfs / zio.c
index 4ec4bb69152d419ebd315396f420718beb0ac91c..e96bbda35a047e19ee8579239d089eb6cb7043d8 100644 (file)
@@ -6,7 +6,7 @@
  * You may not use this file except in compliance with the License.
  *
  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
- * or http://www.opensolaris.org/os/licensing.
+ * or https://opensource.org/licenses/CDDL-1.0.
  * See the License for the specific language governing permissions
  * and limitations under the License.
  *
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011, 2019 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2022 by Delphix. All rights reserved.
  * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
  * Copyright (c) 2017, Intel Corporation.
+ * Copyright (c) 2019, Klara Inc.
+ * Copyright (c) 2019, Allan Jude
+ * Copyright (c) 2021, Datto, Inc.
  */
 
 #include <sys/sysmacros.h>
@@ -38,6 +41,7 @@
 #include <sys/zio_checksum.h>
 #include <sys/dmu_objset.h>
 #include <sys/arc.h>
+#include <sys/brt.h>
 #include <sys/ddt.h>
 #include <sys/blkptr.h>
 #include <sys/zfeature.h>
 #include <sys/trace_zfs.h>
 #include <sys/abd.h>
 #include <sys/dsl_crypt.h>
-#include <sys/cityhash.h>
+#include <cityhash.h>
 
 /*
  * ==========================================================================
  * I/O type descriptions
  * ==========================================================================
  */
-const char *zio_type_name[ZIO_TYPES] = {
+const char *const zio_type_name[ZIO_TYPES] = {
        /*
         * Note: Linux kernel thread name length is limited
         * so these names will differ from upstream open zfs.
@@ -63,24 +67,24 @@ const char *zio_type_name[ZIO_TYPES] = {
 };
 
 int zio_dva_throttle_enabled = B_TRUE;
-int zio_deadman_log_all = B_FALSE;
+static int zio_deadman_log_all = B_FALSE;
 
 /*
  * ==========================================================================
  * I/O kmem caches
  * ==========================================================================
  */
-kmem_cache_t *zio_cache;
-kmem_cache_t *zio_link_cache;
+static kmem_cache_t *zio_cache;
+static kmem_cache_t *zio_link_cache;
 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 #if defined(ZFS_DEBUG) && !defined(_KERNEL)
-uint64_t zio_buf_cache_allocs[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
-uint64_t zio_buf_cache_frees[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
+static uint64_t zio_buf_cache_allocs[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
+static uint64_t zio_buf_cache_frees[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 #endif
 
 /* Mark IOs as "slow" if they take longer than 30 seconds */
-int zio_slow_io_ms = (30 * MILLISEC);
+static uint_t zio_slow_io_ms = (30 * MILLISEC);
 
 #define        BP_SPANB(indblkshift, level) \
        (((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
@@ -111,9 +115,15 @@ int zio_slow_io_ms = (30 * MILLISEC);
  * fragmented systems, which may have very few free segments of this size,
  * and may need to load new metaslabs to satisfy 128K allocations.
  */
-int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
-int zfs_sync_pass_dont_compress = 8; /* don't compress starting in this pass */
-int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
+
+/* defer frees starting in this pass */
+uint_t zfs_sync_pass_deferred_free = 2;
+
+/* don't compress starting in this pass */
+static uint_t zfs_sync_pass_dont_compress = 8;
+
+/* rewrite new bps starting in this pass */
+static uint_t zfs_sync_pass_rewrite = 2;
 
 /*
  * An allocating zio is one that either currently has the DVA allocate
@@ -121,12 +131,17 @@ int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
  */
 #define        IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
 
-int zio_requeue_io_start_cut_in_line = 1;
+/*
+ * Enable smaller cores by excluding metadata
+ * allocations as well.
+ */
+int zio_exclude_metadata = 0;
+static int zio_requeue_io_start_cut_in_line = 1;
 
 #ifdef ZFS_DEBUG
-int zio_buf_debug_limit = 16384;
+static const int zio_buf_debug_limit = 16384;
 #else
-int zio_buf_debug_limit = 0;
+static const int zio_buf_debug_limit = 0;
 #endif
 
 static inline void __zio_execute(zio_t *zio);
@@ -137,35 +152,28 @@ void
 zio_init(void)
 {
        size_t c;
-       vmem_t *data_alloc_arena = NULL;
 
        zio_cache = kmem_cache_create("zio_cache",
            sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
        zio_link_cache = kmem_cache_create("zio_link_cache",
            sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
 
-       /*
-        * For small buffers, we want a cache for each multiple of
-        * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
-        * for each quarter-power of 2.
-        */
        for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
                size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
-               size_t p2 = size;
-               size_t align = 0;
-               size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
+               size_t align, cflags, data_cflags;
+               char name[32];
 
-#if defined(_ILP32) && defined(_KERNEL)
                /*
-                * Cache size limited to 1M on 32-bit platforms until ARC
-                * buffers no longer require virtual address space.
+                * Create cache for each half-power of 2 size, starting from
+                * SPA_MINBLOCKSIZE.  It should give us memory space efficiency
+                * of ~7/8, sufficient for transient allocations mostly using
+                * these caches.
                 */
-               if (size > zfs_max_recordsize)
-                       break;
-#endif
-
+               size_t p2 = size;
                while (!ISP2(p2))
                        p2 &= p2 - 1;
+               if (!IS_P2ALIGNED(size, p2 / 2))
+                       continue;
 
 #ifndef _KERNEL
                /*
@@ -176,33 +184,37 @@ zio_init(void)
                 */
                if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
                        continue;
-               /*
-                * Here's the problem - on 4K native devices in userland on
-                * Linux using O_DIRECT, buffers must be 4K aligned or I/O
-                * will fail with EINVAL, causing zdb (and others) to coredump.
-                * Since userland probably doesn't need optimized buffer caches,
-                * we just force 4K alignment on everything.
-                */
-               align = 8 * SPA_MINBLOCKSIZE;
-#else
-               if (size < PAGESIZE) {
-                       align = SPA_MINBLOCKSIZE;
-               } else if (IS_P2ALIGNED(size, p2 >> 2)) {
-                       align = PAGESIZE;
-               }
 #endif
 
-               if (align != 0) {
-                       char name[36];
-                       (void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
-                       zio_buf_cache[c] = kmem_cache_create(name, size,
-                           align, NULL, NULL, NULL, NULL, NULL, cflags);
+               if (IS_P2ALIGNED(size, PAGESIZE))
+                       align = PAGESIZE;
+               else
+                       align = 1 << (highbit64(size ^ (size - 1)) - 1);
 
-                       (void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
-                       zio_data_buf_cache[c] = kmem_cache_create(name, size,
-                           align, NULL, NULL, NULL, NULL,
-                           data_alloc_arena, cflags);
+               cflags = (zio_exclude_metadata || size > zio_buf_debug_limit) ?
+                   KMC_NODEBUG : 0;
+               data_cflags = KMC_NODEBUG;
+               if (cflags == data_cflags) {
+                       /*
+                        * Resulting kmem caches would be identical.
+                        * Save memory by creating only one.
+                        */
+                       (void) snprintf(name, sizeof (name),
+                           "zio_buf_comb_%lu", (ulong_t)size);
+                       zio_buf_cache[c] = kmem_cache_create(name, size, align,
+                           NULL, NULL, NULL, NULL, NULL, cflags);
+                       zio_data_buf_cache[c] = zio_buf_cache[c];
+                       continue;
                }
+               (void) snprintf(name, sizeof (name), "zio_buf_%lu",
+                   (ulong_t)size);
+               zio_buf_cache[c] = kmem_cache_create(name, size, align,
+                   NULL, NULL, NULL, NULL, NULL, cflags);
+
+               (void) snprintf(name, sizeof (name), "zio_data_buf_%lu",
+                   (ulong_t)size);
+               zio_data_buf_cache[c] = kmem_cache_create(name, size, align,
+                   NULL, NULL, NULL, NULL, NULL, data_cflags);
        }
 
        while (--c != 0) {
@@ -223,37 +235,50 @@ zio_init(void)
 void
 zio_fini(void)
 {
-       size_t c;
-       kmem_cache_t *last_cache = NULL;
-       kmem_cache_t *last_data_cache = NULL;
+       size_t n = SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT;
 
-       for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
-#ifdef _ILP32
-               /*
-                * Cache size limited to 1M on 32-bit platforms until ARC
-                * buffers no longer require virtual address space.
-                */
-               if (((c + 1) << SPA_MINBLOCKSHIFT) > zfs_max_recordsize)
-                       break;
-#endif
 #if defined(ZFS_DEBUG) && !defined(_KERNEL)
-               if (zio_buf_cache_allocs[c] != zio_buf_cache_frees[c])
+       for (size_t i = 0; i < n; i++) {
+               if (zio_buf_cache_allocs[i] != zio_buf_cache_frees[i])
                        (void) printf("zio_fini: [%d] %llu != %llu\n",
-                           (int)((c + 1) << SPA_MINBLOCKSHIFT),
-                           (long long unsigned)zio_buf_cache_allocs[c],
-                           (long long unsigned)zio_buf_cache_frees[c]);
+                           (int)((i + 1) << SPA_MINBLOCKSHIFT),
+                           (long long unsigned)zio_buf_cache_allocs[i],
+                           (long long unsigned)zio_buf_cache_frees[i]);
+       }
 #endif
-               if (zio_buf_cache[c] != last_cache) {
-                       last_cache = zio_buf_cache[c];
-                       kmem_cache_destroy(zio_buf_cache[c]);
+
+       /*
+        * The same kmem cache can show up multiple times in both zio_buf_cache
+        * and zio_data_buf_cache. Do a wasteful but trivially correct scan to
+        * sort it out.
+        */
+       for (size_t i = 0; i < n; i++) {
+               kmem_cache_t *cache = zio_buf_cache[i];
+               if (cache == NULL)
+                       continue;
+               for (size_t j = i; j < n; j++) {
+                       if (cache == zio_buf_cache[j])
+                               zio_buf_cache[j] = NULL;
+                       if (cache == zio_data_buf_cache[j])
+                               zio_data_buf_cache[j] = NULL;
                }
-               zio_buf_cache[c] = NULL;
+               kmem_cache_destroy(cache);
+       }
 
-               if (zio_data_buf_cache[c] != last_data_cache) {
-                       last_data_cache = zio_data_buf_cache[c];
-                       kmem_cache_destroy(zio_data_buf_cache[c]);
+       for (size_t i = 0; i < n; i++) {
+               kmem_cache_t *cache = zio_data_buf_cache[i];
+               if (cache == NULL)
+                       continue;
+               for (size_t j = i; j < n; j++) {
+                       if (cache == zio_data_buf_cache[j])
+                               zio_data_buf_cache[j] = NULL;
                }
-               zio_data_buf_cache[c] = NULL;
+               kmem_cache_destroy(cache);
+       }
+
+       for (size_t i = 0; i < n; i++) {
+               VERIFY3P(zio_buf_cache[i], ==, NULL);
+               VERIFY3P(zio_data_buf_cache[i], ==, NULL);
        }
 
        kmem_cache_destroy(zio_link_cache);
@@ -270,6 +295,53 @@ zio_fini(void)
  * ==========================================================================
  */
 
+#ifdef ZFS_DEBUG
+static const ulong_t zio_buf_canary = (ulong_t)0xdeadc0dedead210b;
+#endif
+
+/*
+ * Use empty space after the buffer to detect overflows.
+ *
+ * Since zio_init() creates kmem caches only for certain set of buffer sizes,
+ * allocations of different sizes may have some unused space after the data.
+ * Filling part of that space with a known pattern on allocation and checking
+ * it on free should allow us to detect some buffer overflows.
+ */
+static void
+zio_buf_put_canary(ulong_t *p, size_t size, kmem_cache_t **cache, size_t c)
+{
+#ifdef ZFS_DEBUG
+       size_t off = P2ROUNDUP(size, sizeof (ulong_t));
+       ulong_t *canary = p + off / sizeof (ulong_t);
+       size_t asize = (c + 1) << SPA_MINBLOCKSHIFT;
+       if (c + 1 < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT &&
+           cache[c] == cache[c + 1])
+               asize = (c + 2) << SPA_MINBLOCKSHIFT;
+       for (; off < asize; canary++, off += sizeof (ulong_t))
+               *canary = zio_buf_canary;
+#endif
+}
+
+static void
+zio_buf_check_canary(ulong_t *p, size_t size, kmem_cache_t **cache, size_t c)
+{
+#ifdef ZFS_DEBUG
+       size_t off = P2ROUNDUP(size, sizeof (ulong_t));
+       ulong_t *canary = p + off / sizeof (ulong_t);
+       size_t asize = (c + 1) << SPA_MINBLOCKSHIFT;
+       if (c + 1 < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT &&
+           cache[c] == cache[c + 1])
+               asize = (c + 2) << SPA_MINBLOCKSHIFT;
+       for (; off < asize; canary++, off += sizeof (ulong_t)) {
+               if (unlikely(*canary != zio_buf_canary)) {
+                       PANIC("ZIO buffer overflow %p (%zu) + %zu %#lx != %#lx",
+                           p, size, (canary - p) * sizeof (ulong_t),
+                           *canary, zio_buf_canary);
+               }
+       }
+#endif
+}
+
 /*
  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
@@ -286,7 +358,9 @@ zio_buf_alloc(size_t size)
        atomic_add_64(&zio_buf_cache_allocs[c], 1);
 #endif
 
-       return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
+       void *p = kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE);
+       zio_buf_put_canary(p, size, zio_buf_cache, c);
+       return (p);
 }
 
 /*
@@ -302,7 +376,9 @@ zio_data_buf_alloc(size_t size)
 
        VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-       return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
+       void *p = kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE);
+       zio_buf_put_canary(p, size, zio_data_buf_cache, c);
+       return (p);
 }
 
 void
@@ -315,6 +391,7 @@ zio_buf_free(void *buf, size_t size)
        atomic_add_64(&zio_buf_cache_frees[c], 1);
 #endif
 
+       zio_buf_check_canary(buf, size, zio_buf_cache, c);
        kmem_cache_free(zio_buf_cache[c], buf);
 }
 
@@ -325,12 +402,14 @@ zio_data_buf_free(void *buf, size_t size)
 
        VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
+       zio_buf_check_canary(buf, size, zio_data_buf_cache, c);
        kmem_cache_free(zio_data_buf_cache[c], buf);
 }
 
 static void
 zio_abd_free(void *abd, size_t size)
 {
+       (void) size;
        abd_free((abd_t *)abd);
 }
 
@@ -398,7 +477,8 @@ zio_decompress(zio_t *zio, abd_t *data, uint64_t size)
        if (zio->io_error == 0) {
                void *tmp = abd_borrow_buf(data, size);
                int ret = zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
-                   zio->io_abd, tmp, zio->io_size, size);
+                   zio->io_abd, tmp, zio->io_size, size,
+                   &zio->io_prop.zp_complevel);
                abd_return_buf_copy(data, tmp, size);
 
                if (zio_injection_enabled && ret == 0)
@@ -448,7 +528,8 @@ zio_decrypt(zio_t *zio, abd_t *data, uint64_t size)
                         */
                        tmp = zio_buf_alloc(lsize);
                        ret = zio_decompress_data(BP_GET_COMPRESS(bp),
-                           zio->io_abd, tmp, zio->io_size, lsize);
+                           zio->io_abd, tmp, zio->io_size, lsize,
+                           &zio->io_prop.zp_complevel);
                        if (ret != 0) {
                                ret = SET_ERROR(EIO);
                                goto error;
@@ -474,8 +555,9 @@ zio_decrypt(zio_t *zio, abd_t *data, uint64_t size)
 
        /*
         * If this is an authenticated block, just check the MAC. It would be
-        * nice to separate this out into its own flag, but for the moment
-        * enum zio_flag is out of bits.
+        * nice to separate this out into its own flag, but when this was done,
+        * we had run out of bits in what is now zio_flag_t. Future cleanup
+        * could make this a flag bit.
         */
        if (BP_IS_AUTHENTICATED(bp)) {
                if (ot == DMU_OT_OBJSET) {
@@ -530,9 +612,10 @@ error:
        if (ret == ECKSUM) {
                zio->io_error = SET_ERROR(EIO);
                if ((zio->io_flags & ZIO_FLAG_SPECULATIVE) == 0) {
-                       spa_log_error(spa, &zio->io_bookmark);
-                       zfs_ereport_post(FM_EREPORT_ZFS_AUTHENTICATION,
-                           spa, NULL, &zio->io_bookmark, zio, 0, 0);
+                       spa_log_error(spa, &zio->io_bookmark,
+                           BP_GET_LOGICAL_BIRTH(zio->io_bp));
+                       (void) zfs_ereport_post(FM_EREPORT_ZFS_AUTHENTICATION,
+                           spa, NULL, &zio->io_bookmark, zio, 0);
                }
        } else {
                zio->io_error = ret;
@@ -585,8 +668,6 @@ zio_unique_parent(zio_t *cio)
 void
 zio_add_child(zio_t *pio, zio_t *cio)
 {
-       zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
-
        /*
         * Logical I/Os can have logical, gang, or vdev children.
         * Gang I/Os can have gang or vdev children.
@@ -595,6 +676,12 @@ zio_add_child(zio_t *pio, zio_t *cio)
         */
        ASSERT3S(cio->io_child_type, <=, pio->io_child_type);
 
+       /* Parent should not have READY stage if child doesn't have it. */
+       IMPLY((cio->io_pipeline & ZIO_STAGE_READY) == 0 &&
+           (cio->io_child_type != ZIO_CHILD_VDEV),
+           (pio->io_pipeline & ZIO_STAGE_READY) == 0);
+
+       zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
        zl->zl_parent = pio;
        zl->zl_child = cio;
 
@@ -603,19 +690,53 @@ zio_add_child(zio_t *pio, zio_t *cio)
 
        ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
 
+       uint64_t *countp = pio->io_children[cio->io_child_type];
        for (int w = 0; w < ZIO_WAIT_TYPES; w++)
-               pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
+               countp[w] += !cio->io_state[w];
 
        list_insert_head(&pio->io_child_list, zl);
        list_insert_head(&cio->io_parent_list, zl);
 
-       pio->io_child_count++;
-       cio->io_parent_count++;
-
        mutex_exit(&cio->io_lock);
        mutex_exit(&pio->io_lock);
 }
 
+void
+zio_add_child_first(zio_t *pio, zio_t *cio)
+{
+       /*
+        * Logical I/Os can have logical, gang, or vdev children.
+        * Gang I/Os can have gang or vdev children.
+        * Vdev I/Os can only have vdev children.
+        * The following ASSERT captures all of these constraints.
+        */
+       ASSERT3S(cio->io_child_type, <=, pio->io_child_type);
+
+       /* Parent should not have READY stage if child doesn't have it. */
+       IMPLY((cio->io_pipeline & ZIO_STAGE_READY) == 0 &&
+           (cio->io_child_type != ZIO_CHILD_VDEV),
+           (pio->io_pipeline & ZIO_STAGE_READY) == 0);
+
+       zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
+       zl->zl_parent = pio;
+       zl->zl_child = cio;
+
+       ASSERT(list_is_empty(&cio->io_parent_list));
+       list_insert_head(&cio->io_parent_list, zl);
+
+       mutex_enter(&pio->io_lock);
+
+       ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
+
+       uint64_t *countp = pio->io_children[cio->io_child_type];
+       for (int w = 0; w < ZIO_WAIT_TYPES; w++)
+               countp[w] += !cio->io_state[w];
+
+       list_insert_head(&pio->io_child_list, zl);
+
+       mutex_exit(&pio->io_lock);
+}
+
 static void
 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
 {
@@ -628,9 +749,6 @@ zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
        list_remove(&pio->io_child_list, zl);
        list_remove(&cio->io_parent_list, zl);
 
-       pio->io_child_count--;
-       cio->io_parent_count--;
-
        mutex_exit(&cio->io_lock);
        mutex_exit(&pio->io_lock);
        kmem_cache_free(zio_link_cache, zl);
@@ -685,7 +803,9 @@ zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait,
 
                /*
                 * If we can tell the caller to execute this parent next, do
-                * so.  Otherwise dispatch the parent zio as its own task.
+                * so. We only do this if the parent's zio type matches the
+                * child's type. Otherwise dispatch the parent zio in its
+                * own taskq.
                 *
                 * Having the caller execute the parent when possible reduces
                 * locking on the zio taskq's, reduces context switch
@@ -704,7 +824,8 @@ zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait,
                 * parent-child relationships, as we do with the "mega zio"
                 * of writes for spa_sync(), and the chain of ZIL blocks.
                 */
-               if (next_to_executep != NULL && *next_to_executep == NULL) {
+               if (next_to_executep != NULL && *next_to_executep == NULL &&
+                   pio->io_type == zio->io_type) {
                        *next_to_executep = pio;
                } else {
                        zio_taskq_dispatch(pio, type, B_FALSE);
@@ -764,7 +885,7 @@ static zio_t *
 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
     abd_t *data, uint64_t lsize, uint64_t psize, zio_done_func_t *done,
     void *private, zio_type_t type, zio_priority_t priority,
-    enum zio_flag flags, vdev_t *vd, uint64_t offset,
+    zio_flag_t flags, vdev_t *vd, uint64_t offset,
     const zbookmark_phys_t *zb, enum zio_stage stage,
     enum zio_stage pipeline)
 {
@@ -781,7 +902,7 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        IMPLY(lsize != psize, (flags & ZIO_FLAG_RAW_COMPRESS) != 0);
 
        zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
-       bzero(zio, sizeof (zio_t));
+       memset(zio, 0, sizeof (zio_t));
 
        mutex_init(&zio->io_lock, NULL, MUTEX_NOLOCKDEP, NULL);
        cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
@@ -802,12 +923,14 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
                zio->io_child_type = ZIO_CHILD_LOGICAL;
 
        if (bp != NULL) {
-               zio->io_bp = (blkptr_t *)bp;
-               zio->io_bp_copy = *bp;
-               zio->io_bp_orig = *bp;
                if (type != ZIO_TYPE_WRITE ||
-                   zio->io_child_type == ZIO_CHILD_DDT)
+                   zio->io_child_type == ZIO_CHILD_DDT) {
+                       zio->io_bp_copy = *bp;
                        zio->io_bp = &zio->io_bp_copy;  /* so caller can free */
+               } else {
+                       zio->io_bp = (blkptr_t *)bp;
+               }
+               zio->io_bp_orig = *bp;
                if (zio->io_child_type == ZIO_CHILD_LOGICAL)
                        zio->io_logical = zio;
                if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
@@ -829,21 +952,22 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        zio->io_orig_stage = zio->io_stage = stage;
        zio->io_orig_pipeline = zio->io_pipeline = pipeline;
        zio->io_pipeline_trace = ZIO_STAGE_OPEN;
+       zio->io_allocator = ZIO_ALLOCATOR_NONE;
 
-       zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
+       zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY) ||
+           (pipeline & ZIO_STAGE_READY) == 0;
        zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
 
        if (zb != NULL)
                zio->io_bookmark = *zb;
 
        if (pio != NULL) {
-               if (zio->io_metaslab_class == NULL)
-                       zio->io_metaslab_class = pio->io_metaslab_class;
+               zio->io_metaslab_class = pio->io_metaslab_class;
                if (zio->io_logical == NULL)
                        zio->io_logical = pio->io_logical;
                if (zio->io_child_type == ZIO_CHILD_GANG)
                        zio->io_gang_leader = pio->io_gang_leader;
-               zio_add_child(pio, zio);
+               zio_add_child_first(pio, zio);
        }
 
        taskq_init_ent(&zio->io_tqent);
@@ -851,7 +975,7 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        return (zio);
 }
 
-static void
+void
 zio_destroy(zio_t *zio)
 {
        metaslab_trace_fini(&zio->io_alloc_list);
@@ -862,9 +986,13 @@ zio_destroy(zio_t *zio)
        kmem_cache_free(zio_cache, zio);
 }
 
+/*
+ * ZIO intended to be between others.  Provides synchronization at READY
+ * and DONE pipeline stages and calls the respective callbacks.
+ */
 zio_t *
 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
-    void *private, enum zio_flag flags)
+    void *private, zio_flag_t flags)
 {
        zio_t *zio;
 
@@ -875,41 +1003,132 @@ zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
        return (zio);
 }
 
+/*
+ * ZIO intended to be a root of a tree.  Unlike null ZIO does not have a
+ * READY pipeline stage (is ready on creation), so it should not be used
+ * as child of any ZIO that may need waiting for grandchildren READY stage
+ * (any other ZIO type).
+ */
 zio_t *
-zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
+zio_root(spa_t *spa, zio_done_func_t *done, void *private, zio_flag_t flags)
 {
-       return (zio_null(NULL, spa, NULL, done, private, flags));
+       zio_t *zio;
+
+       zio = zio_create(NULL, spa, 0, NULL, NULL, 0, 0, done, private,
+           ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, NULL, 0, NULL,
+           ZIO_STAGE_OPEN, ZIO_ROOT_PIPELINE);
+
+       return (zio);
 }
 
-static void
-zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp, boolean_t config_held)
+static int
+zfs_blkptr_verify_log(spa_t *spa, const blkptr_t *bp,
+    enum blk_verify_flag blk_verify, const char *fmt, ...)
 {
+       va_list adx;
+       char buf[256];
+
+       va_start(adx, fmt);
+       (void) vsnprintf(buf, sizeof (buf), fmt, adx);
+       va_end(adx);
+
+       zfs_dbgmsg("bad blkptr at %px: "
+           "DVA[0]=%#llx/%#llx "
+           "DVA[1]=%#llx/%#llx "
+           "DVA[2]=%#llx/%#llx "
+           "prop=%#llx "
+           "pad=%#llx,%#llx "
+           "phys_birth=%#llx "
+           "birth=%#llx "
+           "fill=%#llx "
+           "cksum=%#llx/%#llx/%#llx/%#llx",
+           bp,
+           (long long)bp->blk_dva[0].dva_word[0],
+           (long long)bp->blk_dva[0].dva_word[1],
+           (long long)bp->blk_dva[1].dva_word[0],
+           (long long)bp->blk_dva[1].dva_word[1],
+           (long long)bp->blk_dva[2].dva_word[0],
+           (long long)bp->blk_dva[2].dva_word[1],
+           (long long)bp->blk_prop,
+           (long long)bp->blk_pad[0],
+           (long long)bp->blk_pad[1],
+           (long long)BP_GET_PHYSICAL_BIRTH(bp),
+           (long long)BP_GET_LOGICAL_BIRTH(bp),
+           (long long)bp->blk_fill,
+           (long long)bp->blk_cksum.zc_word[0],
+           (long long)bp->blk_cksum.zc_word[1],
+           (long long)bp->blk_cksum.zc_word[2],
+           (long long)bp->blk_cksum.zc_word[3]);
+       switch (blk_verify) {
+       case BLK_VERIFY_HALT:
+               zfs_panic_recover("%s: %s", spa_name(spa), buf);
+               break;
+       case BLK_VERIFY_LOG:
+               zfs_dbgmsg("%s: %s", spa_name(spa), buf);
+               break;
+       case BLK_VERIFY_ONLY:
+               break;
+       }
+
+       return (1);
+}
+
+/*
+ * Verify the block pointer fields contain reasonable values.  This means
+ * it only contains known object types, checksum/compression identifiers,
+ * block sizes within the maximum allowed limits, valid DVAs, etc.
+ *
+ * If everything checks out B_TRUE is returned.  The zfs_blkptr_verify
+ * argument controls the behavior when an invalid field is detected.
+ *
+ * Values for blk_verify_flag:
+ *   BLK_VERIFY_ONLY: evaluate the block
+ *   BLK_VERIFY_LOG: evaluate the block and log problems
+ *   BLK_VERIFY_HALT: call zfs_panic_recover on error
+ *
+ * Values for blk_config_flag:
+ *   BLK_CONFIG_HELD: caller holds SCL_VDEV for writer
+ *   BLK_CONFIG_NEEDED: caller holds no config lock, SCL_VDEV will be
+ *   obtained for reader
+ *   BLK_CONFIG_SKIP: skip checks which require SCL_VDEV, for better
+ *   performance
+ */
+boolean_t
+zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp,
+    enum blk_config_flag blk_config, enum blk_verify_flag blk_verify)
+{
+       int errors = 0;
+
        if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
-               zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
+               errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                   "blkptr at %px has invalid TYPE %llu",
                    bp, (longlong_t)BP_GET_TYPE(bp));
        }
-       if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||
-           BP_GET_CHECKSUM(bp) <= ZIO_CHECKSUM_ON) {
-               zfs_panic_recover("blkptr at %p has invalid CHECKSUM %llu",
+       if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS) {
+               errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                   "blkptr at %px has invalid CHECKSUM %llu",
                    bp, (longlong_t)BP_GET_CHECKSUM(bp));
        }
-       if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS ||
-           BP_GET_COMPRESS(bp) <= ZIO_COMPRESS_ON) {
-               zfs_panic_recover("blkptr at %p has invalid COMPRESS %llu",
+       if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS) {
+               errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                   "blkptr at %px has invalid COMPRESS %llu",
                    bp, (longlong_t)BP_GET_COMPRESS(bp));
        }
        if (BP_GET_LSIZE(bp) > SPA_MAXBLOCKSIZE) {
-               zfs_panic_recover("blkptr at %p has invalid LSIZE %llu",
+               errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                   "blkptr at %px has invalid LSIZE %llu",
                    bp, (longlong_t)BP_GET_LSIZE(bp));
        }
        if (BP_GET_PSIZE(bp) > SPA_MAXBLOCKSIZE) {
-               zfs_panic_recover("blkptr at %p has invalid PSIZE %llu",
+               errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                   "blkptr at %px has invalid PSIZE %llu",
                    bp, (longlong_t)BP_GET_PSIZE(bp));
        }
 
        if (BP_IS_EMBEDDED(bp)) {
                if (BPE_GET_ETYPE(bp) >= NUM_BP_EMBEDDED_TYPES) {
-                       zfs_panic_recover("blkptr at %p has invalid ETYPE %llu",
+                       errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                           "blkptr at %px has invalid ETYPE %llu",
                            bp, (longlong_t)BPE_GET_ETYPE(bp));
                }
        }
@@ -919,39 +1138,50 @@ zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp, boolean_t config_held)
         * will be done once the zio is executed in vdev_mirror_map_alloc.
         */
        if (!spa->spa_trust_config)
-               return;
+               return (errors == 0);
 
-       if (!config_held)
-               spa_config_enter(spa, SCL_VDEV, bp, RW_READER);
-       else
+       switch (blk_config) {
+       case BLK_CONFIG_HELD:
                ASSERT(spa_config_held(spa, SCL_VDEV, RW_WRITER));
+               break;
+       case BLK_CONFIG_NEEDED:
+               spa_config_enter(spa, SCL_VDEV, bp, RW_READER);
+               break;
+       case BLK_CONFIG_SKIP:
+               return (errors == 0);
+       default:
+               panic("invalid blk_config %u", blk_config);
+       }
+
        /*
         * Pool-specific checks.
         *
-        * Note: it would be nice to verify that the blk_birth and
-        * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
-        * allows the birth time of log blocks (and dmu_sync()-ed blocks
-        * that are in the log) to be arbitrarily large.
+        * Note: it would be nice to verify that the logical birth
+        * and physical birth are not too large.  However,
+        * spa_freeze() allows the birth time of log blocks (and
+        * dmu_sync()-ed blocks that are in the log) to be arbitrarily
+        * large.
         */
        for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
-               uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
+               const dva_t *dva = &bp->blk_dva[i];
+               uint64_t vdevid = DVA_GET_VDEV(dva);
 
                if (vdevid >= spa->spa_root_vdev->vdev_children) {
-                       zfs_panic_recover("blkptr at %p DVA %u has invalid "
-                           "VDEV %llu",
+                       errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                           "blkptr at %px DVA %u has invalid VDEV %llu",
                            bp, i, (longlong_t)vdevid);
                        continue;
                }
                vdev_t *vd = spa->spa_root_vdev->vdev_child[vdevid];
                if (vd == NULL) {
-                       zfs_panic_recover("blkptr at %p DVA %u has invalid "
-                           "VDEV %llu",
+                       errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                           "blkptr at %px DVA %u has invalid VDEV %llu",
                            bp, i, (longlong_t)vdevid);
                        continue;
                }
                if (vd->vdev_ops == &vdev_hole_ops) {
-                       zfs_panic_recover("blkptr at %p DVA %u has hole "
-                           "VDEV %llu",
+                       errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                           "blkptr at %px DVA %u has hole VDEV %llu",
                            bp, i, (longlong_t)vdevid);
                        continue;
                }
@@ -963,23 +1193,26 @@ zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp, boolean_t config_held)
                         */
                        continue;
                }
-               uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
-               uint64_t asize = DVA_GET_ASIZE(&bp->blk_dva[i]);
-               if (BP_IS_GANG(bp))
-                       asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
+               uint64_t offset = DVA_GET_OFFSET(dva);
+               uint64_t asize = DVA_GET_ASIZE(dva);
+               if (DVA_GET_GANG(dva))
+                       asize = vdev_gang_header_asize(vd);
                if (offset + asize > vd->vdev_asize) {
-                       zfs_panic_recover("blkptr at %p DVA %u has invalid "
-                           "OFFSET %llu",
+                       errors += zfs_blkptr_verify_log(spa, bp, blk_verify,
+                           "blkptr at %px DVA %u has invalid OFFSET %llu",
                            bp, i, (longlong_t)offset);
                }
        }
-       if (!config_held)
+       if (blk_config == BLK_CONFIG_NEEDED)
                spa_config_exit(spa, SCL_VDEV, bp);
+
+       return (errors == 0);
 }
 
 boolean_t
 zfs_dva_valid(spa_t *spa, const dva_t *dva, const blkptr_t *bp)
 {
+       (void) bp;
        uint64_t vdevid = DVA_GET_VDEV(dva);
 
        if (vdevid >= spa->spa_root_vdev->vdev_children)
@@ -999,8 +1232,8 @@ zfs_dva_valid(spa_t *spa, const dva_t *dva, const blkptr_t *bp)
        uint64_t offset = DVA_GET_OFFSET(dva);
        uint64_t asize = DVA_GET_ASIZE(dva);
 
-       if (BP_IS_GANG(bp))
-               asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
+       if (DVA_GET_GANG(dva))
+               asize = vdev_gang_header_asize(vd);
        if (offset + asize > vd->vdev_asize)
                return (B_FALSE);
 
@@ -1010,13 +1243,11 @@ zfs_dva_valid(spa_t *spa, const dva_t *dva, const blkptr_t *bp)
 zio_t *
 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
     abd_t *data, uint64_t size, zio_done_func_t *done, void *private,
-    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
+    zio_priority_t priority, zio_flag_t flags, const zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
-       zfs_blkptr_verify(spa, bp, flags & ZIO_FLAG_CONFIG_WRITER);
-
-       zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
+       zio = zio_create(pio, spa, BP_GET_BIRTH(bp), bp,
            data, size, size, done, private,
            ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
            ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
@@ -1029,9 +1260,8 @@ zio_t *
 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     abd_t *data, uint64_t lsize, uint64_t psize, const zio_prop_t *zp,
     zio_done_func_t *ready, zio_done_func_t *children_ready,
-    zio_done_func_t *physdone, zio_done_func_t *done,
-    void *private, zio_priority_t priority, enum zio_flag flags,
-    const zbookmark_phys_t *zb)
+    zio_done_func_t *done, void *private, zio_priority_t priority,
+    zio_flag_t flags, const zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -1051,7 +1281,6 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
 
        zio->io_ready = ready;
        zio->io_children_ready = children_ready;
-       zio->io_physdone = physdone;
        zio->io_prop = *zp;
 
        /*
@@ -1073,7 +1302,7 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
 zio_t *
 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, abd_t *data,
     uint64_t size, zio_done_func_t *done, void *private,
-    zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
+    zio_priority_t priority, zio_flag_t flags, zbookmark_phys_t *zb)
 {
        zio_t *zio;
 
@@ -1085,12 +1314,14 @@ zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, abd_t *data,
 }
 
 void
-zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
+zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite,
+    boolean_t brtwrite)
 {
        ASSERT(zio->io_type == ZIO_TYPE_WRITE);
        ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
        ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
        ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
+       ASSERT(!brtwrite || !nopwrite);
 
        /*
         * We must reset the io_prop to match the values that existed
@@ -1099,6 +1330,7 @@ zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
         */
        zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
        zio->io_prop.zp_nopwrite = nopwrite;
+       zio->io_prop.zp_brtwrite = brtwrite;
        zio->io_prop.zp_copies = copies;
        zio->io_bp_override = bp;
 }
@@ -1107,7 +1339,7 @@ void
 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
 {
 
-       zfs_blkptr_verify(spa, bp, B_FALSE);
+       (void) zfs_blkptr_verify(spa, bp, BLK_CONFIG_NEEDED, BLK_VERIFY_HALT);
 
        /*
         * The check for EMBEDDED is a performance optimization.  We
@@ -1116,7 +1348,6 @@ zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
         */
        if (BP_IS_EMBEDDED(bp))
                return;
-       metaslab_check_free(spa, bp);
 
        /*
         * Frees that are for the currently-syncing txg, are not going to be
@@ -1132,52 +1363,63 @@ zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
            BP_GET_DEDUP(bp) ||
            txg != spa->spa_syncing_txg ||
            (spa_sync_pass(spa) >= zfs_sync_pass_deferred_free &&
-           !spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP))) {
+           !spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP)) ||
+           brt_maybe_exists(spa, bp)) {
+               metaslab_check_free(spa, bp);
                bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
        } else {
-               VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp, 0)));
+               VERIFY3P(zio_free_sync(NULL, spa, txg, bp, 0), ==, NULL);
        }
 }
 
+/*
+ * To improve performance, this function may return NULL if we were able
+ * to do the free immediately.  This avoids the cost of creating a zio
+ * (and linking it to the parent, etc).
+ */
 zio_t *
 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
-    enum zio_flag flags)
+    zio_flag_t flags)
 {
-       zio_t *zio;
-       enum zio_stage stage = ZIO_FREE_PIPELINE;
-
        ASSERT(!BP_IS_HOLE(bp));
        ASSERT(spa_syncing_txg(spa) == txg);
 
        if (BP_IS_EMBEDDED(bp))
-               return (zio_null(pio, spa, NULL, NULL, NULL, 0));
+               return (NULL);
 
        metaslab_check_free(spa, bp);
        arc_freed(spa, bp);
        dsl_scan_freed(spa, bp);
 
-       /*
-        * GANG and DEDUP blocks can induce a read (for the gang block header,
-        * or the DDT), so issue them asynchronously so that this thread is
-        * not tied up.
-        */
-       if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
-               stage |= ZIO_STAGE_ISSUE_ASYNC;
-
-       zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
-           BP_GET_PSIZE(bp), NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW,
-           flags, NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
+       if (BP_IS_GANG(bp) ||
+           BP_GET_DEDUP(bp) ||
+           brt_maybe_exists(spa, bp)) {
+               /*
+                * GANG, DEDUP and BRT blocks can induce a read (for the gang
+                * block header, the DDT or the BRT), so issue them
+                * asynchronously so that this thread is not tied up.
+                */
+               enum zio_stage stage =
+                   ZIO_FREE_PIPELINE | ZIO_STAGE_ISSUE_ASYNC;
 
-       return (zio);
+               return (zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
+                   BP_GET_PSIZE(bp), NULL, NULL,
+                   ZIO_TYPE_FREE, ZIO_PRIORITY_NOW,
+                   flags, NULL, 0, NULL, ZIO_STAGE_OPEN, stage));
+       } else {
+               metaslab_free(spa, bp, txg, B_FALSE);
+               return (NULL);
+       }
 }
 
 zio_t *
 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
-    zio_done_func_t *done, void *private, enum zio_flag flags)
+    zio_done_func_t *done, void *private, zio_flag_t flags)
 {
        zio_t *zio;
 
-       zfs_blkptr_verify(spa, bp, flags & ZIO_FLAG_CONFIG_WRITER);
+       (void) zfs_blkptr_verify(spa, bp, (flags & ZIO_FLAG_CONFIG_WRITER) ?
+           BLK_CONFIG_HELD : BLK_CONFIG_NEEDED, BLK_VERIFY_HALT);
 
        if (BP_IS_EMBEDDED(bp))
                return (zio_null(pio, spa, NULL, NULL, NULL, 0));
@@ -1194,10 +1436,10 @@ zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
         * starts allocating blocks -- so that nothing is allocated twice.
         * If txg == 0 we just verify that the block is claimable.
         */
-       ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <,
+       ASSERT3U(BP_GET_LOGICAL_BIRTH(&spa->spa_uberblock.ub_rootbp), <,
            spa_min_claim_txg(spa));
        ASSERT(txg == spa_min_claim_txg(spa) || txg == 0);
-       ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));       /* zdb(1M) */
+       ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));       /* zdb(8) */
 
        zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
            BP_GET_PSIZE(bp), done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW,
@@ -1209,32 +1451,19 @@ zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 
 zio_t *
 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
-    zio_done_func_t *done, void *private, enum zio_flag flags)
+    zio_done_func_t *done, void *private, zio_flag_t flags)
 {
-       zio_t *zio;
-       int c;
-
-       if (vd->vdev_children == 0) {
-               zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
-                   ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
-                   ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
-
-               zio->io_cmd = cmd;
-       } else {
-               zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
-
-               for (c = 0; c < vd->vdev_children; c++)
-                       zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
-                           done, private, flags));
-       }
-
+       zio_t *zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
+           ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
+           ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
+       zio->io_cmd = cmd;
        return (zio);
 }
 
 zio_t *
 zio_trim(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     zio_done_func_t *done, void *private, zio_priority_t priority,
-    enum zio_flag flags, enum trim_flag trim_flags)
+    zio_flag_t flags, enum trim_flag trim_flags)
 {
        zio_t *zio;
 
@@ -1254,7 +1483,7 @@ zio_trim(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
 zio_t *
 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     abd_t *data, int checksum, zio_done_func_t *done, void *private,
-    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
+    zio_priority_t priority, zio_flag_t flags, boolean_t labels)
 {
        zio_t *zio;
 
@@ -1275,7 +1504,7 @@ zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
 zio_t *
 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     abd_t *data, int checksum, zio_done_func_t *done, void *private,
-    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
+    zio_priority_t priority, zio_flag_t flags, boolean_t labels)
 {
        zio_t *zio;
 
@@ -1312,7 +1541,7 @@ zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
 zio_t *
 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
     abd_t *data, uint64_t size, int type, zio_priority_t priority,
-    enum zio_flag flags, zio_done_func_t *done, void *private)
+    zio_flag_t flags, zio_done_func_t *done, void *private)
 {
        enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
        zio_t *zio;
@@ -1371,22 +1600,17 @@ zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
                flags &= ~ZIO_FLAG_IO_ALLOCATING;
        }
 
-
        zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size, size,
            done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
            ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
        ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
 
-       zio->io_physdone = pio->io_physdone;
-       if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
-               zio->io_logical->io_phys_children++;
-
        return (zio);
 }
 
 zio_t *
 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, abd_t *data, uint64_t size,
-    zio_type_t type, zio_priority_t priority, enum zio_flag flags,
+    zio_type_t type, zio_priority_t priority, zio_flag_t flags,
     zio_done_func_t *done, void *private)
 {
        zio_t *zio;
@@ -1403,11 +1627,18 @@ zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, abd_t *data, uint64_t size,
 }
 
 void
-zio_flush(zio_t *zio, vdev_t *vd)
+zio_flush(zio_t *pio, vdev_t *vd)
 {
-       zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
-           NULL, NULL,
-           ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
+       if (vd->vdev_nowritecache)
+               return;
+       if (vd->vdev_children == 0) {
+               zio_nowait(zio_ioctl(pio, vd->vdev_spa, vd,
+                   DKIOCFLUSHWRITECACHE, NULL, NULL, ZIO_FLAG_CANFAIL |
+                   ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
+       } else {
+               for (uint64_t c = 0; c < vd->vdev_children; c++)
+                       zio_flush(pio, vd->vdev_child[c]);
+       }
 }
 
 void
@@ -1430,6 +1661,19 @@ zio_shrink(zio_t *zio, uint64_t size)
        }
 }
 
+/*
+ * Round provided allocation size up to a value that can be allocated
+ * by at least some vdev(s) in the pool with minimum or no additional
+ * padding and without extra space usage on others
+ */
+static uint64_t
+zio_roundup_alloc_size(spa_t *spa, uint64_t size)
+{
+       if (size > spa->spa_min_alloc)
+               return (roundup(size, spa->spa_gcd_alloc));
+       return (spa->spa_min_alloc);
+}
+
 /*
  * ==========================================================================
  * Prepare to read and write logical blocks
@@ -1468,15 +1712,8 @@ zio_read_bp_init(zio_t *zio)
                abd_return_buf_copy(zio->io_abd, data, psize);
        } else {
                ASSERT(!BP_IS_EMBEDDED(bp));
-               ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
        }
 
-       if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
-               zio->io_flags |= ZIO_FLAG_DONT_CACHE;
-
-       if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
-               zio->io_flags |= ZIO_FLAG_DONT_CACHE;
-
        if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
                zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
 
@@ -1495,12 +1732,16 @@ zio_write_bp_init(zio_t *zio)
                blkptr_t *bp = zio->io_bp;
                zio_prop_t *zp = &zio->io_prop;
 
-               ASSERT(bp->blk_birth != zio->io_txg);
-               ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
+               ASSERT(BP_GET_LOGICAL_BIRTH(bp) != zio->io_txg);
 
                *bp = *zio->io_bp_override;
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+               if (zp->zp_brtwrite)
+                       return (zio);
+
+               ASSERT(!BP_GET_DEDUP(zio->io_bp_override));
+
                if (BP_IS_EMBEDDED(bp))
                        return (zio);
 
@@ -1552,7 +1793,7 @@ zio_write_compress(zio_t *zio)
        blkptr_t *bp = zio->io_bp;
        uint64_t lsize = zio->io_lsize;
        uint64_t psize = zio->io_size;
-       int pass = 1;
+       uint32_t pass = 1;
 
        /*
         * If our children haven't all reached the ready stage,
@@ -1579,7 +1820,7 @@ zio_write_compress(zio_t *zio)
        ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
        ASSERT(zio->io_bp_override == NULL);
 
-       if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
+       if (!BP_IS_HOLE(bp) && BP_GET_LOGICAL_BIRTH(bp) == zio->io_txg) {
                /*
                 * We're rewriting an existing block, which means we're
                 * working on behalf of spa_sync().  For spa_sync() to
@@ -1599,18 +1840,23 @@ zio_write_compress(zio_t *zio)
                        compress = ZIO_COMPRESS_OFF;
 
                /* Make sure someone doesn't change their mind on overwrites */
-               ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
-                   spa_max_replication(spa)) == BP_GET_NDVAS(bp));
+               ASSERT(BP_IS_EMBEDDED(bp) || BP_IS_GANG(bp) ||
+                   MIN(zp->zp_copies, spa_max_replication(spa))
+                   == BP_GET_NDVAS(bp));
        }
 
        /* If it's a compressed write that is not raw, compress the buffer. */
        if (compress != ZIO_COMPRESS_OFF &&
            !(zio->io_flags & ZIO_FLAG_RAW_COMPRESS)) {
-               void *cbuf = zio_buf_alloc(lsize);
-               psize = zio_compress_data(compress, zio->io_abd, cbuf, lsize);
-               if (psize == 0 || psize == lsize) {
+               void *cbuf = NULL;
+               psize = zio_compress_data(compress, zio->io_abd, &cbuf, lsize,
+                   zp->zp_complevel);
+               if (psize == 0) {
                        compress = ZIO_COMPRESS_OFF;
-                       zio_buf_free(cbuf, lsize);
+               } else if (psize >= lsize) {
+                       compress = ZIO_COMPRESS_OFF;
+                       if (cbuf != NULL)
+                               zio_buf_free(cbuf, lsize);
                } else if (!zp->zp_dedup && !zp->zp_encrypt &&
                    psize <= BPE_PAYLOAD_SIZE &&
                    zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
@@ -1621,23 +1867,22 @@ zio_write_compress(zio_t *zio)
                        BP_SET_TYPE(bp, zio->io_prop.zp_type);
                        BP_SET_LEVEL(bp, zio->io_prop.zp_level);
                        zio_buf_free(cbuf, lsize);
-                       bp->blk_birth = zio->io_txg;
+                       BP_SET_LOGICAL_BIRTH(bp, zio->io_txg);
                        zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
                        ASSERT(spa_feature_is_active(spa,
                            SPA_FEATURE_EMBEDDED_DATA));
                        return (zio);
                } else {
                        /*
-                        * Round up compressed size up to the ashift
-                        * of the smallest-ashift device, and zero the tail.
-                        * This ensures that the compressed size of the BP
-                        * (and thus compressratio property) are correct,
+                        * Round compressed size up to the minimum allocation
+                        * size of the smallest-ashift device, and zero the
+                        * tail. This ensures that the compressed size of the
+                        * BP (and thus compressratio property) are correct,
                         * in that we charge for the padding used to fill out
                         * the last sector.
                         */
-                       ASSERT3U(spa->spa_min_ashift, >=, SPA_MINBLOCKSHIFT);
-                       size_t rounded = (size_t)P2ROUNDUP(psize,
-                           1ULL << spa->spa_min_ashift);
+                       size_t rounded = (size_t)zio_roundup_alloc_size(spa,
+                           psize);
                        if (rounded >= lsize) {
                                compress = ZIO_COMPRESS_OFF;
                                zio_buf_free(cbuf, lsize);
@@ -1670,9 +1915,27 @@ zio_write_compress(zio_t *zio)
                 * to a hole.
                 */
                psize = zio_compress_data(ZIO_COMPRESS_EMPTY,
-                   zio->io_abd, NULL, lsize);
-               if (psize == 0)
+                   zio->io_abd, NULL, lsize, zp->zp_complevel);
+               if (psize == 0 || psize >= lsize)
                        compress = ZIO_COMPRESS_OFF;
+       } else if (zio->io_flags & ZIO_FLAG_RAW_COMPRESS &&
+           !(zio->io_flags & ZIO_FLAG_RAW_ENCRYPT)) {
+               /*
+                * If we are raw receiving an encrypted dataset we should not
+                * take this codepath because it will change the on-disk block
+                * and decryption will fail.
+                */
+               size_t rounded = MIN((size_t)zio_roundup_alloc_size(spa, psize),
+                   lsize);
+
+               if (rounded != psize) {
+                       abd_t *cdata = abd_alloc_linear(rounded, B_TRUE);
+                       abd_zero_off(cdata, psize, rounded - psize);
+                       abd_copy_off(cdata, zio->io_abd, 0, 0, psize);
+                       psize = rounded;
+                       zio_push_transform(zio, cdata,
+                           psize, rounded, NULL);
+               }
        } else {
                ASSERT3U(psize, !=, 0);
        }
@@ -1685,7 +1948,7 @@ zio_write_compress(zio_t *zio)
         * spa_sync() to allocate new blocks, but force rewrites after that.
         * There should only be a handful of blocks after pass 1 in any case.
         */
-       if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
+       if (!BP_IS_HOLE(bp) && BP_GET_LOGICAL_BIRTH(bp) == zio->io_txg &&
            BP_GET_PSIZE(bp) == psize &&
            pass >= zfs_sync_pass_rewrite) {
                VERIFY3U(psize, !=, 0);
@@ -1699,7 +1962,7 @@ zio_write_compress(zio_t *zio)
        }
 
        if (psize == 0) {
-               if (zio->io_bp_orig.blk_birth != 0 &&
+               if (BP_GET_LOGICAL_BIRTH(&zio->io_bp_orig) != 0 &&
                    spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
                        BP_SET_LSIZE(bp, lsize);
                        BP_SET_TYPE(bp, zp->zp_type);
@@ -1792,21 +2055,22 @@ zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
         * to dispatch the zio to another taskq at the same time.
         */
        ASSERT(taskq_empty_ent(&zio->io_tqent));
-       spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
-           flags, &zio->io_tqent);
+       spa_taskq_dispatch_ent(spa, t, q, zio_execute, zio, flags,
+           &zio->io_tqent, zio);
 }
 
 static boolean_t
 zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
 {
-       kthread_t *executor = zio->io_executor;
        spa_t *spa = zio->io_spa;
 
+       taskq_t *tq = taskq_of_curthread();
+
        for (zio_type_t t = 0; t < ZIO_TYPES; t++) {
                spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
                uint_t i;
                for (i = 0; i < tqs->stqs_count; i++) {
-                       if (taskq_member(tqs->stqs_taskq[i], executor))
+                       if (tqs->stqs_taskq[i] == tq)
                                return (B_TRUE);
                }
        }
@@ -1817,17 +2081,88 @@ zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
 static zio_t *
 zio_issue_async(zio_t *zio)
 {
+       ASSERT((zio->io_type != ZIO_TYPE_WRITE) || ZIO_HAS_ALLOCATOR(zio));
        zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
-
        return (NULL);
 }
 
 void
-zio_interrupt(zio_t *zio)
+zio_interrupt(void *zio)
 {
        zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
 }
 
+void
+zio_delay_interrupt(zio_t *zio)
+{
+       /*
+        * The timeout_generic() function isn't defined in userspace, so
+        * rather than trying to implement the function, the zio delay
+        * functionality has been disabled for userspace builds.
+        */
+
+#ifdef _KERNEL
+       /*
+        * If io_target_timestamp is zero, then no delay has been registered
+        * for this IO, thus jump to the end of this function and "skip" the
+        * delay; issuing it directly to the zio layer.
+        */
+       if (zio->io_target_timestamp != 0) {
+               hrtime_t now = gethrtime();
+
+               if (now >= zio->io_target_timestamp) {
+                       /*
+                        * This IO has already taken longer than the target
+                        * delay to complete, so we don't want to delay it
+                        * any longer; we "miss" the delay and issue it
+                        * directly to the zio layer. This is likely due to
+                        * the target latency being set to a value less than
+                        * the underlying hardware can satisfy (e.g. delay
+                        * set to 1ms, but the disks take 10ms to complete an
+                        * IO request).
+                        */
+
+                       DTRACE_PROBE2(zio__delay__miss, zio_t *, zio,
+                           hrtime_t, now);
+
+                       zio_interrupt(zio);
+               } else {
+                       taskqid_t tid;
+                       hrtime_t diff = zio->io_target_timestamp - now;
+                       clock_t expire_at_tick = ddi_get_lbolt() +
+                           NSEC_TO_TICK(diff);
+
+                       DTRACE_PROBE3(zio__delay__hit, zio_t *, zio,
+                           hrtime_t, now, hrtime_t, diff);
+
+                       if (NSEC_TO_TICK(diff) == 0) {
+                               /* Our delay is less than a jiffy - just spin */
+                               zfs_sleep_until(zio->io_target_timestamp);
+                               zio_interrupt(zio);
+                       } else {
+                               /*
+                                * Use taskq_dispatch_delay() in the place of
+                                * OpenZFS's timeout_generic().
+                                */
+                               tid = taskq_dispatch_delay(system_taskq,
+                                   zio_interrupt, zio, TQ_NOSLEEP,
+                                   expire_at_tick);
+                               if (tid == TASKQID_INVALID) {
+                                       /*
+                                        * Couldn't allocate a task.  Just
+                                        * finish the zio without a delay.
+                                        */
+                                       zio_interrupt(zio);
+                               }
+                       }
+               }
+               return;
+       }
+#endif
+       DTRACE_PROBE1(zio__delay__skip, zio_t *, zio);
+       zio_interrupt(zio);
+}
+
 static void
 zio_deadman_impl(zio_t *pio, int ziodepth)
 {
@@ -1843,20 +2178,26 @@ zio_deadman_impl(zio_t *pio, int ziodepth)
 
                zfs_dbgmsg("slow zio[%d]: zio=%px timestamp=%llu "
                    "delta=%llu queued=%llu io=%llu "
-                   "path=%s last=%llu "
-                   "type=%d priority=%d flags=0x%x "
-                   "stage=0x%x pipeline=0x%x pipeline-trace=0x%x "
-                   "objset=%llu object=%llu level=%llu blkid=%llu "
-                   "offset=%llu size=%llu error=%d",
+                   "path=%s "
+                   "last=%llu type=%d "
+                   "priority=%d flags=0x%llx stage=0x%x "
+                   "pipeline=0x%x pipeline-trace=0x%x "
+                   "objset=%llu object=%llu "
+                   "level=%llu blkid=%llu "
+                   "offset=%llu size=%llu "
+                   "error=%d",
                    ziodepth, pio, pio->io_timestamp,
-                   delta, pio->io_delta, pio->io_delay,
-                   vd ? vd->vdev_path : "NULL", vq ? vq->vq_io_complete_ts : 0,
-                   pio->io_type, pio->io_priority, pio->io_flags,
+                   (u_longlong_t)delta, pio->io_delta, pio->io_delay,
+                   vd ? vd->vdev_path : "NULL",
+                   vq ? vq->vq_io_complete_ts : 0, pio->io_type,
+                   pio->io_priority, (u_longlong_t)pio->io_flags,
                    pio->io_stage, pio->io_pipeline, pio->io_pipeline_trace,
-                   zb->zb_objset, zb->zb_object, zb->zb_level, zb->zb_blkid,
-                   pio->io_offset, pio->io_size, pio->io_error);
-               zfs_ereport_post(FM_EREPORT_ZFS_DEADMAN,
-                   pio->io_spa, vd, zb, pio, 0, 0);
+                   (u_longlong_t)zb->zb_objset, (u_longlong_t)zb->zb_object,
+                   (u_longlong_t)zb->zb_level, (u_longlong_t)zb->zb_blkid,
+                   (u_longlong_t)pio->io_offset, (u_longlong_t)pio->io_size,
+                   pio->io_error);
+               (void) zfs_ereport_post(FM_EREPORT_ZFS_DEADMAN,
+                   pio->io_spa, vd, zb, pio, 0);
 
                if (failmode == ZIO_FAILURE_MODE_CONTINUE &&
                    taskq_empty_ent(&pio->io_tqent)) {
@@ -1877,7 +2218,7 @@ zio_deadman_impl(zio_t *pio, int ziodepth)
  * using the zfs_dbgmsg() interface then post deadman event for the ZED.
  */
 void
-zio_deadman(zio_t *pio, char *tag)
+zio_deadman(zio_t *pio, const char *tag)
 {
        spa_t *spa = pio->io_spa;
        char *name = spa_name(spa);
@@ -1926,7 +2267,7 @@ static zio_pipe_stage_t *zio_pipeline[];
  * it is externally visible.
  */
 void
-zio_execute(zio_t *zio)
+zio_execute(void *zio)
 {
        fstrans_cookie_t cookie;
 
@@ -1940,7 +2281,7 @@ zio_execute(zio_t *zio)
  * enough to allow zio_execute() to be called recursively.  A minimum
  * stack size of 16K is required to avoid needing to re-dispatch the zio.
  */
-boolean_t
+static boolean_t
 zio_execute_stack_check(zio_t *zio)
 {
 #if !defined(HAVE_LARGE_STACKS)
@@ -1955,6 +2296,8 @@ zio_execute_stack_check(zio_t *zio)
            !zio_taskq_member(zio, ZIO_TASKQ_ISSUE) &&
            !zio_taskq_member(zio, ZIO_TASKQ_ISSUE_HIGH))
                return (B_TRUE);
+#else
+       (void) zio;
 #endif /* HAVE_LARGE_STACKS */
 
        return (B_FALSE);
@@ -2034,6 +2377,15 @@ __zio_execute(zio_t *zio)
 int
 zio_wait(zio_t *zio)
 {
+       /*
+        * Some routines, like zio_free_sync(), may return a NULL zio
+        * to avoid the performance overhead of creating and then destroying
+        * an unneeded zio.  For the callers' simplicity, we accept a NULL
+        * zio and ignore it.
+        */
+       if (zio == NULL)
+               return (0);
+
        long timeout = MSEC_TO_TICK(zfs_deadman_ziotime_ms);
        int error;
 
@@ -2044,6 +2396,9 @@ zio_wait(zio_t *zio)
        ASSERT0(zio->io_queued_timestamp);
        zio->io_queued_timestamp = gethrtime();
 
+       if (zio->io_type == ZIO_TYPE_WRITE) {
+               spa_select_allocator(zio);
+       }
        __zio_execute(zio);
 
        mutex_enter(&zio->io_lock);
@@ -2071,10 +2426,16 @@ zio_wait(zio_t *zio)
 void
 zio_nowait(zio_t *zio)
 {
+       /*
+        * See comment in zio_wait().
+        */
+       if (zio == NULL)
+               return;
+
        ASSERT3P(zio->io_executor, ==, NULL);
 
        if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
-           zio_unique_parent(zio) == NULL) {
+           list_is_empty(&zio->io_parent_list)) {
                zio_t *pio;
 
                /*
@@ -2083,15 +2444,16 @@ zio_nowait(zio_t *zio)
                 * will ensure they complete prior to unloading the pool.
                 */
                spa_t *spa = zio->io_spa;
-               kpreempt_disable();
-               pio = spa->spa_async_zio_root[CPU_SEQID];
-               kpreempt_enable();
+               pio = spa->spa_async_zio_root[CPU_SEQID_UNSTABLE];
 
                zio_add_child(pio, zio);
        }
 
        ASSERT0(zio->io_queued_timestamp);
        zio->io_queued_timestamp = gethrtime();
+       if (zio->io_type == ZIO_TYPE_WRITE) {
+               spa_select_allocator(zio);
+       }
        __zio_execute(zio);
 }
 
@@ -2102,15 +2464,17 @@ zio_nowait(zio_t *zio)
  */
 
 static void
-zio_reexecute(zio_t *pio)
+zio_reexecute(void *arg)
 {
-       zio_t *cio, *cio_next;
+       zio_t *pio = arg;
+       zio_t *cio, *cio_next, *gio;
 
        ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
        ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
        ASSERT(pio->io_gang_leader == NULL);
        ASSERT(pio->io_gang_tree == NULL);
 
+       mutex_enter(&pio->io_lock);
        pio->io_flags = pio->io_orig_flags;
        pio->io_stage = pio->io_orig_stage;
        pio->io_pipeline = pio->io_orig_pipeline;
@@ -2118,8 +2482,16 @@ zio_reexecute(zio_t *pio)
        pio->io_flags |= ZIO_FLAG_REEXECUTED;
        pio->io_pipeline_trace = 0;
        pio->io_error = 0;
-       for (int w = 0; w < ZIO_WAIT_TYPES; w++)
-               pio->io_state[w] = 0;
+       pio->io_state[ZIO_WAIT_READY] = (pio->io_stage >= ZIO_STAGE_READY) ||
+           (pio->io_pipeline & ZIO_STAGE_READY) == 0;
+       pio->io_state[ZIO_WAIT_DONE] = (pio->io_stage >= ZIO_STAGE_DONE);
+       zio_link_t *zl = NULL;
+       while ((gio = zio_walk_parents(pio, &zl)) != NULL) {
+               for (int w = 0; w < ZIO_WAIT_TYPES; w++) {
+                       gio->io_children[pio->io_child_type][w] +=
+                           !pio->io_state[w];
+               }
+       }
        for (int c = 0; c < ZIO_CHILD_TYPES; c++)
                pio->io_child_error[c] = 0;
 
@@ -2133,12 +2505,9 @@ zio_reexecute(zio_t *pio)
         * the remainder of pio's io_child_list, from 'cio_next' onward,
         * cannot be affected by any side effects of reexecuting 'cio'.
         */
-       zio_link_t *zl = NULL;
-       mutex_enter(&pio->io_lock);
+       zl = NULL;
        for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
                cio_next = zio_walk_children(pio, &zl);
-               for (int w = 0; w < ZIO_WAIT_TYPES; w++)
-                       pio->io_children[cio->io_child_type][w]++;
                mutex_exit(&pio->io_lock);
                zio_reexecute(cio);
                mutex_enter(&pio->io_lock);
@@ -2167,8 +2536,8 @@ zio_suspend(spa_t *spa, zio_t *zio, zio_suspend_reason_t reason)
        cmn_err(CE_WARN, "Pool '%s' has encountered an uncorrectable I/O "
            "failure and has been suspended.\n", spa_name(spa));
 
-       zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL,
-           NULL, NULL, 0, 0);
+       (void) zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL,
+           NULL, NULL, 0);
 
        mutex_enter(&spa->spa_suspend_lock);
 
@@ -2291,7 +2660,7 @@ zio_resume_wait(spa_t *spa)
 static void
 zio_gang_issue_func_done(zio_t *zio)
 {
-       abd_put(zio->io_abd);
+       abd_free(zio->io_abd);
 }
 
 static zio_t *
@@ -2335,7 +2704,7 @@ zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
                        zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
                            buf, BP_GET_PSIZE(bp));
 
-                       abd_put(buf);
+                       abd_free(buf);
                }
                /*
                 * If we are here to damage data for testing purposes,
@@ -2353,20 +2722,26 @@ zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
        return (zio);
 }
 
-/* ARGSUSED */
 static zio_t *
 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
     uint64_t offset)
 {
-       return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
-           ZIO_GANG_CHILD_FLAGS(pio)));
+       (void) gn, (void) data, (void) offset;
+
+       zio_t *zio = zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
+           ZIO_GANG_CHILD_FLAGS(pio));
+       if (zio == NULL) {
+               zio = zio_null(pio, pio->io_spa,
+                   NULL, NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio));
+       }
+       return (zio);
 }
 
-/* ARGSUSED */
 static zio_t *
 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
     uint64_t offset)
 {
+       (void) gn, (void) data, (void) offset;
        return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
            NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
 }
@@ -2445,7 +2820,7 @@ zio_gang_tree_assemble_done(zio_t *zio)
        blkptr_t *bp = zio->io_bp;
 
        ASSERT(gio == zio_unique_parent(zio));
-       ASSERT(zio->io_child_count == 0);
+       ASSERT(list_is_empty(&zio->io_child_list));
 
        if (zio->io_error)
                return;
@@ -2458,7 +2833,7 @@ zio_gang_tree_assemble_done(zio_t *zio)
        ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
        ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
 
-       abd_put(zio->io_abd);
+       abd_free(zio->io_abd);
 
        for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
                blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
@@ -2543,6 +2918,13 @@ zio_gang_issue(zio_t *zio)
        return (zio);
 }
 
+static void
+zio_gang_inherit_allocator(zio_t *pio, zio_t *cio)
+{
+       cio->io_allocator = pio->io_allocator;
+       cio->io_wr_iss_tq = pio->io_wr_iss_tq;
+}
+
 static void
 zio_write_gang_member_ready(zio_t *zio)
 {
@@ -2561,7 +2943,7 @@ zio_write_gang_member_ready(zio_t *zio)
        ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
        ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
        ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
-       ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
+       VERIFY3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
 
        mutex_enter(&pio->io_lock);
        for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
@@ -2582,14 +2964,13 @@ zio_write_gang_done(zio_t *zio)
         * check for it here as it is cleared in zio_ready.
         */
        if (zio->io_abd != NULL)
-               abd_put(zio->io_abd);
+               abd_free(zio->io_abd);
 }
 
 static zio_t *
-zio_write_gang_block(zio_t *pio)
+zio_write_gang_block(zio_t *pio, metaslab_class_t *mc)
 {
        spa_t *spa = pio->io_spa;
-       metaslab_class_t *mc = spa_normal_class(spa);
        blkptr_t *bp = pio->io_bp;
        zio_t *gio = pio->io_gang_leader;
        zio_t *zio;
@@ -2600,27 +2981,30 @@ zio_write_gang_block(zio_t *pio)
        uint64_t resid = pio->io_size;
        uint64_t lsize;
        int copies = gio->io_prop.zp_copies;
-       int gbh_copies;
        zio_prop_t zp;
        int error;
        boolean_t has_data = !(pio->io_flags & ZIO_FLAG_NODATA);
 
        /*
-        * encrypted blocks need DVA[2] free so encrypted gang headers can't
-        * have a third copy.
+        * If one copy was requested, store 2 copies of the GBH, so that we
+        * can still traverse all the data (e.g. to free or scrub) even if a
+        * block is damaged.  Note that we can't store 3 copies of the GBH in
+        * all cases, e.g. with encryption, which uses DVA[2] for the IV+salt.
         */
-       gbh_copies = MIN(copies + 1, spa_max_replication(spa));
-       if (gio->io_prop.zp_encrypt && gbh_copies >= SPA_DVAS_PER_BP)
-               gbh_copies = SPA_DVAS_PER_BP - 1;
+       int gbh_copies = copies;
+       if (gbh_copies == 1) {
+               gbh_copies = MIN(2, spa_max_replication(spa));
+       }
 
+       ASSERT(ZIO_HAS_ALLOCATOR(pio));
        int flags = METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER;
        if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
                ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
                ASSERT(has_data);
 
                flags |= METASLAB_ASYNC_ALLOC;
-               VERIFY(zfs_refcount_held(&mc->mc_alloc_slots[pio->io_allocator],
-                   pio));
+               VERIFY(zfs_refcount_held(&mc->mc_allocator[pio->io_allocator].
+                   mca_alloc_slots, pio));
 
                /*
                 * The logical zio has already placed a reservation for
@@ -2666,7 +3050,7 @@ zio_write_gang_block(zio_t *pio)
 
        gn = zio_gang_node_alloc(gnpp);
        gbh = gn->gn_gbh;
-       bzero(gbh, SPA_GANGBLOCKSIZE);
+       memset(gbh, 0, SPA_GANGBLOCKSIZE);
        gbh_abd = abd_get_from_buf(gbh, SPA_GANGBLOCKSIZE);
 
        /*
@@ -2676,6 +3060,8 @@ zio_write_gang_block(zio_t *pio)
            zio_write_gang_done, NULL, pio->io_priority,
            ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
 
+       zio_gang_inherit_allocator(pio, zio);
+
        /*
         * Create and nowait the gang children.
         */
@@ -2686,6 +3072,7 @@ zio_write_gang_block(zio_t *pio)
 
                zp.zp_checksum = gio->io_prop.zp_checksum;
                zp.zp_compress = ZIO_COMPRESS_OFF;
+               zp.zp_complevel = gio->io_prop.zp_complevel;
                zp.zp_type = DMU_OT_NONE;
                zp.zp_level = 0;
                zp.zp_copies = gio->io_prop.zp_copies;
@@ -2694,17 +3081,19 @@ zio_write_gang_block(zio_t *pio)
                zp.zp_nopwrite = B_FALSE;
                zp.zp_encrypt = gio->io_prop.zp_encrypt;
                zp.zp_byteorder = gio->io_prop.zp_byteorder;
-               bzero(zp.zp_salt, ZIO_DATA_SALT_LEN);
-               bzero(zp.zp_iv, ZIO_DATA_IV_LEN);
-               bzero(zp.zp_mac, ZIO_DATA_MAC_LEN);
+               memset(zp.zp_salt, 0, ZIO_DATA_SALT_LEN);
+               memset(zp.zp_iv, 0, ZIO_DATA_IV_LEN);
+               memset(zp.zp_mac, 0, ZIO_DATA_MAC_LEN);
 
                zio_t *cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
                    has_data ? abd_get_offset(pio->io_abd, pio->io_size -
                    resid) : NULL, lsize, lsize, &zp,
-                   zio_write_gang_member_ready, NULL, NULL,
+                   zio_write_gang_member_ready, NULL,
                    zio_write_gang_done, &gn->gn_child[g], pio->io_priority,
                    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
 
+               zio_gang_inherit_allocator(zio, cio);
+
                if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
                        ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
                        ASSERT(has_data);
@@ -2725,11 +3114,6 @@ zio_write_gang_block(zio_t *pio)
         */
        pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
-       /*
-        * We didn't allocate this bp, so make sure it doesn't get unmarked.
-        */
-       pio->io_flags &= ~ZIO_FLAG_FASTWRITE;
-
        zio_nowait(zio);
 
        return (pio);
@@ -2760,6 +3144,7 @@ zio_nop_write(zio_t *zio)
        blkptr_t *bp_orig = &zio->io_bp_orig;
        zio_prop_t *zp = &zio->io_prop;
 
+       ASSERT(BP_IS_HOLE(bp));
        ASSERT(BP_GET_LEVEL(bp) == 0);
        ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
        ASSERT(zp->zp_nopwrite);
@@ -2793,8 +3178,7 @@ zio_nop_write(zio_t *zio)
                ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
                ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
                ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
-               ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
-                   sizeof (uint64_t)) == 0);
+               ASSERT3U(bp->blk_prop, ==, bp_orig->blk_prop);
 
                /*
                 * If we're overwriting a block that is currently on an
@@ -2802,11 +3186,13 @@ zio_nop_write(zio_t *zio)
                 * allow a new block to be allocated on a concrete vdev.
                 */
                spa_config_enter(zio->io_spa, SCL_VDEV, FTAG, RW_READER);
-               vdev_t *tvd = vdev_lookup_top(zio->io_spa,
-                   DVA_GET_VDEV(&bp->blk_dva[0]));
-               if (tvd->vdev_ops == &vdev_indirect_ops) {
-                       spa_config_exit(zio->io_spa, SCL_VDEV, FTAG);
-                       return (zio);
+               for (int d = 0; d < BP_GET_NDVAS(bp_orig); d++) {
+                       vdev_t *tvd = vdev_lookup_top(zio->io_spa,
+                           DVA_GET_VDEV(&bp_orig->blk_dva[d]));
+                       if (tvd->vdev_ops == &vdev_indirect_ops) {
+                               spa_config_exit(zio->io_spa, SCL_VDEV, FTAG);
+                               return (zio);
+                       }
                }
                spa_config_exit(zio->io_spa, SCL_VDEV, FTAG);
 
@@ -2818,6 +3204,35 @@ zio_nop_write(zio_t *zio)
        return (zio);
 }
 
+/*
+ * ==========================================================================
+ * Block Reference Table
+ * ==========================================================================
+ */
+static zio_t *
+zio_brt_free(zio_t *zio)
+{
+       blkptr_t *bp;
+
+       bp = zio->io_bp;
+
+       if (BP_GET_LEVEL(bp) > 0 ||
+           BP_IS_METADATA(bp) ||
+           !brt_maybe_exists(zio->io_spa, bp)) {
+               return (zio);
+       }
+
+       if (!brt_entry_decref(zio->io_spa, bp)) {
+               /*
+                * This isn't the last reference, so we cannot free
+                * the data yet.
+                */
+               zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+       }
+
+       return (zio);
+}
+
 /*
  * ==========================================================================
  * Dedup
@@ -3125,14 +3540,14 @@ zio_ddt_write(zio_t *zio)
                else
                        ddt_phys_addref(ddp);
        } else if (zio->io_bp_override) {
-               ASSERT(bp->blk_birth == txg);
+               ASSERT(BP_GET_LOGICAL_BIRTH(bp) == txg);
                ASSERT(BP_EQUAL(bp, zio->io_bp_override));
                ddt_phys_fill(ddp, bp);
                ddt_phys_addref(ddp);
        } else {
                cio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
                    zio->io_orig_size, zio->io_orig_size, zp,
-                   zio_ddt_child_write_ready, NULL, NULL,
+                   zio_ddt_child_write_ready, NULL,
                    zio_ddt_child_write_done, dde, zio->io_priority,
                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
 
@@ -3142,13 +3557,12 @@ zio_ddt_write(zio_t *zio)
 
        ddt_exit(ddt);
 
-       if (cio)
-               zio_nowait(cio);
+       zio_nowait(cio);
 
        return (zio);
 }
 
-ddt_entry_t *freedde; /* for debugging */
+static ddt_entry_t *freedde; /* for debugging */
 
 static zio_t *
 zio_ddt_free(zio_t *zio)
@@ -3185,13 +3599,14 @@ zio_io_to_allocate(spa_t *spa, int allocator)
 {
        zio_t *zio;
 
-       ASSERT(MUTEX_HELD(&spa->spa_alloc_locks[allocator]));
+       ASSERT(MUTEX_HELD(&spa->spa_allocs[allocator].spaa_lock));
 
-       zio = avl_first(&spa->spa_alloc_trees[allocator]);
+       zio = avl_first(&spa->spa_allocs[allocator].spaa_tree);
        if (zio == NULL)
                return (NULL);
 
        ASSERT(IO_IS_ALLOCATING(zio));
+       ASSERT(ZIO_HAS_ALLOCATOR(zio));
 
        /*
         * Try to place a reservation for this zio. If we're unable to
@@ -3199,11 +3614,11 @@ zio_io_to_allocate(spa_t *spa, int allocator)
         */
        ASSERT3U(zio->io_allocator, ==, allocator);
        if (!metaslab_class_throttle_reserve(zio->io_metaslab_class,
-           zio->io_prop.zp_copies, zio->io_allocator, zio, 0)) {
+           zio->io_prop.zp_copies, allocator, zio, 0)) {
                return (NULL);
        }
 
-       avl_remove(&spa->spa_alloc_trees[allocator], zio);
+       avl_remove(&spa->spa_allocs[allocator].spaa_tree, zio);
        ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
 
        return (zio);
@@ -3227,27 +3642,18 @@ zio_dva_throttle(zio_t *zio)
                return (zio);
        }
 
+       ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+       ASSERT(ZIO_HAS_ALLOCATOR(zio));
        ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
-
        ASSERT3U(zio->io_queued_timestamp, >, 0);
        ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
 
-       zbookmark_phys_t *bm = &zio->io_bookmark;
-       /*
-        * We want to try to use as many allocators as possible to help improve
-        * performance, but we also want logically adjacent IOs to be physically
-        * adjacent to improve sequential read performance. We chunk each object
-        * into 2^20 block regions, and then hash based on the objset, object,
-        * level, and region to accomplish both of these goals.
-        */
-       zio->io_allocator = cityhash4(bm->zb_objset, bm->zb_object,
-           bm->zb_level, bm->zb_blkid >> 20) % spa->spa_alloc_count;
-       mutex_enter(&spa->spa_alloc_locks[zio->io_allocator]);
-       ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+       int allocator = zio->io_allocator;
        zio->io_metaslab_class = mc;
-       avl_add(&spa->spa_alloc_trees[zio->io_allocator], zio);
-       nio = zio_io_to_allocate(spa, zio->io_allocator);
-       mutex_exit(&spa->spa_alloc_locks[zio->io_allocator]);
+       mutex_enter(&spa->spa_allocs[allocator].spaa_lock);
+       avl_add(&spa->spa_allocs[allocator].spaa_tree, zio);
+       nio = zio_io_to_allocate(spa, allocator);
+       mutex_exit(&spa->spa_allocs[allocator].spaa_lock);
        return (nio);
 }
 
@@ -3256,9 +3662,9 @@ zio_allocate_dispatch(spa_t *spa, int allocator)
 {
        zio_t *zio;
 
-       mutex_enter(&spa->spa_alloc_locks[allocator]);
+       mutex_enter(&spa->spa_allocs[allocator].spaa_lock);
        zio = zio_io_to_allocate(spa, allocator);
-       mutex_exit(&spa->spa_alloc_locks[allocator]);
+       mutex_exit(&spa->spa_allocs[allocator].spaa_lock);
        if (zio == NULL)
                return;
 
@@ -3287,7 +3693,6 @@ zio_dva_allocate(zio_t *zio)
        ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
        ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
 
-       flags |= (zio->io_flags & ZIO_FLAG_FASTWRITE) ? METASLAB_FASTWRITE : 0;
        if (zio->io_flags & ZIO_FLAG_NODATA)
                flags |= METASLAB_DONT_THROTTLE;
        if (zio->io_flags & ZIO_FLAG_GANG_CHILD)
@@ -3306,6 +3711,18 @@ zio_dva_allocate(zio_t *zio)
                zio->io_metaslab_class = mc;
        }
 
+       /*
+        * Try allocating the block in the usual metaslab class.
+        * If that's full, allocate it in the normal class.
+        * If that's full, allocate as a gang block,
+        * and if all are full, the allocation fails (which shouldn't happen).
+        *
+        * Note that we do not fall back on embedded slog (ZIL) space, to
+        * preserve unfragmented slog space, which is critical for decent
+        * sync write performance.  If a log allocation fails, we will fall
+        * back to spa_sync() which is abysmal for performance.
+        */
+       ASSERT(ZIO_HAS_ALLOCATOR(zio));
        error = metaslab_alloc(spa, mc, zio->io_size, bp,
            zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
            &zio->io_alloc_list, zio, zio->io_allocator);
@@ -3325,26 +3742,41 @@ zio_dva_allocate(zio_t *zio)
                            zio->io_prop.zp_copies, zio->io_allocator, zio);
                        zio->io_flags &= ~ZIO_FLAG_IO_ALLOCATING;
 
-                       mc = spa_normal_class(spa);
-                       VERIFY(metaslab_class_throttle_reserve(mc,
+                       VERIFY(metaslab_class_throttle_reserve(
+                           spa_normal_class(spa),
                            zio->io_prop.zp_copies, zio->io_allocator, zio,
                            flags | METASLAB_MUST_RESERVE));
-               } else {
-                       mc = spa_normal_class(spa);
                }
-               zio->io_metaslab_class = mc;
+               zio->io_metaslab_class = mc = spa_normal_class(spa);
+               if (zfs_flags & ZFS_DEBUG_METASLAB_ALLOC) {
+                       zfs_dbgmsg("%s: metaslab allocation failure, "
+                           "trying normal class: zio %px, size %llu, error %d",
+                           spa_name(spa), zio, (u_longlong_t)zio->io_size,
+                           error);
+               }
 
                error = metaslab_alloc(spa, mc, zio->io_size, bp,
                    zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
                    &zio->io_alloc_list, zio, zio->io_allocator);
        }
 
+       if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE) {
+               if (zfs_flags & ZFS_DEBUG_METASLAB_ALLOC) {
+                       zfs_dbgmsg("%s: metaslab allocation failure, "
+                           "trying ganging: zio %px, size %llu, error %d",
+                           spa_name(spa), zio, (u_longlong_t)zio->io_size,
+                           error);
+               }
+               return (zio_write_gang_block(zio, mc));
+       }
        if (error != 0) {
-               zfs_dbgmsg("%s: metaslab allocation failure: zio %px, "
-                   "size %llu, error %d", spa_name(spa), zio, zio->io_size,
-                   error);
-               if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
-                       return (zio_write_gang_block(zio));
+               if (error != ENOSPC ||
+                   (zfs_flags & ZFS_DEBUG_METASLAB_ALLOC)) {
+                       zfs_dbgmsg("%s: metaslab allocation failure: zio %px, "
+                           "size %llu, error %d",
+                           spa_name(spa), zio, (u_longlong_t)zio->io_size,
+                           error);
+               }
                zio->io_error = error;
        }
 
@@ -3379,11 +3811,13 @@ zio_dva_claim(zio_t *zio)
 static void
 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
 {
-       ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
+       ASSERT(BP_GET_LOGICAL_BIRTH(bp) == zio->io_txg || BP_IS_HOLE(bp));
        ASSERT(zio->io_bp_override == NULL);
 
-       if (!BP_IS_HOLE(bp))
-               metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
+       if (!BP_IS_HOLE(bp)) {
+               metaslab_free(zio->io_spa, bp, BP_GET_LOGICAL_BIRTH(bp),
+                   B_TRUE);
+       }
 
        if (gn != NULL) {
                for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
@@ -3421,19 +3855,21 @@ zio_alloc_zil(spa_t *spa, objset_t *os, uint64_t txg, blkptr_t *new_bp,
         * of, so we just hash the objset ID to pick the allocator to get
         * some parallelism.
         */
+       int flags = METASLAB_ZIL;
+       int allocator = (uint_t)cityhash4(0, 0, 0,
+           os->os_dsl_dataset->ds_object) % spa->spa_alloc_count;
        error = metaslab_alloc(spa, spa_log_class(spa), size, new_bp, 1,
-           txg, NULL, METASLAB_FASTWRITE, &io_alloc_list, NULL,
-           cityhash4(0, 0, 0, os->os_dsl_dataset->ds_object) %
-           spa->spa_alloc_count);
-       if (error == 0) {
-               *slog = TRUE;
-       } else {
+           txg, NULL, flags, &io_alloc_list, NULL, allocator);
+       *slog = (error == 0);
+       if (error != 0) {
+               error = metaslab_alloc(spa, spa_embedded_log_class(spa), size,
+                   new_bp, 1, txg, NULL, flags,
+                   &io_alloc_list, NULL, allocator);
+       }
+       if (error != 0) {
                error = metaslab_alloc(spa, spa_normal_class(spa), size,
-                   new_bp, 1, txg, NULL, METASLAB_FASTWRITE,
-                   &io_alloc_list, NULL, cityhash4(0, 0, 0,
-                   os->os_dsl_dataset->ds_object) % spa->spa_alloc_count);
-               if (error == 0)
-                       *slog = FALSE;
+                   new_bp, 1, txg, NULL, flags,
+                   &io_alloc_list, NULL, allocator);
        }
        metaslab_trace_fini(&io_alloc_list);
 
@@ -3467,7 +3903,8 @@ zio_alloc_zil(spa_t *spa, objset_t *os, uint64_t txg, blkptr_t *new_bp,
                }
        } else {
                zfs_dbgmsg("%s: zil block allocation failure: "
-                   "size %llu, error %d", spa_name(spa), size, error);
+                   "size %llu, error %d", spa_name(spa), (u_longlong_t)size,
+                   error);
        }
 
        return (error);
@@ -3520,7 +3957,7 @@ zio_vdev_io_start(zio_t *zio)
                 * Note: the code can handle other kinds of writes,
                 * but we don't expect them.
                 */
-               if (zio->io_vd->vdev_removing) {
+               if (zio->io_vd->vdev_noalloc) {
                        ASSERT(zio->io_flags &
                            (ZIO_FLAG_PHYSICAL | ZIO_FLAG_SELF_HEAL |
                            ZIO_FLAG_RESILVER | ZIO_FLAG_INDUCE_DAMAGE));
@@ -3590,22 +4027,37 @@ zio_vdev_io_start(zio_t *zio)
         * However, indirect vdevs point off to other vdevs which may have
         * DTL's, so we never bypass them.  The child i/os on concrete vdevs
         * will be properly bypassed instead.
+        *
+        * Leaf DTL_PARTIAL can be empty when a legitimate write comes from
+        * a dRAID spare vdev. For example, when a dRAID spare is first
+        * used, its spare blocks need to be written to but the leaf vdev's
+        * of such blocks can have empty DTL_PARTIAL.
+        *
+        * There seemed no clean way to allow such writes while bypassing
+        * spurious ones. At this point, just avoid all bypassing for dRAID
+        * for correctness.
         */
        if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
            !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
            zio->io_txg != 0 && /* not a delegated i/o */
            vd->vdev_ops != &vdev_indirect_ops &&
+           vd->vdev_top->vdev_ops != &vdev_draid_ops &&
            !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
                ASSERT(zio->io_type == ZIO_TYPE_WRITE);
                zio_vdev_io_bypass(zio);
                return (zio);
        }
 
-       if (vd->vdev_ops->vdev_op_leaf && (zio->io_type == ZIO_TYPE_READ ||
-           zio->io_type == ZIO_TYPE_WRITE || zio->io_type == ZIO_TYPE_TRIM)) {
-
-               if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio))
-                       return (zio);
+       /*
+        * Select the next best leaf I/O to process.  Distributed spares are
+        * excluded since they dispatch the I/O directly to a leaf vdev after
+        * applying the dRAID mapping.
+        */
+       if (vd->vdev_ops->vdev_op_leaf &&
+           vd->vdev_ops != &vdev_draid_spare_ops &&
+           (zio->io_type == ZIO_TYPE_READ ||
+           zio->io_type == ZIO_TYPE_WRITE ||
+           zio->io_type == ZIO_TYPE_TRIM)) {
 
                if ((zio = vdev_queue_io(zio)) == NULL)
                        return (NULL);
@@ -3639,13 +4091,10 @@ zio_vdev_io_done(zio_t *zio)
        if (zio->io_delay)
                zio->io_delay = gethrtime() - zio->io_delay;
 
-       if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
-
+       if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
+           vd->vdev_ops != &vdev_draid_spare_ops) {
                vdev_queue_io_done(zio);
 
-               if (zio->io_type == ZIO_TYPE_WRITE)
-                       vdev_cache_write(zio);
-
                if (zio_injection_enabled && zio->io_error == 0)
                        zio->io_error = zio_handle_device_injections(vd, zio,
                            EIO, EILSEQ);
@@ -3664,7 +4113,7 @@ zio_vdev_io_done(zio_t *zio)
 
        ops->vdev_op_io_done(zio);
 
-       if (unexpected_error)
+       if (unexpected_error && vd->vdev_remove_wanted == B_FALSE)
                VERIFY(vdev_probe(vd, zio) == NULL);
 
        return (zio);
@@ -3711,9 +4160,8 @@ zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
        zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
 }
 
-/*ARGSUSED*/
 void
-zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
+zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr)
 {
        void *abd = abd_alloc_sametype(zio->io_abd, zio->io_size);
 
@@ -3756,8 +4204,7 @@ zio_vdev_io_assess(zio_t *zio)
                ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE)); /* not a leaf */
                ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));  /* not a leaf */
                zio->io_error = 0;
-               zio->io_flags |= ZIO_FLAG_IO_RETRY |
-                   ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
+               zio->io_flags |= ZIO_FLAG_IO_RETRY | ZIO_FLAG_DONT_AGGREGATE;
                zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
                zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
                    zio_requeue_io_start_cut_in_line);
@@ -3778,6 +4225,9 @@ zio_vdev_io_assess(zio_t *zio)
         */
        if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
            vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
+               vdev_dbgmsg(vd, "zio_vdev_io_assess(zio=%px) setting "
+                   "cant_write=TRUE due to write failure with ENXIO",
+                   zio);
                vd->vdev_cant_write = B_TRUE;
        }
 
@@ -3794,13 +4244,6 @@ zio_vdev_io_assess(zio_t *zio)
        if (zio->io_error)
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
-       if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
-           zio->io_physdone != NULL) {
-               ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
-               ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
-               zio->io_physdone(zio->io_logical);
-       }
-
        return (zio);
 }
 
@@ -4042,7 +4485,7 @@ zio_checksum_verify(zio_t *zio)
                if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
                        return (zio);
 
-               ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
+               ASSERT3U(zio->io_prop.zp_checksum, ==, ZIO_CHECKSUM_LABEL);
        }
 
        if ((error = zio_checksum_error(zio, &info)) != 0) {
@@ -4052,10 +4495,9 @@ zio_checksum_verify(zio_t *zio)
                        mutex_enter(&zio->io_vd->vdev_stat_lock);
                        zio->io_vd->vdev_stat.vs_checksum_errors++;
                        mutex_exit(&zio->io_vd->vdev_stat_lock);
-
-                       zfs_ereport_start_checksum(zio->io_spa,
+                       (void) zfs_ereport_start_checksum(zio->io_spa,
                            zio->io_vd, &zio->io_bookmark, zio,
-                           zio->io_offset, zio->io_size, NULL, &info);
+                           zio->io_offset, zio->io_size, &info);
                }
        }
 
@@ -4109,22 +4551,24 @@ zio_ready(zio_t *zio)
        zio_t *pio, *pio_next;
        zio_link_t *zl = NULL;
 
-       if (zio_wait_for_children(zio, ZIO_CHILD_GANG_BIT | ZIO_CHILD_DDT_BIT,
-           ZIO_WAIT_READY)) {
+       if (zio_wait_for_children(zio, ZIO_CHILD_LOGICAL_BIT |
+           ZIO_CHILD_GANG_BIT | ZIO_CHILD_DDT_BIT, ZIO_WAIT_READY)) {
                return (NULL);
        }
 
        if (zio->io_ready) {
                ASSERT(IO_IS_ALLOCATING(zio));
-               ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
-                   (zio->io_flags & ZIO_FLAG_NOPWRITE));
+               ASSERT(BP_GET_LOGICAL_BIRTH(bp) == zio->io_txg ||
+                   BP_IS_HOLE(bp) || (zio->io_flags & ZIO_FLAG_NOPWRITE));
                ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
 
                zio->io_ready(zio);
        }
 
+#ifdef ZFS_DEBUG
        if (bp != NULL && bp != &zio->io_bp_copy)
                zio->io_bp_copy = *bp;
+#endif
 
        if (zio->io_error != 0) {
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
@@ -4133,6 +4577,7 @@ zio_ready(zio_t *zio)
                        ASSERT(IO_IS_ALLOCATING(zio));
                        ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
                        ASSERT(zio->io_metaslab_class != NULL);
+                       ASSERT(ZIO_HAS_ALLOCATOR(zio));
 
                        /*
                         * We were unable to allocate anything, unreserve and
@@ -4163,7 +4608,7 @@ zio_ready(zio_t *zio)
        }
 
        if (zio->io_flags & ZIO_FLAG_NODATA) {
-               if (BP_IS_GANG(bp)) {
+               if (bp != NULL && BP_IS_GANG(bp)) {
                        zio->io_flags &= ~ZIO_FLAG_NODATA;
                } else {
                        ASSERT((uintptr_t)zio->io_abd < SPA_MAXBLOCKSIZE);
@@ -4219,6 +4664,7 @@ zio_dva_throttle_done(zio_t *zio)
        }
 
        ASSERT(IO_IS_ALLOCATING(pio));
+       ASSERT(ZIO_HAS_ALLOCATOR(pio));
        ASSERT3P(zio, !=, zio->io_logical);
        ASSERT(zio->io_logical != NULL);
        ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REPAIR));
@@ -4281,12 +4727,12 @@ zio_done(zio_t *zio)
                ASSERT(zio->io_type == ZIO_TYPE_WRITE);
                ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
                ASSERT(zio->io_bp != NULL);
+               ASSERT(ZIO_HAS_ALLOCATOR(zio));
 
                metaslab_group_alloc_verify(zio->io_spa, zio->io_bp, zio,
                    zio->io_allocator);
-               VERIFY(zfs_refcount_not_held(
-                   &zio->io_metaslab_class->mc_alloc_slots[zio->io_allocator],
-                   zio));
+               VERIFY(zfs_refcount_not_held(&zio->io_metaslab_class->
+                   mc_allocator[zio->io_allocator].mca_alloc_slots, zio));
        }
 
 
@@ -4297,7 +4743,7 @@ zio_done(zio_t *zio)
        if (zio->io_bp != NULL && !BP_IS_EMBEDDED(zio->io_bp)) {
                ASSERT(zio->io_bp->blk_pad[0] == 0);
                ASSERT(zio->io_bp->blk_pad[1] == 0);
-               ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy,
+               ASSERT(memcmp(zio->io_bp, &zio->io_bp_copy,
                    sizeof (blkptr_t)) == 0 ||
                    (zio->io_bp == zio_unique_parent(zio)->io_bp));
                if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(zio->io_bp) &&
@@ -4331,7 +4777,7 @@ zio_done(zio_t *zio)
                        uint64_t asize = P2ROUNDUP(psize, align);
                        abd_t *adata = zio->io_abd;
 
-                       if (asize != psize) {
+                       if (adata != NULL && asize != psize) {
                                adata = abd_alloc(asize, B_TRUE);
                                abd_copy(adata, zio->io_abd, psize);
                                abd_zero_off(adata, psize, asize - psize);
@@ -4342,7 +4788,7 @@ zio_done(zio_t *zio)
                        zcr->zcr_finish(zcr, adata);
                        zfs_ereport_free_checksum(zcr);
 
-                       if (asize != psize)
+                       if (adata != NULL && asize != psize)
                                abd_free(adata);
                }
        }
@@ -4373,9 +4819,9 @@ zio_done(zio_t *zio)
                                zio->io_vd->vdev_stat.vs_slow_ios++;
                                mutex_exit(&zio->io_vd->vdev_stat_lock);
 
-                               zfs_ereport_post(FM_EREPORT_ZFS_DELAY,
+                               (void) zfs_ereport_post(FM_EREPORT_ZFS_DELAY,
                                    zio->io_spa, zio->io_vd, &zio->io_bookmark,
-                                   zio, 0, 0);
+                                   zio, 0);
                        }
                }
        }
@@ -4389,16 +4835,16 @@ zio_done(zio_t *zio)
                 */
                if (zio->io_error != ECKSUM && zio->io_vd != NULL &&
                    !vdev_is_dead(zio->io_vd)) {
-                       mutex_enter(&zio->io_vd->vdev_stat_lock);
-                       if (zio->io_type == ZIO_TYPE_READ) {
-                               zio->io_vd->vdev_stat.vs_read_errors++;
-                       } else if (zio->io_type == ZIO_TYPE_WRITE) {
-                               zio->io_vd->vdev_stat.vs_write_errors++;
+                       int ret = zfs_ereport_post(FM_EREPORT_ZFS_IO,
+                           zio->io_spa, zio->io_vd, &zio->io_bookmark, zio, 0);
+                       if (ret != EALREADY) {
+                               mutex_enter(&zio->io_vd->vdev_stat_lock);
+                               if (zio->io_type == ZIO_TYPE_READ)
+                                       zio->io_vd->vdev_stat.vs_read_errors++;
+                               else if (zio->io_type == ZIO_TYPE_WRITE)
+                                       zio->io_vd->vdev_stat.vs_write_errors++;
+                               mutex_exit(&zio->io_vd->vdev_stat_lock);
                        }
-                       mutex_exit(&zio->io_vd->vdev_stat_lock);
-
-                       zfs_ereport_post(FM_EREPORT_ZFS_IO, zio->io_spa,
-                           zio->io_vd, &zio->io_bookmark, zio, 0, 0);
                }
 
                if ((zio->io_error == EIO || !(zio->io_flags &
@@ -4408,9 +4854,10 @@ zio_done(zio_t *zio)
                         * For logical I/O requests, tell the SPA to log the
                         * error and generate a logical data ereport.
                         */
-                       spa_log_error(zio->io_spa, &zio->io_bookmark);
-                       zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa,
-                           NULL, &zio->io_bookmark, zio, 0, 0);
+                       spa_log_error(zio->io_spa, &zio->io_bookmark,
+                           BP_GET_LOGICAL_BIRTH(zio->io_bp));
+                       (void) zfs_ereport_post(FM_EREPORT_ZFS_DATA,
+                           zio->io_spa, NULL, &zio->io_bookmark, zio, 0);
                }
        }
 
@@ -4546,13 +4993,12 @@ zio_done(zio_t *zio)
                        ASSERT(taskq_empty_ent(&zio->io_tqent));
                        spa_taskq_dispatch_ent(zio->io_spa,
                            ZIO_TYPE_CLAIM, ZIO_TASKQ_ISSUE,
-                           (task_func_t *)zio_reexecute, zio, 0,
-                           &zio->io_tqent);
+                           zio_reexecute, zio, 0, &zio->io_tqent, NULL);
                }
                return (NULL);
        }
 
-       ASSERT(zio->io_child_count == 0);
+       ASSERT(list_is_empty(&zio->io_child_list));
        ASSERT(zio->io_reexecute == 0);
        ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
 
@@ -4567,12 +5013,6 @@ zio_done(zio_t *zio)
                zfs_ereport_free_checksum(zcr);
        }
 
-       if (zio->io_flags & ZIO_FLAG_FASTWRITE && zio->io_bp &&
-           !BP_IS_HOLE(zio->io_bp) && !BP_IS_EMBEDDED(zio->io_bp) &&
-           !(zio->io_flags & ZIO_FLAG_NOPWRITE)) {
-               metaslab_fastwrite_unmark(zio->io_spa, zio->io_bp);
-       }
-
        /*
         * It is the responsibility of the done callback to ensure that this
         * particular zio is no longer discoverable for adoption, and as
@@ -4625,6 +5065,7 @@ static zio_pipe_stage_t *zio_pipeline[] = {
        zio_encrypt,
        zio_checksum_generate,
        zio_nop_write,
+       zio_brt_free,
        zio_ddt_read_start,
        zio_ddt_read_done,
        zio_ddt_write,
@@ -4745,7 +5186,7 @@ zbookmark_subtree_completed(const dnode_phys_t *dnp,
 {
        zbookmark_phys_t mod_zb = *subtree_root;
        mod_zb.zb_blkid++;
-       ASSERT(last_block->zb_level == 0);
+       ASSERT0(last_block->zb_level);
 
        /* The objset_phys_t isn't before anything. */
        if (dnp == NULL)
@@ -4771,26 +5212,41 @@ zbookmark_subtree_completed(const dnode_phys_t *dnp,
            last_block) <= 0);
 }
 
+/*
+ * This function is similar to zbookmark_subtree_completed(), but returns true
+ * if subtree_root is equal or ahead of last_block, i.e. still to be done.
+ */
+boolean_t
+zbookmark_subtree_tbd(const dnode_phys_t *dnp,
+    const zbookmark_phys_t *subtree_root, const zbookmark_phys_t *last_block)
+{
+       ASSERT0(last_block->zb_level);
+       if (dnp == NULL)
+               return (B_FALSE);
+       return (zbookmark_compare(dnp->dn_datablkszsec, dnp->dn_indblkshift,
+           1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT), 0, subtree_root,
+           last_block) >= 0);
+}
+
 EXPORT_SYMBOL(zio_type_name);
 EXPORT_SYMBOL(zio_buf_alloc);
 EXPORT_SYMBOL(zio_data_buf_alloc);
 EXPORT_SYMBOL(zio_buf_free);
 EXPORT_SYMBOL(zio_data_buf_free);
 
-/* BEGIN CSTYLED */
 ZFS_MODULE_PARAM(zfs_zio, zio_, slow_io_ms, INT, ZMOD_RW,
        "Max I/O completion time (milliseconds) before marking it as slow");
 
 ZFS_MODULE_PARAM(zfs_zio, zio_, requeue_io_start_cut_in_line, INT, ZMOD_RW,
        "Prioritize requeued I/O");
 
-ZFS_MODULE_PARAM(zfs, zfs_, sync_pass_deferred_free,  INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs, zfs_, sync_pass_deferred_free,  UINT, ZMOD_RW,
        "Defer frees starting in this pass");
 
-ZFS_MODULE_PARAM(zfs, zfs_, sync_pass_dont_compress, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs, zfs_, sync_pass_dont_compress, UINT, ZMOD_RW,
        "Don't compress starting in this pass");
 
-ZFS_MODULE_PARAM(zfs, zfs_, sync_pass_rewrite, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs, zfs_, sync_pass_rewrite, UINT, ZMOD_RW,
        "Rewrite new bps starting in this pass");
 
 ZFS_MODULE_PARAM(zfs_zio, zio_, dva_throttle_enabled, INT, ZMOD_RW,
@@ -4798,4 +5254,3 @@ ZFS_MODULE_PARAM(zfs_zio, zio_, dva_throttle_enabled, INT, ZMOD_RW,
 
 ZFS_MODULE_PARAM(zfs_zio, zio_, deadman_log_all, INT, ZMOD_RW,
        "Log all slow ZIOs, not just those with vdevs");
-/* END CSTYLED */