]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/metaslab.c
Fix typo/etc in module/zfs/zfs_ctldir.c
[mirror_zfs.git] / module / zfs / metaslab.c
index 879238e7d8ea300347df6afecf01b8142cb10d8a..ec89810b48abeed5f31355537803f2d9634e03ee 100644 (file)
@@ -20,8 +20,9 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
+ * Copyright (c) 2017, Intel Corporation.
  */
 
 #include <sys/zfs_context.h>
@@ -180,7 +181,6 @@ int metaslab_lba_weighting_enabled = B_TRUE;
  */
 int metaslab_bias_enabled = B_TRUE;
 
-
 /*
  * Enable/disable remapping of indirect DVAs to their concrete vdevs.
  */
@@ -218,11 +218,19 @@ boolean_t metaslab_trace_enabled = B_TRUE;
 uint64_t metaslab_trace_max_entries = 5000;
 #endif
 
+/*
+ * Maximum number of metaslabs per group that can be disabled
+ * simultaneously.
+ */
+int max_disabled_ms = 3;
+
 static uint64_t metaslab_weight(metaslab_t *);
 static void metaslab_set_fragmentation(metaslab_t *);
 static void metaslab_free_impl(vdev_t *, uint64_t, uint64_t, boolean_t);
 static void metaslab_check_free_impl(vdev_t *, uint64_t, uint64_t);
 
+static void metaslab_passivate(metaslab_t *msp, uint64_t weight);
+static uint64_t metaslab_weight_from_range_tree(metaslab_t *msp);
 #ifdef _METASLAB_TRACING
 kmem_cache_t *metaslab_alloc_trace_cache;
 #endif
@@ -243,7 +251,12 @@ metaslab_class_create(spa_t *spa, metaslab_ops_t *ops)
        mc->mc_rotor = NULL;
        mc->mc_ops = ops;
        mutex_init(&mc->mc_lock, NULL, MUTEX_DEFAULT, NULL);
-       refcount_create_tracked(&mc->mc_alloc_slots);
+       mc->mc_alloc_slots = kmem_zalloc(spa->spa_alloc_count *
+           sizeof (zfs_refcount_t), KM_SLEEP);
+       mc->mc_alloc_max_slots = kmem_zalloc(spa->spa_alloc_count *
+           sizeof (uint64_t), KM_SLEEP);
+       for (int i = 0; i < spa->spa_alloc_count; i++)
+               zfs_refcount_create_tracked(&mc->mc_alloc_slots[i]);
 
        return (mc);
 }
@@ -257,7 +270,12 @@ metaslab_class_destroy(metaslab_class_t *mc)
        ASSERT(mc->mc_space == 0);
        ASSERT(mc->mc_dspace == 0);
 
-       refcount_destroy(&mc->mc_alloc_slots);
+       for (int i = 0; i < mc->mc_spa->spa_alloc_count; i++)
+               zfs_refcount_destroy(&mc->mc_alloc_slots[i]);
+       kmem_free(mc->mc_alloc_slots, mc->mc_spa->spa_alloc_count *
+           sizeof (zfs_refcount_t));
+       kmem_free(mc->mc_alloc_max_slots, mc->mc_spa->spa_alloc_count *
+           sizeof (uint64_t));
        mutex_destroy(&mc->mc_lock);
        kmem_free(mc, sizeof (metaslab_class_t));
 }
@@ -288,7 +306,7 @@ metaslab_class_validate(metaslab_class_t *mc)
        return (0);
 }
 
-void
+static void
 metaslab_class_space_update(metaslab_class_t *mc, int64_t alloc_delta,
     int64_t defer_delta, int64_t space_delta, int64_t dspace_delta)
 {
@@ -325,7 +343,8 @@ metaslab_class_get_dspace(metaslab_class_t *mc)
 void
 metaslab_class_histogram_verify(metaslab_class_t *mc)
 {
-       vdev_t *rvd = mc->mc_spa->spa_root_vdev;
+       spa_t *spa = mc->mc_spa;
+       vdev_t *rvd = spa->spa_root_vdev;
        uint64_t *mc_hist;
        int i;
 
@@ -449,6 +468,30 @@ metaslab_compare(const void *x1, const void *x2)
        const metaslab_t *m1 = (const metaslab_t *)x1;
        const metaslab_t *m2 = (const metaslab_t *)x2;
 
+       int sort1 = 0;
+       int sort2 = 0;
+       if (m1->ms_allocator != -1 && m1->ms_primary)
+               sort1 = 1;
+       else if (m1->ms_allocator != -1 && !m1->ms_primary)
+               sort1 = 2;
+       if (m2->ms_allocator != -1 && m2->ms_primary)
+               sort2 = 1;
+       else if (m2->ms_allocator != -1 && !m2->ms_primary)
+               sort2 = 2;
+
+       /*
+        * Sort inactive metaslabs first, then primaries, then secondaries. When
+        * selecting a metaslab to allocate from, an allocator first tries its
+        * primary, then secondary active metaslab. If it doesn't have active
+        * metaslabs, or can't allocate from them, it searches for an inactive
+        * metaslab to activate. If it can't find a suitable one, it will steal
+        * a primary or secondary metaslab from another allocator.
+        */
+       if (sort1 < sort2)
+               return (-1);
+       if (sort1 > sort2)
+               return (1);
+
        int cmp = AVL_CMP(m2->ms_weight, m1->ms_weight);
        if (likely(cmp))
                return (cmp);
@@ -458,45 +501,62 @@ metaslab_compare(const void *x1, const void *x2)
        return (AVL_CMP(m1->ms_start, m2->ms_start));
 }
 
+uint64_t
+metaslab_allocated_space(metaslab_t *msp)
+{
+       return (msp->ms_allocated_space);
+}
+
 /*
  * Verify that the space accounting on disk matches the in-core range_trees.
  */
-void
+static void
 metaslab_verify_space(metaslab_t *msp, uint64_t txg)
 {
        spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
-       uint64_t allocated = 0;
+       uint64_t allocating = 0;
        uint64_t sm_free_space, msp_free_space;
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
+       ASSERT(!msp->ms_condensing);
 
        if ((zfs_flags & ZFS_DEBUG_METASLAB_VERIFY) == 0)
                return;
 
        /*
         * We can only verify the metaslab space when we're called
-        * from syncing context with a loaded metaslab that has an allocated
-        * space map. Calling this in non-syncing context does not
-        * provide a consistent view of the metaslab since we're performing
-        * allocations in the future.
+        * from syncing context with a loaded metaslab that has an
+        * allocated space map. Calling this in non-syncing context
+        * does not provide a consistent view of the metaslab since
+        * we're performing allocations in the future.
         */
        if (txg != spa_syncing_txg(spa) || msp->ms_sm == NULL ||
            !msp->ms_loaded)
                return;
 
-       sm_free_space = msp->ms_size - space_map_allocated(msp->ms_sm) -
-           space_map_alloc_delta(msp->ms_sm);
+       /*
+        * Even though the smp_alloc field can get negative (e.g.
+        * see vdev_checkpoint_sm), that should never be the case
+        * when it come's to a metaslab's space map.
+        */
+       ASSERT3S(space_map_allocated(msp->ms_sm), >=, 0);
+
+       sm_free_space = msp->ms_size - metaslab_allocated_space(msp);
 
        /*
-        * Account for future allocations since we would have already
-        * deducted that space from the ms_freetree.
+        * Account for future allocations since we would have
+        * already deducted that space from the ms_allocatable.
         */
        for (int t = 0; t < TXG_CONCURRENT_STATES; t++) {
-               allocated +=
+               allocating +=
                    range_tree_space(msp->ms_allocating[(txg + t) & TXG_MASK]);
        }
 
-       msp_free_space = range_tree_space(msp->ms_allocatable) + allocated +
+       ASSERT3U(msp->ms_deferspace, ==,
+           range_tree_space(msp->ms_defer[0]) +
+           range_tree_space(msp->ms_defer[1]));
+
+       msp_free_space = range_tree_space(msp->ms_allocatable) + allocating +
            msp->ms_deferspace + range_tree_space(msp->ms_freed);
 
        VERIFY3U(sm_free_space, ==, msp_free_space);
@@ -591,12 +651,18 @@ metaslab_group_alloc_update(metaslab_group_t *mg)
 }
 
 metaslab_group_t *
-metaslab_group_create(metaslab_class_t *mc, vdev_t *vd)
+metaslab_group_create(metaslab_class_t *mc, vdev_t *vd, int allocators)
 {
        metaslab_group_t *mg;
 
        mg = kmem_zalloc(sizeof (metaslab_group_t), KM_SLEEP);
        mutex_init(&mg->mg_lock, NULL, MUTEX_DEFAULT, NULL);
+       mutex_init(&mg->mg_ms_disabled_lock, NULL, MUTEX_DEFAULT, NULL);
+       cv_init(&mg->mg_ms_disabled_cv, NULL, CV_DEFAULT, NULL);
+       mg->mg_primaries = kmem_zalloc(allocators * sizeof (metaslab_t *),
+           KM_SLEEP);
+       mg->mg_secondaries = kmem_zalloc(allocators * sizeof (metaslab_t *),
+           KM_SLEEP);
        avl_create(&mg->mg_metaslab_tree, metaslab_compare,
            sizeof (metaslab_t), offsetof(struct metaslab, ms_group_node));
        mg->mg_vd = vd;
@@ -604,7 +670,16 @@ metaslab_group_create(metaslab_class_t *mc, vdev_t *vd)
        mg->mg_activation_count = 0;
        mg->mg_initialized = B_FALSE;
        mg->mg_no_free_space = B_TRUE;
-       refcount_create_tracked(&mg->mg_alloc_queue_depth);
+       mg->mg_allocators = allocators;
+
+       mg->mg_alloc_queue_depth = kmem_zalloc(allocators *
+           sizeof (zfs_refcount_t), KM_SLEEP);
+       mg->mg_cur_max_alloc_queue_depth = kmem_zalloc(allocators *
+           sizeof (uint64_t), KM_SLEEP);
+       for (int i = 0; i < allocators; i++) {
+               zfs_refcount_create_tracked(&mg->mg_alloc_queue_depth[i]);
+               mg->mg_cur_max_alloc_queue_depth[i] = 0;
+       }
 
        mg->mg_taskq = taskq_create("metaslab_group_taskq", metaslab_load_pct,
            maxclsyspri, 10, INT_MAX, TASKQ_THREADS_CPU_PCT | TASKQ_DYNAMIC);
@@ -626,8 +701,22 @@ metaslab_group_destroy(metaslab_group_t *mg)
 
        taskq_destroy(mg->mg_taskq);
        avl_destroy(&mg->mg_metaslab_tree);
+       kmem_free(mg->mg_primaries, mg->mg_allocators * sizeof (metaslab_t *));
+       kmem_free(mg->mg_secondaries, mg->mg_allocators *
+           sizeof (metaslab_t *));
        mutex_destroy(&mg->mg_lock);
-       refcount_destroy(&mg->mg_alloc_queue_depth);
+       mutex_destroy(&mg->mg_ms_disabled_lock);
+       cv_destroy(&mg->mg_ms_disabled_cv);
+
+       for (int i = 0; i < mg->mg_allocators; i++) {
+               zfs_refcount_destroy(&mg->mg_alloc_queue_depth[i]);
+               mg->mg_cur_max_alloc_queue_depth[i] = 0;
+       }
+       kmem_free(mg->mg_alloc_queue_depth, mg->mg_allocators *
+           sizeof (zfs_refcount_t));
+       kmem_free(mg->mg_cur_max_alloc_queue_depth, mg->mg_allocators *
+           sizeof (uint64_t));
+
        kmem_free(mg, sizeof (metaslab_group_t));
 }
 
@@ -706,6 +795,22 @@ metaslab_group_passivate(metaslab_group_t *mg)
        taskq_wait_outstanding(mg->mg_taskq, 0);
        spa_config_enter(spa, locks & ~(SCL_ZIO - 1), spa, RW_WRITER);
        metaslab_group_alloc_update(mg);
+       for (int i = 0; i < mg->mg_allocators; i++) {
+               metaslab_t *msp = mg->mg_primaries[i];
+               if (msp != NULL) {
+                       mutex_enter(&msp->ms_lock);
+                       metaslab_passivate(msp,
+                           metaslab_weight_from_range_tree(msp));
+                       mutex_exit(&msp->ms_lock);
+               }
+               msp = mg->mg_secondaries[i];
+               if (msp != NULL) {
+                       mutex_enter(&msp->ms_lock);
+                       metaslab_passivate(msp,
+                           metaslab_weight_from_range_tree(msp));
+                       mutex_exit(&msp->ms_lock);
+               }
+       }
 
        mgprev = mg->mg_prev;
        mgnext = mg->mg_next;
@@ -756,8 +861,10 @@ metaslab_group_histogram_verify(metaslab_group_t *mg)
 
        for (int m = 0; m < vd->vdev_ms_count; m++) {
                metaslab_t *msp = vd->vdev_ms[m];
+               ASSERT(msp != NULL);
 
-               if (msp->ms_sm == NULL)
+               /* skip if not active or not a member */
+               if (msp->ms_sm == NULL || msp->ms_group != mg)
                        continue;
 
                for (i = 0; i < SPACE_MAP_HISTOGRAM_SIZE; i++)
@@ -845,6 +952,17 @@ metaslab_group_remove(metaslab_group_t *mg, metaslab_t *msp)
        mutex_exit(&mg->mg_lock);
 }
 
+static void
+metaslab_group_sort_impl(metaslab_group_t *mg, metaslab_t *msp, uint64_t weight)
+{
+       ASSERT(MUTEX_HELD(&mg->mg_lock));
+       ASSERT(msp->ms_group == mg);
+       avl_remove(&mg->mg_metaslab_tree, msp);
+       msp->ms_weight = weight;
+       avl_add(&mg->mg_metaslab_tree, msp);
+
+}
+
 static void
 metaslab_group_sort(metaslab_group_t *mg, metaslab_t *msp, uint64_t weight)
 {
@@ -856,10 +974,7 @@ metaslab_group_sort(metaslab_group_t *mg, metaslab_t *msp, uint64_t weight)
        ASSERT(MUTEX_HELD(&msp->ms_lock));
 
        mutex_enter(&mg->mg_lock);
-       ASSERT(msp->ms_group == mg);
-       avl_remove(&mg->mg_metaslab_tree, msp);
-       msp->ms_weight = weight;
-       avl_add(&mg->mg_metaslab_tree, msp);
+       metaslab_group_sort_impl(mg, msp, weight);
        mutex_exit(&mg->mg_lock);
 }
 
@@ -882,12 +997,14 @@ metaslab_group_fragmentation(metaslab_group_t *mg)
 
                if (msp->ms_fragmentation == ZFS_FRAG_INVALID)
                        continue;
+               if (msp->ms_group != mg)
+                       continue;
 
                valid_ms++;
                fragmentation += msp->ms_fragmentation;
        }
 
-       if (valid_ms <= vd->vdev_ms_count / 2)
+       if (valid_ms <= mg->mg_vd->vdev_ms_count / 2)
                return (ZFS_FRAG_INVALID);
 
        fragmentation /= valid_ms;
@@ -907,7 +1024,7 @@ metaslab_group_fragmentation(metaslab_group_t *mg)
  */
 static boolean_t
 metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
-    uint64_t psize)
+    uint64_t psize, int allocator, int d)
 {
        spa_t *spa = mg->mg_vd->vdev_spa;
        metaslab_class_t *mc = mg->mg_class;
@@ -918,7 +1035,10 @@ metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
         * groups to select from. Otherwise, we always consider it eligible
         * for allocations.
         */
-       if (mc != spa_normal_class(spa) || mc->mc_groups <= 1)
+       if ((mc != spa_normal_class(spa) &&
+           mc != spa_special_class(spa) &&
+           mc != spa_dedup_class(spa)) ||
+           mc->mc_groups <= 1)
                return (B_TRUE);
 
        /*
@@ -936,7 +1056,7 @@ metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
        if (mg->mg_allocatable) {
                metaslab_group_t *mgp;
                int64_t qdepth;
-               uint64_t qmax = mg->mg_max_alloc_queue_depth;
+               uint64_t qmax = mg->mg_cur_max_alloc_queue_depth[allocator];
 
                if (!mc->mc_alloc_throttle_enabled)
                        return (B_TRUE);
@@ -948,7 +1068,15 @@ metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
                if (mg->mg_no_free_space)
                        return (B_FALSE);
 
-               qdepth = refcount_count(&mg->mg_alloc_queue_depth);
+               /*
+                * Relax allocation throttling for ditto blocks.  Due to
+                * random imbalances in allocation it tends to push copies
+                * to one vdev, that looks a bit better at the moment.
+                */
+               qmax = qmax * (4 + d) / 4;
+
+               qdepth = zfs_refcount_count(
+                   &mg->mg_alloc_queue_depth[allocator]);
 
                /*
                 * If this metaslab group is below its qmax or it's
@@ -967,9 +1095,10 @@ metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
                 * groups at the same time when we make this check.
                 */
                for (mgp = mg->mg_next; mgp != rotor; mgp = mgp->mg_next) {
-                       qmax = mgp->mg_max_alloc_queue_depth;
-
-                       qdepth = refcount_count(&mgp->mg_alloc_queue_depth);
+                       qmax = mgp->mg_cur_max_alloc_queue_depth[allocator];
+                       qmax = qmax * (4 + d) / 4;
+                       qdepth = zfs_refcount_count(
+                           &mgp->mg_alloc_queue_depth[allocator]);
 
                        /*
                         * If there is another metaslab group that
@@ -1293,10 +1422,207 @@ metaslab_ops_t *zfs_metaslab_ops = &metaslab_ndf_ops;
  * ==========================================================================
  */
 
+static void
+metaslab_aux_histograms_clear(metaslab_t *msp)
+{
+       /*
+        * Auxiliary histograms are only cleared when resetting them,
+        * which can only happen while the metaslab is loaded.
+        */
+       ASSERT(msp->ms_loaded);
+
+       bzero(msp->ms_synchist, sizeof (msp->ms_synchist));
+       for (int t = 0; t < TXG_DEFER_SIZE; t++)
+               bzero(msp->ms_deferhist[t], sizeof (msp->ms_deferhist[t]));
+}
+
+static void
+metaslab_aux_histogram_add(uint64_t *histogram, uint64_t shift,
+    range_tree_t *rt)
+{
+       /*
+        * This is modeled after space_map_histogram_add(), so refer to that
+        * function for implementation details. We want this to work like
+        * the space map histogram, and not the range tree histogram, as we
+        * are essentially constructing a delta that will be later subtracted
+        * from the space map histogram.
+        */
+       int idx = 0;
+       for (int i = shift; i < RANGE_TREE_HISTOGRAM_SIZE; i++) {
+               ASSERT3U(i, >=, idx + shift);
+               histogram[idx] += rt->rt_histogram[i] << (i - idx - shift);
+
+               if (idx < SPACE_MAP_HISTOGRAM_SIZE - 1) {
+                       ASSERT3U(idx + shift, ==, i);
+                       idx++;
+                       ASSERT3U(idx, <, SPACE_MAP_HISTOGRAM_SIZE);
+               }
+       }
+}
+
+/*
+ * Called at every sync pass that the metaslab gets synced.
+ *
+ * The reason is that we want our auxiliary histograms to be updated
+ * wherever the metaslab's space map histogram is updated. This way
+ * we stay consistent on which parts of the metaslab space map's
+ * histogram are currently not available for allocations (e.g because
+ * they are in the defer, freed, and freeing trees).
+ */
+static void
+metaslab_aux_histograms_update(metaslab_t *msp)
+{
+       space_map_t *sm = msp->ms_sm;
+       ASSERT(sm != NULL);
+
+       /*
+        * This is similar to the metaslab's space map histogram updates
+        * that take place in metaslab_sync(). The only difference is that
+        * we only care about segments that haven't made it into the
+        * ms_allocatable tree yet.
+        */
+       if (msp->ms_loaded) {
+               metaslab_aux_histograms_clear(msp);
+
+               metaslab_aux_histogram_add(msp->ms_synchist,
+                   sm->sm_shift, msp->ms_freed);
+
+               for (int t = 0; t < TXG_DEFER_SIZE; t++) {
+                       metaslab_aux_histogram_add(msp->ms_deferhist[t],
+                           sm->sm_shift, msp->ms_defer[t]);
+               }
+       }
+
+       metaslab_aux_histogram_add(msp->ms_synchist,
+           sm->sm_shift, msp->ms_freeing);
+}
+
+/*
+ * Called every time we are done syncing (writing to) the metaslab,
+ * i.e. at the end of each sync pass.
+ * [see the comment in metaslab_impl.h for ms_synchist, ms_deferhist]
+ */
+static void
+metaslab_aux_histograms_update_done(metaslab_t *msp, boolean_t defer_allowed)
+{
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       space_map_t *sm = msp->ms_sm;
+
+       if (sm == NULL) {
+               /*
+                * We came here from metaslab_init() when creating/opening a
+                * pool, looking at a metaslab that hasn't had any allocations
+                * yet.
+                */
+               return;
+       }
+
+       /*
+        * This is similar to the actions that we take for the ms_freed
+        * and ms_defer trees in metaslab_sync_done().
+        */
+       uint64_t hist_index = spa_syncing_txg(spa) % TXG_DEFER_SIZE;
+       if (defer_allowed) {
+               bcopy(msp->ms_synchist, msp->ms_deferhist[hist_index],
+                   sizeof (msp->ms_synchist));
+       } else {
+               bzero(msp->ms_deferhist[hist_index],
+                   sizeof (msp->ms_deferhist[hist_index]));
+       }
+       bzero(msp->ms_synchist, sizeof (msp->ms_synchist));
+}
+
+/*
+ * Ensure that the metaslab's weight and fragmentation are consistent
+ * with the contents of the histogram (either the range tree's histogram
+ * or the space map's depending whether the metaslab is loaded).
+ */
+static void
+metaslab_verify_weight_and_frag(metaslab_t *msp)
+{
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       if ((zfs_flags & ZFS_DEBUG_METASLAB_VERIFY) == 0)
+               return;
+
+       /* see comment in metaslab_verify_unflushed_changes() */
+       if (msp->ms_group == NULL)
+               return;
+
+       /*
+        * Devices being removed always return a weight of 0 and leave
+        * fragmentation and ms_max_size as is - there is nothing for
+        * us to verify here.
+        */
+       vdev_t *vd = msp->ms_group->mg_vd;
+       if (vd->vdev_removing)
+               return;
+
+       /*
+        * If the metaslab is dirty it probably means that we've done
+        * some allocations or frees that have changed our histograms
+        * and thus the weight.
+        */
+       for (int t = 0; t < TXG_SIZE; t++) {
+               if (txg_list_member(&vd->vdev_ms_list, msp, t))
+                       return;
+       }
+
+       /*
+        * This verification checks that our in-memory state is consistent
+        * with what's on disk. If the pool is read-only then there aren't
+        * any changes and we just have the initially-loaded state.
+        */
+       if (!spa_writeable(msp->ms_group->mg_vd->vdev_spa))
+               return;
+
+       /* some extra verification for in-core tree if you can */
+       if (msp->ms_loaded) {
+               range_tree_stat_verify(msp->ms_allocatable);
+               VERIFY(space_map_histogram_verify(msp->ms_sm,
+                   msp->ms_allocatable));
+       }
+
+       uint64_t weight = msp->ms_weight;
+       uint64_t was_active = msp->ms_weight & METASLAB_ACTIVE_MASK;
+       boolean_t space_based = WEIGHT_IS_SPACEBASED(msp->ms_weight);
+       uint64_t frag = msp->ms_fragmentation;
+       uint64_t max_segsize = msp->ms_max_size;
+
+       msp->ms_weight = 0;
+       msp->ms_fragmentation = 0;
+       msp->ms_max_size = 0;
+
+       /*
+        * This function is used for verification purposes. Regardless of
+        * whether metaslab_weight() thinks this metaslab should be active or
+        * not, we want to ensure that the actual weight (and therefore the
+        * value of ms_weight) would be the same if it was to be recalculated
+        * at this point.
+        */
+       msp->ms_weight = metaslab_weight(msp) | was_active;
+
+       VERIFY3U(max_segsize, ==, msp->ms_max_size);
+
+       /*
+        * If the weight type changed then there is no point in doing
+        * verification. Revert fields to their original values.
+        */
+       if ((space_based && !WEIGHT_IS_SPACEBASED(msp->ms_weight)) ||
+           (!space_based && WEIGHT_IS_SPACEBASED(msp->ms_weight))) {
+               msp->ms_fragmentation = frag;
+               msp->ms_weight = weight;
+               return;
+       }
+
+       VERIFY3U(msp->ms_fragmentation, ==, frag);
+       VERIFY3U(msp->ms_weight, ==, weight);
+}
+
 /*
  * Wait for any in-progress metaslab loads to complete.
  */
-void
+static void
 metaslab_load_wait(metaslab_t *msp)
 {
        ASSERT(MUTEX_HELD(&msp->ms_lock));
@@ -1307,59 +1633,124 @@ metaslab_load_wait(metaslab_t *msp)
        }
 }
 
-int
-metaslab_load(metaslab_t *msp)
+static int
+metaslab_load_impl(metaslab_t *msp)
 {
        int error = 0;
-       boolean_t success = B_FALSE;
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
-       ASSERT(!msp->ms_loaded);
-       ASSERT(!msp->ms_loading);
+       ASSERT(msp->ms_loading);
+       ASSERT(!msp->ms_condensing);
 
-       msp->ms_loading = B_TRUE;
        /*
-        * Nobody else can manipulate a loading metaslab, so it's now safe
-        * to drop the lock.  This way we don't have to hold the lock while
-        * reading the spacemap from disk.
+        * We temporarily drop the lock to unblock other operations while we
+        * are reading the space map. Therefore, metaslab_sync() and
+        * metaslab_sync_done() can run at the same time as we do.
+        *
+        * metaslab_sync() can append to the space map while we are loading.
+        * Therefore we load only entries that existed when we started the
+        * load. Additionally, metaslab_sync_done() has to wait for the load
+        * to complete because there are potential races like metaslab_load()
+        * loading parts of the space map that are currently being appended
+        * by metaslab_sync(). If we didn't, the ms_allocatable would have
+        * entries that metaslab_sync_done() would try to re-add later.
+        *
+        * That's why before dropping the lock we remember the synced length
+        * of the metaslab and read up to that point of the space map,
+        * ignoring entries appended by metaslab_sync() that happen after we
+        * drop the lock.
         */
+       uint64_t length = msp->ms_synced_length;
        mutex_exit(&msp->ms_lock);
 
-       /*
-        * If the space map has not been allocated yet, then treat
-        * all the space in the metaslab as free and add it to ms_allocatable.
-        */
        if (msp->ms_sm != NULL) {
-               error = space_map_load(msp->ms_sm, msp->ms_allocatable,
-                   SM_FREE);
+               error = space_map_load_length(msp->ms_sm, msp->ms_allocatable,
+                   SM_FREE, length);
        } else {
+               /*
+                * The space map has not been allocated yet, so treat
+                * all the space in the metaslab as free and add it to the
+                * ms_allocatable tree.
+                */
                range_tree_add(msp->ms_allocatable,
                    msp->ms_start, msp->ms_size);
        }
 
-       success = (error == 0);
-
+       /*
+        * We need to grab the ms_sync_lock to prevent metaslab_sync() from
+        * changing the ms_sm and the metaslab's range trees while we are
+        * about to use them and populate the ms_allocatable. The ms_lock
+        * is insufficient for this because metaslab_sync() doesn't hold
+        * the ms_lock while writing the ms_checkpointing tree to disk.
+        */
+       mutex_enter(&msp->ms_sync_lock);
        mutex_enter(&msp->ms_lock);
-       msp->ms_loading = B_FALSE;
+       ASSERT(!msp->ms_condensing);
+
+       if (error != 0) {
+               mutex_exit(&msp->ms_sync_lock);
+               return (error);
+       }
 
-       if (success) {
-               ASSERT3P(msp->ms_group, !=, NULL);
-               msp->ms_loaded = B_TRUE;
+       ASSERT3P(msp->ms_group, !=, NULL);
+       msp->ms_loaded = B_TRUE;
 
-               /*
-                * If the metaslab already has a spacemap, then we need to
-                * remove all segments from the defer tree; otherwise, the
-                * metaslab is completely empty and we can skip this.
-                */
-               if (msp->ms_sm != NULL) {
-                       for (int t = 0; t < TXG_DEFER_SIZE; t++) {
-                               range_tree_walk(msp->ms_defer[t],
-                                   range_tree_remove, msp->ms_allocatable);
-                       }
-               }
-               msp->ms_max_size = metaslab_block_maxsize(msp);
+       /*
+        * The ms_allocatable contains the segments that exist in the
+        * ms_defer trees [see ms_synced_length]. Thus we need to remove
+        * them from ms_allocatable as they will be added again in
+        * metaslab_sync_done().
+        */
+       for (int t = 0; t < TXG_DEFER_SIZE; t++) {
+               range_tree_walk(msp->ms_defer[t],
+                   range_tree_remove, msp->ms_allocatable);
        }
+
+       /*
+        * Call metaslab_recalculate_weight_and_sort() now that the
+        * metaslab is loaded so we get the metaslab's real weight.
+        *
+        * Unless this metaslab was created with older software and
+        * has not yet been converted to use segment-based weight, we
+        * expect the new weight to be better or equal to the weight
+        * that the metaslab had while it was not loaded. This is
+        * because the old weight does not take into account the
+        * consolidation of adjacent segments between TXGs. [see
+        * comment for ms_synchist and ms_deferhist[] for more info]
+        */
+       uint64_t weight = msp->ms_weight;
+       metaslab_recalculate_weight_and_sort(msp);
+       if (!WEIGHT_IS_SPACEBASED(weight))
+               ASSERT3U(weight, <=, msp->ms_weight);
+       msp->ms_max_size = metaslab_block_maxsize(msp);
+
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       metaslab_verify_space(msp, spa_syncing_txg(spa));
+       mutex_exit(&msp->ms_sync_lock);
+
+       return (0);
+}
+
+int
+metaslab_load(metaslab_t *msp)
+{
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       /*
+        * There may be another thread loading the same metaslab, if that's
+        * the case just wait until the other thread is done and return.
+        */
+       metaslab_load_wait(msp);
+       if (msp->ms_loaded)
+               return (0);
+       VERIFY(!msp->ms_loading);
+       ASSERT(!msp->ms_condensing);
+
+       msp->ms_loading = B_TRUE;
+       int error = metaslab_load_impl(msp);
+       msp->ms_loading = B_FALSE;
        cv_broadcast(&msp->ms_load_cv);
+
        return (error);
 }
 
@@ -1367,10 +1758,42 @@ void
 metaslab_unload(metaslab_t *msp)
 {
        ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       metaslab_verify_weight_and_frag(msp);
+
        range_tree_vacate(msp->ms_allocatable, NULL, NULL);
        msp->ms_loaded = B_FALSE;
+
        msp->ms_weight &= ~METASLAB_ACTIVE_MASK;
        msp->ms_max_size = 0;
+
+       /*
+        * We explicitly recalculate the metaslab's weight based on its space
+        * map (as it is now not loaded). We want unload metaslabs to always
+        * have their weights calculated from the space map histograms, while
+        * loaded ones have it calculated from their in-core range tree
+        * [see metaslab_load()]. This way, the weight reflects the information
+        * available in-core, whether it is loaded or not
+        *
+        * If ms_group == NULL means that we came here from metaslab_fini(),
+        * at which point it doesn't make sense for us to do the recalculation
+        * and the sorting.
+        */
+       if (msp->ms_group != NULL)
+               metaslab_recalculate_weight_and_sort(msp);
+}
+
+static void
+metaslab_space_update(vdev_t *vd, metaslab_class_t *mc, int64_t alloc_delta,
+    int64_t defer_delta, int64_t space_delta)
+{
+       vdev_space_update(vd, alloc_delta, defer_delta, space_delta);
+
+       ASSERT3P(vd->vdev_spa->spa_root_vdev, ==, vd->vdev_parent);
+       ASSERT(vd->vdev_ms_count != 0);
+
+       metaslab_class_space_update(mc, alloc_delta, defer_delta, space_delta,
+           vdev_deflated_space(vd, space_delta));
 }
 
 int
@@ -1378,7 +1801,8 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
     metaslab_t **msp)
 {
        vdev_t *vd = mg->mg_vd;
-       objset_t *mos = vd->vdev_spa->spa_meta_objset;
+       spa_t *spa = vd->vdev_spa;
+       objset_t *mos = spa->spa_meta_objset;
        metaslab_t *ms;
        int error;
 
@@ -1386,13 +1810,23 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
        mutex_init(&ms->ms_lock, NULL, MUTEX_DEFAULT, NULL);
        mutex_init(&ms->ms_sync_lock, NULL, MUTEX_DEFAULT, NULL);
        cv_init(&ms->ms_load_cv, NULL, CV_DEFAULT, NULL);
+
        ms->ms_id = id;
        ms->ms_start = id << vd->vdev_ms_shift;
        ms->ms_size = 1ULL << vd->vdev_ms_shift;
+       ms->ms_allocator = -1;
+       ms->ms_new = B_TRUE;
 
        /*
         * We only open space map objects that already exist. All others
         * will be opened when we finally allocate an object for it.
+        *
+        * Note:
+        * When called from vdev_expand(), we can't call into the DMU as
+        * we are holding the spa_config_lock as a writer and we would
+        * deadlock [see relevant comment in vdev_metaslab_init()]. in
+        * that case, the object parameter is zero though, so we won't
+        * call into the DMU.
         */
        if (object != 0) {
                error = space_map_open(&ms->ms_sm, mos, object, ms->ms_start,
@@ -1404,19 +1838,23 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
                }
 
                ASSERT(ms->ms_sm != NULL);
+               ms->ms_allocated_space = space_map_allocated(ms->ms_sm);
        }
 
        /*
-        * We create the main range tree here, but we don't create the
+        * We create the ms_allocatable here, but we don't create the
         * other range trees until metaslab_sync_done().  This serves
         * two purposes: it allows metaslab_sync_done() to detect the
-        * addition of new space; and for debugging, it ensures that we'd
-        * data fault on any attempt to use this metaslab before it's ready.
+        * addition of new space; and for debugging, it ensures that
+        * we'd data fault on any attempt to use this metaslab before
+        * it's ready.
         */
        ms->ms_allocatable = range_tree_create_impl(&rt_avl_ops,
            &ms->ms_allocatable_by_size, metaslab_rangesize_compare, 0);
-       metaslab_group_add(mg, ms);
 
+       ms->ms_trim = range_tree_create(NULL, NULL);
+
+       metaslab_group_add(mg, ms);
        metaslab_set_fragmentation(ms);
 
        /*
@@ -1428,13 +1866,16 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
         * out this txg. This ensures that we don't attempt to allocate
         * from it before we have initialized it completely.
         */
-       if (txg <= TXG_INITIAL)
+       if (txg <= TXG_INITIAL) {
                metaslab_sync_done(ms, 0);
+               metaslab_space_update(vd, mg->mg_class,
+                   metaslab_allocated_space(ms), 0, 0);
+       }
 
        /*
         * If metaslab_debug_load is set and we're initializing a metaslab
-        * that has an allocated space map object then load the its space
-        * map so that can verify frees.
+        * that has an allocated space map object then load the space map
+        * so that we can verify frees.
         */
        if (metaslab_debug_load && ms->ms_sm != NULL) {
                mutex_enter(&ms->ms_lock);
@@ -1456,16 +1897,19 @@ void
 metaslab_fini(metaslab_t *msp)
 {
        metaslab_group_t *mg = msp->ms_group;
+       vdev_t *vd = mg->mg_vd;
 
        metaslab_group_remove(mg, msp);
 
        mutex_enter(&msp->ms_lock);
        VERIFY(msp->ms_group == NULL);
-       vdev_space_update(mg->mg_vd, -space_map_allocated(msp->ms_sm),
-           0, -msp->ms_size);
+       metaslab_space_update(vd, mg->mg_class,
+           -metaslab_allocated_space(msp), 0, -msp->ms_size);
+
        space_map_close(msp->ms_sm);
 
        metaslab_unload(msp);
+
        range_tree_destroy(msp->ms_allocatable);
        range_tree_destroy(msp->ms_freeing);
        range_tree_destroy(msp->ms_freed);
@@ -1481,10 +1925,17 @@ metaslab_fini(metaslab_t *msp)
 
        range_tree_destroy(msp->ms_checkpointing);
 
+       for (int t = 0; t < TXG_SIZE; t++)
+               ASSERT(!txg_list_member(&vd->vdev_ms_list, msp, t));
+
+       range_tree_vacate(msp->ms_trim, NULL, NULL);
+       range_tree_destroy(msp->ms_trim);
+
        mutex_exit(&msp->ms_lock);
        cv_destroy(&msp->ms_load_cv);
        mutex_destroy(&msp->ms_lock);
        mutex_destroy(&msp->ms_sync_lock);
+       ASSERT3U(msp->ms_allocator, ==, -1);
 
        kmem_free(msp, sizeof (metaslab_t));
 }
@@ -1495,7 +1946,7 @@ metaslab_fini(metaslab_t *msp)
  * This table defines a segment size based fragmentation metric that will
  * allow each metaslab to derive its own fragmentation value. This is done
  * by calculating the space in each bucket of the spacemap histogram and
- * multiplying that by the fragmetation metric in this table. Doing
+ * multiplying that by the fragmentation metric in this table. Doing
  * this for all buckets and dividing it by the total amount of free
  * space in this metaslab (i.e. the total free space in all buckets) gives
  * us the fragmentation metric. This means that a high fragmentation metric
@@ -1530,10 +1981,10 @@ int zfs_frag_table[FRAGMENTATION_TABLE_SIZE] = {
 };
 
 /*
- * Calclate the metaslab's fragmentation metric. A return value
- * of ZFS_FRAG_INVALID means that the metaslab has not been upgraded and does
- * not support this metric. Otherwise, the return value should be in the
- * range [0, 100].
+ * Calculate the metaslab's fragmentation metric and set ms_fragmentation.
+ * Setting this value to ZFS_FRAG_INVALID means that the metaslab has not
+ * been upgraded and does not support this metric. Otherwise, the return
+ * value should be in the range [0, 100].
  */
 static void
 metaslab_set_fragmentation(metaslab_t *msp)
@@ -1626,7 +2077,7 @@ metaslab_space_weight(metaslab_t *msp)
        /*
         * The baseline weight is the metaslab's free space.
         */
-       space = msp->ms_size - space_map_allocated(msp->ms_sm);
+       space = msp->ms_size - metaslab_allocated_space(msp);
 
        if (metaslab_fragmentation_factor_enabled &&
            msp->ms_fragmentation != ZFS_FRAG_INVALID) {
@@ -1730,14 +2181,38 @@ metaslab_weight_from_range_tree(metaslab_t *msp)
 static uint64_t
 metaslab_weight_from_spacemap(metaslab_t *msp)
 {
-       uint64_t weight = 0;
+       space_map_t *sm = msp->ms_sm;
+       ASSERT(!msp->ms_loaded);
+       ASSERT(sm != NULL);
+       ASSERT3U(space_map_object(sm), !=, 0);
+       ASSERT3U(sm->sm_dbuf->db_size, ==, sizeof (space_map_phys_t));
+
+       /*
+        * Create a joint histogram from all the segments that have made
+        * it to the metaslab's space map histogram, that are not yet
+        * available for allocation because they are still in the freeing
+        * pipeline (e.g. freeing, freed, and defer trees). Then subtract
+        * these segments from the space map's histogram to get a more
+        * accurate weight.
+        */
+       uint64_t deferspace_histogram[SPACE_MAP_HISTOGRAM_SIZE] = {0};
+       for (int i = 0; i < SPACE_MAP_HISTOGRAM_SIZE; i++)
+               deferspace_histogram[i] += msp->ms_synchist[i];
+       for (int t = 0; t < TXG_DEFER_SIZE; t++) {
+               for (int i = 0; i < SPACE_MAP_HISTOGRAM_SIZE; i++) {
+                       deferspace_histogram[i] += msp->ms_deferhist[t][i];
+               }
+       }
 
+       uint64_t weight = 0;
        for (int i = SPACE_MAP_HISTOGRAM_SIZE - 1; i >= 0; i--) {
-               if (msp->ms_sm->sm_phys->smp_histogram[i] != 0) {
-                       WEIGHT_SET_COUNT(weight,
-                           msp->ms_sm->sm_phys->smp_histogram[i]);
-                       WEIGHT_SET_INDEX(weight, i +
-                           msp->ms_sm->sm_shift);
+               ASSERT3U(sm->sm_phys->smp_histogram[i], >=,
+                   deferspace_histogram[i]);
+               uint64_t count =
+                   sm->sm_phys->smp_histogram[i] - deferspace_histogram[i];
+               if (count != 0) {
+                       WEIGHT_SET_COUNT(weight, count);
+                       WEIGHT_SET_INDEX(weight, i + sm->sm_shift);
                        WEIGHT_SET_ACTIVE(weight, 0);
                        break;
                }
@@ -1762,7 +2237,7 @@ metaslab_segment_weight(metaslab_t *msp)
        /*
         * The metaslab is completely free.
         */
-       if (space_map_allocated(msp->ms_sm) == 0) {
+       if (metaslab_allocated_space(msp) == 0) {
                int idx = highbit64(msp->ms_size) - 1;
                int max_idx = SPACE_MAP_HISTOGRAM_SIZE + shift - 1;
 
@@ -1784,7 +2259,7 @@ metaslab_segment_weight(metaslab_t *msp)
        /*
         * If the metaslab is fully allocated then just make the weight 0.
         */
-       if (space_map_allocated(msp->ms_sm) == msp->ms_size)
+       if (metaslab_allocated_space(msp) == msp->ms_size)
                return (0);
        /*
         * If the metaslab is already loaded, then use the range tree to
@@ -1864,6 +2339,8 @@ metaslab_weight(metaslab_t *msp)
         */
        if (msp->ms_loaded)
                msp->ms_max_size = metaslab_block_maxsize(msp);
+       else
+               ASSERT0(msp->ms_max_size);
 
        /*
         * Segment-based weighting requires space map histogram support.
@@ -1879,19 +2356,65 @@ metaslab_weight(metaslab_t *msp)
        return (weight);
 }
 
+void
+metaslab_recalculate_weight_and_sort(metaslab_t *msp)
+{
+       /* note: we preserve the mask (e.g. indication of primary, etc..) */
+       uint64_t was_active = msp->ms_weight & METASLAB_ACTIVE_MASK;
+       metaslab_group_sort(msp->ms_group, msp,
+           metaslab_weight(msp) | was_active);
+}
+
 static int
-metaslab_activate(metaslab_t *msp, uint64_t activation_weight)
+metaslab_activate_allocator(metaslab_group_t *mg, metaslab_t *msp,
+    int allocator, uint64_t activation_weight)
+{
+       /*
+        * If we're activating for the claim code, we don't want to actually
+        * set the metaslab up for a specific allocator.
+        */
+       if (activation_weight == METASLAB_WEIGHT_CLAIM)
+               return (0);
+       metaslab_t **arr = (activation_weight == METASLAB_WEIGHT_PRIMARY ?
+           mg->mg_primaries : mg->mg_secondaries);
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+       mutex_enter(&mg->mg_lock);
+       if (arr[allocator] != NULL) {
+               mutex_exit(&mg->mg_lock);
+               return (EEXIST);
+       }
+
+       arr[allocator] = msp;
+       ASSERT3S(msp->ms_allocator, ==, -1);
+       msp->ms_allocator = allocator;
+       msp->ms_primary = (activation_weight == METASLAB_WEIGHT_PRIMARY);
+       mutex_exit(&mg->mg_lock);
+
+       return (0);
+}
+
+static int
+metaslab_activate(metaslab_t *msp, int allocator, uint64_t activation_weight)
 {
        ASSERT(MUTEX_HELD(&msp->ms_lock));
 
        if ((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0) {
-               metaslab_load_wait(msp);
-               if (!msp->ms_loaded) {
-                       int error = metaslab_load(msp);
-                       if (error) {
-                               metaslab_group_sort(msp->ms_group, msp, 0);
-                               return (error);
-                       }
+               int error = metaslab_load(msp);
+               if (error != 0) {
+                       metaslab_group_sort(msp->ms_group, msp, 0);
+                       return (error);
+               }
+               if ((msp->ms_weight & METASLAB_ACTIVE_MASK) != 0) {
+                       /*
+                        * The metaslab was activated for another allocator
+                        * while we were waiting, we should reselect.
+                        */
+                       return (SET_ERROR(EBUSY));
+               }
+               if ((error = metaslab_activate_allocator(msp->ms_group, msp,
+                   allocator, activation_weight)) != 0) {
+                       return (error);
                }
 
                msp->ms_activation_weight = msp->ms_weight;
@@ -1904,6 +2427,34 @@ metaslab_activate(metaslab_t *msp, uint64_t activation_weight)
        return (0);
 }
 
+static void
+metaslab_passivate_allocator(metaslab_group_t *mg, metaslab_t *msp,
+    uint64_t weight)
+{
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+       if (msp->ms_weight & METASLAB_WEIGHT_CLAIM) {
+               metaslab_group_sort(mg, msp, weight);
+               return;
+       }
+
+       mutex_enter(&mg->mg_lock);
+       ASSERT3P(msp->ms_group, ==, mg);
+       if (msp->ms_primary) {
+               ASSERT3U(0, <=, msp->ms_allocator);
+               ASSERT3U(msp->ms_allocator, <, mg->mg_allocators);
+               ASSERT3P(mg->mg_primaries[msp->ms_allocator], ==, msp);
+               ASSERT(msp->ms_weight & METASLAB_WEIGHT_PRIMARY);
+               mg->mg_primaries[msp->ms_allocator] = NULL;
+       } else {
+               ASSERT(msp->ms_weight & METASLAB_WEIGHT_SECONDARY);
+               ASSERT3P(mg->mg_secondaries[msp->ms_allocator], ==, msp);
+               mg->mg_secondaries[msp->ms_allocator] = NULL;
+       }
+       msp->ms_allocator = -1;
+       metaslab_group_sort_impl(mg, msp, weight);
+       mutex_exit(&mg->mg_lock);
+}
+
 static void
 metaslab_passivate(metaslab_t *msp, uint64_t weight)
 {
@@ -1920,7 +2471,7 @@ metaslab_passivate(metaslab_t *msp, uint64_t weight)
        ASSERT0(weight & METASLAB_ACTIVE_MASK);
 
        msp->ms_activation_weight = 0;
-       metaslab_group_sort(msp->ms_group, msp, weight);
+       metaslab_passivate_allocator(msp->ms_group, msp, weight);
        ASSERT((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0);
 }
 
@@ -1966,9 +2517,7 @@ metaslab_preload(void *arg)
        ASSERT(!MUTEX_HELD(&msp->ms_group->mg_lock));
 
        mutex_enter(&msp->ms_lock);
-       metaslab_load_wait(msp);
-       if (!msp->ms_loaded)
-               (void) metaslab_load(msp);
+       (void) metaslab_load(msp);
        msp->ms_selected_txg = spa_syncing_txg(spa);
        mutex_exit(&msp->ms_lock);
        spl_fstrans_unmark(cookie);
@@ -2094,7 +2643,7 @@ metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
        ASSERT(msp->ms_loaded);
 
 
-       zfs_dbgmsg("condensing: txg %llu, msp[%llu] %p, vdev id %llu, "
+       zfs_dbgmsg("condensing: txg %llu, msp[%llu] %px, vdev id %llu, "
            "spa %s, smp size %llu, segments %lu, forcing condense=%s", txg,
            msp->ms_id, msp, msp->ms_group->mg_vd->vdev_id,
            msp->ms_group->mg_vd->vdev_spa->spa_name,
@@ -2188,6 +2737,7 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        ASSERT3P(msp->ms_freeing, !=, NULL);
        ASSERT3P(msp->ms_freed, !=, NULL);
        ASSERT3P(msp->ms_checkpointing, !=, NULL);
+       ASSERT3P(msp->ms_trim, !=, NULL);
 
        /*
         * Normally, we don't want to process a metaslab if there are no
@@ -2204,17 +2754,17 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        VERIFY(txg <= spa_final_dirty_txg(spa));
 
        /*
-        * The only state that can actually be changing concurrently with
-        * metaslab_sync() is the metaslab's ms_allocatable.  No other
-        * thread can be modifying this txg's alloc, freeing,
+        * The only state that can actually be changing concurrently
+        * with metaslab_sync() is the metaslab's ms_allocatable. No
+        * other thread can be modifying this txg's alloc, freeing,
         * freed, or space_map_phys_t.  We drop ms_lock whenever we
-        * could call into the DMU, because the DMU can call down to us
-        * (e.g. via zio_free()) at any time.
+        * could call into the DMU, because the DMU can call down to
+        * us (e.g. via zio_free()) at any time.
         *
         * The spa_vdev_remove_thread() can be reading metaslab state
-        * concurrently, and it is locked out by the ms_sync_lock.  Note
-        * that the ms_lock is insufficient for this, because it is dropped
-        * by space_map_write().
+        * concurrently, and it is locked out by the ms_sync_lock.
+        * Note that the ms_lock is insufficient for this, because it
+        * is dropped by space_map_write().
         */
        tx = dmu_tx_create_assigned(spa_get_dsl(spa), txg);
 
@@ -2226,7 +2776,9 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
 
                VERIFY0(space_map_open(&msp->ms_sm, mos, new_object,
                    msp->ms_start, msp->ms_size, vd->vdev_ashift));
+
                ASSERT(msp->ms_sm != NULL);
+               ASSERT0(metaslab_allocated_space(msp));
        }
 
        if (!range_tree_is_empty(msp->ms_checkpointing) &&
@@ -2274,6 +2826,11 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
                mutex_enter(&msp->ms_lock);
        }
 
+       msp->ms_allocated_space += range_tree_space(alloctree);
+       ASSERT3U(msp->ms_allocated_space, >=,
+           range_tree_space(msp->ms_freeing));
+       msp->ms_allocated_space -= range_tree_space(msp->ms_freeing);
+
        if (!range_tree_is_empty(msp->ms_checkpointing)) {
                ASSERT(spa_has_checkpoint(spa));
                ASSERT3P(vd->vdev_checkpoint_sm, !=, NULL);
@@ -2287,14 +2844,13 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
                space_map_write(vd->vdev_checkpoint_sm,
                    msp->ms_checkpointing, SM_FREE, SM_NO_VDEVID, tx);
                mutex_enter(&msp->ms_lock);
-               space_map_update(vd->vdev_checkpoint_sm);
 
                spa->spa_checkpoint_info.sci_dspace +=
                    range_tree_space(msp->ms_checkpointing);
                vd->vdev_stat.vs_checkpoint_space +=
                    range_tree_space(msp->ms_checkpointing);
                ASSERT3U(vd->vdev_stat.vs_checkpoint_space, ==,
-                   -vd->vdev_checkpoint_sm->sm_alloc);
+                   -space_map_allocated(vd->vdev_checkpoint_sm));
 
                range_tree_vacate(msp->ms_checkpointing, NULL, NULL);
        }
@@ -2339,6 +2895,7 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
         * time we load the space map.
         */
        space_map_histogram_add(msp->ms_sm, msp->ms_freeing, tx);
+       metaslab_aux_histograms_update(msp);
 
        metaslab_group_histogram_add(mg, msp);
        metaslab_group_histogram_verify(mg);
@@ -2346,16 +2903,18 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
 
        /*
         * For sync pass 1, we avoid traversing this txg's free range tree
-        * and instead will just swap the pointers for freeing and
-        * freed. We can safely do this since the freed_tree is
-        * guaranteed to be empty on the initial pass.
+        * and instead will just swap the pointers for freeing and freed.
+        * We can safely do this since the freed_tree is guaranteed to be
+        * empty on the initial pass.
         */
        if (spa_sync_pass(spa) == 1) {
                range_tree_swap(&msp->ms_freeing, &msp->ms_freed);
+               ASSERT0(msp->ms_allocated_this_txg);
        } else {
                range_tree_vacate(msp->ms_freeing,
                    range_tree_add, msp->ms_freed);
        }
+       msp->ms_allocated_this_txg += range_tree_space(alloctree);
        range_tree_vacate(alloctree, NULL, NULL);
 
        ASSERT0(range_tree_space(msp->ms_allocating[txg & TXG_MASK]));
@@ -2419,7 +2978,7 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                ASSERT3P(msp->ms_checkpointing, ==, NULL);
                msp->ms_checkpointing = range_tree_create(NULL, NULL);
 
-               vdev_space_update(vd, 0, 0, msp->ms_size);
+               metaslab_space_update(vd, mg->mg_class, 0, 0, msp->ms_size);
        }
        ASSERT0(range_tree_space(msp->ms_freeing));
        ASSERT0(range_tree_space(msp->ms_checkpointing));
@@ -2433,7 +2992,8 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
        }
 
        defer_delta = 0;
-       alloc_delta = space_map_alloc_delta(msp->ms_sm);
+       alloc_delta = msp->ms_allocated_this_txg -
+           range_tree_space(msp->ms_freed);
        if (defer_allowed) {
                defer_delta = range_tree_space(msp->ms_freed) -
                    range_tree_space(*defer_tree);
@@ -2441,7 +3001,8 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                defer_delta -= range_tree_space(*defer_tree);
        }
 
-       vdev_space_update(vd, alloc_delta + defer_delta, defer_delta, 0);
+       metaslab_space_update(vd, mg->mg_class, alloc_delta + defer_delta,
+           defer_delta, 0);
 
        /*
         * If there's a metaslab_load() in progress, wait for it to complete
@@ -2449,6 +3010,24 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
         */
        metaslab_load_wait(msp);
 
+       /*
+        * When auto-trimming is enabled, free ranges which are added to
+        * ms_allocatable are also be added to ms_trim.  The ms_trim tree is
+        * periodically consumed by the vdev_autotrim_thread() which issues
+        * trims for all ranges and then vacates the tree.  The ms_trim tree
+        * can be discarded at any time with the sole consequence of recent
+        * frees not being trimmed.
+        */
+       if (spa_get_autotrim(spa) == SPA_AUTOTRIM_ON) {
+               range_tree_walk(*defer_tree, range_tree_add, msp->ms_trim);
+               if (!defer_allowed) {
+                       range_tree_walk(msp->ms_freed, range_tree_add,
+                           msp->ms_trim);
+               }
+       } else {
+               range_tree_vacate(msp->ms_trim, NULL, NULL);
+       }
+
        /*
         * Move the frees from the defer_tree back to the free
         * range tree (if it's loaded). Swap the freed_tree and
@@ -2464,7 +3043,8 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                    msp->ms_loaded ? range_tree_add : NULL,
                    msp->ms_allocatable);
        }
-       space_map_update(msp->ms_sm);
+
+       msp->ms_synced_length = space_map_length(msp->ms_sm);
 
        msp->ms_deferspace += defer_delta;
        ASSERT3S(msp->ms_deferspace, >=, 0);
@@ -2476,24 +3056,37 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                 */
                vdev_dirty(vd, VDD_METASLAB, msp, txg + 1);
        }
+       metaslab_aux_histograms_update_done(msp, defer_allowed);
+
+       if (msp->ms_new) {
+               msp->ms_new = B_FALSE;
+               mutex_enter(&mg->mg_lock);
+               mg->mg_ms_ready++;
+               mutex_exit(&mg->mg_lock);
+       }
 
        /*
-        * Calculate the new weights before unloading any metaslabs.
-        * This will give us the most accurate weighting.
+        * Re-sort metaslab within its group now that we've adjusted
+        * its allocatable space.
         */
-       metaslab_group_sort(mg, msp, metaslab_weight(msp));
+       metaslab_recalculate_weight_and_sort(msp);
 
        /*
         * If the metaslab is loaded and we've not tried to load or allocate
         * from it in 'metaslab_unload_delay' txgs, then unload it.
         */
        if (msp->ms_loaded &&
+           msp->ms_disabled == 0 &&
            msp->ms_selected_txg + metaslab_unload_delay < txg) {
 
                for (int t = 1; t < TXG_CONCURRENT_STATES; t++) {
                        VERIFY0(range_tree_space(
                            msp->ms_allocating[(txg + t) & TXG_MASK]));
                }
+               if (msp->ms_allocator != -1) {
+                       metaslab_passivate(msp, msp->ms_weight &
+                           ~METASLAB_ACTIVE_MASK);
+               }
 
                if (!metaslab_debug_unload)
                        metaslab_unload(msp);
@@ -2504,6 +3097,7 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
        ASSERT0(range_tree_space(msp->ms_freed));
        ASSERT0(range_tree_space(msp->ms_checkpointing));
 
+       msp->ms_allocated_this_txg = 0;
        mutex_exit(&msp->ms_lock);
 }
 
@@ -2529,21 +3123,25 @@ metaslab_sync_reassess(metaslab_group_t *mg)
        spa_config_exit(spa, SCL_ALLOC, FTAG);
 }
 
-static uint64_t
-metaslab_distance(metaslab_t *msp, dva_t *dva)
+/*
+ * When writing a ditto block (i.e. more than one DVA for a given BP) on
+ * the same vdev as an existing DVA of this BP, then try to allocate it
+ * on a different metaslab than existing DVAs (i.e. a unique metaslab).
+ */
+static boolean_t
+metaslab_is_unique(metaslab_t *msp, dva_t *dva)
 {
-       uint64_t ms_shift = msp->ms_group->mg_vd->vdev_ms_shift;
-       uint64_t offset = DVA_GET_OFFSET(dva) >> ms_shift;
-       uint64_t start = msp->ms_id;
+       uint64_t dva_ms_id;
+
+       if (DVA_GET_ASIZE(dva) == 0)
+               return (B_TRUE);
 
        if (msp->ms_group->mg_vd->vdev_id != DVA_GET_VDEV(dva))
-               return (1ULL << 63);
+               return (B_TRUE);
 
-       if (offset < start)
-               return ((start - offset) << ms_shift);
-       if (offset > start)
-               return ((offset - start) << ms_shift);
-       return (0);
+       dva_ms_id = DVA_GET_OFFSET(dva) >> msp->ms_group->mg_vd->vdev_ms_shift;
+
+       return (msp->ms_id != dva_ms_id);
 }
 
 /*
@@ -2588,7 +3186,8 @@ metaslab_alloc_trace_fini(void)
  */
 static void
 metaslab_trace_add(zio_alloc_list_t *zal, metaslab_group_t *mg,
-    metaslab_t *msp, uint64_t psize, uint32_t dva_id, uint64_t offset)
+    metaslab_t *msp, uint64_t psize, uint32_t dva_id, uint64_t offset,
+    int allocator)
 {
        metaslab_alloc_trace_t *mat;
 
@@ -2622,6 +3221,7 @@ metaslab_trace_add(zio_alloc_list_t *zal, metaslab_group_t *mg,
        mat->mat_dva_id = dva_id;
        mat->mat_offset = offset;
        mat->mat_weight = 0;
+       mat->mat_allocator = allocator;
 
        if (msp != NULL)
                mat->mat_weight = msp->ms_weight;
@@ -2656,7 +3256,7 @@ metaslab_trace_fini(zio_alloc_list_t *zal)
 }
 #else
 
-#define        metaslab_trace_add(zal, mg, msp, psize, id, off)
+#define        metaslab_trace_add(zal, mg, msp, psize, id, off, alloc)
 
 void
 metaslab_alloc_trace_init(void)
@@ -2687,35 +3287,56 @@ metaslab_trace_fini(zio_alloc_list_t *zal)
  */
 
 static void
-metaslab_group_alloc_increment(spa_t *spa, uint64_t vdev, void *tag, int flags)
+metaslab_group_alloc_increment(spa_t *spa, uint64_t vdev, void *tag, int flags,
+    int allocator)
 {
        if (!(flags & METASLAB_ASYNC_ALLOC) ||
-           flags & METASLAB_DONT_THROTTLE)
+           (flags & METASLAB_DONT_THROTTLE))
                return;
 
        metaslab_group_t *mg = vdev_lookup_top(spa, vdev)->vdev_mg;
        if (!mg->mg_class->mc_alloc_throttle_enabled)
                return;
 
-       (void) refcount_add(&mg->mg_alloc_queue_depth, tag);
+       (void) zfs_refcount_add(&mg->mg_alloc_queue_depth[allocator], tag);
+}
+
+static void
+metaslab_group_increment_qdepth(metaslab_group_t *mg, int allocator)
+{
+       uint64_t max = mg->mg_max_alloc_queue_depth;
+       uint64_t cur = mg->mg_cur_max_alloc_queue_depth[allocator];
+       while (cur < max) {
+               if (atomic_cas_64(&mg->mg_cur_max_alloc_queue_depth[allocator],
+                   cur, cur + 1) == cur) {
+                       atomic_inc_64(
+                           &mg->mg_class->mc_alloc_max_slots[allocator]);
+                       return;
+               }
+               cur = mg->mg_cur_max_alloc_queue_depth[allocator];
+       }
 }
 
 void
-metaslab_group_alloc_decrement(spa_t *spa, uint64_t vdev, void *tag, int flags)
+metaslab_group_alloc_decrement(spa_t *spa, uint64_t vdev, void *tag, int flags,
+    int allocator, boolean_t io_complete)
 {
        if (!(flags & METASLAB_ASYNC_ALLOC) ||
-           flags & METASLAB_DONT_THROTTLE)
+           (flags & METASLAB_DONT_THROTTLE))
                return;
 
        metaslab_group_t *mg = vdev_lookup_top(spa, vdev)->vdev_mg;
        if (!mg->mg_class->mc_alloc_throttle_enabled)
                return;
 
-       (void) refcount_remove(&mg->mg_alloc_queue_depth, tag);
+       (void) zfs_refcount_remove(&mg->mg_alloc_queue_depth[allocator], tag);
+       if (io_complete)
+               metaslab_group_increment_qdepth(mg, allocator);
 }
 
 void
-metaslab_group_alloc_verify(spa_t *spa, const blkptr_t *bp, void *tag)
+metaslab_group_alloc_verify(spa_t *spa, const blkptr_t *bp, void *tag,
+    int allocator)
 {
 #ifdef ZFS_DEBUG
        const dva_t *dva = bp->blk_dva;
@@ -2724,7 +3345,8 @@ metaslab_group_alloc_verify(spa_t *spa, const blkptr_t *bp, void *tag)
        for (int d = 0; d < ndvas; d++) {
                uint64_t vdev = DVA_GET_VDEV(&dva[d]);
                metaslab_group_t *mg = vdev_lookup_top(spa, vdev)->vdev_mg;
-               VERIFY(refcount_not_held(&mg->mg_alloc_queue_depth, tag));
+               VERIFY(zfs_refcount_not_held(
+                   &mg->mg_alloc_queue_depth[allocator], tag));
        }
 #endif
 }
@@ -2737,6 +3359,7 @@ metaslab_block_alloc(metaslab_t *msp, uint64_t size, uint64_t txg)
        metaslab_class_t *mc = msp->ms_group->mg_class;
 
        VERIFY(!msp->ms_condensing);
+       VERIFY0(msp->ms_disabled);
 
        start = mc->mc_ops->msop_alloc(msp, size);
        if (start != -1ULL) {
@@ -2747,6 +3370,7 @@ metaslab_block_alloc(metaslab_t *msp, uint64_t size, uint64_t txg)
                VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
                VERIFY3U(range_tree_space(rt) - size, <=, msp->ms_size);
                range_tree_remove(rt, start, size);
+               range_tree_clear(msp->ms_trim, start, size);
 
                if (range_tree_is_empty(msp->ms_allocating[txg & TXG_MASK]))
                        vdev_dirty(mg->mg_vd, VDD_METASLAB, msp, txg);
@@ -2766,91 +3390,141 @@ metaslab_block_alloc(metaslab_t *msp, uint64_t size, uint64_t txg)
        return (start);
 }
 
+/*
+ * Find the metaslab with the highest weight that is less than what we've
+ * already tried.  In the common case, this means that we will examine each
+ * metaslab at most once. Note that concurrent callers could reorder metaslabs
+ * by activation/passivation once we have dropped the mg_lock. If a metaslab is
+ * activated by another thread, and we fail to allocate from the metaslab we
+ * have selected, we may not try the newly-activated metaslab, and instead
+ * activate another metaslab.  This is not optimal, but generally does not cause
+ * any problems (a possible exception being if every metaslab is completely full
+ * except for the the newly-activated metaslab which we fail to examine).
+ */
+static metaslab_t *
+find_valid_metaslab(metaslab_group_t *mg, uint64_t activation_weight,
+    dva_t *dva, int d, boolean_t want_unique, uint64_t asize, int allocator,
+    zio_alloc_list_t *zal, metaslab_t *search, boolean_t *was_active)
+{
+       avl_index_t idx;
+       avl_tree_t *t = &mg->mg_metaslab_tree;
+       metaslab_t *msp = avl_find(t, search, &idx);
+       if (msp == NULL)
+               msp = avl_nearest(t, idx, AVL_AFTER);
+
+       for (; msp != NULL; msp = AVL_NEXT(t, msp)) {
+               int i;
+               if (!metaslab_should_allocate(msp, asize)) {
+                       metaslab_trace_add(zal, mg, msp, asize, d,
+                           TRACE_TOO_SMALL, allocator);
+                       continue;
+               }
+
+               /*
+                * If the selected metaslab is condensing or disabled,
+                * skip it.
+                */
+               if (msp->ms_condensing || msp->ms_disabled > 0)
+                       continue;
+
+               *was_active = msp->ms_allocator != -1;
+               /*
+                * If we're activating as primary, this is our first allocation
+                * from this disk, so we don't need to check how close we are.
+                * If the metaslab under consideration was already active,
+                * we're getting desperate enough to steal another allocator's
+                * metaslab, so we still don't care about distances.
+                */
+               if (activation_weight == METASLAB_WEIGHT_PRIMARY || *was_active)
+                       break;
+
+               for (i = 0; i < d; i++) {
+                       if (want_unique &&
+                           !metaslab_is_unique(msp, &dva[i]))
+                               break;  /* try another metaslab */
+               }
+               if (i == d)
+                       break;
+       }
+
+       if (msp != NULL) {
+               search->ms_weight = msp->ms_weight;
+               search->ms_start = msp->ms_start + 1;
+               search->ms_allocator = msp->ms_allocator;
+               search->ms_primary = msp->ms_primary;
+       }
+       return (msp);
+}
+
+/* ARGSUSED */
 static uint64_t
 metaslab_group_alloc_normal(metaslab_group_t *mg, zio_alloc_list_t *zal,
-    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
+    uint64_t asize, uint64_t txg, boolean_t want_unique, dva_t *dva,
+    int d, int allocator)
 {
        metaslab_t *msp = NULL;
        uint64_t offset = -1ULL;
        uint64_t activation_weight;
-       uint64_t target_distance;
-       int i;
 
        activation_weight = METASLAB_WEIGHT_PRIMARY;
-       for (i = 0; i < d; i++) {
-               if (DVA_GET_VDEV(&dva[i]) == mg->mg_vd->vdev_id) {
+       for (int i = 0; i < d; i++) {
+               if (activation_weight == METASLAB_WEIGHT_PRIMARY &&
+                   DVA_GET_VDEV(&dva[i]) == mg->mg_vd->vdev_id) {
                        activation_weight = METASLAB_WEIGHT_SECONDARY;
+               } else if (activation_weight == METASLAB_WEIGHT_SECONDARY &&
+                   DVA_GET_VDEV(&dva[i]) == mg->mg_vd->vdev_id) {
+                       activation_weight = METASLAB_WEIGHT_CLAIM;
                        break;
                }
        }
 
+       /*
+        * If we don't have enough metaslabs active to fill the entire array, we
+        * just use the 0th slot.
+        */
+       if (mg->mg_ms_ready < mg->mg_allocators * 3)
+               allocator = 0;
+
+       ASSERT3U(mg->mg_vd->vdev_ms_count, >=, 2);
+
        metaslab_t *search = kmem_alloc(sizeof (*search), KM_SLEEP);
        search->ms_weight = UINT64_MAX;
        search->ms_start = 0;
+       /*
+        * At the end of the metaslab tree are the already-active metaslabs,
+        * first the primaries, then the secondaries. When we resume searching
+        * through the tree, we need to consider ms_allocator and ms_primary so
+        * we start in the location right after where we left off, and don't
+        * accidentally loop forever considering the same metaslabs.
+        */
+       search->ms_allocator = -1;
+       search->ms_primary = B_TRUE;
        for (;;) {
-               boolean_t was_active;
-               avl_tree_t *t = &mg->mg_metaslab_tree;
-               avl_index_t idx;
+               boolean_t was_active = B_FALSE;
 
                mutex_enter(&mg->mg_lock);
 
-               /*
-                * Find the metaslab with the highest weight that is less
-                * than what we've already tried.  In the common case, this
-                * means that we will examine each metaslab at most once.
-                * Note that concurrent callers could reorder metaslabs
-                * by activation/passivation once we have dropped the mg_lock.
-                * If a metaslab is activated by another thread, and we fail
-                * to allocate from the metaslab we have selected, we may
-                * not try the newly-activated metaslab, and instead activate
-                * another metaslab.  This is not optimal, but generally
-                * does not cause any problems (a possible exception being
-                * if every metaslab is completely full except for the
-                * the newly-activated metaslab which we fail to examine).
-                */
-               msp = avl_find(t, search, &idx);
-               if (msp == NULL)
-                       msp = avl_nearest(t, idx, AVL_AFTER);
-               for (; msp != NULL; msp = AVL_NEXT(t, msp)) {
-
-                       if (!metaslab_should_allocate(msp, asize)) {
-                               metaslab_trace_add(zal, mg, msp, asize, d,
-                                   TRACE_TOO_SMALL);
-                               continue;
-                       }
-
-                       /*
-                        * If the selected metaslab is condensing, skip it.
-                        */
-                       if (msp->ms_condensing)
-                               continue;
-
-                       was_active = msp->ms_weight & METASLAB_ACTIVE_MASK;
-                       if (activation_weight == METASLAB_WEIGHT_PRIMARY)
-                               break;
-
-                       target_distance = min_distance +
-                           (space_map_allocated(msp->ms_sm) != 0 ? 0 :
-                           min_distance >> 1);
-
-                       for (i = 0; i < d; i++) {
-                               if (metaslab_distance(msp, &dva[i]) <
-                                   target_distance)
-                                       break;
-                       }
-                       if (i == d)
-                               break;
+               if (activation_weight == METASLAB_WEIGHT_PRIMARY &&
+                   mg->mg_primaries[allocator] != NULL) {
+                       msp = mg->mg_primaries[allocator];
+                       was_active = B_TRUE;
+               } else if (activation_weight == METASLAB_WEIGHT_SECONDARY &&
+                   mg->mg_secondaries[allocator] != NULL) {
+                       msp = mg->mg_secondaries[allocator];
+                       was_active = B_TRUE;
+               } else {
+                       msp = find_valid_metaslab(mg, activation_weight, dva, d,
+                           want_unique, asize, allocator, zal, search,
+                           &was_active);
                }
+
                mutex_exit(&mg->mg_lock);
                if (msp == NULL) {
                        kmem_free(search, sizeof (*search));
                        return (-1ULL);
                }
-               search->ms_weight = msp->ms_weight;
-               search->ms_start = msp->ms_start + 1;
 
                mutex_enter(&msp->ms_lock);
-
                /*
                 * Ensure that the metaslab we have selected is still
                 * capable of handling our request. It's possible that
@@ -2864,18 +3538,33 @@ metaslab_group_alloc_normal(metaslab_group_t *mg, zio_alloc_list_t *zal,
                        continue;
                }
 
-               if ((msp->ms_weight & METASLAB_WEIGHT_SECONDARY) &&
-                   activation_weight == METASLAB_WEIGHT_PRIMARY) {
-                       metaslab_passivate(msp,
-                           msp->ms_weight & ~METASLAB_ACTIVE_MASK);
+               /*
+                * If the metaslab is freshly activated for an allocator that
+                * isn't the one we're allocating from, or if it's a primary and
+                * we're seeking a secondary (or vice versa), we go back and
+                * select a new metaslab.
+                */
+               if (!was_active && (msp->ms_weight & METASLAB_ACTIVE_MASK) &&
+                   (msp->ms_allocator != -1) &&
+                   (msp->ms_allocator != allocator || ((activation_weight ==
+                   METASLAB_WEIGHT_PRIMARY) != msp->ms_primary))) {
                        mutex_exit(&msp->ms_lock);
                        continue;
                }
 
-               if (metaslab_activate(msp, activation_weight) != 0) {
+               if (msp->ms_weight & METASLAB_WEIGHT_CLAIM &&
+                   activation_weight != METASLAB_WEIGHT_CLAIM) {
+                       metaslab_passivate(msp, msp->ms_weight &
+                           ~METASLAB_WEIGHT_CLAIM);
                        mutex_exit(&msp->ms_lock);
                        continue;
                }
+
+               if (metaslab_activate(msp, allocator, activation_weight) != 0) {
+                       mutex_exit(&msp->ms_lock);
+                       continue;
+               }
+
                msp->ms_selected_txg = txg;
 
                /*
@@ -2888,7 +3577,7 @@ metaslab_group_alloc_normal(metaslab_group_t *mg, zio_alloc_list_t *zal,
                if (!metaslab_should_allocate(msp, asize)) {
                        /* Passivate this metaslab and select a new one. */
                        metaslab_trace_add(zal, mg, msp, asize, d,
-                           TRACE_TOO_SMALL);
+                           TRACE_TOO_SMALL, allocator);
                        goto next;
                }
 
@@ -2896,17 +3585,28 @@ metaslab_group_alloc_normal(metaslab_group_t *mg, zio_alloc_list_t *zal,
                /*
                 * If this metaslab is currently condensing then pick again as
                 * we can't manipulate this metaslab until it's committed
-                * to disk.
+                * to disk. If this metaslab is being initialized, we shouldn't
+                * allocate from it since the allocated region might be
+                * overwritten after allocation.
                 */
                if (msp->ms_condensing) {
                        metaslab_trace_add(zal, mg, msp, asize, d,
-                           TRACE_CONDENSING);
+                           TRACE_CONDENSING, allocator);
+                       metaslab_passivate(msp, msp->ms_weight &
+                           ~METASLAB_ACTIVE_MASK);
+                       mutex_exit(&msp->ms_lock);
+                       continue;
+               } else if (msp->ms_disabled > 0) {
+                       metaslab_trace_add(zal, mg, msp, asize, d,
+                           TRACE_DISABLED, allocator);
+                       metaslab_passivate(msp, msp->ms_weight &
+                           ~METASLAB_ACTIVE_MASK);
                        mutex_exit(&msp->ms_lock);
                        continue;
                }
 
                offset = metaslab_block_alloc(msp, asize, txg);
-               metaslab_trace_add(zal, mg, msp, asize, d, offset);
+               metaslab_trace_add(zal, mg, msp, asize, d, offset, allocator);
 
                if (offset != -1ULL) {
                        /* Proactively passivate the metaslab, if needed */
@@ -2953,6 +3653,7 @@ next:
                 * metaslab.
                 */
                ASSERT(!metaslab_should_allocate(msp, asize));
+
                mutex_exit(&msp->ms_lock);
        }
        mutex_exit(&msp->ms_lock);
@@ -2962,19 +3663,20 @@ next:
 
 static uint64_t
 metaslab_group_alloc(metaslab_group_t *mg, zio_alloc_list_t *zal,
-    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
+    uint64_t asize, uint64_t txg, boolean_t want_unique, dva_t *dva,
+    int d, int allocator)
 {
        uint64_t offset;
        ASSERT(mg->mg_initialized);
 
-       offset = metaslab_group_alloc_normal(mg, zal, asize, txg,
-           min_distance, dva, d);
+       offset = metaslab_group_alloc_normal(mg, zal, asize, txg, want_unique,
+           dva, d, allocator);
 
        mutex_enter(&mg->mg_lock);
        if (offset == -1ULL) {
                mg->mg_failed_allocations++;
                metaslab_trace_add(zal, mg, NULL, asize, d,
-                   TRACE_GROUP_FAILURE);
+                   TRACE_GROUP_FAILURE, allocator);
                if (asize == SPA_GANGBLOCKSIZE) {
                        /*
                         * This metaslab group was unable to allocate
@@ -2995,21 +3697,13 @@ metaslab_group_alloc(metaslab_group_t *mg, zio_alloc_list_t *zal,
        return (offset);
 }
 
-/*
- * If we have to write a ditto block (i.e. more than one DVA for a given BP)
- * on the same vdev as an existing DVA of this BP, then try to allocate it
- * at least (vdev_asize / (2 ^ ditto_same_vdev_distance_shift)) away from the
- * existing DVAs.
- */
-int ditto_same_vdev_distance_shift = 3;
-
 /*
  * Allocate a block for the specified i/o.
  */
 int
 metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
     dva_t *dva, int d, dva_t *hintdva, uint64_t txg, int flags,
-    zio_alloc_list_t *zal)
+    zio_alloc_list_t *zal, int allocator)
 {
        metaslab_group_t *mg, *fast_mg, *rotor;
        vdev_t *vd;
@@ -3019,9 +3713,14 @@ metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
 
        /*
         * For testing, make some blocks above a certain size be gang blocks.
+        * This will result in more split blocks when using device removal,
+        * and a large number of split blocks coupled with ztest-induced
+        * damage can result in extremely long reconstruction times.  This
+        * will also test spilling from special to normal.
         */
-       if (psize >= metaslab_force_ganging && (ddi_get_lbolt() & 3) == 0) {
-               metaslab_trace_add(zal, NULL, NULL, psize, d, TRACE_FORCE_GANG);
+       if (psize >= metaslab_force_ganging && (spa_get_random(100) < 3)) {
+               metaslab_trace_add(zal, NULL, NULL, psize, d, TRACE_FORCE_GANG,
+                   allocator);
                return (SET_ERROR(ENOSPC));
        }
 
@@ -3078,6 +3777,7 @@ metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
                } while ((fast_mg = fast_mg->mg_next) != mc->mc_rotor);
 
        } else {
+               ASSERT(mc->mc_rotor != NULL);
                mg = mc->mc_rotor;
        }
 
@@ -3116,12 +3816,12 @@ top:
                 */
                if (allocatable && !GANG_ALLOCATION(flags) && !try_hard) {
                        allocatable = metaslab_group_allocatable(mg, rotor,
-                           psize);
+                           psize, allocator, d);
                }
 
                if (!allocatable) {
                        metaslab_trace_add(zal, mg, NULL, psize, d,
-                           TRACE_NOT_ALLOCATABLE);
+                           TRACE_NOT_ALLOCATABLE, allocator);
                        goto next;
                }
 
@@ -3136,31 +3836,23 @@ top:
                    vd->vdev_state < VDEV_STATE_HEALTHY) &&
                    d == 0 && !try_hard && vd->vdev_children == 0) {
                        metaslab_trace_add(zal, mg, NULL, psize, d,
-                           TRACE_VDEV_ERROR);
+                           TRACE_VDEV_ERROR, allocator);
                        goto next;
                }
 
                ASSERT(mg->mg_class == mc);
 
-               /*
-                * If we don't need to try hard, then require that the
-                * block be 1/8th of the device away from any other DVAs
-                * in this BP.  If we are trying hard, allow any offset
-                * to be used (distance=0).
-                */
-               uint64_t distance = 0;
-               if (!try_hard) {
-                       distance = vd->vdev_asize >>
-                           ditto_same_vdev_distance_shift;
-                       if (distance <= (1ULL << vd->vdev_ms_shift))
-                               distance = 0;
-               }
-
                uint64_t asize = vdev_psize_to_asize(vd, psize);
                ASSERT(P2PHASE(asize, 1ULL << vd->vdev_ashift) == 0);
 
+               /*
+                * If we don't need to try hard, then require that the
+                * block be on an different metaslab from any other DVAs
+                * in this BP (unique=true).  If we are trying hard, then
+                * allow any metaslab to be used (unique=false).
+                */
                uint64_t offset = metaslab_group_alloc(mg, zal, asize, txg,
-                   distance, dva, d);
+                   !try_hard, dva, d, allocator);
 
                if (offset != -1ULL) {
                        /*
@@ -3244,7 +3936,7 @@ next:
 
        bzero(&dva[d], sizeof (dva_t));
 
-       metaslab_trace_add(zal, rotor, NULL, psize, d, TRACE_ENOSPC);
+       metaslab_trace_add(zal, rotor, NULL, psize, d, TRACE_ENOSPC, allocator);
        return (SET_ERROR(ENOSPC));
 }
 
@@ -3545,26 +4237,31 @@ metaslab_free_dva(spa_t *spa, const dva_t *dva, boolean_t checkpoint)
  * the reservation.
  */
 boolean_t
-metaslab_class_throttle_reserve(metaslab_class_t *mc, int slots, zio_t *zio,
-    int flags)
+metaslab_class_throttle_reserve(metaslab_class_t *mc, int slots, int allocator,
+    zio_t *zio, int flags)
 {
        uint64_t available_slots = 0;
        boolean_t slot_reserved = B_FALSE;
+       uint64_t max = mc->mc_alloc_max_slots[allocator];
 
        ASSERT(mc->mc_alloc_throttle_enabled);
        mutex_enter(&mc->mc_lock);
 
-       uint64_t reserved_slots = refcount_count(&mc->mc_alloc_slots);
-       if (reserved_slots < mc->mc_alloc_max_slots)
-               available_slots = mc->mc_alloc_max_slots - reserved_slots;
+       uint64_t reserved_slots =
+           zfs_refcount_count(&mc->mc_alloc_slots[allocator]);
+       if (reserved_slots < max)
+               available_slots = max - reserved_slots;
 
-       if (slots <= available_slots || GANG_ALLOCATION(flags)) {
+       if (slots <= available_slots || GANG_ALLOCATION(flags) ||
+           flags & METASLAB_MUST_RESERVE) {
                /*
                 * We reserve the slots individually so that we can unreserve
                 * them individually when an I/O completes.
                 */
                for (int d = 0; d < slots; d++) {
-                       reserved_slots = refcount_add(&mc->mc_alloc_slots, zio);
+                       reserved_slots =
+                           zfs_refcount_add(&mc->mc_alloc_slots[allocator],
+                           zio);
                }
                zio->io_flags |= ZIO_FLAG_IO_ALLOCATING;
                slot_reserved = B_TRUE;
@@ -3575,12 +4272,14 @@ metaslab_class_throttle_reserve(metaslab_class_t *mc, int slots, zio_t *zio,
 }
 
 void
-metaslab_class_throttle_unreserve(metaslab_class_t *mc, int slots, zio_t *zio)
+metaslab_class_throttle_unreserve(metaslab_class_t *mc, int slots,
+    int allocator, zio_t *zio)
 {
        ASSERT(mc->mc_alloc_throttle_enabled);
        mutex_enter(&mc->mc_lock);
        for (int d = 0; d < slots; d++) {
-               (void) refcount_remove(&mc->mc_alloc_slots, zio);
+               (void) zfs_refcount_remove(&mc->mc_alloc_slots[allocator],
+                   zio);
        }
        mutex_exit(&mc->mc_lock);
 }
@@ -3594,15 +4293,21 @@ metaslab_claim_concrete(vdev_t *vd, uint64_t offset, uint64_t size,
        int error = 0;
 
        if (offset >> vd->vdev_ms_shift >= vd->vdev_ms_count)
-               return (ENXIO);
+               return (SET_ERROR(ENXIO));
 
        ASSERT3P(vd->vdev_ms, !=, NULL);
        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
 
        mutex_enter(&msp->ms_lock);
 
-       if ((txg != 0 && spa_writeable(spa)) || !msp->ms_loaded)
-               error = metaslab_activate(msp, METASLAB_WEIGHT_SECONDARY);
+       if ((txg != 0 && spa_writeable(spa)) || !msp->ms_loaded) {
+               error = metaslab_activate(msp, 0, METASLAB_WEIGHT_CLAIM);
+               if (error == EBUSY) {
+                       ASSERT(msp->ms_loaded);
+                       ASSERT(msp->ms_weight & METASLAB_ACTIVE_MASK);
+                       error = 0;
+               }
+       }
 
        if (error == 0 &&
            !range_tree_contains(msp->ms_allocatable, offset, size))
@@ -3619,6 +4324,7 @@ metaslab_claim_concrete(vdev_t *vd, uint64_t offset, uint64_t size,
        VERIFY3U(range_tree_space(msp->ms_allocatable) - size, <=,
            msp->ms_size);
        range_tree_remove(msp->ms_allocatable, offset, size);
+       range_tree_clear(msp->ms_trim, offset, size);
 
        if (spa_writeable(spa)) {       /* don't dirty if we're zdb(1M) */
                if (range_tree_is_empty(msp->ms_allocating[txg & TXG_MASK]))
@@ -3707,10 +4413,10 @@ metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
 int
 metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
     int ndvas, uint64_t txg, blkptr_t *hintbp, int flags,
-    zio_alloc_list_t *zal, zio_t *zio)
+    zio_alloc_list_t *zal, zio_t *zio, int allocator)
 {
        dva_t *dva = bp->blk_dva;
-       dva_t *hintdva = hintbp->blk_dva;
+       dva_t *hintdva = (hintbp != NULL) ? hintbp->blk_dva : NULL;
        int error = 0;
 
        ASSERT(bp->blk_birth == 0);
@@ -3730,12 +4436,13 @@ metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
 
        for (int d = 0; d < ndvas; d++) {
                error = metaslab_alloc_dva(spa, mc, psize, dva, d, hintdva,
-                   txg, flags, zal);
+                   txg, flags, zal, allocator);
                if (error != 0) {
                        for (d--; d >= 0; d--) {
                                metaslab_unalloc_dva(spa, &dva[d], txg);
                                metaslab_group_alloc_decrement(spa,
-                                   DVA_GET_VDEV(&dva[d]), zio, flags);
+                                   DVA_GET_VDEV(&dva[d]), zio, flags,
+                                   allocator, B_FALSE);
                                bzero(&dva[d], sizeof (dva_t));
                        }
                        spa_config_exit(spa, SCL_ALLOC, FTAG);
@@ -3746,7 +4453,7 @@ metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
                         * based on the newly allocated dva.
                         */
                        metaslab_group_alloc_increment(spa,
-                           DVA_GET_VDEV(&dva[d]), zio, flags);
+                           DVA_GET_VDEV(&dva[d]), zio, flags, allocator);
                }
 
        }
@@ -3830,9 +4537,11 @@ metaslab_claim(spa_t *spa, const blkptr_t *bp, uint64_t txg)
 
        spa_config_enter(spa, SCL_ALLOC, FTAG, RW_READER);
 
-       for (int d = 0; d < ndvas; d++)
-               if ((error = metaslab_claim_dva(spa, &dva[d], txg)) != 0)
+       for (int d = 0; d < ndvas; d++) {
+               error = metaslab_claim_dva(spa, &dva[d], txg);
+               if (error != 0)
                        break;
+       }
 
        spa_config_exit(spa, SCL_ALLOC, FTAG);
 
@@ -3923,14 +4632,17 @@ metaslab_check_free_impl(vdev_t *vd, uint64_t offset, uint64_t size)
        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
 
        mutex_enter(&msp->ms_lock);
-       if (msp->ms_loaded)
-               range_tree_verify(msp->ms_allocatable, offset, size);
+       if (msp->ms_loaded) {
+               range_tree_verify_not_present(msp->ms_allocatable,
+                   offset, size);
+       }
 
-       range_tree_verify(msp->ms_freeing, offset, size);
-       range_tree_verify(msp->ms_checkpointing, offset, size);
-       range_tree_verify(msp->ms_freed, offset, size);
+       range_tree_verify_not_present(msp->ms_trim, offset, size);
+       range_tree_verify_not_present(msp->ms_freeing, offset, size);
+       range_tree_verify_not_present(msp->ms_checkpointing, offset, size);
+       range_tree_verify_not_present(msp->ms_freed, offset, size);
        for (int j = 0; j < TXG_DEFER_SIZE; j++)
-               range_tree_verify(msp->ms_defer[j], offset, size);
+               range_tree_verify_not_present(msp->ms_defer[j], offset, size);
        mutex_exit(&msp->ms_lock);
 }
 
@@ -3957,8 +4669,91 @@ metaslab_check_free(spa_t *spa, const blkptr_t *bp)
        spa_config_exit(spa, SCL_VDEV, FTAG);
 }
 
+static void
+metaslab_group_disable_wait(metaslab_group_t *mg)
+{
+       ASSERT(MUTEX_HELD(&mg->mg_ms_disabled_lock));
+       while (mg->mg_disabled_updating) {
+               cv_wait(&mg->mg_ms_disabled_cv, &mg->mg_ms_disabled_lock);
+       }
+}
+
+static void
+metaslab_group_disabled_increment(metaslab_group_t *mg)
+{
+       ASSERT(MUTEX_HELD(&mg->mg_ms_disabled_lock));
+       ASSERT(mg->mg_disabled_updating);
+
+       while (mg->mg_ms_disabled >= max_disabled_ms) {
+               cv_wait(&mg->mg_ms_disabled_cv, &mg->mg_ms_disabled_lock);
+       }
+       mg->mg_ms_disabled++;
+       ASSERT3U(mg->mg_ms_disabled, <=, max_disabled_ms);
+}
+
+/*
+ * Mark the metaslab as disabled to prevent any allocations on this metaslab.
+ * We must also track how many metaslabs are currently disabled within a
+ * metaslab group and limit them to prevent allocation failures from
+ * occurring because all metaslabs are disabled.
+ */
+void
+metaslab_disable(metaslab_t *msp)
+{
+       ASSERT(!MUTEX_HELD(&msp->ms_lock));
+       metaslab_group_t *mg = msp->ms_group;
+
+       mutex_enter(&mg->mg_ms_disabled_lock);
+
+       /*
+        * To keep an accurate count of how many threads have disabled
+        * a specific metaslab group, we only allow one thread to mark
+        * the metaslab group at a time. This ensures that the value of
+        * ms_disabled will be accurate when we decide to mark a metaslab
+        * group as disabled. To do this we force all other threads
+        * to wait till the metaslab's mg_disabled_updating flag is no
+        * longer set.
+        */
+       metaslab_group_disable_wait(mg);
+       mg->mg_disabled_updating = B_TRUE;
+       if (msp->ms_disabled == 0) {
+               metaslab_group_disabled_increment(mg);
+       }
+       mutex_enter(&msp->ms_lock);
+       msp->ms_disabled++;
+       mutex_exit(&msp->ms_lock);
+
+       mg->mg_disabled_updating = B_FALSE;
+       cv_broadcast(&mg->mg_ms_disabled_cv);
+       mutex_exit(&mg->mg_ms_disabled_lock);
+}
+
+void
+metaslab_enable(metaslab_t *msp, boolean_t sync)
+{
+       metaslab_group_t *mg = msp->ms_group;
+       spa_t *spa = mg->mg_vd->vdev_spa;
+
+       /*
+        * Wait for the outstanding IO to be synced to prevent newly
+        * allocated blocks from being overwritten.  This used by
+        * initialize and TRIM which are modifying unallocated space.
+        */
+       if (sync)
+               txg_wait_synced(spa_get_dsl(spa), 0);
+
+       mutex_enter(&mg->mg_ms_disabled_lock);
+       mutex_enter(&msp->ms_lock);
+       if (--msp->ms_disabled == 0) {
+               mg->mg_ms_disabled--;
+               cv_broadcast(&mg->mg_ms_disabled_cv);
+       }
+       mutex_exit(&msp->ms_lock);
+       mutex_exit(&mg->mg_ms_disabled_lock);
+}
+
 #if defined(_KERNEL)
-/* CSTYLED */
+/* BEGIN CSTYLED */
 module_param(metaslab_aliquot, ulong, 0644);
 MODULE_PARM_DESC(metaslab_aliquot,
        "allocation granularity (a.k.a. stripe size)");
@@ -4007,8 +4802,9 @@ module_param(zfs_metaslab_switch_threshold, int, 0644);
 MODULE_PARM_DESC(zfs_metaslab_switch_threshold,
        "segment-based metaslab selection maximum buckets before switching");
 
-/* CSTYLED */
 module_param(metaslab_force_ganging, ulong, 0644);
 MODULE_PARM_DESC(metaslab_force_ganging,
        "blocks larger than this size are forced to be gang blocks");
+/* END CSTYLED */
+
 #endif