]> git.proxmox.com Git - mirror_zfs-debian.git/blobdiff - module/zfs/dbuf.c
New upstream version 0.7.2
[mirror_zfs-debian.git] / module / zfs / dbuf.c
index 483067cc7ba45101c4b497d1bd131816c7270c4f..dc2c00495b54c550668584815802659f8b08441e 100644 (file)
@@ -21,7 +21,7 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
- * Copyright (c) 2012, 2015 by Delphix. All rights reserved.
+ * Copyright (c) 2012, 2017 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
  */
 #include <sys/blkptr.h>
 #include <sys/range_tree.h>
 #include <sys/trace_dbuf.h>
+#include <sys/callb.h>
+#include <sys/abd.h>
 
 struct dbuf_hold_impl_data {
        /* Function arguments */
        dnode_t *dh_dn;
        uint8_t dh_level;
        uint64_t dh_blkid;
-       int dh_fail_sparse;
+       boolean_t dh_fail_sparse;
+       boolean_t dh_fail_uncached;
        void *dh_tag;
        dmu_buf_impl_t **dh_dbp;
        /* Local variables */
@@ -65,31 +68,94 @@ struct dbuf_hold_impl_data {
 };
 
 static void __dbuf_hold_impl_init(struct dbuf_hold_impl_data *dh,
-    dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
-    void *tag, dmu_buf_impl_t **dbp, int depth);
+    dnode_t *dn, uint8_t level, uint64_t blkid, boolean_t fail_sparse,
+       boolean_t fail_uncached,
+       void *tag, dmu_buf_impl_t **dbp, int depth);
 static int __dbuf_hold_impl(struct dbuf_hold_impl_data *dh);
 
-/*
- * Number of times that zfs_free_range() took the slow path while doing
- * a zfs receive.  A nonzero value indicates a potential performance problem.
- */
-uint64_t zfs_free_range_recv_miss;
+uint_t zfs_dbuf_evict_key;
 
-static void dbuf_destroy(dmu_buf_impl_t *db);
 static boolean_t dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx);
 static void dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx);
 
-#ifndef __lint
 extern inline void dmu_buf_init_user(dmu_buf_user_t *dbu,
-    dmu_buf_evict_func_t *evict_func, dmu_buf_t **clear_on_evict_dbufp);
-#endif /* ! __lint */
+    dmu_buf_evict_func_t *evict_func_sync,
+    dmu_buf_evict_func_t *evict_func_async,
+    dmu_buf_t **clear_on_evict_dbufp);
 
 /*
  * Global data structures and functions for the dbuf cache.
  */
-static kmem_cache_t *dbuf_cache;
+static kmem_cache_t *dbuf_kmem_cache;
 static taskq_t *dbu_evict_taskq;
 
+static kthread_t *dbuf_cache_evict_thread;
+static kmutex_t dbuf_evict_lock;
+static kcondvar_t dbuf_evict_cv;
+static boolean_t dbuf_evict_thread_exit;
+
+/*
+ * LRU cache of dbufs. The dbuf cache maintains a list of dbufs that
+ * are not currently held but have been recently released. These dbufs
+ * are not eligible for arc eviction until they are aged out of the cache.
+ * Dbufs are added to the dbuf cache once the last hold is released. If a
+ * dbuf is later accessed and still exists in the dbuf cache, then it will
+ * be removed from the cache and later re-added to the head of the cache.
+ * Dbufs that are aged out of the cache will be immediately destroyed and
+ * become eligible for arc eviction.
+ */
+static multilist_t *dbuf_cache;
+static refcount_t dbuf_cache_size;
+unsigned long  dbuf_cache_max_bytes = 100 * 1024 * 1024;
+
+/* Cap the size of the dbuf cache to log2 fraction of arc size. */
+int dbuf_cache_max_shift = 5;
+
+/*
+ * The dbuf cache uses a three-stage eviction policy:
+ *     - A low water marker designates when the dbuf eviction thread
+ *     should stop evicting from the dbuf cache.
+ *     - When we reach the maximum size (aka mid water mark), we
+ *     signal the eviction thread to run.
+ *     - The high water mark indicates when the eviction thread
+ *     is unable to keep up with the incoming load and eviction must
+ *     happen in the context of the calling thread.
+ *
+ * The dbuf cache:
+ *                                                 (max size)
+ *                                      low water   mid water   hi water
+ * +----------------------------------------+----------+----------+
+ * |                                        |          |          |
+ * |                                        |          |          |
+ * |                                        |          |          |
+ * |                                        |          |          |
+ * +----------------------------------------+----------+----------+
+ *                                        stop        signal     evict
+ *                                      evicting     eviction   directly
+ *                                                    thread
+ *
+ * The high and low water marks indicate the operating range for the eviction
+ * thread. The low water mark is, by default, 90% of the total size of the
+ * cache and the high water mark is at 110% (both of these percentages can be
+ * changed by setting dbuf_cache_lowater_pct and dbuf_cache_hiwater_pct,
+ * respectively). The eviction thread will try to ensure that the cache remains
+ * within this range by waking up every second and checking if the cache is
+ * above the low water mark. The thread can also be woken up by callers adding
+ * elements into the cache if the cache is larger than the mid water (i.e max
+ * cache size). Once the eviction thread is woken up and eviction is required,
+ * it will continue evicting buffers until it's able to reduce the cache size
+ * to the low water mark. If the cache size continues to grow and hits the high
+ * water mark, then callers adding elements to the cache will begin to evict
+ * directly from the cache until the cache is no longer above the high water
+ * mark.
+ */
+
+/*
+ * The percentage above and below the maximum cache size.
+ */
+uint_t dbuf_cache_hiwater_pct = 10;
+uint_t dbuf_cache_lowater_pct = 10;
+
 /* ARGSUSED */
 static int
 dbuf_cons(void *vdb, void *unused, int kmflag)
@@ -99,7 +165,9 @@ dbuf_cons(void *vdb, void *unused, int kmflag)
 
        mutex_init(&db->db_mtx, NULL, MUTEX_DEFAULT, NULL);
        cv_init(&db->db_changed, NULL, CV_DEFAULT, NULL);
+       multilist_link_init(&db->db_cache_link);
        refcount_create(&db->db_holds);
+       multilist_link_init(&db->db_cache_link);
 
        return (0);
 }
@@ -111,6 +179,7 @@ dbuf_dest(void *vdb, void *unused)
        dmu_buf_impl_t *db = vdb;
        mutex_destroy(&db->db_mtx);
        cv_destroy(&db->db_changed);
+       ASSERT(!multilist_link_active(&db->db_cache_link));
        refcount_destroy(&db->db_holds);
 }
 
@@ -140,8 +209,6 @@ dbuf_hash(void *os, uint64_t obj, uint8_t lvl, uint64_t blkid)
        return (crc);
 }
 
-#define        DBUF_HASH(os, obj, level, blkid) dbuf_hash(os, obj, level, blkid);
-
 #define        DBUF_EQUAL(dbuf, os, obj, level, blkid)         \
        ((dbuf)->db.db_object == (obj) &&               \
        (dbuf)->db_objset == (os) &&                    \
@@ -156,7 +223,7 @@ dbuf_find(objset_t *os, uint64_t obj, uint8_t level, uint64_t blkid)
        uint64_t idx;
        dmu_buf_impl_t *db;
 
-       hv = DBUF_HASH(os, obj, level, blkid);
+       hv = dbuf_hash(os, obj, level, blkid);
        idx = hv & h->hash_table_mask;
 
        mutex_enter(DBUF_HASH_MUTEX(h, idx));
@@ -209,7 +276,7 @@ dbuf_hash_insert(dmu_buf_impl_t *db)
        dmu_buf_impl_t *dbf;
 
        blkid = db->db_blkid;
-       hv = DBUF_HASH(os, obj, level, blkid);
+       hv = dbuf_hash(os, obj, level, blkid);
        idx = hv & h->hash_table_mask;
 
        mutex_enter(DBUF_HASH_MUTEX(h, idx));
@@ -228,7 +295,7 @@ dbuf_hash_insert(dmu_buf_impl_t *db)
        db->db_hash_next = h->hash_table[idx];
        h->hash_table[idx] = db;
        mutex_exit(DBUF_HASH_MUTEX(h, idx));
-       atomic_add_64(&dbuf_hash_count, 1);
+       atomic_inc_64(&dbuf_hash_count);
 
        return (NULL);
 }
@@ -243,12 +310,12 @@ dbuf_hash_remove(dmu_buf_impl_t *db)
        uint64_t hv, idx;
        dmu_buf_impl_t *dbf, **dbp;
 
-       hv = DBUF_HASH(db->db_objset, db->db.db_object,
+       hv = dbuf_hash(db->db_objset, db->db.db_object,
            db->db_level, db->db_blkid);
        idx = hv & h->hash_table_mask;
 
        /*
-        * We musn't hold db_mtx to maintain lock ordering:
+        * We mustn't hold db_mtx to maintain lock ordering:
         * DBUF_HASH_MUTEX > db_mtx.
         */
        ASSERT(refcount_is_zero(&db->db_holds));
@@ -264,11 +331,9 @@ dbuf_hash_remove(dmu_buf_impl_t *db)
        *dbp = db->db_hash_next;
        db->db_hash_next = NULL;
        mutex_exit(DBUF_HASH_MUTEX(h, idx));
-       atomic_add_64(&dbuf_hash_count, -1);
+       atomic_dec_64(&dbuf_hash_count);
 }
 
-static arc_evict_func_t dbuf_do_evict;
-
 typedef enum {
        DBVU_EVICTING,
        DBVU_NOT_EVICTING
@@ -330,11 +395,24 @@ dbuf_evict_user(dmu_buf_impl_t *db)
 #endif
 
        /*
-        * Invoke the callback from a taskq to avoid lock order reversals
-        * and limit stack depth.
+        * There are two eviction callbacks - one that we call synchronously
+        * and one that we invoke via a taskq.  The async one is useful for
+        * avoiding lock order reversals and limiting stack depth.
+        *
+        * Note that if we have a sync callback but no async callback,
+        * it's likely that the sync callback will free the structure
+        * containing the dbu.  In that case we need to take care to not
+        * dereference dbu after calling the sync evict func.
         */
-       taskq_dispatch_ent(dbu_evict_taskq, dbu->dbu_evict_func, dbu, 0,
-           &dbu->dbu_tqent);
+       boolean_t has_async = (dbu->dbu_evict_func_async != NULL);
+
+       if (dbu->dbu_evict_func_sync != NULL)
+               dbu->dbu_evict_func_sync(dbu);
+
+       if (has_async) {
+               taskq_dispatch_ent(dbu_evict_taskq, dbu->dbu_evict_func_async,
+                   dbu, 0, &dbu->dbu_tqent);
+       }
 }
 
 boolean_t
@@ -356,17 +434,182 @@ dbuf_is_metadata(dmu_buf_impl_t *db)
        }
 }
 
-void
-dbuf_evict(dmu_buf_impl_t *db)
+
+/*
+ * This function *must* return indices evenly distributed between all
+ * sublists of the multilist. This is needed due to how the dbuf eviction
+ * code is laid out; dbuf_evict_thread() assumes dbufs are evenly
+ * distributed between all sublists and uses this assumption when
+ * deciding which sublist to evict from and how much to evict from it.
+ */
+unsigned int
+dbuf_cache_multilist_index_func(multilist_t *ml, void *obj)
 {
-       ASSERT(MUTEX_HELD(&db->db_mtx));
-       ASSERT(db->db_buf == NULL);
-       ASSERT(db->db_data_pending == NULL);
+       dmu_buf_impl_t *db = obj;
 
-       dbuf_clear(db);
-       dbuf_destroy(db);
+       /*
+        * The assumption here, is the hash value for a given
+        * dmu_buf_impl_t will remain constant throughout it's lifetime
+        * (i.e. it's objset, object, level and blkid fields don't change).
+        * Thus, we don't need to store the dbuf's sublist index
+        * on insertion, as this index can be recalculated on removal.
+        *
+        * Also, the low order bits of the hash value are thought to be
+        * distributed evenly. Otherwise, in the case that the multilist
+        * has a power of two number of sublists, each sublists' usage
+        * would not be evenly distributed.
+        */
+       return (dbuf_hash(db->db_objset, db->db.db_object,
+           db->db_level, db->db_blkid) %
+           multilist_get_num_sublists(ml));
 }
 
+static inline boolean_t
+dbuf_cache_above_hiwater(void)
+{
+       uint64_t dbuf_cache_hiwater_bytes =
+           (dbuf_cache_max_bytes * dbuf_cache_hiwater_pct) / 100;
+
+       return (refcount_count(&dbuf_cache_size) >
+           dbuf_cache_max_bytes + dbuf_cache_hiwater_bytes);
+}
+
+static inline boolean_t
+dbuf_cache_above_lowater(void)
+{
+       uint64_t dbuf_cache_lowater_bytes =
+           (dbuf_cache_max_bytes * dbuf_cache_lowater_pct) / 100;
+
+       return (refcount_count(&dbuf_cache_size) >
+           dbuf_cache_max_bytes - dbuf_cache_lowater_bytes);
+}
+
+/*
+ * Evict the oldest eligible dbuf from the dbuf cache.
+ */
+static void
+dbuf_evict_one(void)
+{
+       int idx = multilist_get_random_index(dbuf_cache);
+       multilist_sublist_t *mls = multilist_sublist_lock(dbuf_cache, idx);
+       dmu_buf_impl_t *db;
+       ASSERT(!MUTEX_HELD(&dbuf_evict_lock));
+
+       /*
+        * Set the thread's tsd to indicate that it's processing evictions.
+        * Once a thread stops evicting from the dbuf cache it will
+        * reset its tsd to NULL.
+        */
+       ASSERT3P(tsd_get(zfs_dbuf_evict_key), ==, NULL);
+       (void) tsd_set(zfs_dbuf_evict_key, (void *)B_TRUE);
+
+       db = multilist_sublist_tail(mls);
+       while (db != NULL && mutex_tryenter(&db->db_mtx) == 0) {
+               db = multilist_sublist_prev(mls, db);
+       }
+
+       DTRACE_PROBE2(dbuf__evict__one, dmu_buf_impl_t *, db,
+           multilist_sublist_t *, mls);
+
+       if (db != NULL) {
+               multilist_sublist_remove(mls, db);
+               multilist_sublist_unlock(mls);
+               (void) refcount_remove_many(&dbuf_cache_size,
+                   db->db.db_size, db);
+               dbuf_destroy(db);
+       } else {
+               multilist_sublist_unlock(mls);
+       }
+       (void) tsd_set(zfs_dbuf_evict_key, NULL);
+}
+
+/*
+ * The dbuf evict thread is responsible for aging out dbufs from the
+ * cache. Once the cache has reached it's maximum size, dbufs are removed
+ * and destroyed. The eviction thread will continue running until the size
+ * of the dbuf cache is at or below the maximum size. Once the dbuf is aged
+ * out of the cache it is destroyed and becomes eligible for arc eviction.
+ */
+static void
+dbuf_evict_thread(void)
+{
+       callb_cpr_t cpr;
+
+       CALLB_CPR_INIT(&cpr, &dbuf_evict_lock, callb_generic_cpr, FTAG);
+
+       mutex_enter(&dbuf_evict_lock);
+       while (!dbuf_evict_thread_exit) {
+               while (!dbuf_cache_above_lowater() && !dbuf_evict_thread_exit) {
+                       CALLB_CPR_SAFE_BEGIN(&cpr);
+                       (void) cv_timedwait_sig_hires(&dbuf_evict_cv,
+                           &dbuf_evict_lock, SEC2NSEC(1), MSEC2NSEC(1), 0);
+                       CALLB_CPR_SAFE_END(&cpr, &dbuf_evict_lock);
+               }
+               mutex_exit(&dbuf_evict_lock);
+
+               /*
+                * Keep evicting as long as we're above the low water mark
+                * for the cache. We do this without holding the locks to
+                * minimize lock contention.
+                */
+               while (dbuf_cache_above_lowater() && !dbuf_evict_thread_exit) {
+                       dbuf_evict_one();
+               }
+
+               mutex_enter(&dbuf_evict_lock);
+       }
+
+       dbuf_evict_thread_exit = B_FALSE;
+       cv_broadcast(&dbuf_evict_cv);
+       CALLB_CPR_EXIT(&cpr);   /* drops dbuf_evict_lock */
+       thread_exit();
+}
+
+/*
+ * Wake up the dbuf eviction thread if the dbuf cache is at its max size.
+ * If the dbuf cache is at its high water mark, then evict a dbuf from the
+ * dbuf cache using the callers context.
+ */
+static void
+dbuf_evict_notify(void)
+{
+
+       /*
+        * We use thread specific data to track when a thread has
+        * started processing evictions. This allows us to avoid deeply
+        * nested stacks that would have a call flow similar to this:
+        *
+        * dbuf_rele()-->dbuf_rele_and_unlock()-->dbuf_evict_notify()
+        *      ^                                               |
+        *      |                                               |
+        *      +-----dbuf_destroy()<--dbuf_evict_one()<--------+
+        *
+        * The dbuf_eviction_thread will always have its tsd set until
+        * that thread exits. All other threads will only set their tsd
+        * if they are participating in the eviction process. This only
+        * happens if the eviction thread is unable to process evictions
+        * fast enough. To keep the dbuf cache size in check, other threads
+        * can evict from the dbuf cache directly. Those threads will set
+        * their tsd values so that we ensure that they only evict one dbuf
+        * from the dbuf cache.
+        */
+       if (tsd_get(zfs_dbuf_evict_key) != NULL)
+               return;
+
+       /*
+        * We check if we should evict without holding the dbuf_evict_lock,
+        * because it's OK to occasionally make the wrong decision here,
+        * and grabbing the lock results in massive lock contention.
+        */
+       if (refcount_count(&dbuf_cache_size) > dbuf_cache_max_bytes) {
+               if (dbuf_cache_above_hiwater())
+                       dbuf_evict_one();
+               cv_signal(&dbuf_evict_cv);
+       }
+}
+
+
+
 void
 dbuf_init(void)
 {
@@ -401,7 +644,7 @@ retry:
                goto retry;
        }
 
-       dbuf_cache = kmem_cache_create("dmu_buf_impl_t",
+       dbuf_kmem_cache = kmem_cache_create("dmu_buf_impl_t",
            sizeof (dmu_buf_impl_t),
            0, dbuf_cons, dbuf_dest, NULL, NULL, NULL, 0);
 
@@ -410,11 +653,30 @@ retry:
 
        dbuf_stats_init(h);
 
+       /*
+        * Setup the parameters for the dbuf cache. We cap the size of the
+        * dbuf cache to 1/32nd (default) of the size of the ARC.
+        */
+       dbuf_cache_max_bytes = MIN(dbuf_cache_max_bytes,
+           arc_max_bytes() >> dbuf_cache_max_shift);
+
        /*
         * All entries are queued via taskq_dispatch_ent(), so min/maxalloc
         * configuration is not required.
         */
        dbu_evict_taskq = taskq_create("dbu_evict", 1, defclsyspri, 0, 0, 0);
+
+       dbuf_cache = multilist_create(sizeof (dmu_buf_impl_t),
+           offsetof(dmu_buf_impl_t, db_cache_link),
+           dbuf_cache_multilist_index_func);
+       refcount_create(&dbuf_cache_size);
+
+       tsd_create(&zfs_dbuf_evict_key, NULL);
+       dbuf_evict_thread_exit = B_FALSE;
+       mutex_init(&dbuf_evict_lock, NULL, MUTEX_DEFAULT, NULL);
+       cv_init(&dbuf_evict_cv, NULL, CV_DEFAULT, NULL);
+       dbuf_cache_evict_thread = thread_create(NULL, 0, dbuf_evict_thread,
+           NULL, 0, &p0, TS_RUN, minclsyspri);
 }
 
 void
@@ -436,8 +698,23 @@ dbuf_fini(void)
 #else
        kmem_free(h->hash_table, (h->hash_table_mask + 1) * sizeof (void *));
 #endif
-       kmem_cache_destroy(dbuf_cache);
+       kmem_cache_destroy(dbuf_kmem_cache);
        taskq_destroy(dbu_evict_taskq);
+
+       mutex_enter(&dbuf_evict_lock);
+       dbuf_evict_thread_exit = B_TRUE;
+       while (dbuf_evict_thread_exit) {
+               cv_signal(&dbuf_evict_cv);
+               cv_wait(&dbuf_evict_cv, &dbuf_evict_lock);
+       }
+       mutex_exit(&dbuf_evict_lock);
+       tsd_destroy(&zfs_dbuf_evict_key);
+
+       mutex_destroy(&dbuf_evict_lock);
+       cv_destroy(&dbuf_evict_cv);
+
+       refcount_destroy(&dbuf_cache_size);
+       multilist_destroy(dbuf_cache);
 }
 
 /*
@@ -476,7 +753,6 @@ dbuf_verify(dmu_buf_impl_t *db)
                ASSERT3U(db->db.db_offset, ==, DMU_BONUS_BLKID);
        } else if (db->db_blkid == DMU_SPILL_BLKID) {
                ASSERT(dn != NULL);
-               ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
                ASSERT0(db->db.db_offset);
        } else {
                ASSERT3U(db->db.db_offset, ==, db->db_blkid * db->db.db_size);
@@ -517,7 +793,7 @@ dbuf_verify(dmu_buf_impl_t *db)
                } else {
                        /* db is pointed to by an indirect block */
                        ASSERTV(int epb = db->db_parent->db.db_size >>
-                               SPA_BLKPTRSHIFT);
+                           SPA_BLKPTRSHIFT);
                        ASSERT3U(db->db_parent->db_level, ==, db->db_level+1);
                        ASSERT3U(db->db_parent->db.db_object, ==,
                            db->db.db_object);
@@ -541,13 +817,50 @@ dbuf_verify(dmu_buf_impl_t *db)
                 * If the blkptr isn't set but they have nonzero data,
                 * it had better be dirty, otherwise we'll lose that
                 * data when we evict this buffer.
+                *
+                * There is an exception to this rule for indirect blocks; in
+                * this case, if the indirect block is a hole, we fill in a few
+                * fields on each of the child blocks (importantly, birth time)
+                * to prevent hole birth times from being lost when you
+                * partially fill in a hole.
                 */
                if (db->db_dirtycnt == 0) {
-                       ASSERTV(uint64_t *buf = db->db.db_data);
-                       int i;
+                       if (db->db_level == 0) {
+                               uint64_t *buf = db->db.db_data;
+                               int i;
 
-                       for (i = 0; i < db->db.db_size >> 3; i++) {
-                               ASSERT(buf[i] == 0);
+                               for (i = 0; i < db->db.db_size >> 3; i++) {
+                                       ASSERT(buf[i] == 0);
+                               }
+                       } else {
+                               int i;
+                               blkptr_t *bps = db->db.db_data;
+                               ASSERT3U(1 << DB_DNODE(db)->dn_indblkshift, ==,
+                                   db->db.db_size);
+                               /*
+                                * We want to verify that all the blkptrs in the
+                                * indirect block are holes, but we may have
+                                * automatically set up a few fields for them.
+                                * We iterate through each blkptr and verify
+                                * they only have those fields set.
+                                */
+                               for (i = 0;
+                                   i < db->db.db_size / sizeof (blkptr_t);
+                                   i++) {
+                                       blkptr_t *bp = &bps[i];
+                                       ASSERT(ZIO_CHECKSUM_IS_ZERO(
+                                           &bp->blk_cksum));
+                                       ASSERT(
+                                           DVA_IS_EMPTY(&bp->blk_dva[0]) &&
+                                           DVA_IS_EMPTY(&bp->blk_dva[1]) &&
+                                           DVA_IS_EMPTY(&bp->blk_dva[2]));
+                                       ASSERT0(bp->blk_fill);
+                                       ASSERT0(bp->blk_pad[0]);
+                                       ASSERT0(bp->blk_pad[1]);
+                                       ASSERT(!BP_IS_EMBEDDED(bp));
+                                       ASSERT(BP_IS_HOLE(bp));
+                                       ASSERT0(bp->blk_phys_birth);
+                               }
                        }
                }
        }
@@ -560,7 +873,7 @@ dbuf_clear_data(dmu_buf_impl_t *db)
 {
        ASSERT(MUTEX_HELD(&db->db_mtx));
        dbuf_evict_user(db);
-       db->db_buf = NULL;
+       ASSERT3P(db->db_buf, ==, NULL);
        db->db.db_data = NULL;
        if (db->db_state != DB_NOFILL)
                db->db_state = DB_UNCACHED;
@@ -575,8 +888,6 @@ dbuf_set_data(dmu_buf_impl_t *db, arc_buf_t *buf)
        db->db_buf = buf;
        ASSERT(buf->b_data != NULL);
        db->db.db_data = buf->b_data;
-       if (!arc_released(buf))
-               arc_set_callback(buf, dbuf_do_evict, db);
 }
 
 /*
@@ -587,28 +898,65 @@ dbuf_loan_arcbuf(dmu_buf_impl_t *db)
 {
        arc_buf_t *abuf;
 
+       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
        mutex_enter(&db->db_mtx);
        if (arc_released(db->db_buf) || refcount_count(&db->db_holds) > 1) {
                int blksz = db->db.db_size;
                spa_t *spa = db->db_objset->os_spa;
 
                mutex_exit(&db->db_mtx);
-               abuf = arc_loan_buf(spa, blksz);
+               abuf = arc_loan_buf(spa, B_FALSE, blksz);
                bcopy(db->db.db_data, abuf->b_data, blksz);
        } else {
                abuf = db->db_buf;
                arc_loan_inuse_buf(abuf, db);
+               db->db_buf = NULL;
                dbuf_clear_data(db);
                mutex_exit(&db->db_mtx);
        }
        return (abuf);
 }
 
+/*
+ * Calculate which level n block references the data at the level 0 offset
+ * provided.
+ */
 uint64_t
-dbuf_whichblock(dnode_t *dn, uint64_t offset)
+dbuf_whichblock(const dnode_t *dn, const int64_t level, const uint64_t offset)
 {
-       if (dn->dn_datablkshift) {
-               return (offset >> dn->dn_datablkshift);
+       if (dn->dn_datablkshift != 0 && dn->dn_indblkshift != 0) {
+               /*
+                * The level n blkid is equal to the level 0 blkid divided by
+                * the number of level 0s in a level n block.
+                *
+                * The level 0 blkid is offset >> datablkshift =
+                * offset / 2^datablkshift.
+                *
+                * The number of level 0s in a level n is the number of block
+                * pointers in an indirect block, raised to the power of level.
+                * This is 2^(indblkshift - SPA_BLKPTRSHIFT)^level =
+                * 2^(level*(indblkshift - SPA_BLKPTRSHIFT)).
+                *
+                * Thus, the level n blkid is: offset /
+                * ((2^datablkshift)*(2^(level*(indblkshift - SPA_BLKPTRSHIFT)))
+                * = offset / 2^(datablkshift + level *
+                *   (indblkshift - SPA_BLKPTRSHIFT))
+                * = offset >> (datablkshift + level *
+                *   (indblkshift - SPA_BLKPTRSHIFT))
+                */
+
+               const unsigned exp = dn->dn_datablkshift +
+                   level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
+
+               if (exp >= 8 * sizeof (offset)) {
+                       /* This only happens on the highest indirection level */
+                       ASSERT3U(level, ==, dn->dn_nlevels - 1);
+                       return (0);
+               }
+
+               ASSERT3U(exp, <, 8 * sizeof (offset));
+
+               return (offset >> exp);
        } else {
                ASSERT3U(offset, <, dn->dn_datablksz);
                return (0);
@@ -642,7 +990,7 @@ dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
        } else {
                ASSERT(db->db_blkid != DMU_BONUS_BLKID);
                ASSERT3P(db->db_buf, ==, NULL);
-               VERIFY(arc_buf_remove_ref(buf, db));
+               arc_buf_destroy(buf, db);
                db->db_state = DB_UNCACHED;
        }
        cv_broadcast(&db->db_changed);
@@ -650,7 +998,7 @@ dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
 }
 
 static int
-dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
+dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
 {
        dnode_t *dn;
        zbookmark_phys_t zb;
@@ -667,13 +1015,18 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
        ASSERT(db->db_buf == NULL);
 
        if (db->db_blkid == DMU_BONUS_BLKID) {
+               /*
+                * The bonus length stored in the dnode may be less than
+                * the maximum available space in the bonus buffer.
+                */
                int bonuslen = MIN(dn->dn_bonuslen, dn->dn_phys->dn_bonuslen);
+               int max_bonuslen = DN_SLOTS_TO_BONUSLEN(dn->dn_num_slots);
 
                ASSERT3U(bonuslen, <=, db->db.db_size);
-               db->db.db_data = zio_buf_alloc(DN_MAX_BONUSLEN);
-               arc_space_consume(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
-               if (bonuslen < DN_MAX_BONUSLEN)
-                       bzero(db->db.db_data, DN_MAX_BONUSLEN);
+               db->db.db_data = kmem_alloc(max_bonuslen, KM_SLEEP);
+               arc_space_consume(max_bonuslen, ARC_SPACE_BONUS);
+               if (bonuslen < max_bonuslen)
+                       bzero(db->db.db_data, max_bonuslen);
                if (bonuslen)
                        bcopy(DN_BONUS(dn->dn_phys), db->db.db_data, bonuslen);
                DB_DNODE_EXIT(db);
@@ -692,12 +1045,33 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
            BP_IS_HOLE(db->db_blkptr)))) {
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
 
-               DB_DNODE_EXIT(db);
-               dbuf_set_data(db, arc_buf_alloc(db->db_objset->os_spa,
-                   db->db.db_size, db, type));
+               dbuf_set_data(db, arc_alloc_buf(db->db_objset->os_spa, db, type,
+                   db->db.db_size));
                bzero(db->db.db_data, db->db.db_size);
+
+               if (db->db_blkptr != NULL && db->db_level > 0 &&
+                   BP_IS_HOLE(db->db_blkptr) &&
+                   db->db_blkptr->blk_birth != 0) {
+                       blkptr_t *bps = db->db.db_data;
+                       int i;
+                       for (i = 0; i < ((1 <<
+                           DB_DNODE(db)->dn_indblkshift) / sizeof (blkptr_t));
+                           i++) {
+                               blkptr_t *bp = &bps[i];
+                               ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
+                                   1 << dn->dn_indblkshift);
+                               BP_SET_LSIZE(bp,
+                                   BP_GET_LEVEL(db->db_blkptr) == 1 ?
+                                   dn->dn_datablksz :
+                                   BP_GET_LSIZE(db->db_blkptr));
+                               BP_SET_TYPE(bp, BP_GET_TYPE(db->db_blkptr));
+                               BP_SET_LEVEL(bp,
+                                   BP_GET_LEVEL(db->db_blkptr) - 1);
+                               BP_SET_BIRTH(bp, db->db_blkptr->blk_birth, 0);
+                       }
+               }
+               DB_DNODE_EXIT(db);
                db->db_state = DB_CACHED;
-               *flags |= DB_RF_CACHED;
                mutex_exit(&db->db_mtx);
                return (0);
        }
@@ -709,8 +1083,6 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 
        if (DBUF_IS_L2CACHEABLE(db))
                aflags |= ARC_FLAG_L2CACHE;
-       if (DBUF_IS_L2COMPRESSIBLE(db))
-               aflags |= ARC_FLAG_L2COMPRESS;
 
        SET_BOOKMARK(&zb, db->db_objset->os_dsl_dataset ?
            db->db_objset->os_dsl_dataset->ds_object : DMU_META_OBJSET,
@@ -720,19 +1092,79 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 
        err = arc_read(zio, db->db_objset->os_spa, db->db_blkptr,
            dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
-           (*flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
+           (flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
            &aflags, &zb);
-       if (aflags & ARC_FLAG_CACHED)
-               *flags |= DB_RF_CACHED;
 
-       return (SET_ERROR(err));
+       return (err);
+}
+
+/*
+ * This is our just-in-time copy function.  It makes a copy of buffers that
+ * have been modified in a previous transaction group before we access them in
+ * the current active group.
+ *
+ * This function is used in three places: when we are dirtying a buffer for the
+ * first time in a txg, when we are freeing a range in a dnode that includes
+ * this buffer, and when we are accessing a buffer which was received compressed
+ * and later referenced in a WRITE_BYREF record.
+ *
+ * Note that when we are called from dbuf_free_range() we do not put a hold on
+ * the buffer, we just traverse the active dbuf list for the dnode.
+ */
+static void
+dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
+{
+       dbuf_dirty_record_t *dr = db->db_last_dirty;
+
+       ASSERT(MUTEX_HELD(&db->db_mtx));
+       ASSERT(db->db.db_data != NULL);
+       ASSERT(db->db_level == 0);
+       ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT);
+
+       if (dr == NULL ||
+           (dr->dt.dl.dr_data !=
+           ((db->db_blkid  == DMU_BONUS_BLKID) ? db->db.db_data : db->db_buf)))
+               return;
+
+       /*
+        * If the last dirty record for this dbuf has not yet synced
+        * and its referencing the dbuf data, either:
+        *      reset the reference to point to a new copy,
+        * or (if there a no active holders)
+        *      just null out the current db_data pointer.
+        */
+       ASSERT(dr->dr_txg >= txg - 2);
+       if (db->db_blkid == DMU_BONUS_BLKID) {
+               dnode_t *dn = DB_DNODE(db);
+               int bonuslen = DN_SLOTS_TO_BONUSLEN(dn->dn_num_slots);
+               dr->dt.dl.dr_data = kmem_alloc(bonuslen, KM_SLEEP);
+               arc_space_consume(bonuslen, ARC_SPACE_BONUS);
+               bcopy(db->db.db_data, dr->dt.dl.dr_data, bonuslen);
+       } else if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
+               int size = arc_buf_size(db->db_buf);
+               arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
+               spa_t *spa = db->db_objset->os_spa;
+               enum zio_compress compress_type =
+                   arc_get_compression(db->db_buf);
+
+               if (compress_type == ZIO_COMPRESS_OFF) {
+                       dr->dt.dl.dr_data = arc_alloc_buf(spa, db, type, size);
+               } else {
+                       ASSERT3U(type, ==, ARC_BUFC_DATA);
+                       dr->dt.dl.dr_data = arc_alloc_compressed_buf(spa, db,
+                           size, arc_buf_lsize(db->db_buf), compress_type);
+               }
+               bcopy(db->db.db_data, dr->dt.dl.dr_data->b_data, size);
+       } else {
+               db->db_buf = NULL;
+               dbuf_clear_data(db);
+       }
 }
 
 int
 dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
 {
        int err = 0;
-       boolean_t havepzio = (zio != NULL);
        boolean_t prefetch;
        dnode_t *dn;
 
@@ -756,32 +1188,45 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
 
        mutex_enter(&db->db_mtx);
        if (db->db_state == DB_CACHED) {
+               /*
+                * If the arc buf is compressed, we need to decompress it to
+                * read the data. This could happen during the "zfs receive" of
+                * a stream which is compressed and deduplicated.
+                */
+               if (db->db_buf != NULL &&
+                   arc_get_compression(db->db_buf) != ZIO_COMPRESS_OFF) {
+                       dbuf_fix_old_data(db,
+                           spa_syncing_txg(dmu_objset_spa(db->db_objset)));
+                       err = arc_decompress(db->db_buf);
+                       dbuf_set_data(db, db->db_buf);
+               }
                mutex_exit(&db->db_mtx);
                if (prefetch)
-                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
-                           db->db.db_size, TRUE);
+                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
                if ((flags & DB_RF_HAVESTRUCT) == 0)
                        rw_exit(&dn->dn_struct_rwlock);
                DB_DNODE_EXIT(db);
        } else if (db->db_state == DB_UNCACHED) {
                spa_t *spa = dn->dn_objset->os_spa;
+               boolean_t need_wait = B_FALSE;
 
-               if (zio == NULL)
+               if (zio == NULL &&
+                   db->db_blkptr != NULL && !BP_IS_HOLE(db->db_blkptr)) {
                        zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
-
-               err = dbuf_read_impl(db, zio, &flags);
+                       need_wait = B_TRUE;
+               }
+               err = dbuf_read_impl(db, zio, flags);
 
                /* dbuf_read_impl has dropped db_mtx for us */
 
                if (!err && prefetch)
-                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
-                           db->db.db_size, flags & DB_RF_CACHED);
+                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
 
                if ((flags & DB_RF_HAVESTRUCT) == 0)
                        rw_exit(&dn->dn_struct_rwlock);
                DB_DNODE_EXIT(db);
 
-               if (!err && !havepzio)
+               if (!err && need_wait)
                        err = zio_wait(zio);
        } else {
                /*
@@ -794,8 +1239,7 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                 */
                mutex_exit(&db->db_mtx);
                if (prefetch)
-                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
-                           db->db.db_size, TRUE);
+                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
                if ((flags & DB_RF_HAVESTRUCT) == 0)
                        rw_exit(&dn->dn_struct_rwlock);
                DB_DNODE_EXIT(db);
@@ -817,7 +1261,6 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                mutex_exit(&db->db_mtx);
        }
 
-       ASSERT(err || havepzio || db->db_state == DB_CACHED);
        return (err);
 }
 
@@ -835,7 +1278,7 @@ dbuf_noread(dmu_buf_impl_t *db)
 
                ASSERT(db->db_buf == NULL);
                ASSERT(db->db.db_data == NULL);
-               dbuf_set_data(db, arc_buf_alloc(spa, db->db.db_size, db, type));
+               dbuf_set_data(db, arc_alloc_buf(spa, db, type, db->db.db_size));
                db->db_state = DB_FILL;
        } else if (db->db_state == DB_NOFILL) {
                dbuf_clear_data(db);
@@ -845,59 +1288,6 @@ dbuf_noread(dmu_buf_impl_t *db)
        mutex_exit(&db->db_mtx);
 }
 
-/*
- * This is our just-in-time copy function.  It makes a copy of
- * buffers, that have been modified in a previous transaction
- * group, before we modify them in the current active group.
- *
- * This function is used in two places: when we are dirtying a
- * buffer for the first time in a txg, and when we are freeing
- * a range in a dnode that includes this buffer.
- *
- * Note that when we are called from dbuf_free_range() we do
- * not put a hold on the buffer, we just traverse the active
- * dbuf list for the dnode.
- */
-static void
-dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
-{
-       dbuf_dirty_record_t *dr = db->db_last_dirty;
-
-       ASSERT(MUTEX_HELD(&db->db_mtx));
-       ASSERT(db->db.db_data != NULL);
-       ASSERT(db->db_level == 0);
-       ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT);
-
-       if (dr == NULL ||
-           (dr->dt.dl.dr_data !=
-           ((db->db_blkid  == DMU_BONUS_BLKID) ? db->db.db_data : db->db_buf)))
-               return;
-
-       /*
-        * If the last dirty record for this dbuf has not yet synced
-        * and its referencing the dbuf data, either:
-        *      reset the reference to point to a new copy,
-        * or (if there a no active holders)
-        *      just null out the current db_data pointer.
-        */
-       ASSERT(dr->dr_txg >= txg - 2);
-       if (db->db_blkid == DMU_BONUS_BLKID) {
-               /* Note that the data bufs here are zio_bufs */
-               dr->dt.dl.dr_data = zio_buf_alloc(DN_MAX_BONUSLEN);
-               arc_space_consume(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
-               bcopy(db->db.db_data, dr->dt.dl.dr_data, DN_MAX_BONUSLEN);
-       } else if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
-               int size = db->db.db_size;
-               arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
-               spa_t *spa = db->db_objset->os_spa;
-
-               dr->dt.dl.dr_data = arc_buf_alloc(spa, size, db, type);
-               bcopy(db->db.db_data, dr->dt.dl.dr_data->b_data, size);
-       } else {
-               dbuf_clear_data(db);
-       }
-}
-
 void
 dbuf_unoverride(dbuf_dirty_record_t *dr)
 {
@@ -906,6 +1296,11 @@ dbuf_unoverride(dbuf_dirty_record_t *dr)
        uint64_t txg = dr->dr_txg;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
+       /*
+        * This assert is valid because dmu_sync() expects to be called by
+        * a zilog's get_data while holding a range lock.  This call only
+        * comes from dbuf_dirty() callers who must also hold a range lock.
+        */
        ASSERT(dr->dt.dl.dr_override_state != DR_IN_DMU_SYNC);
        ASSERT(db->db_level == 0);
 
@@ -937,9 +1332,6 @@ dbuf_unoverride(dbuf_dirty_record_t *dr)
  * Evict (if its unreferenced) or clear (if its referenced) any level-0
  * data blocks in the free range, so that any future readers will find
  * empty blocks.
- *
- * This is a no-op if the dataset is in the middle of an incremental
- * receive; see comment below for details.
  */
 void
 dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
@@ -949,10 +1341,9 @@ dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
        dmu_buf_impl_t *db, *db_next;
        uint64_t txg = tx->tx_txg;
        avl_index_t where;
-       boolean_t freespill =
-           (start_blkid == DMU_SPILL_BLKID || end_blkid == DMU_SPILL_BLKID);
 
-       if (end_blkid > dn->dn_maxblkid && !freespill)
+       if (end_blkid > dn->dn_maxblkid &&
+           !(start_blkid == DMU_SPILL_BLKID || end_blkid == DMU_SPILL_BLKID))
                end_blkid = dn->dn_maxblkid;
        dprintf_dnode(dn, "start=%llu end=%llu\n", start_blkid, end_blkid);
 
@@ -962,28 +1353,9 @@ dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
        db_search->db_state = DB_SEARCH;
 
        mutex_enter(&dn->dn_dbufs_mtx);
-       if (start_blkid >= dn->dn_unlisted_l0_blkid && !freespill) {
-               /* There can't be any dbufs in this range; no need to search. */
-#ifdef DEBUG
-               db = avl_find(&dn->dn_dbufs, db_search, &where);
-               ASSERT3P(db, ==, NULL);
-               db = avl_nearest(&dn->dn_dbufs, where, AVL_AFTER);
-               ASSERT(db == NULL || db->db_level > 0);
-#endif
-               goto out;
-       } else if (dmu_objset_is_receiving(dn->dn_objset)) {
-               /*
-                * If we are receiving, we expect there to be no dbufs in
-                * the range to be freed, because receive modifies each
-                * block at most once, and in offset order.  If this is
-                * not the case, it can lead to performance problems,
-                * so note that we unexpectedly took the slow path.
-                */
-               atomic_inc_64(&zfs_free_range_recv_miss);
-       }
-
        db = avl_find(&dn->dn_dbufs, db_search, &where);
        ASSERT3P(db, ==, NULL);
+
        db = avl_nearest(&dn->dn_dbufs, where, AVL_AFTER);
 
        for (; db != NULL; db = db_next) {
@@ -1017,7 +1389,7 @@ dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
                }
                if (refcount_count(&db->db_holds) == 0) {
                        ASSERT(db->db_buf);
-                       dbuf_clear(db);
+                       dbuf_destroy(db);
                        continue;
                }
                /* The dbuf is referenced */
@@ -1056,46 +1428,10 @@ dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
                mutex_exit(&db->db_mtx);
        }
 
-out:
        kmem_free(db_search, sizeof (dmu_buf_impl_t));
        mutex_exit(&dn->dn_dbufs_mtx);
 }
 
-static int
-dbuf_block_freeable(dmu_buf_impl_t *db)
-{
-       dsl_dataset_t *ds = db->db_objset->os_dsl_dataset;
-       uint64_t birth_txg = 0;
-
-       /*
-        * We don't need any locking to protect db_blkptr:
-        * If it's syncing, then db_last_dirty will be set
-        * so we'll ignore db_blkptr.
-        *
-        * This logic ensures that only block births for
-        * filled blocks are considered.
-        */
-       ASSERT(MUTEX_HELD(&db->db_mtx));
-       if (db->db_last_dirty && (db->db_blkptr == NULL ||
-           !BP_IS_HOLE(db->db_blkptr))) {
-               birth_txg = db->db_last_dirty->dr_txg;
-       } else if (db->db_blkptr != NULL && !BP_IS_HOLE(db->db_blkptr)) {
-               birth_txg = db->db_blkptr->blk_birth;
-       }
-
-       /*
-        * If this block don't exist or is in a snapshot, it can't be freed.
-        * Don't pass the bp to dsl_dataset_block_freeable() since we
-        * are holding the db_mtx lock and might deadlock if we are
-        * prefetching a dedup-ed block.
-        */
-       if (birth_txg != 0)
-               return (ds == NULL ||
-                   dsl_dataset_block_freeable(ds, NULL, birth_txg));
-       else
-               return (B_FALSE);
-}
-
 void
 dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
 {
@@ -1125,7 +1461,7 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        dmu_buf_will_dirty(&db->db, tx);
 
        /* create the data buffer for the new block */
-       buf = arc_buf_alloc(dn->dn_objset->os_spa, size, db, type);
+       buf = arc_alloc_buf(dn->dn_objset->os_spa, db, type, size);
 
        /* copy old block data to the new block */
        obuf = db->db_buf;
@@ -1136,7 +1472,7 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
 
        mutex_enter(&db->db_mtx);
        dbuf_set_data(db, buf);
-       VERIFY(arc_buf_remove_ref(obuf, db));
+       arc_buf_destroy(obuf, db);
        db->db.db_size = size;
 
        if (db->db_level == 0) {
@@ -1145,7 +1481,7 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        }
        mutex_exit(&db->db_mtx);
 
-       dnode_willuse_space(dn, size-osize, tx);
+       dmu_objset_willuse_space(dn->dn_objset, size - osize, tx);
        DB_DNODE_EXIT(db);
 }
 
@@ -1162,6 +1498,32 @@ dbuf_release_bp(dmu_buf_impl_t *db)
        (void) arc_release(db->db_buf, db);
 }
 
+/*
+ * We already have a dirty record for this TXG, and we are being
+ * dirtied again.
+ */
+static void
+dbuf_redirty(dbuf_dirty_record_t *dr)
+{
+       dmu_buf_impl_t *db = dr->dr_dbuf;
+
+       ASSERT(MUTEX_HELD(&db->db_mtx));
+
+       if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID) {
+               /*
+                * If this buffer has already been written out,
+                * we now need to reset its state.
+                */
+               dbuf_unoverride(dr);
+               if (db->db.db_object != DMU_META_DNODE_OBJECT &&
+                   db->db_state != DB_NOFILL) {
+                       /* Already released on initial dirty, so just thaw. */
+                       ASSERT(arc_released(db->db_buf));
+                       arc_buf_thaw(db->db_buf);
+               }
+       }
+}
+
 dbuf_dirty_record_t *
 dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 {
@@ -1169,7 +1531,6 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        objset_t *os;
        dbuf_dirty_record_t **drp, *dr;
        int drop_struct_lock = FALSE;
-       boolean_t do_free_accounting = B_FALSE;
        int txgoff = tx->tx_txg & TXG_MASK;
 
        ASSERT(tx->tx_txg != 0);
@@ -1183,10 +1544,18 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * objects may be dirtied in syncing context, but only if they
         * were already pre-dirtied in open context.
         */
+#ifdef DEBUG
+       if (dn->dn_objset->os_dsl_dataset != NULL) {
+               rrw_enter(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock,
+                   RW_READER, FTAG);
+       }
        ASSERT(!dmu_tx_is_syncing(tx) ||
            BP_IS_HOLE(dn->dn_objset->os_rootbp) ||
            DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
            dn->dn_objset->os_dsl_dataset == NULL);
+       if (dn->dn_objset->os_dsl_dataset != NULL)
+               rrw_exit(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock, FTAG);
+#endif
        /*
         * We make this assert for private objects as well, but after we
         * check if we're already dirty.  They are allowed to re-dirty
@@ -1211,12 +1580,21 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * Don't set dirtyctx to SYNC if we're just modifying this as we
         * initialize the objset.
         */
-       if (dn->dn_dirtyctx == DN_UNDIRTIED &&
-           !BP_IS_HOLE(dn->dn_objset->os_rootbp)) {
-               dn->dn_dirtyctx =
-                   (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN);
-               ASSERT(dn->dn_dirtyctx_firstset == NULL);
-               dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_SLEEP);
+       if (dn->dn_dirtyctx == DN_UNDIRTIED) {
+               if (dn->dn_objset->os_dsl_dataset != NULL) {
+                       rrw_enter(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock,
+                           RW_READER, FTAG);
+               }
+               if (!BP_IS_HOLE(dn->dn_objset->os_rootbp)) {
+                       dn->dn_dirtyctx = (dmu_tx_is_syncing(tx) ?
+                           DN_DIRTY_SYNC : DN_DIRTY_OPEN);
+                       ASSERT(dn->dn_dirtyctx_firstset == NULL);
+                       dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_SLEEP);
+               }
+               if (dn->dn_objset->os_dsl_dataset != NULL) {
+                       rrw_exit(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock,
+                           FTAG);
+               }
        }
        mutex_exit(&dn->dn_mtx);
 
@@ -1234,16 +1612,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        if (dr && dr->dr_txg == tx->tx_txg) {
                DB_DNODE_EXIT(db);
 
-               if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID) {
-                       /*
-                        * If this buffer has already been written out,
-                        * we now need to reset its state.
-                        */
-                       dbuf_unoverride(dr);
-                       if (db->db.db_object != DMU_META_DNODE_OBJECT &&
-                           db->db_state != DB_NOFILL)
-                               arc_buf_thaw(db->db_buf);
-               }
+               dbuf_redirty(dr);
                mutex_exit(&db->db_mtx);
                return (dr);
        }
@@ -1256,11 +1625,6 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
            (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
 
        ASSERT3U(dn->dn_nlevels, >, db->db_level);
-       ASSERT((dn->dn_phys->dn_nlevels == 0 && db->db_level == 0) ||
-           dn->dn_phys->dn_nlevels > db->db_level ||
-           dn->dn_next_nlevels[txgoff] > db->db_level ||
-           dn->dn_next_nlevels[(tx->tx_txg-1) & TXG_MASK] > db->db_level ||
-           dn->dn_next_nlevels[(tx->tx_txg-2) & TXG_MASK] > db->db_level);
 
        /*
         * We should only be dirtying in syncing context if it's the
@@ -1270,22 +1634,21 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * this assertion only if we're not already dirty.
         */
        os = dn->dn_objset;
+       VERIFY3U(tx->tx_txg, <=, spa_final_dirty_txg(os->os_spa));
+#ifdef DEBUG
+       if (dn->dn_objset->os_dsl_dataset != NULL)
+               rrw_enter(&os->os_dsl_dataset->ds_bp_rwlock, RW_READER, FTAG);
        ASSERT(!dmu_tx_is_syncing(tx) || DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
            os->os_dsl_dataset == NULL || BP_IS_HOLE(os->os_rootbp));
+       if (dn->dn_objset->os_dsl_dataset != NULL)
+               rrw_exit(&os->os_dsl_dataset->ds_bp_rwlock, FTAG);
+#endif
        ASSERT(db->db.db_size != 0);
 
        dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
 
        if (db->db_blkid != DMU_BONUS_BLKID) {
-               /*
-                * Update the accounting.
-                * Note: we delay "free accounting" until after we drop
-                * the db_mtx.  This keeps us from grabbing other locks
-                * (and possibly deadlocking) in bp_get_dsize() while
-                * also holding the db_mtx.
-                */
-               dnode_willuse_space(dn, db->db.db_size, tx);
-               do_free_accounting = dbuf_block_freeable(db);
+               dmu_objset_willuse_space(os, db->db.db_size, tx);
        }
 
        /*
@@ -1320,7 +1683,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                }
                dr->dt.dl.dr_data = data_old;
        } else {
-               mutex_init(&dr->dt.di.dr_mtx, NULL, MUTEX_DEFAULT, NULL);
+               mutex_init(&dr->dt.di.dr_mtx, NULL, MUTEX_NOLOCKDEP, NULL);
                list_create(&dr->dt.di.dr_children,
                    sizeof (dbuf_dirty_record_t),
                    offsetof(dbuf_dirty_record_t, dr_dirty_node));
@@ -1366,27 +1729,37 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                dnode_setdirty(dn, tx);
                DB_DNODE_EXIT(db);
                return (dr);
-       } else if (do_free_accounting) {
-               blkptr_t *bp = db->db_blkptr;
-               int64_t willfree = (bp && !BP_IS_HOLE(bp)) ?
-                   bp_get_dsize(os->os_spa, bp) : db->db.db_size;
-               /*
-                * This is only a guess -- if the dbuf is dirty
-                * in a previous txg, we don't know how much
-                * space it will use on disk yet.  We should
-                * really have the struct_rwlock to access
-                * db_blkptr, but since this is just a guess,
-                * it's OK if we get an odd answer.
-                */
-               ddt_prefetch(os->os_spa, bp);
-               dnode_willuse_space(dn, -willfree, tx);
        }
 
+       /*
+        * The dn_struct_rwlock prevents db_blkptr from changing
+        * due to a write from syncing context completing
+        * while we are running, so we want to acquire it before
+        * looking at db_blkptr.
+        */
        if (!RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
                rw_enter(&dn->dn_struct_rwlock, RW_READER);
                drop_struct_lock = TRUE;
        }
 
+       /*
+        * We need to hold the dn_struct_rwlock to make this assertion,
+        * because it protects dn_phys / dn_next_nlevels from changing.
+        */
+       ASSERT((dn->dn_phys->dn_nlevels == 0 && db->db_level == 0) ||
+           dn->dn_phys->dn_nlevels > db->db_level ||
+           dn->dn_next_nlevels[txgoff] > db->db_level ||
+           dn->dn_next_nlevels[(tx->tx_txg-1) & TXG_MASK] > db->db_level ||
+           dn->dn_next_nlevels[(tx->tx_txg-2) & TXG_MASK] > db->db_level);
+
+       /*
+        * If we are overwriting a dedup BP, then unless it is snapshotted,
+        * when we get to syncing context we will need to decrement its
+        * refcount in the DDT.  Prefetch the relevant DDT block so that
+        * syncing context won't have to wait for the i/o.
+        */
+       ddt_prefetch(os->os_spa, db->db_blkptr);
+
        if (db->db_level == 0) {
                dnode_new_blkid(dn, db->db_blkid, tx, drop_struct_lock);
                ASSERT(dn->dn_maxblkid >= db->db_blkid);
@@ -1518,7 +1891,7 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                ASSERT(db->db_buf != NULL);
                ASSERT(dr->dt.dl.dr_data != NULL);
                if (dr->dt.dl.dr_data != db->db_buf)
-                       VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data, db));
+                       arc_buf_destroy(dr->dt.dl.dr_data, db);
        }
 
        kmem_free(dr, sizeof (dbuf_dirty_record_t));
@@ -1527,12 +1900,8 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        db->db_dirtycnt -= 1;
 
        if (refcount_remove(&db->db_holds, (void *)(uintptr_t)txg) == 0) {
-               arc_buf_t *buf = db->db_buf;
-
-               ASSERT(db->db_state == DB_NOFILL || arc_released(buf));
-               dbuf_clear_data(db);
-               VERIFY(arc_buf_remove_ref(buf, db));
-               dbuf_evict(db);
+               ASSERT(db->db_state == DB_NOFILL || arc_released(db->db_buf));
+               dbuf_destroy(db);
                return (B_TRUE);
        }
 
@@ -1544,10 +1913,35 @@ dmu_buf_will_dirty(dmu_buf_t *db_fake, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
        int rf = DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH;
+       dbuf_dirty_record_t *dr;
 
        ASSERT(tx->tx_txg != 0);
        ASSERT(!refcount_is_zero(&db->db_holds));
 
+       /*
+        * Quick check for dirtyness.  For already dirty blocks, this
+        * reduces runtime of this function by >90%, and overall performance
+        * by 50% for some workloads (e.g. file deletion with indirect blocks
+        * cached).
+        */
+       mutex_enter(&db->db_mtx);
+
+       for (dr = db->db_last_dirty;
+           dr != NULL && dr->dr_txg >= tx->tx_txg; dr = dr->dr_next) {
+               /*
+                * It's possible that it is already dirty but not cached,
+                * because there are some calls to dbuf_dirty() that don't
+                * go through dmu_buf_will_dirty().
+                */
+               if (dr->dr_txg == tx->tx_txg && db->db_state == DB_CACHED) {
+                       /* This dbuf is already dirty and cached. */
+                       dbuf_redirty(dr);
+                       mutex_exit(&db->db_mtx);
+                       return;
+               }
+       }
+       mutex_exit(&db->db_mtx);
+
        DB_DNODE_ENTER(db);
        if (RW_WRITE_HELD(&DB_DNODE(db)->dn_struct_rwlock))
                rf |= DB_RF_HAVESTRUCT;
@@ -1615,6 +2009,11 @@ dmu_buf_write_embedded(dmu_buf_t *dbuf, void *data,
        struct dirty_leaf *dl;
        dmu_object_type_t type;
 
+       if (etype == BP_EMBEDDED_TYPE_DATA) {
+               ASSERT(spa_feature_is_active(dmu_objset_spa(db->db_objset),
+                   SPA_FEATURE_EMBEDDED_DATA));
+       }
+
        DB_DNODE_ENTER(db);
        type = DB_DNODE(db)->dn_type;
        DB_DNODE_EXIT(db);
@@ -1647,9 +2046,9 @@ dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
        ASSERT(!refcount_is_zero(&db->db_holds));
        ASSERT(db->db_blkid != DMU_BONUS_BLKID);
        ASSERT(db->db_level == 0);
-       ASSERT(DBUF_GET_BUFC_TYPE(db) == ARC_BUFC_DATA);
+       ASSERT3U(dbuf_is_metadata(db), ==, arc_is_metadata(buf));
        ASSERT(buf != NULL);
-       ASSERT(arc_buf_size(buf) == db->db.db_size);
+       ASSERT(arc_buf_lsize(buf) == db->db.db_size);
        ASSERT(tx->tx_txg != 0);
 
        arc_return_buf(buf, db);
@@ -1667,7 +2066,7 @@ dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
                mutex_exit(&db->db_mtx);
                (void) dbuf_dirty(db, tx);
                bcopy(buf->b_data, db->db.db_data, db->db.db_size);
-               VERIFY(arc_buf_remove_ref(buf, db));
+               arc_buf_destroy(buf, db);
                xuio_stat_wbuf_copied();
                return;
        }
@@ -1685,10 +2084,10 @@ dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
                                arc_release(db->db_buf, db);
                        }
                        dr->dt.dl.dr_data = buf;
-                       VERIFY(arc_buf_remove_ref(db->db_buf, db));
+                       arc_buf_destroy(db->db_buf, db);
                } else if (dr == NULL || dr->dt.dl.dr_data != db->db_buf) {
                        arc_release(db->db_buf, db);
-                       VERIFY(arc_buf_remove_ref(db->db_buf, db));
+                       arc_buf_destroy(db->db_buf, db);
                }
                db->db_buf = NULL;
        }
@@ -1700,59 +2099,64 @@ dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
        dmu_buf_fill_done(&db->db, tx);
 }
 
-/*
- * "Clear" the contents of this dbuf.  This will mark the dbuf
- * EVICTING and clear *most* of its references.  Unfortunately,
- * when we are not holding the dn_dbufs_mtx, we can't clear the
- * entry in the dn_dbufs list.  We have to wait until dbuf_destroy()
- * in this case.  For callers from the DMU we will usually see:
- *     dbuf_clear()->arc_clear_callback()->dbuf_do_evict()->dbuf_destroy()
- * For the arc callback, we will usually see:
- *     dbuf_do_evict()->dbuf_clear();dbuf_destroy()
- * Sometimes, though, we will get a mix of these two:
- *     DMU: dbuf_clear()->arc_clear_callback()
- *     ARC: dbuf_do_evict()->dbuf_destroy()
- *
- * This routine will dissociate the dbuf from the arc, by calling
- * arc_clear_callback(), but will not evict the data from the ARC.
- */
 void
-dbuf_clear(dmu_buf_impl_t *db)
+dbuf_destroy(dmu_buf_impl_t *db)
 {
        dnode_t *dn;
        dmu_buf_impl_t *parent = db->db_parent;
        dmu_buf_impl_t *dndb;
-       boolean_t dbuf_gone = B_FALSE;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
        ASSERT(refcount_is_zero(&db->db_holds));
 
-       dbuf_evict_user(db);
+       if (db->db_buf != NULL) {
+               arc_buf_destroy(db->db_buf, db);
+               db->db_buf = NULL;
+       }
 
-       if (db->db_state == DB_CACHED) {
+       if (db->db_blkid == DMU_BONUS_BLKID) {
+               int slots = DB_DNODE(db)->dn_num_slots;
+               int bonuslen = DN_SLOTS_TO_BONUSLEN(slots);
                ASSERT(db->db.db_data != NULL);
-               if (db->db_blkid == DMU_BONUS_BLKID) {
-                       zio_buf_free(db->db.db_data, DN_MAX_BONUSLEN);
-                       arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
-               }
-               db->db.db_data = NULL;
+               kmem_free(db->db.db_data, bonuslen);
+               arc_space_return(bonuslen, ARC_SPACE_BONUS);
                db->db_state = DB_UNCACHED;
        }
 
+       dbuf_clear_data(db);
+
+       if (multilist_link_active(&db->db_cache_link)) {
+               multilist_remove(dbuf_cache, db);
+               (void) refcount_remove_many(&dbuf_cache_size,
+                   db->db.db_size, db);
+       }
+
        ASSERT(db->db_state == DB_UNCACHED || db->db_state == DB_NOFILL);
        ASSERT(db->db_data_pending == NULL);
 
        db->db_state = DB_EVICTING;
        db->db_blkptr = NULL;
 
+       /*
+        * Now that db_state is DB_EVICTING, nobody else can find this via
+        * the hash table.  We can now drop db_mtx, which allows us to
+        * acquire the dn_dbufs_mtx.
+        */
+       mutex_exit(&db->db_mtx);
+
        DB_DNODE_ENTER(db);
        dn = DB_DNODE(db);
        dndb = dn->dn_dbuf;
-       if (db->db_blkid != DMU_BONUS_BLKID && MUTEX_HELD(&dn->dn_dbufs_mtx)) {
+       if (db->db_blkid != DMU_BONUS_BLKID) {
+               boolean_t needlock = !MUTEX_HELD(&dn->dn_dbufs_mtx);
+               if (needlock)
+                       mutex_enter(&dn->dn_dbufs_mtx);
                avl_remove(&dn->dn_dbufs, db);
                atomic_dec_32(&dn->dn_dbufs_count);
                membar_producer();
                DB_DNODE_EXIT(db);
+               if (needlock)
+                       mutex_exit(&dn->dn_dbufs_mtx);
                /*
                 * Decrementing the dbuf count means that the hold corresponding
                 * to the removed dbuf is no longer discounted in dnode_move(),
@@ -1763,15 +2167,25 @@ dbuf_clear(dmu_buf_impl_t *db)
                 */
                dnode_rele(dn, db);
                db->db_dnode_handle = NULL;
+
+               dbuf_hash_remove(db);
        } else {
                DB_DNODE_EXIT(db);
        }
 
-       if (db->db_buf)
-               dbuf_gone = arc_clear_callback(db->db_buf);
+       ASSERT(refcount_is_zero(&db->db_holds));
+
+       db->db_parent = NULL;
 
-       if (!dbuf_gone)
-               mutex_exit(&db->db_mtx);
+       ASSERT(db->db_buf == NULL);
+       ASSERT(db->db.db_data == NULL);
+       ASSERT(db->db_hash_next == NULL);
+       ASSERT(db->db_blkptr == NULL);
+       ASSERT(db->db_data_pending == NULL);
+       ASSERT(!multilist_link_active(&db->db_cache_link));
+
+       kmem_cache_free(dbuf_kmem_cache, db);
+       arc_space_return(sizeof (dmu_buf_impl_t), ARC_SPACE_DBUF);
 
        /*
         * If this dbuf is referenced from an indirect dbuf,
@@ -1781,6 +2195,12 @@ dbuf_clear(dmu_buf_impl_t *db)
                dbuf_rele(parent, db);
 }
 
+/*
+ * Note: While bpp will always be updated if the function returns success,
+ * parentp will not be updated if the dnode does not have dn_dbuf filled in;
+ * this happens when the dnode is the meta-dnode, or a userused or groupused
+ * object.
+ */
 __attribute__((always_inline))
 static inline int
 dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
@@ -1797,7 +2217,7 @@ dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
                mutex_enter(&dn->dn_mtx);
                if (dn->dn_have_spill &&
                    (dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR))
-                       *bpp = &dn->dn_phys->dn_spill;
+                       *bpp = DN_SPILL_BLKPTR(dn->dn_phys);
                else
                        *bpp = NULL;
                dbuf_add_ref(dn->dn_dbuf, NULL);
@@ -1806,29 +2226,47 @@ dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
                return (0);
        }
 
-       if (dn->dn_phys->dn_nlevels == 0)
-               nlevels = 1;
-       else
-               nlevels = dn->dn_phys->dn_nlevels;
-
+       nlevels =
+           (dn->dn_phys->dn_nlevels == 0) ? 1 : dn->dn_phys->dn_nlevels;
        epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
 
        ASSERT3U(level * epbs, <, 64);
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
+       /*
+        * This assertion shouldn't trip as long as the max indirect block size
+        * is less than 1M.  The reason for this is that up to that point,
+        * the number of levels required to address an entire object with blocks
+        * of size SPA_MINBLOCKSIZE satisfies nlevels * epbs + 1 <= 64.  In
+        * other words, if N * epbs + 1 > 64, then if (N-1) * epbs + 1 > 55
+        * (i.e. we can address the entire object), objects will all use at most
+        * N-1 levels and the assertion won't overflow.  However, once epbs is
+        * 13, 4 * 13 + 1 = 53, but 5 * 13 + 1 = 66.  Then, 4 levels will not be
+        * enough to address an entire object, so objects will have 5 levels,
+        * but then this assertion will overflow.
+        *
+        * All this is to say that if we ever increase DN_MAX_INDBLKSHIFT, we
+        * need to redo this logic to handle overflows.
+        */
+       ASSERT(level >= nlevels ||
+           ((nlevels - level - 1) * epbs) +
+           highbit64(dn->dn_phys->dn_nblkptr) <= 64);
        if (level >= nlevels ||
-           (blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))) {
+           blkid >= ((uint64_t)dn->dn_phys->dn_nblkptr <<
+           ((nlevels - level - 1) * epbs)) ||
+           (fail_sparse &&
+           blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))) {
                /* the buffer has no parent yet */
                return (SET_ERROR(ENOENT));
        } else if (level < nlevels-1) {
                /* this block is referenced from an indirect block */
                int err;
                if (dh == NULL) {
-                       err = dbuf_hold_impl(dn, level+1, blkid >> epbs,
-                                       fail_sparse, NULL, parentp);
+                       err = dbuf_hold_impl(dn, level+1,
+                           blkid >> epbs, fail_sparse, FALSE, NULL, parentp);
                } else {
                        __dbuf_hold_impl_init(dh + 1, dn, dh->dh_level + 1,
-                                       blkid >> epbs, fail_sparse, NULL,
-                                       parentp, dh->dh_depth + 1);
+                           blkid >> epbs, fail_sparse, FALSE, NULL,
+                           parentp, dh->dh_depth + 1);
                        err = __dbuf_hold_impl(dh + 1);
                }
                if (err)
@@ -1842,6 +2280,8 @@ dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
                }
                *bpp = ((blkptr_t *)(*parentp)->db.db_data) +
                    (blkid & ((1ULL << epbs) - 1));
+               if (blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))
+                       ASSERT(BP_IS_HOLE(*bpp));
                return (0);
        } else {
                /* the block is referenced from the dnode */
@@ -1867,7 +2307,7 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
        ASSERT(dn->dn_type != DMU_OT_NONE);
 
-       db = kmem_cache_alloc(dbuf_cache, KM_SLEEP);
+       db = kmem_cache_alloc(dbuf_kmem_cache, KM_SLEEP);
 
        db->db_objset = os;
        db->db.db_object = dn->dn_object;
@@ -1886,13 +2326,13 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
 
        if (blkid == DMU_BONUS_BLKID) {
                ASSERT3P(parent, ==, dn->dn_dbuf);
-               db->db.db_size = DN_MAX_BONUSLEN -
+               db->db.db_size = DN_SLOTS_TO_BONUSLEN(dn->dn_num_slots) -
                    (dn->dn_nblkptr-1) * sizeof (blkptr_t);
                ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
                db->db.db_offset = DMU_BONUS_BLKID;
                db->db_state = DB_UNCACHED;
                /* the bonus dbuf is not placed in the hash table */
-               arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
+               arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_DBUF);
                return (db);
        } else if (blkid == DMU_SPILL_BLKID) {
                db->db.db_size = (blkptr != NULL) ?
@@ -1916,17 +2356,15 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        db->db_state = DB_EVICTING;
        if ((odb = dbuf_hash_insert(db)) != NULL) {
                /* someone else inserted it first */
-               kmem_cache_free(dbuf_cache, db);
+               kmem_cache_free(dbuf_kmem_cache, db);
                mutex_exit(&dn->dn_dbufs_mtx);
                return (odb);
        }
        avl_add(&dn->dn_dbufs, db);
-       if (db->db_level == 0 && db->db_blkid >=
-           dn->dn_unlisted_l0_blkid)
-               dn->dn_unlisted_l0_blkid = db->db_blkid + 1;
+
        db->db_state = DB_UNCACHED;
        mutex_exit(&dn->dn_dbufs_mtx);
-       arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
+       arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_DBUF);
 
        if (parent && parent != dn->dn_dbuf)
                dbuf_add_ref(parent, db);
@@ -1941,112 +2379,240 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        return (db);
 }
 
-static int
-dbuf_do_evict(void *private)
-{
-       dmu_buf_impl_t *db = private;
+typedef struct dbuf_prefetch_arg {
+       spa_t *dpa_spa; /* The spa to issue the prefetch in. */
+       zbookmark_phys_t dpa_zb; /* The target block to prefetch. */
+       int dpa_epbs; /* Entries (blkptr_t's) Per Block Shift. */
+       int dpa_curlevel; /* The current level that we're reading */
+       dnode_t *dpa_dnode; /* The dnode associated with the prefetch */
+       zio_priority_t dpa_prio; /* The priority I/Os should be issued at. */
+       zio_t *dpa_zio; /* The parent zio_t for all prefetches. */
+       arc_flags_t dpa_aflags; /* Flags to pass to the final prefetch. */
+} dbuf_prefetch_arg_t;
 
-       if (!MUTEX_HELD(&db->db_mtx))
-               mutex_enter(&db->db_mtx);
+/*
+ * Actually issue the prefetch read for the block given.
+ */
+static void
+dbuf_issue_final_prefetch(dbuf_prefetch_arg_t *dpa, blkptr_t *bp)
+{
+       arc_flags_t aflags;
+       if (BP_IS_HOLE(bp) || BP_IS_EMBEDDED(bp))
+               return;
 
-       ASSERT(refcount_is_zero(&db->db_holds));
+       aflags = dpa->dpa_aflags | ARC_FLAG_NOWAIT | ARC_FLAG_PREFETCH;
 
-       if (db->db_state != DB_EVICTING) {
-               ASSERT(db->db_state == DB_CACHED);
-               DBUF_VERIFY(db);
-               db->db_buf = NULL;
-               dbuf_evict(db);
-       } else {
-               mutex_exit(&db->db_mtx);
-               dbuf_destroy(db);
-       }
-       return (0);
+       ASSERT3U(dpa->dpa_curlevel, ==, BP_GET_LEVEL(bp));
+       ASSERT3U(dpa->dpa_curlevel, ==, dpa->dpa_zb.zb_level);
+       ASSERT(dpa->dpa_zio != NULL);
+       (void) arc_read(dpa->dpa_zio, dpa->dpa_spa, bp, NULL, NULL,
+           dpa->dpa_prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
+           &aflags, &dpa->dpa_zb);
 }
 
+/*
+ * Called when an indirect block above our prefetch target is read in.  This
+ * will either read in the next indirect block down the tree or issue the actual
+ * prefetch if the next block down is our target.
+ */
 static void
-dbuf_destroy(dmu_buf_impl_t *db)
+dbuf_prefetch_indirect_done(zio_t *zio, arc_buf_t *abuf, void *private)
 {
-       ASSERT(refcount_is_zero(&db->db_holds));
+       dbuf_prefetch_arg_t *dpa = private;
+       uint64_t nextblkid;
+       blkptr_t *bp;
 
-       if (db->db_blkid != DMU_BONUS_BLKID) {
-               /*
-                * If this dbuf is still on the dn_dbufs list,
-                * remove it from that list.
-                */
-               if (db->db_dnode_handle != NULL) {
-                       dnode_t *dn;
+       ASSERT3S(dpa->dpa_zb.zb_level, <, dpa->dpa_curlevel);
+       ASSERT3S(dpa->dpa_curlevel, >, 0);
 
-                       DB_DNODE_ENTER(db);
-                       dn = DB_DNODE(db);
-                       mutex_enter(&dn->dn_dbufs_mtx);
-                       avl_remove(&dn->dn_dbufs, db);
-                       atomic_dec_32(&dn->dn_dbufs_count);
-                       mutex_exit(&dn->dn_dbufs_mtx);
-                       DB_DNODE_EXIT(db);
-                       /*
-                        * Decrementing the dbuf count means that the hold
-                        * corresponding to the removed dbuf is no longer
-                        * discounted in dnode_move(), so the dnode cannot be
-                        * moved until after we release the hold.
-                        */
-                       dnode_rele(dn, db);
-                       db->db_dnode_handle = NULL;
+       /*
+        * The dpa_dnode is only valid if we are called with a NULL
+        * zio. This indicates that the arc_read() returned without
+        * first calling zio_read() to issue a physical read. Once
+        * a physical read is made the dpa_dnode must be invalidated
+        * as the locks guarding it may have been dropped. If the
+        * dpa_dnode is still valid, then we want to add it to the dbuf
+        * cache. To do so, we must hold the dbuf associated with the block
+        * we just prefetched, read its contents so that we associate it
+        * with an arc_buf_t, and then release it.
+        */
+       if (zio != NULL) {
+               ASSERT3S(BP_GET_LEVEL(zio->io_bp), ==, dpa->dpa_curlevel);
+               if (zio->io_flags & ZIO_FLAG_RAW) {
+                       ASSERT3U(BP_GET_PSIZE(zio->io_bp), ==, zio->io_size);
+               } else {
+                       ASSERT3U(BP_GET_LSIZE(zio->io_bp), ==, zio->io_size);
                }
-               dbuf_hash_remove(db);
-       }
-       db->db_parent = NULL;
-       db->db_buf = NULL;
+               ASSERT3P(zio->io_spa, ==, dpa->dpa_spa);
+
+               dpa->dpa_dnode = NULL;
+       } else if (dpa->dpa_dnode != NULL) {
+               uint64_t curblkid = dpa->dpa_zb.zb_blkid >>
+                   (dpa->dpa_epbs * (dpa->dpa_curlevel -
+                   dpa->dpa_zb.zb_level));
+               dmu_buf_impl_t *db = dbuf_hold_level(dpa->dpa_dnode,
+                   dpa->dpa_curlevel, curblkid, FTAG);
+               (void) dbuf_read(db, NULL,
+                   DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH | DB_RF_HAVESTRUCT);
+               dbuf_rele(db, FTAG);
+       }
+
+       dpa->dpa_curlevel--;
+
+       nextblkid = dpa->dpa_zb.zb_blkid >>
+           (dpa->dpa_epbs * (dpa->dpa_curlevel - dpa->dpa_zb.zb_level));
+       bp = ((blkptr_t *)abuf->b_data) +
+           P2PHASE(nextblkid, 1ULL << dpa->dpa_epbs);
+       if (BP_IS_HOLE(bp) || (zio != NULL && zio->io_error != 0)) {
+               kmem_free(dpa, sizeof (*dpa));
+       } else if (dpa->dpa_curlevel == dpa->dpa_zb.zb_level) {
+               ASSERT3U(nextblkid, ==, dpa->dpa_zb.zb_blkid);
+               dbuf_issue_final_prefetch(dpa, bp);
+               kmem_free(dpa, sizeof (*dpa));
+       } else {
+               arc_flags_t iter_aflags = ARC_FLAG_NOWAIT;
+               zbookmark_phys_t zb;
 
-       ASSERT(db->db.db_data == NULL);
-       ASSERT(db->db_hash_next == NULL);
-       ASSERT(db->db_blkptr == NULL);
-       ASSERT(db->db_data_pending == NULL);
+               ASSERT3U(dpa->dpa_curlevel, ==, BP_GET_LEVEL(bp));
+
+               SET_BOOKMARK(&zb, dpa->dpa_zb.zb_objset,
+                   dpa->dpa_zb.zb_object, dpa->dpa_curlevel, nextblkid);
+
+               (void) arc_read(dpa->dpa_zio, dpa->dpa_spa,
+                   bp, dbuf_prefetch_indirect_done, dpa, dpa->dpa_prio,
+                   ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
+                   &iter_aflags, &zb);
+       }
 
-       kmem_cache_free(dbuf_cache, db);
-       arc_space_return(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
+       arc_buf_destroy(abuf, private);
 }
 
+/*
+ * Issue prefetch reads for the given block on the given level.  If the indirect
+ * blocks above that block are not in memory, we will read them in
+ * asynchronously.  As a result, this call never blocks waiting for a read to
+ * complete.
+ */
 void
-dbuf_prefetch(dnode_t *dn, uint64_t blkid, zio_priority_t prio)
+dbuf_prefetch(dnode_t *dn, int64_t level, uint64_t blkid, zio_priority_t prio,
+    arc_flags_t aflags)
 {
-       dmu_buf_impl_t *db = NULL;
-       blkptr_t *bp = NULL;
+       blkptr_t bp;
+       int epbs, nlevels, curlevel;
+       uint64_t curblkid;
+       dmu_buf_impl_t *db;
+       zio_t *pio;
+       dbuf_prefetch_arg_t *dpa;
+       dsl_dataset_t *ds;
 
        ASSERT(blkid != DMU_BONUS_BLKID);
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
 
+       if (blkid > dn->dn_maxblkid)
+               return;
+
        if (dnode_block_freed(dn, blkid))
                return;
 
-       /* dbuf_find() returns with db_mtx held */
-       if ((db = dbuf_find(dn->dn_objset, dn->dn_object, 0, blkid))) {
+       /*
+        * This dnode hasn't been written to disk yet, so there's nothing to
+        * prefetch.
+        */
+       nlevels = dn->dn_phys->dn_nlevels;
+       if (level >= nlevels || dn->dn_phys->dn_nblkptr == 0)
+               return;
+
+       epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
+       if (dn->dn_phys->dn_maxblkid < blkid << (epbs * level))
+               return;
+
+       db = dbuf_find(dn->dn_objset, dn->dn_object,
+           level, blkid);
+       if (db != NULL) {
+               mutex_exit(&db->db_mtx);
                /*
-                * This dbuf is already in the cache.  We assume that
-                * it is already CACHED, or else about to be either
-                * read or filled.
+                * This dbuf already exists.  It is either CACHED, or
+                * (we assume) about to be read or filled.
                 */
-               mutex_exit(&db->db_mtx);
                return;
        }
 
-       if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp, NULL) == 0) {
-               if (bp && !BP_IS_HOLE(bp) && !BP_IS_EMBEDDED(bp)) {
-                       dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
-                       arc_flags_t aflags =
-                           ARC_FLAG_NOWAIT | ARC_FLAG_PREFETCH;
-                       zbookmark_phys_t zb;
+       /*
+        * Find the closest ancestor (indirect block) of the target block
+        * that is present in the cache.  In this indirect block, we will
+        * find the bp that is at curlevel, curblkid.
+        */
+       curlevel = level;
+       curblkid = blkid;
+       while (curlevel < nlevels - 1) {
+               int parent_level = curlevel + 1;
+               uint64_t parent_blkid = curblkid >> epbs;
+               dmu_buf_impl_t *db;
+
+               if (dbuf_hold_impl(dn, parent_level, parent_blkid,
+                   FALSE, TRUE, FTAG, &db) == 0) {
+                       blkptr_t *bpp = db->db_buf->b_data;
+                       bp = bpp[P2PHASE(curblkid, 1 << epbs)];
+                       dbuf_rele(db, FTAG);
+                       break;
+               }
+
+               curlevel = parent_level;
+               curblkid = parent_blkid;
+       }
 
-                       SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
-                           dn->dn_object, 0, blkid);
+       if (curlevel == nlevels - 1) {
+               /* No cached indirect blocks found. */
+               ASSERT3U(curblkid, <, dn->dn_phys->dn_nblkptr);
+               bp = dn->dn_phys->dn_blkptr[curblkid];
+       }
+       if (BP_IS_HOLE(&bp))
+               return;
 
-                       (void) arc_read(NULL, dn->dn_objset->os_spa,
-                           bp, NULL, NULL, prio,
-                           ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
-                           &aflags, &zb);
-               }
-               if (db)
-                       dbuf_rele(db, NULL);
+       ASSERT3U(curlevel, ==, BP_GET_LEVEL(&bp));
+
+       pio = zio_root(dmu_objset_spa(dn->dn_objset), NULL, NULL,
+           ZIO_FLAG_CANFAIL);
+
+       dpa = kmem_zalloc(sizeof (*dpa), KM_SLEEP);
+       ds = dn->dn_objset->os_dsl_dataset;
+       SET_BOOKMARK(&dpa->dpa_zb, ds != NULL ? ds->ds_object : DMU_META_OBJSET,
+           dn->dn_object, level, blkid);
+       dpa->dpa_curlevel = curlevel;
+       dpa->dpa_prio = prio;
+       dpa->dpa_aflags = aflags;
+       dpa->dpa_spa = dn->dn_objset->os_spa;
+       dpa->dpa_dnode = dn;
+       dpa->dpa_epbs = epbs;
+       dpa->dpa_zio = pio;
+
+       /*
+        * If we have the indirect just above us, no need to do the asynchronous
+        * prefetch chain; we'll just run the last step ourselves.  If we're at
+        * a higher level, though, we want to issue the prefetches for all the
+        * indirect blocks asynchronously, so we can go on with whatever we were
+        * doing.
+        */
+       if (curlevel == level) {
+               ASSERT3U(curblkid, ==, blkid);
+               dbuf_issue_final_prefetch(dpa, &bp);
+               kmem_free(dpa, sizeof (*dpa));
+       } else {
+               arc_flags_t iter_aflags = ARC_FLAG_NOWAIT;
+               zbookmark_phys_t zb;
+
+               SET_BOOKMARK(&zb, ds != NULL ? ds->ds_object : DMU_META_OBJSET,
+                   dn->dn_object, curlevel, curblkid);
+               (void) arc_read(dpa->dpa_zio, dpa->dpa_spa,
+                   &bp, dbuf_prefetch_indirect_done, dpa, prio,
+                   ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
+                   &iter_aflags, &zb);
        }
+       /*
+        * We use pio here instead of dpa_zio since it's possible that
+        * dpa may have already been freed.
+        */
+       zio_nowait(pio);
 }
 
 #define        DBUF_HOLD_IMPL_MAX_DEPTH        20
@@ -2066,7 +2632,7 @@ __dbuf_hold_impl(struct dbuf_hold_impl_data *dh)
        ASSERT3U(dh->dh_dn->dn_nlevels, >, dh->dh_level);
 
        *(dh->dh_dbp) = NULL;
-top:
+
        /* dbuf_find() returns with db_mtx held */
        dh->dh_db = dbuf_find(dh->dh_dn->dn_objset, dh->dh_dn->dn_object,
            dh->dh_level, dh->dh_blkid);
@@ -2074,10 +2640,12 @@ top:
        if (dh->dh_db == NULL) {
                dh->dh_bp = NULL;
 
+               if (dh->dh_fail_uncached)
+                       return (SET_ERROR(ENOENT));
+
                ASSERT3P(dh->dh_parent, ==, NULL);
                dh->dh_err = dbuf_findbp(dh->dh_dn, dh->dh_level, dh->dh_blkid,
-                                       dh->dh_fail_sparse, &dh->dh_parent,
-                                       &dh->dh_bp, dh);
+                   dh->dh_fail_sparse, &dh->dh_parent, &dh->dh_bp, dh);
                if (dh->dh_fail_sparse) {
                        if (dh->dh_err == 0 &&
                            dh->dh_bp && BP_IS_HOLE(dh->dh_bp))
@@ -2091,22 +2659,17 @@ top:
                if (dh->dh_err && dh->dh_err != ENOENT)
                        return (dh->dh_err);
                dh->dh_db = dbuf_create(dh->dh_dn, dh->dh_level, dh->dh_blkid,
-                                       dh->dh_parent, dh->dh_bp);
+                   dh->dh_parent, dh->dh_bp);
        }
 
-       if (dh->dh_db->db_buf && refcount_is_zero(&dh->dh_db->db_holds)) {
-               arc_buf_add_ref(dh->dh_db->db_buf, dh->dh_db);
-               if (dh->dh_db->db_buf->b_data == NULL) {
-                       dbuf_clear(dh->dh_db);
-                       if (dh->dh_parent) {
-                               dbuf_rele(dh->dh_parent, NULL);
-                               dh->dh_parent = NULL;
-                       }
-                       goto top;
-               }
-               ASSERT3P(dh->dh_db->db.db_data, ==, dh->dh_db->db_buf->b_data);
+       if (dh->dh_fail_uncached && dh->dh_db->db_state != DB_CACHED) {
+               mutex_exit(&dh->dh_db->db_mtx);
+               return (SET_ERROR(ENOENT));
        }
 
+       if (dh->dh_db->db_buf != NULL)
+               ASSERT3P(dh->dh_db->db.db_data, ==, dh->dh_db->db_buf->b_data);
+
        ASSERT(dh->dh_db->db_buf == NULL || arc_referenced(dh->dh_db->db_buf));
 
        /*
@@ -2124,13 +2687,19 @@ top:
                        dh->dh_type = DBUF_GET_BUFC_TYPE(dh->dh_db);
 
                        dbuf_set_data(dh->dh_db,
-                           arc_buf_alloc(dh->dh_dn->dn_objset->os_spa,
-                           dh->dh_db->db.db_size, dh->dh_db, dh->dh_type));
+                           arc_alloc_buf(dh->dh_dn->dn_objset->os_spa,
+                           dh->dh_db, dh->dh_type, dh->dh_db->db.db_size));
                        bcopy(dh->dh_dr->dt.dl.dr_data->b_data,
                            dh->dh_db->db.db_data, dh->dh_db->db.db_size);
                }
        }
 
+       if (multilist_link_active(&dh->dh_db->db_cache_link)) {
+               ASSERT(refcount_is_zero(&dh->dh_db->db_holds));
+               multilist_remove(dbuf_cache, dh->dh_db);
+               (void) refcount_remove_many(&dbuf_cache_size,
+                   dh->dh_db->db.db_size, dh->dh_db);
+       }
        (void) refcount_add(&dh->dh_db->db_holds, dh->dh_tag);
        DBUF_VERIFY(dh->dh_db);
        mutex_exit(&dh->dh_db->db_mtx);
@@ -2154,15 +2723,17 @@ top:
  * on the stack for 20 levels of recursion.
  */
 int
-dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
+dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid,
+    boolean_t fail_sparse, boolean_t fail_uncached,
     void *tag, dmu_buf_impl_t **dbp)
 {
        struct dbuf_hold_impl_data *dh;
        int error;
 
-       dh = kmem_zalloc(sizeof (struct dbuf_hold_impl_data) *
+       dh = kmem_alloc(sizeof (struct dbuf_hold_impl_data) *
            DBUF_HOLD_IMPL_MAX_DEPTH, KM_SLEEP);
-       __dbuf_hold_impl_init(dh, dn, level, blkid, fail_sparse, tag, dbp, 0);
+       __dbuf_hold_impl_init(dh, dn, level, blkid, fail_sparse,
+           fail_uncached, tag, dbp, 0);
 
        error = __dbuf_hold_impl(dh);
 
@@ -2174,31 +2745,41 @@ dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
 
 static void
 __dbuf_hold_impl_init(struct dbuf_hold_impl_data *dh,
-    dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
+    dnode_t *dn, uint8_t level, uint64_t blkid,
+    boolean_t fail_sparse, boolean_t fail_uncached,
     void *tag, dmu_buf_impl_t **dbp, int depth)
 {
        dh->dh_dn = dn;
        dh->dh_level = level;
        dh->dh_blkid = blkid;
+
        dh->dh_fail_sparse = fail_sparse;
+       dh->dh_fail_uncached = fail_uncached;
+
        dh->dh_tag = tag;
        dh->dh_dbp = dbp;
+
+       dh->dh_db = NULL;
+       dh->dh_parent = NULL;
+       dh->dh_bp = NULL;
+       dh->dh_err = 0;
+       dh->dh_dr = NULL;
+       dh->dh_type = 0;
+
        dh->dh_depth = depth;
 }
 
 dmu_buf_impl_t *
 dbuf_hold(dnode_t *dn, uint64_t blkid, void *tag)
 {
-       dmu_buf_impl_t *db;
-       int err = dbuf_hold_impl(dn, 0, blkid, FALSE, tag, &db);
-       return (err ? NULL : db);
+       return (dbuf_hold_level(dn, 0, blkid, tag));
 }
 
 dmu_buf_impl_t *
 dbuf_hold_level(dnode_t *dn, int level, uint64_t blkid, void *tag)
 {
        dmu_buf_impl_t *db;
-       int err = dbuf_hold_impl(dn, level, blkid, FALSE, tag, &db);
+       int err = dbuf_hold_impl(dn, level, blkid, FALSE, FALSE, tag, &db);
        return (err ? NULL : db);
 }
 
@@ -2244,7 +2825,8 @@ dbuf_rm_spill(dnode_t *dn, dmu_tx_t *tx)
 void
 dbuf_add_ref(dmu_buf_impl_t *db, void *tag)
 {
-       VERIFY(refcount_add(&db->db_holds, tag) > 1);
+       int64_t holds = refcount_add(&db->db_holds, tag);
+       VERIFY3S(holds, >, 1);
 }
 
 #pragma weak dmu_buf_try_add_ref = dbuf_try_add_ref
@@ -2315,8 +2897,10 @@ dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag)
         * We can't freeze indirects if there is a possibility that they
         * may be modified in the current syncing context.
         */
-       if (db->db_buf && holds == (db->db_level == 0 ? db->db_dirtycnt : 0))
+       if (db->db_buf != NULL &&
+           holds == (db->db_level == 0 ? db->db_dirtycnt : 0)) {
                arc_buf_freeze(db->db_buf);
+       }
 
        if (holds == db->db_dirtycnt &&
            db->db_level == 0 && db->db_user_immediate_evict)
@@ -2361,55 +2945,44 @@ dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag)
                         */
                        ASSERT(db->db_state == DB_UNCACHED ||
                            db->db_state == DB_NOFILL);
-                       dbuf_evict(db);
+                       dbuf_destroy(db);
                } else if (arc_released(db->db_buf)) {
-                       arc_buf_t *buf = db->db_buf;
                        /*
                         * This dbuf has anonymous data associated with it.
                         */
-                       dbuf_clear_data(db);
-                       VERIFY(arc_buf_remove_ref(buf, db));
-                       dbuf_evict(db);
+                       dbuf_destroy(db);
                } else {
-                       VERIFY(!arc_buf_remove_ref(db->db_buf, db));
+                       boolean_t do_arc_evict = B_FALSE;
+                       blkptr_t bp;
+                       spa_t *spa = dmu_objset_spa(db->db_objset);
+
+                       if (!DBUF_IS_CACHEABLE(db) &&
+                           db->db_blkptr != NULL &&
+                           !BP_IS_HOLE(db->db_blkptr) &&
+                           !BP_IS_EMBEDDED(db->db_blkptr)) {
+                               do_arc_evict = B_TRUE;
+                               bp = *db->db_blkptr;
+                       }
 
-                       /*
-                        * A dbuf will be eligible for eviction if either the
-                        * 'primarycache' property is set or a duplicate
-                        * copy of this buffer is already cached in the arc.
-                        *
-                        * In the case of the 'primarycache' a buffer
-                        * is considered for eviction if it matches the
-                        * criteria set in the property.
-                        *
-                        * To decide if our buffer is considered a
-                        * duplicate, we must call into the arc to determine
-                        * if multiple buffers are referencing the same
-                        * block on-disk. If so, then we simply evict
-                        * ourselves.
-                        */
-                       if (!DBUF_IS_CACHEABLE(db)) {
-                               if (db->db_blkptr != NULL &&
-                                   !BP_IS_HOLE(db->db_blkptr) &&
-                                   !BP_IS_EMBEDDED(db->db_blkptr)) {
-                                       spa_t *spa =
-                                           dmu_objset_spa(db->db_objset);
-                                       blkptr_t bp = *db->db_blkptr;
-                                       dbuf_clear(db);
-                                       arc_freed(spa, &bp);
-                               } else {
-                                       dbuf_clear(db);
-                               }
-                       } else if (db->db_pending_evict ||
-                           arc_buf_eviction_needed(db->db_buf)) {
-                               dbuf_clear(db);
-                       } else {
+                       if (!DBUF_IS_CACHEABLE(db) ||
+                           db->db_pending_evict) {
+                               dbuf_destroy(db);
+                       } else if (!multilist_link_active(&db->db_cache_link)) {
+                               multilist_insert(dbuf_cache, db);
+                               (void) refcount_add_many(&dbuf_cache_size,
+                                   db->db.db_size, db);
                                mutex_exit(&db->db_mtx);
+
+                               dbuf_evict_notify();
                        }
+
+                       if (do_arc_evict)
+                               arc_freed(spa, &bp);
                }
        } else {
                mutex_exit(&db->db_mtx);
        }
+
 }
 
 #pragma weak dmu_buf_refcount = dbuf_refcount
@@ -2473,24 +3046,33 @@ dmu_buf_user_evict_wait()
        taskq_wait(dbu_evict_taskq);
 }
 
-boolean_t
-dmu_buf_freeable(dmu_buf_t *dbuf)
+blkptr_t *
+dmu_buf_get_blkptr(dmu_buf_t *db)
 {
-       boolean_t res = B_FALSE;
-       dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbuf;
+       dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
+       return (dbi->db_blkptr);
+}
 
-       if (db->db_blkptr)
-               res = dsl_dataset_block_freeable(db->db_objset->os_dsl_dataset,
-                   db->db_blkptr, db->db_blkptr->blk_birth);
+objset_t *
+dmu_buf_get_objset(dmu_buf_t *db)
+{
+       dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
+       return (dbi->db_objset);
+}
 
-       return (res);
+dnode_t *
+dmu_buf_dnode_enter(dmu_buf_t *db)
+{
+       dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
+       DB_DNODE_ENTER(dbi);
+       return (DB_DNODE(dbi));
 }
 
-blkptr_t *
-dmu_buf_get_blkptr(dmu_buf_t *db)
+void
+dmu_buf_dnode_exit(dmu_buf_t *db)
 {
        dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
-       return (dbi->db_blkptr);
+       DB_DNODE_EXIT(dbi);
 }
 
 static void
@@ -2503,7 +3085,7 @@ dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
                return;
 
        if (db->db_blkid == DMU_SPILL_BLKID) {
-               db->db_blkptr = &dn->dn_phys->dn_spill;
+               db->db_blkptr = DN_SPILL_BLKPTR(dn->dn_phys);
                BP_ZERO(db->db_blkptr);
                return;
        }
@@ -2526,8 +3108,8 @@ dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
                if (parent == NULL) {
                        mutex_exit(&db->db_mtx);
                        rw_enter(&dn->dn_struct_rwlock, RW_READER);
-                       (void) dbuf_hold_impl(dn, db->db_level+1,
-                           db->db_blkid >> epbs, FALSE, db, &parent);
+                       parent = dbuf_hold_level(dn, db->db_level + 1,
+                           db->db_blkid >> epbs, db);
                        rw_exit(&dn->dn_struct_rwlock);
                        mutex_enter(&db->db_mtx);
                        db->db_parent = parent;
@@ -2659,13 +3241,16 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
 
                ASSERT(*datap != NULL);
                ASSERT0(db->db_level);
-               ASSERT3U(dn->dn_phys->dn_bonuslen, <=, DN_MAX_BONUSLEN);
+               ASSERT3U(dn->dn_phys->dn_bonuslen, <=,
+                   DN_SLOTS_TO_BONUSLEN(dn->dn_phys->dn_extra_slots + 1));
                bcopy(*datap, DN_BONUS(dn->dn_phys), dn->dn_phys->dn_bonuslen);
                DB_DNODE_EXIT(db);
 
                if (*datap != db->db.db_data) {
-                       zio_buf_free(*datap, DN_MAX_BONUSLEN);
-                       arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
+                       int slots = DB_DNODE(db)->dn_num_slots;
+                       int bonuslen = DN_SLOTS_TO_BONUSLEN(slots);
+                       kmem_free(*datap, bonuslen);
+                       arc_space_return(bonuslen, ARC_SPACE_BONUS);
                }
                db->db_data_pending = NULL;
                drp = &db->db_last_dirty;
@@ -2721,10 +3306,19 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                 * objects only modified in the syncing context (e.g.
                 * DNONE_DNODE blocks).
                 */
-               int blksz = arc_buf_size(*datap);
+               int psize = arc_buf_size(*datap);
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
-               *datap = arc_buf_alloc(os->os_spa, blksz, db, type);
-               bcopy(db->db.db_data, (*datap)->b_data, blksz);
+               enum zio_compress compress_type = arc_get_compression(*datap);
+
+               if (compress_type == ZIO_COMPRESS_OFF) {
+                       *datap = arc_alloc_buf(os->os_spa, db, type, psize);
+               } else {
+                       ASSERT3U(type, ==, ARC_BUFC_DATA);
+                       int lsize = arc_buf_lsize(*datap);
+                       *datap = arc_alloc_compressed_buf(os->os_spa, db,
+                           psize, lsize, compress_type);
+               }
+               bcopy(db->db.db_data, (*datap)->b_data, psize);
        }
        db->db_data_pending = dr;
 
@@ -2792,7 +3386,8 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
        uint64_t fill = 0;
        int i;
 
-       ASSERT3P(db->db_blkptr, ==, bp);
+       ASSERT3P(db->db_blkptr, !=, NULL);
+       ASSERT3P(&db->db_data_pending->dr_bp_copy, ==, bp);
 
        DB_DNODE_ENTER(db);
        dn = DB_DNODE(db);
@@ -2814,8 +3409,8 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
 #ifdef ZFS_DEBUG
        if (db->db_blkid == DMU_SPILL_BLKID) {
                ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
-               ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
-                   db->db_blkptr == &dn->dn_phys->dn_spill);
+               ASSERT(!(BP_IS_HOLE(bp)) &&
+                   db->db_blkptr == DN_SPILL_BLKPTR(dn->dn_phys));
        }
 #endif
 
@@ -2827,11 +3422,17 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
                mutex_exit(&dn->dn_mtx);
 
                if (dn->dn_type == DMU_OT_DNODE) {
-                       dnode_phys_t *dnp = db->db.db_data;
-                       for (i = db->db.db_size >> DNODE_SHIFT; i > 0;
-                           i--, dnp++) {
-                               if (dnp->dn_type != DMU_OT_NONE)
+                       i = 0;
+                       while (i < db->db.db_size) {
+                               dnode_phys_t *dnp =
+                                   (void *)(((char *)db->db.db_data) + i);
+
+                               i += DNODE_MIN_SIZE;
+                               if (dnp->dn_type != DMU_OT_NONE) {
                                        fill++;
+                                       i += dnp->dn_extra_slots *
+                                           DNODE_MIN_SIZE;
+                               }
                        }
                } else {
                        if (BP_IS_HOLE(bp)) {
@@ -2855,6 +3456,55 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
                bp->blk_fill = fill;
 
        mutex_exit(&db->db_mtx);
+
+       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+       *db->db_blkptr = *bp;
+       rw_exit(&dn->dn_struct_rwlock);
+}
+
+/* ARGSUSED */
+/*
+ * This function gets called just prior to running through the compression
+ * stage of the zio pipeline. If we're an indirect block comprised of only
+ * holes, then we want this indirect to be compressed away to a hole. In
+ * order to do that we must zero out any information about the holes that
+ * this indirect points to prior to before we try to compress it.
+ */
+static void
+dbuf_write_children_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
+{
+       dmu_buf_impl_t *db = vdb;
+       dnode_t *dn;
+       blkptr_t *bp;
+       unsigned int epbs, i;
+
+       ASSERT3U(db->db_level, >, 0);
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
+       ASSERT3U(epbs, <, 31);
+
+       /* Determine if all our children are holes */
+       for (i = 0, bp = db->db.db_data; i < 1ULL << epbs; i++, bp++) {
+               if (!BP_IS_HOLE(bp))
+                       break;
+       }
+
+       /*
+        * If all the children are holes, then zero them all out so that
+        * we may get compressed away.
+        */
+       if (i == 1ULL << epbs) {
+               /*
+                * We only found holes. Grab the rwlock to prevent
+                * anybody from reading the blocks we're about to
+                * zero out.
+                */
+               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+               bzero(db->db.db_data, db->db.db_size);
+               rw_exit(&dn->dn_struct_rwlock);
+       }
+       DB_DNODE_EXIT(db);
 }
 
 /*
@@ -2935,7 +3585,7 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                dn = DB_DNODE(db);
                ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
                ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
-                   db->db_blkptr == &dn->dn_phys->dn_spill);
+                   db->db_blkptr == DN_SPILL_BLKPTR(dn->dn_phys));
                DB_DNODE_EXIT(db);
        }
 #endif
@@ -2945,10 +3595,7 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
                if (db->db_state != DB_NOFILL) {
                        if (dr->dt.dl.dr_data != db->db_buf)
-                               VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data,
-                                   db));
-                       else if (!arc_released(db->db_buf))
-                               arc_set_callback(db->db_buf, dbuf_do_evict, db);
+                               arc_buf_destroy(dr->dt.dl.dr_data, db);
                }
        } else {
                dnode_t *dn;
@@ -2964,8 +3611,6 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                            dn->dn_phys->dn_maxblkid >> (db->db_level * epbs));
                        ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
                            db->db.db_size);
-                       if (!arc_released(db->db_buf))
-                               arc_set_callback(db->db_buf, dbuf_do_evict, db);
                }
                DB_DNODE_EXIT(db);
                mutex_destroy(&dr->dt.di.dr_mtx);
@@ -3017,6 +3662,9 @@ dbuf_write_override_done(zio_t *zio)
        mutex_exit(&db->db_mtx);
 
        dbuf_write_done(zio, NULL, db);
+
+       if (zio->io_abd != NULL)
+               abd_put(zio->io_abd);
 }
 
 /* Issue I/O to commit a dirty buffer to disk. */
@@ -3033,6 +3681,8 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
        zio_t *zio;
        int wp_flag = 0;
 
+       ASSERT(dmu_tx_is_syncing(tx));
+
        DB_DNODE_ENTER(db);
        dn = DB_DNODE(db);
        os = dn->dn_objset;
@@ -3091,17 +3741,27 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
        dmu_write_policy(os, dn, db->db_level, wp_flag, &zp);
        DB_DNODE_EXIT(db);
 
+       /*
+        * We copy the blkptr now (rather than when we instantiate the dirty
+        * record), because its value can change between open context and
+        * syncing context. We do not need to hold dn_struct_rwlock to read
+        * db_blkptr because we are in syncing context.
+        */
+       dr->dr_bp_copy = *db->db_blkptr;
+
        if (db->db_level == 0 &&
            dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
                /*
                 * The BP for this block has been provided by open context
                 * (by dmu_sync() or dmu_buf_write_embedded()).
                 */
-               void *contents = (data != NULL) ? data->b_data : NULL;
+               abd_t *contents = (data != NULL) ?
+                   abd_get_from_buf(data->b_data, arc_buf_size(data)) : NULL;
 
                dr->dr_zio = zio_write(zio, os->os_spa, txg,
-                   db->db_blkptr, contents, db->db.db_size, &zp,
-                   dbuf_write_override_ready, NULL, dbuf_write_override_done,
+                   &dr->dr_bp_copy, contents, db->db.db_size, db->db.db_size,
+                   &zp, dbuf_write_override_ready, NULL, NULL,
+                   dbuf_write_override_done,
                    dr, ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
                mutex_enter(&db->db_mtx);
                dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
@@ -3109,26 +3769,39 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
                    dr->dt.dl.dr_copies, dr->dt.dl.dr_nopwrite);
                mutex_exit(&db->db_mtx);
        } else if (db->db_state == DB_NOFILL) {
-               ASSERT(zp.zp_checksum == ZIO_CHECKSUM_OFF);
+               ASSERT(zp.zp_checksum == ZIO_CHECKSUM_OFF ||
+                   zp.zp_checksum == ZIO_CHECKSUM_NOPARITY);
                dr->dr_zio = zio_write(zio, os->os_spa, txg,
-                   db->db_blkptr, NULL, db->db.db_size, &zp,
-                   dbuf_write_nofill_ready, NULL, dbuf_write_nofill_done, db,
+                   &dr->dr_bp_copy, NULL, db->db.db_size, db->db.db_size, &zp,
+                   dbuf_write_nofill_ready, NULL, NULL,
+                   dbuf_write_nofill_done, db,
                    ZIO_PRIORITY_ASYNC_WRITE,
                    ZIO_FLAG_MUSTSUCCEED | ZIO_FLAG_NODATA, &zb);
        } else {
+               arc_done_func_t *children_ready_cb = NULL;
                ASSERT(arc_released(data));
+
+               /*
+                * For indirect blocks, we want to setup the children
+                * ready callback so that we can properly handle an indirect
+                * block that only contains holes.
+                */
+               if (db->db_level != 0)
+                       children_ready_cb = dbuf_write_children_ready;
+
                dr->dr_zio = arc_write(zio, os->os_spa, txg,
-                   db->db_blkptr, data, DBUF_IS_L2CACHEABLE(db),
-                   DBUF_IS_L2COMPRESSIBLE(db), &zp, dbuf_write_ready,
-                   dbuf_write_physdone, dbuf_write_done, db,
-                   ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
+                   &dr->dr_bp_copy, data, DBUF_IS_L2CACHEABLE(db),
+                   &zp, dbuf_write_ready,
+                   children_ready_cb, dbuf_write_physdone,
+                   dbuf_write_done, db, ZIO_PRIORITY_ASYNC_WRITE,
+                   ZIO_FLAG_MUSTSUCCEED, &zb);
        }
 }
 
 #if defined(_KERNEL) && defined(HAVE_SPL)
 EXPORT_SYMBOL(dbuf_find);
 EXPORT_SYMBOL(dbuf_is_metadata);
-EXPORT_SYMBOL(dbuf_evict);
+EXPORT_SYMBOL(dbuf_destroy);
 EXPORT_SYMBOL(dbuf_loan_arcbuf);
 EXPORT_SYMBOL(dbuf_whichblock);
 EXPORT_SYMBOL(dbuf_read);
@@ -3143,7 +3816,6 @@ EXPORT_SYMBOL(dmu_buf_will_fill);
 EXPORT_SYMBOL(dmu_buf_fill_done);
 EXPORT_SYMBOL(dmu_buf_rele);
 EXPORT_SYMBOL(dbuf_assign_arcbuf);
-EXPORT_SYMBOL(dbuf_clear);
 EXPORT_SYMBOL(dbuf_prefetch);
 EXPORT_SYMBOL(dbuf_hold_impl);
 EXPORT_SYMBOL(dbuf_hold);
@@ -3159,6 +3831,25 @@ EXPORT_SYMBOL(dbuf_sync_list);
 EXPORT_SYMBOL(dmu_buf_set_user);
 EXPORT_SYMBOL(dmu_buf_set_user_ie);
 EXPORT_SYMBOL(dmu_buf_get_user);
-EXPORT_SYMBOL(dmu_buf_freeable);
 EXPORT_SYMBOL(dmu_buf_get_blkptr);
+
+/* BEGIN CSTYLED */
+module_param(dbuf_cache_max_bytes, ulong, 0644);
+MODULE_PARM_DESC(dbuf_cache_max_bytes,
+       "Maximum size in bytes of the dbuf cache.");
+
+module_param(dbuf_cache_hiwater_pct, uint, 0644);
+MODULE_PARM_DESC(dbuf_cache_hiwater_pct,
+       "Percentage over dbuf_cache_max_bytes when dbufs must be evicted "
+       "directly.");
+
+module_param(dbuf_cache_lowater_pct, uint, 0644);
+MODULE_PARM_DESC(dbuf_cache_lowater_pct,
+       "Percentage below dbuf_cache_max_bytes when the evict thread stops "
+       "evicting dbufs.");
+
+module_param(dbuf_cache_max_shift, int, 0644);
+MODULE_PARM_DESC(dbuf_cache_max_shift,
+       "Cap the size of the dbuf cache to a log2 fraction of arc size.");
+/* END CSTYLED */
 #endif