]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blobdiff - zfs/module/zfs/metaslab.c
UBUNTU: SAUCE: (noup) Update spl to 0.7.3-1ubuntu1, zfs to 0.7.3-1ubuntu1
[mirror_ubuntu-bionic-kernel.git] / zfs / module / zfs / metaslab.c
index 59bcefd346c0e41a0cb68cfad9f8879416fa7dae..5e413c06518b0dda6b9899822432d655850cd41e 100644 (file)
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  */
 
 
 #define        WITH_DF_BLOCK_ALLOCATOR
 
-/*
- * Allow allocations to switch to gang blocks quickly. We do this to
- * avoid having to load lots of space_maps in a given txg. There are,
- * however, some cases where we want to avoid "fast" ganging and instead
- * we want to do an exhaustive search of all metaslabs on this device.
- * Currently we don't allow any gang, slog, or dump device related allocations
- * to "fast" gang.
- */
-#define        CAN_FASTGANG(flags) \
-       (!((flags) & (METASLAB_GANG_CHILD | METASLAB_GANG_HEADER | \
-       METASLAB_GANG_AVOID)))
-
-#define        METASLAB_WEIGHT_PRIMARY         (1ULL << 63)
-#define        METASLAB_WEIGHT_SECONDARY       (1ULL << 62)
-#define        METASLAB_ACTIVE_MASK            \
-       (METASLAB_WEIGHT_PRIMARY | METASLAB_WEIGHT_SECONDARY)
+#define        GANG_ALLOCATION(flags) \
+       ((flags) & (METASLAB_GANG_CHILD | METASLAB_GANG_HEADER))
 
 /*
  * Metaslab granularity, in bytes. This is roughly similar to what would be
@@ -66,7 +52,7 @@ uint64_t metaslab_gang_bang = SPA_MAXBLOCKSIZE + 1;   /* force gang blocks */
 /*
  * The in-core space map representation is more compact than its on-disk form.
  * The zfs_condense_pct determines how much more compact the in-core
- * space_map representation must be before we compact it on-disk.
+ * space map representation must be before we compact it on-disk.
  * Values should be greater than or equal to 100.
  */
 int zfs_condense_pct = 200;
@@ -134,12 +120,12 @@ int metaslab_debug_unload = 0;
  * an allocation of this size then it switches to using more
  * aggressive strategy (i.e search by size rather than offset).
  */
-uint64_t metaslab_df_alloc_threshold = SPA_MAXBLOCKSIZE;
+uint64_t metaslab_df_alloc_threshold = SPA_OLD_MAXBLOCKSIZE;
 
 /*
  * The minimum free space, in percent, which must be available
  * in a space map to continue allocations in a first-fit fashion.
- * Once the space_map's free space drops below this level we dynamically
+ * Once the space map's free space drops below this level we dynamically
  * switch to using best-fit allocations.
  */
 int metaslab_df_free_pct = 4;
@@ -181,7 +167,45 @@ int metaslab_lba_weighting_enabled = B_TRUE;
  */
 int metaslab_bias_enabled = B_TRUE;
 
-static uint64_t metaslab_fragmentation(metaslab_t *);
+
+/*
+ * Enable/disable segment-based metaslab selection.
+ */
+int zfs_metaslab_segment_weight_enabled = B_TRUE;
+
+/*
+ * When using segment-based metaslab selection, we will continue
+ * allocating from the active metaslab until we have exhausted
+ * zfs_metaslab_switch_threshold of its buckets.
+ */
+int zfs_metaslab_switch_threshold = 2;
+
+/*
+ * Internal switch to enable/disable the metaslab allocation tracing
+ * facility.
+ */
+#ifdef _METASLAB_TRACING
+boolean_t metaslab_trace_enabled = B_TRUE;
+#endif
+
+/*
+ * Maximum entries that the metaslab allocation tracing facility will keep
+ * in a given list when running in non-debug mode. We limit the number
+ * of entries in non-debug mode to prevent us from using up too much memory.
+ * The limit should be sufficiently large that we don't expect any allocation
+ * to every exceed this value. In debug mode, the system will panic if this
+ * limit is ever reached allowing for further investigation.
+ */
+#ifdef _METASLAB_TRACING
+uint64_t metaslab_trace_max_entries = 5000;
+#endif
+
+static uint64_t metaslab_weight(metaslab_t *);
+static void metaslab_set_fragmentation(metaslab_t *);
+
+#ifdef _METASLAB_TRACING
+kmem_cache_t *metaslab_alloc_trace_cache;
+#endif
 
 /*
  * ==========================================================================
@@ -198,7 +222,8 @@ metaslab_class_create(spa_t *spa, metaslab_ops_t *ops)
        mc->mc_spa = spa;
        mc->mc_rotor = NULL;
        mc->mc_ops = ops;
-       mutex_init(&mc->mc_fastwrite_lock, NULL, MUTEX_DEFAULT, NULL);
+       mutex_init(&mc->mc_lock, NULL, MUTEX_DEFAULT, NULL);
+       refcount_create_tracked(&mc->mc_alloc_slots);
 
        return (mc);
 }
@@ -212,7 +237,8 @@ metaslab_class_destroy(metaslab_class_t *mc)
        ASSERT(mc->mc_space == 0);
        ASSERT(mc->mc_dspace == 0);
 
-       mutex_destroy(&mc->mc_fastwrite_lock);
+       refcount_destroy(&mc->mc_alloc_slots);
+       mutex_destroy(&mc->mc_lock);
        kmem_free(mc, sizeof (metaslab_class_t));
 }
 
@@ -387,47 +413,90 @@ metaslab_class_expandable_space(metaslab_class_t *mc)
                        continue;
                }
 
-               space += tvd->vdev_max_asize - tvd->vdev_asize;
+               /*
+                * Calculate if we have enough space to add additional
+                * metaslabs. We report the expandable space in terms
+                * of the metaslab size since that's the unit of expansion.
+                */
+               space += P2ALIGN(tvd->vdev_max_asize - tvd->vdev_asize,
+                   1ULL << tvd->vdev_ms_shift);
        }
        spa_config_exit(mc->mc_spa, SCL_VDEV, FTAG);
        return (space);
 }
 
-/*
- * ==========================================================================
- * Metaslab groups
- * ==========================================================================
- */
 static int
 metaslab_compare(const void *x1, const void *x2)
 {
-       const metaslab_t *m1 = x1;
-       const metaslab_t *m2 = x2;
+       const metaslab_t *m1 = (const metaslab_t *)x1;
+       const metaslab_t *m2 = (const metaslab_t *)x2;
+
+       int cmp = AVL_CMP(m2->ms_weight, m1->ms_weight);
+       if (likely(cmp))
+               return (cmp);
+
+       IMPLY(AVL_CMP(m1->ms_start, m2->ms_start) == 0, m1 == m2);
+
+       return (AVL_CMP(m1->ms_start, m2->ms_start));
+}
+
+/*
+ * Verify that the space accounting on disk matches the in-core range_trees.
+ */
+void
+metaslab_verify_space(metaslab_t *msp, uint64_t txg)
+{
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       uint64_t allocated = 0;
+       uint64_t sm_free_space, msp_free_space;
+       int t;
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
 
-       if (m1->ms_weight < m2->ms_weight)
-               return (1);
-       if (m1->ms_weight > m2->ms_weight)
-               return (-1);
+       if ((zfs_flags & ZFS_DEBUG_METASLAB_VERIFY) == 0)
+               return;
 
        /*
-        * If the weights are identical, use the offset to force uniqueness.
+        * We can only verify the metaslab space when we're called
+        * from syncing context with a loaded metaslab that has an allocated
+        * space map. Calling this in non-syncing context does not
+        * provide a consistent view of the metaslab since we're performing
+        * allocations in the future.
         */
-       if (m1->ms_start < m2->ms_start)
-               return (-1);
-       if (m1->ms_start > m2->ms_start)
-               return (1);
+       if (txg != spa_syncing_txg(spa) || msp->ms_sm == NULL ||
+           !msp->ms_loaded)
+               return;
 
-       ASSERT3P(m1, ==, m2);
+       sm_free_space = msp->ms_size - space_map_allocated(msp->ms_sm) -
+           space_map_alloc_delta(msp->ms_sm);
 
-       return (0);
+       /*
+        * Account for future allocations since we would have already
+        * deducted that space from the ms_freetree.
+        */
+       for (t = 0; t < TXG_CONCURRENT_STATES; t++) {
+               allocated +=
+                   range_tree_space(msp->ms_alloctree[(txg + t) & TXG_MASK]);
+       }
+
+       msp_free_space = range_tree_space(msp->ms_tree) + allocated +
+           msp->ms_deferspace + range_tree_space(msp->ms_freedtree);
+
+       VERIFY3U(sm_free_space, ==, msp_free_space);
 }
 
+/*
+ * ==========================================================================
+ * Metaslab groups
+ * ==========================================================================
+ */
 /*
  * Update the allocatable flag and the metaslab group's capacity.
  * The allocatable flag is set to true if the capacity is below
- * the zfs_mg_noalloc_threshold. If a metaslab group transitions
- * from allocatable to non-allocatable or vice versa then the metaslab
- * group's class is updated to reflect the transition.
+ * the zfs_mg_noalloc_threshold or has a fragmentation value that is
+ * greater than zfs_mg_fragmentation_threshold. If a metaslab group
+ * transitions from allocatable to non-allocatable or vice versa then the
+ * metaslab group's class is updated to reflect the transition.
  */
 static void
 metaslab_group_alloc_update(metaslab_group_t *mg)
@@ -436,22 +505,45 @@ metaslab_group_alloc_update(metaslab_group_t *mg)
        metaslab_class_t *mc = mg->mg_class;
        vdev_stat_t *vs = &vd->vdev_stat;
        boolean_t was_allocatable;
+       boolean_t was_initialized;
 
        ASSERT(vd == vd->vdev_top);
 
        mutex_enter(&mg->mg_lock);
        was_allocatable = mg->mg_allocatable;
+       was_initialized = mg->mg_initialized;
 
        mg->mg_free_capacity = ((vs->vs_space - vs->vs_alloc) * 100) /
            (vs->vs_space + 1);
 
+       mutex_enter(&mc->mc_lock);
+
+       /*
+        * If the metaslab group was just added then it won't
+        * have any space until we finish syncing out this txg.
+        * At that point we will consider it initialized and available
+        * for allocations.  We also don't consider non-activated
+        * metaslab groups (e.g. vdevs that are in the middle of being removed)
+        * to be initialized, because they can't be used for allocation.
+        */
+       mg->mg_initialized = metaslab_group_initialized(mg);
+       if (!was_initialized && mg->mg_initialized) {
+               mc->mc_groups++;
+       } else if (was_initialized && !mg->mg_initialized) {
+               ASSERT3U(mc->mc_groups, >, 0);
+               mc->mc_groups--;
+       }
+       if (mg->mg_initialized)
+               mg->mg_no_free_space = B_FALSE;
+
        /*
         * A metaslab group is considered allocatable if it has plenty
         * of free space or is not heavily fragmented. We only take
         * fragmentation into account if the metaslab group has a valid
         * fragmentation metric (i.e. a value between 0 and 100).
         */
-       mg->mg_allocatable = (mg->mg_free_capacity > zfs_mg_noalloc_threshold &&
+       mg->mg_allocatable = (mg->mg_activation_count > 0 &&
+           mg->mg_free_capacity > zfs_mg_noalloc_threshold &&
            (mg->mg_fragmentation == ZFS_FRAG_INVALID ||
            mg->mg_fragmentation <= zfs_mg_fragmentation_threshold));
 
@@ -474,6 +566,7 @@ metaslab_group_alloc_update(metaslab_group_t *mg)
                mc->mc_alloc_groups--;
        else if (!was_allocatable && mg->mg_allocatable)
                mc->mc_alloc_groups++;
+       mutex_exit(&mc->mc_lock);
 
        mutex_exit(&mg->mg_lock);
 }
@@ -490,6 +583,9 @@ metaslab_group_create(metaslab_class_t *mc, vdev_t *vd)
        mg->mg_vd = vd;
        mg->mg_class = mc;
        mg->mg_activation_count = 0;
+       mg->mg_initialized = B_FALSE;
+       mg->mg_no_free_space = B_TRUE;
+       refcount_create_tracked(&mg->mg_alloc_queue_depth);
 
        mg->mg_taskq = taskq_create("metaslab_group_taskq", metaslab_load_pct,
            maxclsyspri, 10, INT_MAX, TASKQ_THREADS_CPU_PCT | TASKQ_DYNAMIC);
@@ -512,6 +608,7 @@ metaslab_group_destroy(metaslab_group_t *mg)
        taskq_destroy(mg->mg_taskq);
        avl_destroy(&mg->mg_metaslab_tree);
        mutex_destroy(&mg->mg_lock);
+       refcount_destroy(&mg->mg_alloc_queue_depth);
        kmem_free(mg, sizeof (metaslab_group_t));
 }
 
@@ -581,6 +678,15 @@ metaslab_group_passivate(metaslab_group_t *mg)
        mg->mg_next = NULL;
 }
 
+boolean_t
+metaslab_group_initialized(metaslab_group_t *mg)
+{
+       vdev_t *vd = mg->mg_vd;
+       vdev_stat_t *vs = &vd->vdev_stat;
+
+       return (vs->vs_space != 0 && mg->mg_activation_count > 0);
+}
+
 uint64_t
 metaslab_group_get_space(metaslab_group_t *mg)
 {
@@ -753,30 +859,97 @@ metaslab_group_fragmentation(metaslab_group_t *mg)
  * group should avoid allocations if its free capacity is less than the
  * zfs_mg_noalloc_threshold or its fragmentation metric is greater than
  * zfs_mg_fragmentation_threshold and there is at least one metaslab group
- * that can still handle allocations.
+ * that can still handle allocations. If the allocation throttle is enabled
+ * then we skip allocations to devices that have reached their maximum
+ * allocation queue depth unless the selected metaslab group is the only
+ * eligible group remaining.
  */
 static boolean_t
-metaslab_group_allocatable(metaslab_group_t *mg)
+metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
+    uint64_t psize)
 {
-       vdev_t *vd = mg->mg_vd;
-       spa_t *spa = vd->vdev_spa;
+       spa_t *spa = mg->mg_vd->vdev_spa;
        metaslab_class_t *mc = mg->mg_class;
 
        /*
-        * We use two key metrics to determine if a metaslab group is
-        * considered allocatable -- free space and fragmentation. If
-        * the free space is greater than the free space threshold and
-        * the fragmentation is less than the fragmentation threshold then
-        * consider the group allocatable. There are two case when we will
-        * not consider these key metrics. The first is if the group is
-        * associated with a slog device and the second is if all groups
-        * in this metaslab class have already been consider ineligible
+        * We can only consider skipping this metaslab group if it's
+        * in the normal metaslab class and there are other metaslab
+        * groups to select from. Otherwise, we always consider it eligible
         * for allocations.
         */
-       return ((mg->mg_free_capacity > zfs_mg_noalloc_threshold &&
-           (mg->mg_fragmentation == ZFS_FRAG_INVALID ||
-           mg->mg_fragmentation <= zfs_mg_fragmentation_threshold)) ||
-           mc != spa_normal_class(spa) || mc->mc_alloc_groups == 0);
+       if (mc != spa_normal_class(spa) || mc->mc_groups <= 1)
+               return (B_TRUE);
+
+       /*
+        * If the metaslab group's mg_allocatable flag is set (see comments
+        * in metaslab_group_alloc_update() for more information) and
+        * the allocation throttle is disabled then allow allocations to this
+        * device. However, if the allocation throttle is enabled then
+        * check if we have reached our allocation limit (mg_alloc_queue_depth)
+        * to determine if we should allow allocations to this metaslab group.
+        * If all metaslab groups are no longer considered allocatable
+        * (mc_alloc_groups == 0) or we're trying to allocate the smallest
+        * gang block size then we allow allocations on this metaslab group
+        * regardless of the mg_allocatable or throttle settings.
+        */
+       if (mg->mg_allocatable) {
+               metaslab_group_t *mgp;
+               int64_t qdepth;
+               uint64_t qmax = mg->mg_max_alloc_queue_depth;
+
+               if (!mc->mc_alloc_throttle_enabled)
+                       return (B_TRUE);
+
+               /*
+                * If this metaslab group does not have any free space, then
+                * there is no point in looking further.
+                */
+               if (mg->mg_no_free_space)
+                       return (B_FALSE);
+
+               qdepth = refcount_count(&mg->mg_alloc_queue_depth);
+
+               /*
+                * If this metaslab group is below its qmax or it's
+                * the only allocatable metasable group, then attempt
+                * to allocate from it.
+                */
+               if (qdepth < qmax || mc->mc_alloc_groups == 1)
+                       return (B_TRUE);
+               ASSERT3U(mc->mc_alloc_groups, >, 1);
+
+               /*
+                * Since this metaslab group is at or over its qmax, we
+                * need to determine if there are metaslab groups after this
+                * one that might be able to handle this allocation. This is
+                * racy since we can't hold the locks for all metaslab
+                * groups at the same time when we make this check.
+                */
+               for (mgp = mg->mg_next; mgp != rotor; mgp = mgp->mg_next) {
+                       qmax = mgp->mg_max_alloc_queue_depth;
+
+                       qdepth = refcount_count(&mgp->mg_alloc_queue_depth);
+
+                       /*
+                        * If there is another metaslab group that
+                        * might be able to handle the allocation, then
+                        * we return false so that we skip this group.
+                        */
+                       if (qdepth < qmax && !mgp->mg_no_free_space)
+                               return (B_FALSE);
+               }
+
+               /*
+                * We didn't find another group to handle the allocation
+                * so we can't skip this metaslab group even though
+                * we are at or over our qmax.
+                */
+               return (B_TRUE);
+
+       } else if (mc->mc_alloc_groups == 0 || psize == SPA_MINBLOCKSIZE) {
+               return (B_TRUE);
+       }
+       return (B_FALSE);
 }
 
 /*
@@ -797,18 +970,11 @@ metaslab_rangesize_compare(const void *x1, const void *x2)
        uint64_t rs_size1 = r1->rs_end - r1->rs_start;
        uint64_t rs_size2 = r2->rs_end - r2->rs_start;
 
-       if (rs_size1 < rs_size2)
-               return (-1);
-       if (rs_size1 > rs_size2)
-               return (1);
+       int cmp = AVL_CMP(rs_size1, rs_size2);
+       if (likely(cmp))
+               return (cmp);
 
-       if (r1->rs_start < r2->rs_start)
-               return (-1);
-
-       if (r1->rs_start > r2->rs_start)
-               return (1);
-
-       return (0);
+       return (AVL_CMP(r1->rs_start, r2->rs_start));
 }
 
 /*
@@ -892,7 +1058,7 @@ static range_tree_ops_t metaslab_rt_ops = {
 
 /*
  * ==========================================================================
- * Metaslab block operations
+ * Common allocator routines
  * ==========================================================================
  */
 
@@ -911,31 +1077,22 @@ metaslab_block_maxsize(metaslab_t *msp)
        return (rs->rs_end - rs->rs_start);
 }
 
-uint64_t
-metaslab_block_alloc(metaslab_t *msp, uint64_t size)
+static range_seg_t *
+metaslab_block_find(avl_tree_t *t, uint64_t start, uint64_t size)
 {
-       uint64_t start;
-       range_tree_t *rt = msp->ms_tree;
-
-       VERIFY(!msp->ms_condensing);
+       range_seg_t *rs, rsearch;
+       avl_index_t where;
 
-       start = msp->ms_ops->msop_alloc(msp, size);
-       if (start != -1ULL) {
-               vdev_t *vd = msp->ms_group->mg_vd;
+       rsearch.rs_start = start;
+       rsearch.rs_end = start + size;
 
-               VERIFY0(P2PHASE(start, 1ULL << vd->vdev_ashift));
-               VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
-               VERIFY3U(range_tree_space(rt) - size, <=, msp->ms_size);
-               range_tree_remove(rt, start, size);
+       rs = avl_find(t, &rsearch, &where);
+       if (rs == NULL) {
+               rs = avl_nearest(t, where, AVL_AFTER);
        }
-       return (start);
-}
 
-/*
- * ==========================================================================
- * Common allocator routines
- * ==========================================================================
- */
+       return (rs);
+}
 
 #if defined(WITH_FF_BLOCK_ALLOCATOR) || \
     defined(WITH_DF_BLOCK_ALLOCATOR) || \
@@ -949,15 +1106,7 @@ static uint64_t
 metaslab_block_picker(avl_tree_t *t, uint64_t *cursor, uint64_t size,
     uint64_t align)
 {
-       range_seg_t *rs, rsearch;
-       avl_index_t where;
-
-       rsearch.rs_start = *cursor;
-       rsearch.rs_end = *cursor + size;
-
-       rs = avl_find(t, &rsearch, &where);
-       if (rs == NULL)
-               rs = avl_nearest(t, where, AVL_AFTER);
+       range_seg_t *rs = metaslab_block_find(t, *cursor, size);
 
        while (rs != NULL) {
                uint64_t offset = P2ROUNDUP(rs->rs_start, align);
@@ -1199,6 +1348,7 @@ metaslab_load(metaslab_t *msp)
 {
        int error = 0;
        int t;
+       boolean_t success = B_FALSE;
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
        ASSERT(!msp->ms_loaded);
@@ -1216,14 +1366,18 @@ metaslab_load(metaslab_t *msp)
        else
                range_tree_add(msp->ms_tree, msp->ms_start, msp->ms_size);
 
-       msp->ms_loaded = (error == 0);
+       success = (error == 0);
        msp->ms_loading = B_FALSE;
 
-       if (msp->ms_loaded) {
+       if (success) {
+               ASSERT3P(msp->ms_group, !=, NULL);
+               msp->ms_loaded = B_TRUE;
+
                for (t = 0; t < TXG_DEFER_SIZE; t++) {
                        range_tree_walk(msp->ms_defertree[t],
                            range_tree_remove, msp->ms_tree);
                }
+               msp->ms_max_size = metaslab_block_maxsize(msp);
        }
        cv_broadcast(&msp->ms_load_cv);
        return (error);
@@ -1236,6 +1390,7 @@ metaslab_unload(metaslab_t *msp)
        range_tree_vacate(msp->ms_tree, NULL, NULL);
        msp->ms_loaded = B_FALSE;
        msp->ms_weight &= ~METASLAB_ACTIVE_MASK;
+       msp->ms_max_size = 0;
 }
 
 int
@@ -1272,7 +1427,7 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
 
        /*
         * We create the main range tree here, but we don't create the
-        * alloctree and freetree until metaslab_sync_done().  This serves
+        * other range trees until metaslab_sync_done().  This serves
         * two purposes: it allows metaslab_sync_done() to detect the
         * addition of new space; and for debugging, it ensures that we'd
         * data fault on any attempt to use this metaslab before it's ready.
@@ -1280,21 +1435,23 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
        ms->ms_tree = range_tree_create(&metaslab_rt_ops, ms, &ms->ms_lock);
        metaslab_group_add(mg, ms);
 
-       ms->ms_fragmentation = metaslab_fragmentation(ms);
-       ms->ms_ops = mg->mg_class->mc_ops;
+       metaslab_set_fragmentation(ms);
 
        /*
         * If we're opening an existing pool (txg == 0) or creating
         * a new one (txg == TXG_INITIAL), all space is available now.
         * If we're adding space to an existing pool, the new space
         * does not become available until after this txg has synced.
+        * The metaslab's weight will also be initialized when we sync
+        * out this txg. This ensures that we don't attempt to allocate
+        * from it before we have initialized it completely.
         */
        if (txg <= TXG_INITIAL)
                metaslab_sync_done(ms, 0);
 
        /*
         * If metaslab_debug_load is set and we're initializing a metaslab
-        * that has an allocated space_map object then load the its space
+        * that has an allocated space map object then load the its space
         * map so that can verify frees.
         */
        if (metaslab_debug_load && ms->ms_sm != NULL) {
@@ -1323,7 +1480,6 @@ metaslab_fini(metaslab_t *msp)
        metaslab_group_remove(mg, msp);
 
        mutex_enter(&msp->ms_lock);
-
        VERIFY(msp->ms_group == NULL);
        vdev_space_update(mg->mg_vd, -space_map_allocated(msp->ms_sm),
            0, -msp->ms_size);
@@ -1331,10 +1487,11 @@ metaslab_fini(metaslab_t *msp)
 
        metaslab_unload(msp);
        range_tree_destroy(msp->ms_tree);
+       range_tree_destroy(msp->ms_freeingtree);
+       range_tree_destroy(msp->ms_freedtree);
 
        for (t = 0; t < TXG_SIZE; t++) {
                range_tree_destroy(msp->ms_alloctree[t]);
-               range_tree_destroy(msp->ms_freetree[t]);
        }
 
        for (t = 0; t < TXG_DEFER_SIZE; t++) {
@@ -1396,8 +1553,8 @@ int zfs_frag_table[FRAGMENTATION_TABLE_SIZE] = {
  * not support this metric. Otherwise, the return value should be in the
  * range [0, 100].
  */
-static uint64_t
-metaslab_fragmentation(metaslab_t *msp)
+static void
+metaslab_set_fragmentation(metaslab_t *msp)
 {
        spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
        uint64_t fragmentation = 0;
@@ -1406,37 +1563,50 @@ metaslab_fragmentation(metaslab_t *msp)
            SPA_FEATURE_SPACEMAP_HISTOGRAM);
        int i;
 
-       if (!feature_enabled)
-               return (ZFS_FRAG_INVALID);
+       if (!feature_enabled) {
+               msp->ms_fragmentation = ZFS_FRAG_INVALID;
+               return;
+       }
 
        /*
         * A null space map means that the entire metaslab is free
         * and thus is not fragmented.
         */
-       if (msp->ms_sm == NULL)
-               return (0);
+       if (msp->ms_sm == NULL) {
+               msp->ms_fragmentation = 0;
+               return;
+       }
 
        /*
-        * If this metaslab's space_map has not been upgraded, flag it
+        * If this metaslab's space map has not been upgraded, flag it
         * so that we upgrade next time we encounter it.
         */
        if (msp->ms_sm->sm_dbuf->db_size != sizeof (space_map_phys_t)) {
+               uint64_t txg = spa_syncing_txg(spa);
                vdev_t *vd = msp->ms_group->mg_vd;
 
-               if (spa_writeable(vd->vdev_spa)) {
-                       uint64_t txg = spa_syncing_txg(spa);
-
+               /*
+                * If we've reached the final dirty txg, then we must
+                * be shutting down the pool. We don't want to dirty
+                * any data past this point so skip setting the condense
+                * flag. We can retry this action the next time the pool
+                * is imported.
+                */
+               if (spa_writeable(spa) && txg < spa_final_dirty_txg(spa)) {
                        msp->ms_condense_wanted = B_TRUE;
                        vdev_dirty(vd, VDD_METASLAB, msp, txg + 1);
                        spa_dbgmsg(spa, "txg %llu, requesting force condense: "
-                           "msp %p, vd %p", txg, msp, vd);
+                           "ms_id %llu, vdev_id %llu", txg, msp->ms_id,
+                           vd->vdev_id);
                }
-               return (ZFS_FRAG_INVALID);
+               msp->ms_fragmentation = ZFS_FRAG_INVALID;
+               return;
        }
 
        for (i = 0; i < SPACE_MAP_HISTOGRAM_SIZE; i++) {
                uint64_t space = 0;
                uint8_t shift = msp->ms_sm->sm_shift;
+
                int idx = MIN(shift - SPA_MINBLOCKSHIFT + i,
                    FRAGMENTATION_TABLE_SIZE - 1);
 
@@ -1453,7 +1623,8 @@ metaslab_fragmentation(metaslab_t *msp)
        if (total > 0)
                fragmentation /= total;
        ASSERT3U(fragmentation, <=, 100);
-       return (fragmentation);
+
+       msp->ms_fragmentation = fragmentation;
 }
 
 /*
@@ -1462,30 +1633,20 @@ metaslab_fragmentation(metaslab_t *msp)
  * the LBA range, and whether the metaslab is loaded.
  */
 static uint64_t
-metaslab_weight(metaslab_t *msp)
+metaslab_space_weight(metaslab_t *msp)
 {
        metaslab_group_t *mg = msp->ms_group;
        vdev_t *vd = mg->mg_vd;
        uint64_t weight, space;
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
-
-       /*
-        * This vdev is in the process of being removed so there is nothing
-        * for us to do here.
-        */
-       if (vd->vdev_removing) {
-               ASSERT0(space_map_allocated(msp->ms_sm));
-               ASSERT0(vd->vdev_ms_shift);
-               return (0);
-       }
+       ASSERT(!vd->vdev_removing);
 
        /*
         * The baseline weight is the metaslab's free space.
         */
        space = msp->ms_size - space_map_allocated(msp->ms_sm);
 
-       msp->ms_fragmentation = metaslab_fragmentation(msp);
        if (metaslab_fragmentation_factor_enabled &&
            msp->ms_fragmentation != ZFS_FRAG_INVALID) {
                /*
@@ -1534,117 +1695,339 @@ metaslab_weight(metaslab_t *msp)
                weight |= (msp->ms_weight & METASLAB_ACTIVE_MASK);
        }
 
+       WEIGHT_SET_SPACEBASED(weight);
        return (weight);
 }
 
-static int
-metaslab_activate(metaslab_t *msp, uint64_t activation_weight)
+/*
+ * Return the weight of the specified metaslab, according to the segment-based
+ * weighting algorithm. The metaslab must be loaded. This function can
+ * be called within a sync pass since it relies only on the metaslab's
+ * range tree which is always accurate when the metaslab is loaded.
+ */
+static uint64_t
+metaslab_weight_from_range_tree(metaslab_t *msp)
 {
-       ASSERT(MUTEX_HELD(&msp->ms_lock));
-
-       if ((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0) {
-               metaslab_load_wait(msp);
-               if (!msp->ms_loaded) {
-                       int error = metaslab_load(msp);
-                       if (error) {
-                               metaslab_group_sort(msp->ms_group, msp, 0);
-                               return (error);
-                       }
-               }
+       uint64_t weight = 0;
+       uint32_t segments = 0;
+       int i;
 
-               metaslab_group_sort(msp->ms_group, msp,
-                   msp->ms_weight | activation_weight);
-       }
        ASSERT(msp->ms_loaded);
-       ASSERT(msp->ms_weight & METASLAB_ACTIVE_MASK);
 
-       return (0);
+       for (i = RANGE_TREE_HISTOGRAM_SIZE - 1; i >= SPA_MINBLOCKSHIFT; i--) {
+               uint8_t shift = msp->ms_group->mg_vd->vdev_ashift;
+               int max_idx = SPACE_MAP_HISTOGRAM_SIZE + shift - 1;
+
+               segments <<= 1;
+               segments += msp->ms_tree->rt_histogram[i];
+
+               /*
+                * The range tree provides more precision than the space map
+                * and must be downgraded so that all values fit within the
+                * space map's histogram. This allows us to compare loaded
+                * vs. unloaded metaslabs to determine which metaslab is
+                * considered "best".
+                */
+               if (i > max_idx)
+                       continue;
+
+               if (segments != 0) {
+                       WEIGHT_SET_COUNT(weight, segments);
+                       WEIGHT_SET_INDEX(weight, i);
+                       WEIGHT_SET_ACTIVE(weight, 0);
+                       break;
+               }
+       }
+       return (weight);
 }
 
-static void
-metaslab_passivate(metaslab_t *msp, uint64_t size)
+/*
+ * Calculate the weight based on the on-disk histogram. This should only
+ * be called after a sync pass has completely finished since the on-disk
+ * information is updated in metaslab_sync().
+ */
+static uint64_t
+metaslab_weight_from_spacemap(metaslab_t *msp)
 {
-       /*
-        * If size < SPA_MINBLOCKSIZE, then we will not allocate from
-        * this metaslab again.  In that case, it had better be empty,
-        * or we would be leaving space on the table.
-        */
-       ASSERT(size >= SPA_MINBLOCKSIZE || range_tree_space(msp->ms_tree) == 0);
-       metaslab_group_sort(msp->ms_group, msp, MIN(msp->ms_weight, size));
-       ASSERT((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0);
+       uint64_t weight = 0;
+       int i;
+
+       for (i = SPACE_MAP_HISTOGRAM_SIZE - 1; i >= 0; i--) {
+               if (msp->ms_sm->sm_phys->smp_histogram[i] != 0) {
+                       WEIGHT_SET_COUNT(weight,
+                           msp->ms_sm->sm_phys->smp_histogram[i]);
+                       WEIGHT_SET_INDEX(weight, i +
+                           msp->ms_sm->sm_shift);
+                       WEIGHT_SET_ACTIVE(weight, 0);
+                       break;
+               }
+       }
+       return (weight);
 }
 
-static void
-metaslab_preload(void *arg)
+/*
+ * Compute a segment-based weight for the specified metaslab. The weight
+ * is determined by highest bucket in the histogram. The information
+ * for the highest bucket is encoded into the weight value.
+ */
+static uint64_t
+metaslab_segment_weight(metaslab_t *msp)
 {
-       metaslab_t *msp = arg;
-       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
-       fstrans_cookie_t cookie = spl_fstrans_mark();
-
-       ASSERT(!MUTEX_HELD(&msp->ms_group->mg_lock));
+       metaslab_group_t *mg = msp->ms_group;
+       uint64_t weight = 0;
+       uint8_t shift = mg->mg_vd->vdev_ashift;
 
-       mutex_enter(&msp->ms_lock);
-       metaslab_load_wait(msp);
-       if (!msp->ms_loaded)
-               (void) metaslab_load(msp);
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
 
        /*
-        * Set the ms_access_txg value so that we don't unload it right away.
+        * The metaslab is completely free.
         */
-       msp->ms_access_txg = spa_syncing_txg(spa) + metaslab_unload_delay + 1;
-       mutex_exit(&msp->ms_lock);
-       spl_fstrans_unmark(cookie);
-}
+       if (space_map_allocated(msp->ms_sm) == 0) {
+               int idx = highbit64(msp->ms_size) - 1;
+               int max_idx = SPACE_MAP_HISTOGRAM_SIZE + shift - 1;
 
-static void
-metaslab_group_preload(metaslab_group_t *mg)
-{
-       spa_t *spa = mg->mg_vd->vdev_spa;
-       metaslab_t *msp;
-       avl_tree_t *t = &mg->mg_metaslab_tree;
-       int m = 0;
+               if (idx < max_idx) {
+                       WEIGHT_SET_COUNT(weight, 1ULL);
+                       WEIGHT_SET_INDEX(weight, idx);
+               } else {
+                       WEIGHT_SET_COUNT(weight, 1ULL << (idx - max_idx));
+                       WEIGHT_SET_INDEX(weight, max_idx);
+               }
+               WEIGHT_SET_ACTIVE(weight, 0);
+               ASSERT(!WEIGHT_IS_SPACEBASED(weight));
 
-       if (spa_shutting_down(spa) || !metaslab_preload_enabled) {
-               taskq_wait_outstanding(mg->mg_taskq, 0);
-               return;
+               return (weight);
        }
 
-       mutex_enter(&mg->mg_lock);
+       ASSERT3U(msp->ms_sm->sm_dbuf->db_size, ==, sizeof (space_map_phys_t));
+
        /*
-        * Load the next potential metaslabs
+        * If the metaslab is fully allocated then just make the weight 0.
+        */
+       if (space_map_allocated(msp->ms_sm) == msp->ms_size)
+               return (0);
+       /*
+        * If the metaslab is already loaded, then use the range tree to
+        * determine the weight. Otherwise, we rely on the space map information
+        * to generate the weight.
         */
-       msp = avl_first(t);
-       while (msp != NULL) {
-               metaslab_t *msp_next = AVL_NEXT(t, msp);
+       if (msp->ms_loaded) {
+               weight = metaslab_weight_from_range_tree(msp);
+       } else {
+               weight = metaslab_weight_from_spacemap(msp);
+       }
 
-               /*
-                * We preload only the maximum number of metaslabs specified
-                * by metaslab_preload_limit. If a metaslab is being forced
-                * to condense then we preload it too. This will ensure
-                * that force condensing happens in the next txg.
-                */
-               if (++m > metaslab_preload_limit && !msp->ms_condense_wanted) {
-                       msp = msp_next;
-                       continue;
+       /*
+        * If the metaslab was active the last time we calculated its weight
+        * then keep it active. We want to consume the entire region that
+        * is associated with this weight.
+        */
+       if (msp->ms_activation_weight != 0 && weight != 0)
+               WEIGHT_SET_ACTIVE(weight, WEIGHT_GET_ACTIVE(msp->ms_weight));
+       return (weight);
+}
+
+/*
+ * Determine if we should attempt to allocate from this metaslab. If the
+ * metaslab has a maximum size then we can quickly determine if the desired
+ * allocation size can be satisfied. Otherwise, if we're using segment-based
+ * weighting then we can determine the maximum allocation that this metaslab
+ * can accommodate based on the index encoded in the weight. If we're using
+ * space-based weights then rely on the entire weight (excluding the weight
+ * type bit).
+ */
+boolean_t
+metaslab_should_allocate(metaslab_t *msp, uint64_t asize)
+{
+       boolean_t should_allocate;
+
+       if (msp->ms_max_size != 0)
+               return (msp->ms_max_size >= asize);
+
+       if (!WEIGHT_IS_SPACEBASED(msp->ms_weight)) {
+               /*
+                * The metaslab segment weight indicates segments in the
+                * range [2^i, 2^(i+1)), where i is the index in the weight.
+                * Since the asize might be in the middle of the range, we
+                * should attempt the allocation if asize < 2^(i+1).
+                */
+               should_allocate = (asize <
+                   1ULL << (WEIGHT_GET_INDEX(msp->ms_weight) + 1));
+       } else {
+               should_allocate = (asize <=
+                   (msp->ms_weight & ~METASLAB_WEIGHT_TYPE));
+       }
+       return (should_allocate);
+}
+static uint64_t
+metaslab_weight(metaslab_t *msp)
+{
+       vdev_t *vd = msp->ms_group->mg_vd;
+       spa_t *spa = vd->vdev_spa;
+       uint64_t weight;
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       /*
+        * This vdev is in the process of being removed so there is nothing
+        * for us to do here.
+        */
+       if (vd->vdev_removing) {
+               ASSERT0(space_map_allocated(msp->ms_sm));
+               ASSERT0(vd->vdev_ms_shift);
+               return (0);
+       }
+
+       metaslab_set_fragmentation(msp);
+
+       /*
+        * Update the maximum size if the metaslab is loaded. This will
+        * ensure that we get an accurate maximum size if newly freed space
+        * has been added back into the free tree.
+        */
+       if (msp->ms_loaded)
+               msp->ms_max_size = metaslab_block_maxsize(msp);
+
+       /*
+        * Segment-based weighting requires space map histogram support.
+        */
+       if (zfs_metaslab_segment_weight_enabled &&
+           spa_feature_is_enabled(spa, SPA_FEATURE_SPACEMAP_HISTOGRAM) &&
+           (msp->ms_sm == NULL || msp->ms_sm->sm_dbuf->db_size ==
+           sizeof (space_map_phys_t))) {
+               weight = metaslab_segment_weight(msp);
+       } else {
+               weight = metaslab_space_weight(msp);
+       }
+       return (weight);
+}
+
+static int
+metaslab_activate(metaslab_t *msp, uint64_t activation_weight)
+{
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       if ((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0) {
+               metaslab_load_wait(msp);
+               if (!msp->ms_loaded) {
+                       int error = metaslab_load(msp);
+                       if (error) {
+                               metaslab_group_sort(msp->ms_group, msp, 0);
+                               return (error);
+                       }
                }
 
+               msp->ms_activation_weight = msp->ms_weight;
+               metaslab_group_sort(msp->ms_group, msp,
+                   msp->ms_weight | activation_weight);
+       }
+       ASSERT(msp->ms_loaded);
+       ASSERT(msp->ms_weight & METASLAB_ACTIVE_MASK);
+
+       return (0);
+}
+
+static void
+metaslab_passivate(metaslab_t *msp, uint64_t weight)
+{
+       ASSERTV(uint64_t size = weight & ~METASLAB_WEIGHT_TYPE);
+
+       /*
+        * If size < SPA_MINBLOCKSIZE, then we will not allocate from
+        * this metaslab again.  In that case, it had better be empty,
+        * or we would be leaving space on the table.
+        */
+       ASSERT(size >= SPA_MINBLOCKSIZE ||
+           range_tree_space(msp->ms_tree) == 0);
+       ASSERT0(weight & METASLAB_ACTIVE_MASK);
+
+       msp->ms_activation_weight = 0;
+       metaslab_group_sort(msp->ms_group, msp, weight);
+       ASSERT((msp->ms_weight & METASLAB_ACTIVE_MASK) == 0);
+}
+
+/*
+ * Segment-based metaslabs are activated once and remain active until
+ * we either fail an allocation attempt (similar to space-based metaslabs)
+ * or have exhausted the free space in zfs_metaslab_switch_threshold
+ * buckets since the metaslab was activated. This function checks to see
+ * if we've exhaused the zfs_metaslab_switch_threshold buckets in the
+ * metaslab and passivates it proactively. This will allow us to select a
+ * metaslab with a larger contiguous region, if any, remaining within this
+ * metaslab group. If we're in sync pass > 1, then we continue using this
+ * metaslab so that we don't dirty more block and cause more sync passes.
+ */
+void
+metaslab_segment_may_passivate(metaslab_t *msp)
+{
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       uint64_t weight;
+       int activation_idx, current_idx;
+
+       if (WEIGHT_IS_SPACEBASED(msp->ms_weight) || spa_sync_pass(spa) > 1)
+               return;
+
+       /*
+        * Since we are in the middle of a sync pass, the most accurate
+        * information that is accessible to us is the in-core range tree
+        * histogram; calculate the new weight based on that information.
+        */
+       weight = metaslab_weight_from_range_tree(msp);
+       activation_idx = WEIGHT_GET_INDEX(msp->ms_activation_weight);
+       current_idx = WEIGHT_GET_INDEX(weight);
+
+       if (current_idx <= activation_idx - zfs_metaslab_switch_threshold)
+               metaslab_passivate(msp, weight);
+}
+
+static void
+metaslab_preload(void *arg)
+{
+       metaslab_t *msp = arg;
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       fstrans_cookie_t cookie = spl_fstrans_mark();
+
+       ASSERT(!MUTEX_HELD(&msp->ms_group->mg_lock));
+
+       mutex_enter(&msp->ms_lock);
+       metaslab_load_wait(msp);
+       if (!msp->ms_loaded)
+               (void) metaslab_load(msp);
+       msp->ms_selected_txg = spa_syncing_txg(spa);
+       mutex_exit(&msp->ms_lock);
+       spl_fstrans_unmark(cookie);
+}
+
+static void
+metaslab_group_preload(metaslab_group_t *mg)
+{
+       spa_t *spa = mg->mg_vd->vdev_spa;
+       metaslab_t *msp;
+       avl_tree_t *t = &mg->mg_metaslab_tree;
+       int m = 0;
+
+       if (spa_shutting_down(spa) || !metaslab_preload_enabled) {
+               taskq_wait_outstanding(mg->mg_taskq, 0);
+               return;
+       }
+
+       mutex_enter(&mg->mg_lock);
+       /*
+        * Load the next potential metaslabs
+        */
+       for (msp = avl_first(t); msp != NULL; msp = AVL_NEXT(t, msp)) {
                /*
-                * We must drop the metaslab group lock here to preserve
-                * lock ordering with the ms_lock (when grabbing both
-                * the mg_lock and the ms_lock, the ms_lock must be taken
-                * first).  As a result, it is possible that the ordering
-                * of the metaslabs within the avl tree may change before
-                * we reacquire the lock. The metaslab cannot be removed from
-                * the tree while we're in syncing context so it is safe to
-                * drop the mg_lock here. If the metaslabs are reordered
-                * nothing will break -- we just may end up loading a
-                * less than optimal one.
+                * We preload only the maximum number of metaslabs specified
+                * by metaslab_preload_limit. If a metaslab is being forced
+                * to condense then we preload it too. This will ensure
+                * that force condensing happens in the next txg.
                 */
-               mutex_exit(&mg->mg_lock);
+               if (++m > metaslab_preload_limit && !msp->ms_condense_wanted) {
+                       continue;
+               }
+
                VERIFY(taskq_dispatch(mg->mg_taskq, metaslab_preload,
-                   msp, TQ_SLEEP) != 0);
-               mutex_enter(&mg->mg_lock);
-               msp = msp_next;
+                   msp, TQ_SLEEP) != TASKQID_INVALID);
        }
        mutex_exit(&mg->mg_lock);
 }
@@ -1687,7 +2070,7 @@ metaslab_should_condense(metaslab_t *msp)
        range_seg_t *rs;
        uint64_t size, entries, segsz, object_size, optimal_size, record_size;
        dmu_object_info_t doi;
-       uint64_t vdev_blocksize = 1 << msp->ms_group->mg_vd->vdev_ashift;
+       uint64_t vdev_blocksize = 1ULL << msp->ms_group->mg_vd->vdev_ashift;
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
        ASSERT(msp->ms_loaded);
@@ -1732,7 +2115,6 @@ static void
 metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
 {
        spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
-       range_tree_t *freetree = msp->ms_freetree[txg & TXG_MASK];
        range_tree_t *condense_tree;
        space_map_t *sm = msp->ms_sm;
        int t;
@@ -1742,10 +2124,11 @@ metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
        ASSERT(msp->ms_loaded);
 
 
-       spa_dbgmsg(spa, "condensing: txg %llu, msp[%llu] %p, "
-           "smp size %llu, segments %lu, forcing condense=%s", txg,
-           msp->ms_id, msp, space_map_length(msp->ms_sm),
-           avl_numnodes(&msp->ms_tree->rt_root),
+       spa_dbgmsg(spa, "condensing: txg %llu, msp[%llu] %p, vdev id %llu, "
+           "spa %s, smp size %llu, segments %lu, forcing condense=%s", txg,
+           msp->ms_id, msp, msp->ms_group->mg_vd->vdev_id,
+           msp->ms_group->mg_vd->vdev_spa->spa_name,
+           space_map_length(msp->ms_sm), avl_numnodes(&msp->ms_tree->rt_root),
            msp->ms_condense_wanted ? "TRUE" : "FALSE");
 
        msp->ms_condense_wanted = B_FALSE;
@@ -1763,9 +2146,9 @@ metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
        /*
         * Remove what's been freed in this txg from the condense_tree.
         * Since we're in sync_pass 1, we know that all the frees from
-        * this txg are in the freetree.
+        * this txg are in the freeingtree.
         */
-       range_tree_walk(freetree, range_tree_remove, condense_tree);
+       range_tree_walk(msp->ms_freeingtree, range_tree_remove, condense_tree);
 
        for (t = 0; t < TXG_DEFER_SIZE; t++) {
                range_tree_walk(msp->ms_defertree[t],
@@ -1793,7 +2176,7 @@ metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
        mutex_enter(&msp->ms_lock);
 
        /*
-        * While we would ideally like to create a space_map representation
+        * While we would ideally like to create a space map representation
         * that consists only of allocation records, doing so can be
         * prohibitively expensive because the in-core free tree can be
         * large, and therefore computationally expensive to subtract
@@ -1821,9 +2204,6 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        spa_t *spa = vd->vdev_spa;
        objset_t *mos = spa_meta_objset(spa);
        range_tree_t *alloctree = msp->ms_alloctree[txg & TXG_MASK];
-       range_tree_t **freetree = &msp->ms_freetree[txg & TXG_MASK];
-       range_tree_t **freed_tree =
-           &msp->ms_freetree[TXG_CLEAN(txg) & TXG_MASK];
        dmu_tx_t *tx;
        uint64_t object = space_map_object(msp->ms_sm);
 
@@ -1832,31 +2212,35 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        /*
         * This metaslab has just been added so there's no work to do now.
         */
-       if (*freetree == NULL) {
+       if (msp->ms_freeingtree == NULL) {
                ASSERT3P(alloctree, ==, NULL);
                return;
        }
 
        ASSERT3P(alloctree, !=, NULL);
-       ASSERT3P(*freetree, !=, NULL);
-       ASSERT3P(*freed_tree, !=, NULL);
+       ASSERT3P(msp->ms_freeingtree, !=, NULL);
+       ASSERT3P(msp->ms_freedtree, !=, NULL);
 
        /*
         * Normally, we don't want to process a metaslab if there
         * are no allocations or frees to perform. However, if the metaslab
-        * is being forced to condense we need to let it through.
+        * is being forced to condense and it's loaded, we need to let it
+        * through.
         */
        if (range_tree_space(alloctree) == 0 &&
-           range_tree_space(*freetree) == 0 &&
-           !msp->ms_condense_wanted)
+           range_tree_space(msp->ms_freeingtree) == 0 &&
+           !(msp->ms_loaded && msp->ms_condense_wanted))
                return;
 
+
+       VERIFY(txg <= spa_final_dirty_txg(spa));
+
        /*
         * The only state that can actually be changing concurrently with
         * metaslab_sync() is the metaslab's ms_tree.  No other thread can
-        * be modifying this txg's alloctree, freetree, freed_tree, or
+        * be modifying this txg's alloctree, freeingtree, freedtree, or
         * space_map_phys_t. Therefore, we only hold ms_lock to satify
-        * space_map ASSERTs. We drop it whenever we call into the DMU,
+        * space map ASSERTs. We drop it whenever we call into the DMU,
         * because the DMU can call down to us (e.g. via zio_free()) at
         * any time.
         */
@@ -1878,8 +2262,8 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        mutex_enter(&msp->ms_lock);
 
        /*
-        * Note: metaslab_condense() clears the space_map's histogram.
-        * Therefore we muse verify and remove this histogram before
+        * Note: metaslab_condense() clears the space map's histogram.
+        * Therefore we must verify and remove this histogram before
         * condensing.
         */
        metaslab_group_histogram_verify(mg);
@@ -1891,10 +2275,12 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
                metaslab_condense(msp, txg, tx);
        } else {
                space_map_write(msp->ms_sm, alloctree, SM_ALLOC, tx);
-               space_map_write(msp->ms_sm, *freetree, SM_FREE, tx);
+               space_map_write(msp->ms_sm, msp->ms_freeingtree, SM_FREE, tx);
        }
 
        if (msp->ms_loaded) {
+               int t;
+
                /*
                 * When the space map is loaded, we have an accruate
                 * histogram in the range tree. This gives us an opportunity
@@ -1903,35 +2289,59 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
                 */
                space_map_histogram_clear(msp->ms_sm);
                space_map_histogram_add(msp->ms_sm, msp->ms_tree, tx);
-       } else {
+
                /*
-                * Since the space map is not loaded we simply update the
-                * exisiting histogram with what was freed in this txg. This
-                * means that the on-disk histogram may not have an accurate
-                * view of the free space but it's close enough to allow
-                * us to make allocation decisions.
+                * Since we've cleared the histogram we need to add back
+                * any free space that has already been processed, plus
+                * any deferred space. This allows the on-disk histogram
+                * to accurately reflect all free space even if some space
+                * is not yet available for allocation (i.e. deferred).
                 */
-               space_map_histogram_add(msp->ms_sm, *freetree, tx);
+               space_map_histogram_add(msp->ms_sm, msp->ms_freedtree, tx);
+
+               /*
+                * Add back any deferred free space that has not been
+                * added back into the in-core free tree yet. This will
+                * ensure that we don't end up with a space map histogram
+                * that is completely empty unless the metaslab is fully
+                * allocated.
+                */
+               for (t = 0; t < TXG_DEFER_SIZE; t++) {
+                       space_map_histogram_add(msp->ms_sm,
+                           msp->ms_defertree[t], tx);
+               }
        }
+
+       /*
+        * Always add the free space from this sync pass to the space
+        * map histogram. We want to make sure that the on-disk histogram
+        * accounts for all free space. If the space map is not loaded,
+        * then we will lose some accuracy but will correct it the next
+        * time we load the space map.
+        */
+       space_map_histogram_add(msp->ms_sm, msp->ms_freeingtree, tx);
+
        metaslab_group_histogram_add(mg, msp);
        metaslab_group_histogram_verify(mg);
        metaslab_class_histogram_verify(mg->mg_class);
 
        /*
         * For sync pass 1, we avoid traversing this txg's free range tree
-        * and instead will just swap the pointers for freetree and
-        * freed_tree. We can safely do this since the freed_tree is
+        * and instead will just swap the pointers for freeingtree and
+        * freedtree. We can safely do this since the freed_tree is
         * guaranteed to be empty on the initial pass.
         */
        if (spa_sync_pass(spa) == 1) {
-               range_tree_swap(freetree, freed_tree);
+               range_tree_swap(&msp->ms_freeingtree, &msp->ms_freedtree);
        } else {
-               range_tree_vacate(*freetree, range_tree_add, *freed_tree);
+               range_tree_vacate(msp->ms_freeingtree,
+                   range_tree_add, msp->ms_freedtree);
        }
        range_tree_vacate(alloctree, NULL, NULL);
 
        ASSERT0(range_tree_space(msp->ms_alloctree[txg & TXG_MASK]));
-       ASSERT0(range_tree_space(msp->ms_freetree[txg & TXG_MASK]));
+       ASSERT0(range_tree_space(msp->ms_alloctree[TXG_CLEAN(txg) & TXG_MASK]));
+       ASSERT0(range_tree_space(msp->ms_freeingtree));
 
        mutex_exit(&msp->ms_lock);
 
@@ -1952,9 +2362,11 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
 {
        metaslab_group_t *mg = msp->ms_group;
        vdev_t *vd = mg->mg_vd;
-       range_tree_t **freed_tree;
+       spa_t *spa = vd->vdev_spa;
        range_tree_t **defer_tree;
        int64_t alloc_delta, defer_delta;
+       uint64_t free_space;
+       boolean_t defer_allowed = B_TRUE;
        int t;
 
        ASSERT(!vd->vdev_ishole);
@@ -1963,20 +2375,24 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
 
        /*
         * If this metaslab is just becoming available, initialize its
-        * alloctrees, freetrees, and defertree and add its capacity to
-        * the vdev.
+        * range trees and add its capacity to the vdev.
         */
-       if (msp->ms_freetree[TXG_CLEAN(txg) & TXG_MASK] == NULL) {
+       if (msp->ms_freedtree == NULL) {
                for (t = 0; t < TXG_SIZE; t++) {
                        ASSERT(msp->ms_alloctree[t] == NULL);
-                       ASSERT(msp->ms_freetree[t] == NULL);
 
                        msp->ms_alloctree[t] = range_tree_create(NULL, msp,
                            &msp->ms_lock);
-                       msp->ms_freetree[t] = range_tree_create(NULL, msp,
-                           &msp->ms_lock);
                }
 
+               ASSERT3P(msp->ms_freeingtree, ==, NULL);
+               msp->ms_freeingtree = range_tree_create(NULL, msp,
+                   &msp->ms_lock);
+
+               ASSERT3P(msp->ms_freedtree, ==, NULL);
+               msp->ms_freedtree = range_tree_create(NULL, msp,
+                   &msp->ms_lock);
+
                for (t = 0; t < TXG_DEFER_SIZE; t++) {
                        ASSERT(msp->ms_defertree[t] == NULL);
 
@@ -1987,18 +2403,25 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                vdev_space_update(vd, 0, 0, msp->ms_size);
        }
 
-       freed_tree = &msp->ms_freetree[TXG_CLEAN(txg) & TXG_MASK];
        defer_tree = &msp->ms_defertree[txg % TXG_DEFER_SIZE];
 
+       free_space = metaslab_class_get_space(spa_normal_class(spa)) -
+           metaslab_class_get_alloc(spa_normal_class(spa));
+       if (free_space <= spa_get_slop_space(spa)) {
+               defer_allowed = B_FALSE;
+       }
+
+       defer_delta = 0;
        alloc_delta = space_map_alloc_delta(msp->ms_sm);
-       defer_delta = range_tree_space(*freed_tree) -
-           range_tree_space(*defer_tree);
+       if (defer_allowed) {
+               defer_delta = range_tree_space(msp->ms_freedtree) -
+                   range_tree_space(*defer_tree);
+       } else {
+               defer_delta -= range_tree_space(*defer_tree);
+       }
 
        vdev_space_update(vd, alloc_delta + defer_delta, defer_delta, 0);
 
-       ASSERT0(range_tree_space(msp->ms_alloctree[txg & TXG_MASK]));
-       ASSERT0(range_tree_space(msp->ms_freetree[txg & TXG_MASK]));
-
        /*
         * If there's a metaslab_load() in progress, wait for it to complete
         * so that we have a consistent view of the in-core space map.
@@ -2013,7 +2436,12 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
         */
        range_tree_vacate(*defer_tree,
            msp->ms_loaded ? range_tree_add : NULL, msp->ms_tree);
-       range_tree_swap(freed_tree, defer_tree);
+       if (defer_allowed) {
+               range_tree_swap(&msp->ms_freedtree, defer_tree);
+       } else {
+               range_tree_vacate(msp->ms_freedtree,
+                   msp->ms_loaded ? range_tree_add : NULL, msp->ms_tree);
+       }
 
        space_map_update(msp->ms_sm);
 
@@ -2028,7 +2456,19 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                vdev_dirty(vd, VDD_METASLAB, msp, txg + 1);
        }
 
-       if (msp->ms_loaded && msp->ms_access_txg < txg) {
+       /*
+        * Calculate the new weights before unloading any metaslabs.
+        * This will give us the most accurate weighting.
+        */
+       metaslab_group_sort(mg, msp, metaslab_weight(msp));
+
+       /*
+        * If the metaslab is loaded and we've not tried to load or allocate
+        * from it in 'metaslab_unload_delay' txgs, then unload it.
+        */
+       if (msp->ms_loaded &&
+           msp->ms_selected_txg + metaslab_unload_delay < txg) {
+
                for (t = 1; t < TXG_CONCURRENT_STATES; t++) {
                        VERIFY0(range_tree_space(
                            msp->ms_alloctree[(txg + t) & TXG_MASK]));
@@ -2038,7 +2478,6 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                        metaslab_unload(msp);
        }
 
-       metaslab_group_sort(mg, msp, metaslab_weight(msp));
        mutex_exit(&msp->ms_lock);
 }
 
@@ -2071,14 +2510,238 @@ metaslab_distance(metaslab_t *msp, dva_t *dva)
        return (0);
 }
 
+/*
+ * ==========================================================================
+ * Metaslab allocation tracing facility
+ * ==========================================================================
+ */
+#ifdef _METASLAB_TRACING
+kstat_t *metaslab_trace_ksp;
+kstat_named_t metaslab_trace_over_limit;
+
+void
+metaslab_alloc_trace_init(void)
+{
+       ASSERT(metaslab_alloc_trace_cache == NULL);
+       metaslab_alloc_trace_cache = kmem_cache_create(
+           "metaslab_alloc_trace_cache", sizeof (metaslab_alloc_trace_t),
+           0, NULL, NULL, NULL, NULL, NULL, 0);
+       metaslab_trace_ksp = kstat_create("zfs", 0, "metaslab_trace_stats",
+           "misc", KSTAT_TYPE_NAMED, 1, KSTAT_FLAG_VIRTUAL);
+       if (metaslab_trace_ksp != NULL) {
+               metaslab_trace_ksp->ks_data = &metaslab_trace_over_limit;
+               kstat_named_init(&metaslab_trace_over_limit,
+                   "metaslab_trace_over_limit", KSTAT_DATA_UINT64);
+               kstat_install(metaslab_trace_ksp);
+       }
+}
+
+void
+metaslab_alloc_trace_fini(void)
+{
+       if (metaslab_trace_ksp != NULL) {
+               kstat_delete(metaslab_trace_ksp);
+               metaslab_trace_ksp = NULL;
+       }
+       kmem_cache_destroy(metaslab_alloc_trace_cache);
+       metaslab_alloc_trace_cache = NULL;
+}
+
+/*
+ * Add an allocation trace element to the allocation tracing list.
+ */
+static void
+metaslab_trace_add(zio_alloc_list_t *zal, metaslab_group_t *mg,
+    metaslab_t *msp, uint64_t psize, uint32_t dva_id, uint64_t offset)
+{
+       metaslab_alloc_trace_t *mat;
+
+       if (!metaslab_trace_enabled)
+               return;
+
+       /*
+        * When the tracing list reaches its maximum we remove
+        * the second element in the list before adding a new one.
+        * By removing the second element we preserve the original
+        * entry as a clue to what allocations steps have already been
+        * performed.
+        */
+       if (zal->zal_size == metaslab_trace_max_entries) {
+               metaslab_alloc_trace_t *mat_next;
+#ifdef DEBUG
+               panic("too many entries in allocation list");
+#endif
+               atomic_inc_64(&metaslab_trace_over_limit.value.ui64);
+               zal->zal_size--;
+               mat_next = list_next(&zal->zal_list, list_head(&zal->zal_list));
+               list_remove(&zal->zal_list, mat_next);
+               kmem_cache_free(metaslab_alloc_trace_cache, mat_next);
+       }
+
+       mat = kmem_cache_alloc(metaslab_alloc_trace_cache, KM_SLEEP);
+       list_link_init(&mat->mat_list_node);
+       mat->mat_mg = mg;
+       mat->mat_msp = msp;
+       mat->mat_size = psize;
+       mat->mat_dva_id = dva_id;
+       mat->mat_offset = offset;
+       mat->mat_weight = 0;
+
+       if (msp != NULL)
+               mat->mat_weight = msp->ms_weight;
+
+       /*
+        * The list is part of the zio so locking is not required. Only
+        * a single thread will perform allocations for a given zio.
+        */
+       list_insert_tail(&zal->zal_list, mat);
+       zal->zal_size++;
+
+       ASSERT3U(zal->zal_size, <=, metaslab_trace_max_entries);
+}
+
+void
+metaslab_trace_init(zio_alloc_list_t *zal)
+{
+       list_create(&zal->zal_list, sizeof (metaslab_alloc_trace_t),
+           offsetof(metaslab_alloc_trace_t, mat_list_node));
+       zal->zal_size = 0;
+}
+
+void
+metaslab_trace_fini(zio_alloc_list_t *zal)
+{
+       metaslab_alloc_trace_t *mat;
+
+       while ((mat = list_remove_head(&zal->zal_list)) != NULL)
+               kmem_cache_free(metaslab_alloc_trace_cache, mat);
+       list_destroy(&zal->zal_list);
+       zal->zal_size = 0;
+}
+#else
+
+#define        metaslab_trace_add(zal, mg, msp, psize, id, off)
+
+void
+metaslab_alloc_trace_init(void)
+{
+}
+
+void
+metaslab_alloc_trace_fini(void)
+{
+}
+
+void
+metaslab_trace_init(zio_alloc_list_t *zal)
+{
+}
+
+void
+metaslab_trace_fini(zio_alloc_list_t *zal)
+{
+}
+
+#endif /* _METASLAB_TRACING */
+
+/*
+ * ==========================================================================
+ * Metaslab block operations
+ * ==========================================================================
+ */
+
+static void
+metaslab_group_alloc_increment(spa_t *spa, uint64_t vdev, void *tag, int flags)
+{
+       metaslab_group_t *mg;
+
+       if (!(flags & METASLAB_ASYNC_ALLOC) ||
+           flags & METASLAB_DONT_THROTTLE)
+               return;
+
+       mg = vdev_lookup_top(spa, vdev)->vdev_mg;
+       if (!mg->mg_class->mc_alloc_throttle_enabled)
+               return;
+
+       (void) refcount_add(&mg->mg_alloc_queue_depth, tag);
+}
+
+void
+metaslab_group_alloc_decrement(spa_t *spa, uint64_t vdev, void *tag, int flags)
+{
+       metaslab_group_t *mg;
+
+       if (!(flags & METASLAB_ASYNC_ALLOC) ||
+           flags & METASLAB_DONT_THROTTLE)
+               return;
+
+       mg = vdev_lookup_top(spa, vdev)->vdev_mg;
+       if (!mg->mg_class->mc_alloc_throttle_enabled)
+               return;
+
+       (void) refcount_remove(&mg->mg_alloc_queue_depth, tag);
+}
+
+void
+metaslab_group_alloc_verify(spa_t *spa, const blkptr_t *bp, void *tag)
+{
+#ifdef ZFS_DEBUG
+       const dva_t *dva = bp->blk_dva;
+       int ndvas = BP_GET_NDVAS(bp);
+       int d;
+
+       for (d = 0; d < ndvas; d++) {
+               uint64_t vdev = DVA_GET_VDEV(&dva[d]);
+               metaslab_group_t *mg = vdev_lookup_top(spa, vdev)->vdev_mg;
+               VERIFY(refcount_not_held(&mg->mg_alloc_queue_depth, tag));
+       }
+#endif
+}
+
 static uint64_t
-metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
-    uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
+metaslab_block_alloc(metaslab_t *msp, uint64_t size, uint64_t txg)
+{
+       uint64_t start;
+       range_tree_t *rt = msp->ms_tree;
+       metaslab_class_t *mc = msp->ms_group->mg_class;
+
+       VERIFY(!msp->ms_condensing);
+
+       start = mc->mc_ops->msop_alloc(msp, size);
+       if (start != -1ULL) {
+               metaslab_group_t *mg = msp->ms_group;
+               vdev_t *vd = mg->mg_vd;
+
+               VERIFY0(P2PHASE(start, 1ULL << vd->vdev_ashift));
+               VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
+               VERIFY3U(range_tree_space(rt) - size, <=, msp->ms_size);
+               range_tree_remove(rt, start, size);
+
+               if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
+                       vdev_dirty(mg->mg_vd, VDD_METASLAB, msp, txg);
+
+               range_tree_add(msp->ms_alloctree[txg & TXG_MASK], start, size);
+
+               /* Track the last successful allocation */
+               msp->ms_alloc_txg = txg;
+               metaslab_verify_space(msp, txg);
+       }
+
+       /*
+        * Now that we've attempted the allocation we need to update the
+        * metaslab's maximum block size since it may have changed.
+        */
+       msp->ms_max_size = metaslab_block_maxsize(msp);
+       return (start);
+}
+
+static uint64_t
+metaslab_group_alloc_normal(metaslab_group_t *mg, zio_alloc_list_t *zal,
+    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
 {
-       spa_t *spa = mg->mg_vd->vdev_spa;
        metaslab_t *msp = NULL;
+       metaslab_t *search;
        uint64_t offset = -1ULL;
-       avl_tree_t *t = &mg->mg_metaslab_tree;
        uint64_t activation_weight;
        uint64_t target_distance;
        int i;
@@ -2091,20 +2754,39 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
                }
        }
 
+       search = kmem_alloc(sizeof (*search), KM_SLEEP);
+       search->ms_weight = UINT64_MAX;
+       search->ms_start = 0;
        for (;;) {
                boolean_t was_active;
+               avl_tree_t *t = &mg->mg_metaslab_tree;
+               avl_index_t idx;
 
                mutex_enter(&mg->mg_lock);
-               for (msp = avl_first(t); msp; msp = AVL_NEXT(t, msp)) {
-                       if (msp->ms_weight < asize) {
-                               spa_dbgmsg(spa, "%s: failed to meet weight "
-                                   "requirement: vdev %llu, txg %llu, mg %p, "
-                                   "msp %p, psize %llu, asize %llu, "
-                                   "weight %llu", spa_name(spa),
-                                   mg->mg_vd->vdev_id, txg,
-                                   mg, msp, psize, asize, msp->ms_weight);
-                               mutex_exit(&mg->mg_lock);
-                               return (-1ULL);
+
+               /*
+                * Find the metaslab with the highest weight that is less
+                * than what we've already tried.  In the common case, this
+                * means that we will examine each metaslab at most once.
+                * Note that concurrent callers could reorder metaslabs
+                * by activation/passivation once we have dropped the mg_lock.
+                * If a metaslab is activated by another thread, and we fail
+                * to allocate from the metaslab we have selected, we may
+                * not try the newly-activated metaslab, and instead activate
+                * another metaslab.  This is not optimal, but generally
+                * does not cause any problems (a possible exception being
+                * if every metaslab is completely full except for the
+                * the newly-activated metaslab which we fail to examine).
+                */
+               msp = avl_find(t, search, &idx);
+               if (msp == NULL)
+                       msp = avl_nearest(t, idx, AVL_AFTER);
+               for (; msp != NULL; msp = AVL_NEXT(t, msp)) {
+
+                       if (!metaslab_should_allocate(msp, asize)) {
+                               metaslab_trace_add(zal, mg, msp, asize, d,
+                                   TRACE_TOO_SMALL);
+                               continue;
                        }
 
                        /*
@@ -2121,16 +2803,21 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
                            (space_map_allocated(msp->ms_sm) != 0 ? 0 :
                            min_distance >> 1);
 
-                       for (i = 0; i < d; i++)
+                       for (i = 0; i < d; i++) {
                                if (metaslab_distance(msp, &dva[i]) <
                                    target_distance)
                                        break;
+                       }
                        if (i == d)
                                break;
                }
                mutex_exit(&mg->mg_lock);
-               if (msp == NULL)
+               if (msp == NULL) {
+                       kmem_free(search, sizeof (*search));
                        return (-1ULL);
+               }
+               search->ms_weight = msp->ms_weight;
+               search->ms_start = msp->ms_start + 1;
 
                mutex_enter(&msp->ms_lock);
 
@@ -2138,11 +2825,11 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
                 * Ensure that the metaslab we have selected is still
                 * capable of handling our request. It's possible that
                 * another thread may have changed the weight while we
-                * were blocked on the metaslab lock.
+                * were blocked on the metaslab lock. We check the
+                * active status first to see if we need to reselect
+                * a new metaslab.
                 */
-               if (msp->ms_weight < asize || (was_active &&
-                   !(msp->ms_weight & METASLAB_ACTIVE_MASK) &&
-                   activation_weight == METASLAB_WEIGHT_PRIMARY)) {
+               if (was_active && !(msp->ms_weight & METASLAB_ACTIVE_MASK)) {
                        mutex_exit(&msp->ms_lock);
                        continue;
                }
@@ -2159,6 +2846,22 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
                        mutex_exit(&msp->ms_lock);
                        continue;
                }
+               msp->ms_selected_txg = txg;
+
+               /*
+                * Now that we have the lock, recheck to see if we should
+                * continue to use this metaslab for this allocation. The
+                * the metaslab is now loaded so metaslab_should_allocate() can
+                * accurately determine if the allocation attempt should
+                * proceed.
+                */
+               if (!metaslab_should_allocate(msp, asize)) {
+                       /* Passivate this metaslab and select a new one. */
+                       metaslab_trace_add(zal, mg, msp, asize, d,
+                           TRACE_TOO_SMALL);
+                       goto next;
+               }
+
 
                /*
                 * If this metaslab is currently condensing then pick again as
@@ -2166,55 +2869,131 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
                 * to disk.
                 */
                if (msp->ms_condensing) {
+                       metaslab_trace_add(zal, mg, msp, asize, d,
+                           TRACE_CONDENSING);
                        mutex_exit(&msp->ms_lock);
                        continue;
                }
 
-               if ((offset = metaslab_block_alloc(msp, asize)) != -1ULL)
+               offset = metaslab_block_alloc(msp, asize, txg);
+               metaslab_trace_add(zal, mg, msp, asize, d, offset);
+
+               if (offset != -1ULL) {
+                       /* Proactively passivate the metaslab, if needed */
+                       metaslab_segment_may_passivate(msp);
                        break;
+               }
+next:
+               ASSERT(msp->ms_loaded);
 
-               metaslab_passivate(msp, metaslab_block_maxsize(msp));
+               /*
+                * We were unable to allocate from this metaslab so determine
+                * a new weight for this metaslab. Now that we have loaded
+                * the metaslab we can provide a better hint to the metaslab
+                * selector.
+                *
+                * For space-based metaslabs, we use the maximum block size.
+                * This information is only available when the metaslab
+                * is loaded and is more accurate than the generic free
+                * space weight that was calculated by metaslab_weight().
+                * This information allows us to quickly compare the maximum
+                * available allocation in the metaslab to the allocation
+                * size being requested.
+                *
+                * For segment-based metaslabs, determine the new weight
+                * based on the highest bucket in the range tree. We
+                * explicitly use the loaded segment weight (i.e. the range
+                * tree histogram) since it contains the space that is
+                * currently available for allocation and is accurate
+                * even within a sync pass.
+                */
+               if (WEIGHT_IS_SPACEBASED(msp->ms_weight)) {
+                       uint64_t weight = metaslab_block_maxsize(msp);
+                       WEIGHT_SET_SPACEBASED(weight);
+                       metaslab_passivate(msp, weight);
+               } else {
+                       metaslab_passivate(msp,
+                           metaslab_weight_from_range_tree(msp));
+               }
+
+               /*
+                * We have just failed an allocation attempt, check
+                * that metaslab_should_allocate() agrees. Otherwise,
+                * we may end up in an infinite loop retrying the same
+                * metaslab.
+                */
+               ASSERT(!metaslab_should_allocate(msp, asize));
                mutex_exit(&msp->ms_lock);
        }
+       mutex_exit(&msp->ms_lock);
+       kmem_free(search, sizeof (*search));
+       return (offset);
+}
 
-       if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
-               vdev_dirty(mg->mg_vd, VDD_METASLAB, msp, txg);
-
-       range_tree_add(msp->ms_alloctree[txg & TXG_MASK], offset, asize);
-       msp->ms_access_txg = txg + metaslab_unload_delay;
+static uint64_t
+metaslab_group_alloc(metaslab_group_t *mg, zio_alloc_list_t *zal,
+    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
+{
+       uint64_t offset;
+       ASSERT(mg->mg_initialized);
 
-       mutex_exit(&msp->ms_lock);
+       offset = metaslab_group_alloc_normal(mg, zal, asize, txg,
+           min_distance, dva, d);
 
+       mutex_enter(&mg->mg_lock);
+       if (offset == -1ULL) {
+               mg->mg_failed_allocations++;
+               metaslab_trace_add(zal, mg, NULL, asize, d,
+                   TRACE_GROUP_FAILURE);
+               if (asize == SPA_GANGBLOCKSIZE) {
+                       /*
+                        * This metaslab group was unable to allocate
+                        * the minimum gang block size so it must be out of
+                        * space. We must notify the allocation throttle
+                        * to start skipping allocation attempts to this
+                        * metaslab group until more space becomes available.
+                        * Note: this failure cannot be caused by the
+                        * allocation throttle since the allocation throttle
+                        * is only responsible for skipping devices and
+                        * not failing block allocations.
+                        */
+                       mg->mg_no_free_space = B_TRUE;
+               }
+       }
+       mg->mg_allocations++;
+       mutex_exit(&mg->mg_lock);
        return (offset);
 }
 
+/*
+ * If we have to write a ditto block (i.e. more than one DVA for a given BP)
+ * on the same vdev as an existing DVA of this BP, then try to allocate it
+ * at least (vdev_asize / (2 ^ ditto_same_vdev_distance_shift)) away from the
+ * existing DVAs.
+ */
+int ditto_same_vdev_distance_shift = 3;
+
 /*
  * Allocate a block for the specified i/o.
  */
 static int
 metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
-    dva_t *dva, int d, dva_t *hintdva, uint64_t txg, int flags)
+    dva_t *dva, int d, dva_t *hintdva, uint64_t txg, int flags,
+    zio_alloc_list_t *zal)
 {
        metaslab_group_t *mg, *fast_mg, *rotor;
        vdev_t *vd;
-       int dshift = 3;
-       int all_zero;
-       int zio_lock = B_FALSE;
-       boolean_t allocatable;
-       uint64_t offset = -1ULL;
-       uint64_t asize;
-       uint64_t distance;
+       boolean_t try_hard = B_FALSE;
 
        ASSERT(!DVA_IS_VALID(&dva[d]));
 
        /*
         * For testing, make some blocks above a certain size be gang blocks.
         */
-       if (psize >= metaslab_gang_bang && (ddi_get_lbolt() & 3) == 0)
+       if (psize >= metaslab_gang_bang && (ddi_get_lbolt() & 3) == 0) {
+               metaslab_trace_add(zal, NULL, NULL, psize, d, TRACE_FORCE_GANG);
                return (SET_ERROR(ENOSPC));
-
-       if (flags & METASLAB_FASTWRITE)
-               mutex_enter(&mc->mc_fastwrite_lock);
+       }
 
        /*
         * Start at the rotor and loop through all mgs until we find something.
@@ -2280,16 +3059,18 @@ metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
 
        rotor = mg;
 top:
-       all_zero = B_TRUE;
        do {
-               ASSERT(mg->mg_activation_count == 1);
+               boolean_t allocatable;
+               uint64_t offset;
+               uint64_t distance, asize;
 
+               ASSERT(mg->mg_activation_count == 1);
                vd = mg->mg_vd;
 
                /*
                 * Don't allocate from faulted devices.
                 */
-               if (zio_lock) {
+               if (try_hard) {
                        spa_config_enter(spa, SCL_ZIO, FTAG, RW_READER);
                        allocatable = vdev_allocatable(vd);
                        spa_config_exit(spa, SCL_ZIO, FTAG);
@@ -2299,45 +3080,59 @@ top:
 
                /*
                 * Determine if the selected metaslab group is eligible
-                * for allocations. If we're ganging or have requested
-                * an allocation for the smallest gang block size
-                * then we don't want to avoid allocating to the this
-                * metaslab group. If we're in this condition we should
-                * try to allocate from any device possible so that we
-                * don't inadvertently return ENOSPC and suspend the pool
+                * for allocations. If we're ganging then don't allow
+                * this metaslab group to skip allocations since that would
+                * inadvertently return ENOSPC and suspend the pool
                 * even though space is still available.
                 */
-               if (allocatable && CAN_FASTGANG(flags) &&
-                   psize > SPA_GANGBLOCKSIZE)
-                       allocatable = metaslab_group_allocatable(mg);
+               if (allocatable && !GANG_ALLOCATION(flags) && !try_hard) {
+                       allocatable = metaslab_group_allocatable(mg, rotor,
+                           psize);
+               }
 
-               if (!allocatable)
+               if (!allocatable) {
+                       metaslab_trace_add(zal, mg, NULL, psize, d,
+                           TRACE_NOT_ALLOCATABLE);
                        goto next;
+               }
+
+               ASSERT(mg->mg_initialized);
 
                /*
-                * Avoid writing single-copy data to a failing vdev
-                * unless the user instructs us that it is okay.
+                * Avoid writing single-copy data to a failing,
+                * non-redundant vdev, unless we've already tried all
+                * other vdevs.
                 */
                if ((vd->vdev_stat.vs_write_errors > 0 ||
                    vd->vdev_state < VDEV_STATE_HEALTHY) &&
-                   d == 0 && dshift == 3 && vd->vdev_children == 0) {
-                       all_zero = B_FALSE;
+                   d == 0 && !try_hard && vd->vdev_children == 0) {
+                       metaslab_trace_add(zal, mg, NULL, psize, d,
+                           TRACE_VDEV_ERROR);
                        goto next;
                }
 
                ASSERT(mg->mg_class == mc);
 
-               distance = vd->vdev_asize >> dshift;
-               if (distance <= (1ULL << vd->vdev_ms_shift))
-                       distance = 0;
-               else
-                       all_zero = B_FALSE;
+               /*
+                * If we don't need to try hard, then require that the
+                * block be 1/8th of the device away from any other DVAs
+                * in this BP.  If we are trying hard, allow any offset
+                * to be used (distance=0).
+                */
+               distance = 0;
+               if (!try_hard) {
+                       distance = vd->vdev_asize >>
+                           ditto_same_vdev_distance_shift;
+                       if (distance <= (1ULL << vd->vdev_ms_shift))
+                               distance = 0;
+               }
 
                asize = vdev_psize_to_asize(vd, psize);
                ASSERT(P2PHASE(asize, 1ULL << vd->vdev_ashift) == 0);
 
-               offset = metaslab_group_alloc(mg, psize, asize, txg, distance,
+               offset = metaslab_group_alloc(mg, zal, asize, txg, distance,
                    dva, d);
+
                if (offset != -1ULL) {
                        /*
                         * If we've just selected this metaslab group,
@@ -2394,13 +3189,13 @@ top:
 
                        DVA_SET_VDEV(&dva[d], vd->vdev_id);
                        DVA_SET_OFFSET(&dva[d], offset);
-                       DVA_SET_GANG(&dva[d], !!(flags & METASLAB_GANG_HEADER));
+                       DVA_SET_GANG(&dva[d],
+                           ((flags & METASLAB_GANG_HEADER) ? 1 : 0));
                        DVA_SET_ASIZE(&dva[d], asize);
 
                        if (flags & METASLAB_FASTWRITE) {
                                atomic_add_64(&vd->vdev_pending_fastwrite,
                                    psize);
-                               mutex_exit(&mc->mc_fastwrite_lock);
                        }
 
                        return (0);
@@ -2410,23 +3205,17 @@ next:
                mc->mc_aliquot = 0;
        } while ((mg = mg->mg_next) != rotor);
 
-       if (!all_zero) {
-               dshift++;
-               ASSERT(dshift < 64);
-               goto top;
-       }
-
-       if (!allocatable && !zio_lock) {
-               dshift = 3;
-               zio_lock = B_TRUE;
+       /*
+        * If we haven't tried hard, do so now.
+        */
+       if (!try_hard) {
+               try_hard = B_TRUE;
                goto top;
        }
 
        bzero(&dva[d], sizeof (dva_t));
 
-       if (flags & METASLAB_FASTWRITE)
-               mutex_exit(&mc->mc_fastwrite_lock);
-
+       metaslab_trace_add(zal, rotor, NULL, psize, d, TRACE_ENOSPC);
        return (SET_ERROR(ENOSPC));
 }
 
@@ -2473,11 +3262,12 @@ metaslab_free_dva(spa_t *spa, const dva_t *dva, uint64_t txg, boolean_t now)
                VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
                VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
                range_tree_add(msp->ms_tree, offset, size);
+               msp->ms_max_size = metaslab_block_maxsize(msp);
        } else {
-               if (range_tree_space(msp->ms_freetree[txg & TXG_MASK]) == 0)
+               VERIFY3U(txg, ==, spa->spa_syncing_txg);
+               if (range_tree_space(msp->ms_freeingtree) == 0)
                        vdev_dirty(vd, VDD_METASLAB, msp, txg);
-               range_tree_add(msp->ms_freetree[txg & TXG_MASK],
-                   offset, size);
+               range_tree_add(msp->ms_freeingtree, offset, size);
        }
 
        mutex_exit(&msp->ms_lock);
@@ -2540,9 +3330,63 @@ metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
        return (0);
 }
 
+/*
+ * Reserve some allocation slots. The reservation system must be called
+ * before we call into the allocator. If there aren't any available slots
+ * then the I/O will be throttled until an I/O completes and its slots are
+ * freed up. The function returns true if it was successful in placing
+ * the reservation.
+ */
+boolean_t
+metaslab_class_throttle_reserve(metaslab_class_t *mc, int slots, zio_t *zio,
+    int flags)
+{
+       uint64_t available_slots = 0;
+       uint64_t reserved_slots;
+       boolean_t slot_reserved = B_FALSE;
+
+       ASSERT(mc->mc_alloc_throttle_enabled);
+       mutex_enter(&mc->mc_lock);
+
+       reserved_slots = refcount_count(&mc->mc_alloc_slots);
+       if (reserved_slots < mc->mc_alloc_max_slots)
+               available_slots = mc->mc_alloc_max_slots - reserved_slots;
+
+       if (slots <= available_slots || GANG_ALLOCATION(flags)) {
+               int d;
+
+               /*
+                * We reserve the slots individually so that we can unreserve
+                * them individually when an I/O completes.
+                */
+               for (d = 0; d < slots; d++) {
+                       reserved_slots = refcount_add(&mc->mc_alloc_slots, zio);
+               }
+               zio->io_flags |= ZIO_FLAG_IO_ALLOCATING;
+               slot_reserved = B_TRUE;
+       }
+
+       mutex_exit(&mc->mc_lock);
+       return (slot_reserved);
+}
+
+void
+metaslab_class_throttle_unreserve(metaslab_class_t *mc, int slots, zio_t *zio)
+{
+       int d;
+
+       ASSERT(mc->mc_alloc_throttle_enabled);
+       mutex_enter(&mc->mc_lock);
+       for (d = 0; d < slots; d++) {
+               (void) refcount_remove(&mc->mc_alloc_slots, zio);
+       }
+       mutex_exit(&mc->mc_lock);
+}
+
 int
 metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
-    int ndvas, uint64_t txg, blkptr_t *hintbp, int flags)
+    int ndvas, uint64_t txg, blkptr_t *hintbp, int flags,
+    zio_alloc_list_t *zal, zio_t *zio)
 {
        dva_t *dva = bp->blk_dva;
        dva_t *hintdva = hintbp->blk_dva;
@@ -2561,25 +3405,36 @@ metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
        ASSERT(ndvas > 0 && ndvas <= spa_max_replication(spa));
        ASSERT(BP_GET_NDVAS(bp) == 0);
        ASSERT(hintbp == NULL || ndvas <= BP_GET_NDVAS(hintbp));
+       ASSERT3P(zal, !=, NULL);
 
        for (d = 0; d < ndvas; d++) {
                error = metaslab_alloc_dva(spa, mc, psize, dva, d, hintdva,
-                   txg, flags);
+                   txg, flags, zal);
                if (error != 0) {
                        for (d--; d >= 0; d--) {
                                metaslab_free_dva(spa, &dva[d], txg, B_TRUE);
+                               metaslab_group_alloc_decrement(spa,
+                                   DVA_GET_VDEV(&dva[d]), zio, flags);
                                bzero(&dva[d], sizeof (dva_t));
                        }
                        spa_config_exit(spa, SCL_ALLOC, FTAG);
                        return (error);
+               } else {
+                       /*
+                        * Update the metaslab group's queue depth
+                        * based on the newly allocated dva.
+                        */
+                       metaslab_group_alloc_increment(spa,
+                           DVA_GET_VDEV(&dva[d]), zio, flags);
                }
+
        }
        ASSERT(error == 0);
        ASSERT(BP_GET_NDVAS(bp) == ndvas);
 
        spa_config_exit(spa, SCL_ALLOC, FTAG);
 
-       BP_SET_BIRTH(bp, txg, txg);
+       BP_SET_BIRTH(bp, txg, 0);
 
        return (0);
 }
@@ -2700,8 +3555,8 @@ metaslab_check_free(spa_t *spa, const blkptr_t *bp)
                if (msp->ms_loaded)
                        range_tree_verify(msp->ms_tree, offset, size);
 
-               for (j = 0; j < TXG_SIZE; j++)
-                       range_tree_verify(msp->ms_freetree[j], offset, size);
+               range_tree_verify(msp->ms_freeingtree, offset, size);
+               range_tree_verify(msp->ms_freedtree, offset, size);
                for (j = 0; j < TXG_DEFER_SIZE; j++)
                        range_tree_verify(msp->ms_defertree[j], offset, size);
        }
@@ -2709,37 +3564,52 @@ metaslab_check_free(spa_t *spa, const blkptr_t *bp)
 }
 
 #if defined(_KERNEL) && defined(HAVE_SPL)
+/* CSTYLED */
 module_param(metaslab_aliquot, ulong, 0644);
-module_param(metaslab_debug_load, int, 0644);
-module_param(metaslab_debug_unload, int, 0644);
-module_param(metaslab_preload_enabled, int, 0644);
-module_param(zfs_mg_noalloc_threshold, int, 0644);
-module_param(zfs_mg_fragmentation_threshold, int, 0644);
-module_param(zfs_metaslab_fragmentation_threshold, int, 0644);
-module_param(metaslab_fragmentation_factor_enabled, int, 0644);
-module_param(metaslab_lba_weighting_enabled, int, 0644);
-module_param(metaslab_bias_enabled, int, 0644);
-
 MODULE_PARM_DESC(metaslab_aliquot,
        "allocation granularity (a.k.a. stripe size)");
+
+module_param(metaslab_debug_load, int, 0644);
 MODULE_PARM_DESC(metaslab_debug_load,
        "load all metaslabs when pool is first opened");
+
+module_param(metaslab_debug_unload, int, 0644);
 MODULE_PARM_DESC(metaslab_debug_unload,
        "prevent metaslabs from being unloaded");
+
+module_param(metaslab_preload_enabled, int, 0644);
 MODULE_PARM_DESC(metaslab_preload_enabled,
        "preload potential metaslabs during reassessment");
 
+module_param(zfs_mg_noalloc_threshold, int, 0644);
 MODULE_PARM_DESC(zfs_mg_noalloc_threshold,
        "percentage of free space for metaslab group to allow allocation");
+
+module_param(zfs_mg_fragmentation_threshold, int, 0644);
 MODULE_PARM_DESC(zfs_mg_fragmentation_threshold,
        "fragmentation for metaslab group to allow allocation");
 
+module_param(zfs_metaslab_fragmentation_threshold, int, 0644);
 MODULE_PARM_DESC(zfs_metaslab_fragmentation_threshold,
        "fragmentation for metaslab to allow allocation");
+
+module_param(metaslab_fragmentation_factor_enabled, int, 0644);
 MODULE_PARM_DESC(metaslab_fragmentation_factor_enabled,
        "use the fragmentation metric to prefer less fragmented metaslabs");
+
+module_param(metaslab_lba_weighting_enabled, int, 0644);
 MODULE_PARM_DESC(metaslab_lba_weighting_enabled,
        "prefer metaslabs with lower LBAs");
+
+module_param(metaslab_bias_enabled, int, 0644);
 MODULE_PARM_DESC(metaslab_bias_enabled,
        "enable metaslab group biasing");
+
+module_param(zfs_metaslab_segment_weight_enabled, int, 0644);
+MODULE_PARM_DESC(zfs_metaslab_segment_weight_enabled,
+       "enable segment-based metaslab selection");
+
+module_param(zfs_metaslab_switch_threshold, int, 0644);
+MODULE_PARM_DESC(zfs_metaslab_switch_threshold,
+       "segment-based metaslab selection maximum buckets before switching");
 #endif /* _KERNEL && HAVE_SPL */