]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/dbuf.c
Update to onnv_147
[mirror_zfs.git] / module / zfs / dbuf.c
index 42ae439972e4ce25ebd2ef38692af468bca7e0ac..9c4e0296db2bdab35fca237db669679c66cebae7 100644 (file)
@@ -217,6 +217,22 @@ dbuf_evict_user(dmu_buf_impl_t *db)
        db->db_evict_func = NULL;
 }
 
+boolean_t
+dbuf_is_metadata(dmu_buf_impl_t *db)
+{
+       if (db->db_level > 0) {
+               return (B_TRUE);
+       } else {
+               boolean_t is_metadata;
+
+               DB_DNODE_ENTER(db);
+               is_metadata = dmu_ot[DB_DNODE(db)->dn_type].ot_metadata;
+               DB_DNODE_EXIT(db);
+
+               return (is_metadata);
+       }
+}
+
 void
 dbuf_evict(dmu_buf_impl_t *db)
 {
@@ -281,7 +297,7 @@ dbuf_fini(void)
 static void
 dbuf_verify(dmu_buf_impl_t *db)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        dbuf_dirty_record_t *dr;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
@@ -290,6 +306,8 @@ dbuf_verify(dmu_buf_impl_t *db)
                return;
 
        ASSERT(db->db_objset != NULL);
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        if (dn == NULL) {
                ASSERT(db->db_parent == NULL);
                ASSERT(db->db_blkptr == NULL);
@@ -297,8 +315,9 @@ dbuf_verify(dmu_buf_impl_t *db)
                ASSERT3U(db->db.db_object, ==, dn->dn_object);
                ASSERT3P(db->db_objset, ==, dn->dn_objset);
                ASSERT3U(db->db_level, <, dn->dn_nlevels);
-               ASSERT(db->db_blkid == DMU_BONUS_BLKID || db->db_blkid ==
-                   DMU_SPILL_BLKID || list_head(&dn->dn_dbufs));
+               ASSERT(db->db_blkid == DMU_BONUS_BLKID ||
+                   db->db_blkid == DMU_SPILL_BLKID ||
+                   !list_is_empty(&dn->dn_dbufs));
        }
        if (db->db_blkid == DMU_BONUS_BLKID) {
                ASSERT(dn != NULL);
@@ -355,7 +374,7 @@ dbuf_verify(dmu_buf_impl_t *db)
                         * have the struct_rwlock.  XXX indblksz no longer
                         * grows.  safe to do this now?
                         */
-                       if (RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock)) {
+                       if (RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
                                ASSERT3P(db->db_blkptr, ==,
                                    ((blkptr_t *)db->db_parent->db.db_data +
                                    db->db_blkid % epb));
@@ -380,6 +399,7 @@ dbuf_verify(dmu_buf_impl_t *db)
                        }
                }
        }
+       DB_DNODE_EXIT(db);
 }
 #endif
 
@@ -424,8 +444,11 @@ dbuf_loan_arcbuf(dmu_buf_impl_t *db)
        mutex_enter(&db->db_mtx);
        if (arc_released(db->db_buf) || refcount_count(&db->db_holds) > 1) {
                int blksz = db->db.db_size;
+               spa_t *spa;
+
                mutex_exit(&db->db_mtx);
-               abuf = arc_loan_buf(db->db_dnode->dn_objset->os_spa, blksz);
+               DB_GET_SPA(&spa, db);
+               abuf = arc_loan_buf(spa, blksz);
                bcopy(db->db.db_data, abuf->b_data, blksz);
        } else {
                abuf = db->db_buf;
@@ -484,11 +507,14 @@ dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
 static void
 dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
+       spa_t *spa;
        zbookmark_t zb;
        uint32_t aflags = ARC_NOWAIT;
        arc_buf_t *pbuf;
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        ASSERT(!refcount_is_zero(&db->db_holds));
        /* We need the struct_rwlock to prevent db_blkptr from changing. */
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
@@ -506,6 +532,7 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
                        bzero(db->db.db_data, DN_MAX_BONUSLEN);
                if (bonuslen)
                        bcopy(DN_BONUS(dn->dn_phys), db->db.db_data, bonuslen);
+               DB_DNODE_EXIT(db);
                dbuf_update_data(db);
                db->db_state = DB_CACHED;
                mutex_exit(&db->db_mtx);
@@ -524,6 +551,7 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 
                dbuf_set_data(db, arc_buf_alloc(dn->dn_objset->os_spa,
                    db->db.db_size, db, type));
+               DB_DNODE_EXIT(db);
                bzero(db->db.db_data, db->db.db_size);
                db->db_state = DB_CACHED;
                *flags |= DB_RF_CACHED;
@@ -531,6 +559,9 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
                return;
        }
 
+       spa = dn->dn_objset->os_spa;
+       DB_DNODE_EXIT(db);
+
        db->db_state = DB_READ;
        mutex_exit(&db->db_mtx);
 
@@ -549,7 +580,7 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
        else
                pbuf = db->db_objset->os_phys_buf;
 
-       (void) dsl_read(zio, dn->dn_objset->os_spa, db->db_blkptr, pbuf,
+       (void) dsl_read(zio, spa, db->db_blkptr, pbuf,
            dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
            (*flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
            &aflags, &zb);
@@ -563,6 +594,7 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
        int err = 0;
        int havepzio = (zio != NULL);
        int prefetch;
+       dnode_t *dn;
 
        /*
         * We don't have to hold the mutex to check db_state because it
@@ -573,46 +605,51 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
        if (db->db_state == DB_NOFILL)
                return (EIO);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        if ((flags & DB_RF_HAVESTRUCT) == 0)
-               rw_enter(&db->db_dnode->dn_struct_rwlock, RW_READER);
+               rw_enter(&dn->dn_struct_rwlock, RW_READER);
 
        prefetch = db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
-           (flags & DB_RF_NOPREFETCH) == 0 && db->db_dnode != NULL &&
+           (flags & DB_RF_NOPREFETCH) == 0 && dn != NULL &&
            DBUF_IS_CACHEABLE(db);
 
        mutex_enter(&db->db_mtx);
        if (db->db_state == DB_CACHED) {
                mutex_exit(&db->db_mtx);
                if (prefetch)
-                       dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
+                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
                            db->db.db_size, TRUE);
                if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&db->db_dnode->dn_struct_rwlock);
+                       rw_exit(&dn->dn_struct_rwlock);
+               DB_DNODE_EXIT(db);
        } else if (db->db_state == DB_UNCACHED) {
-               if (zio == NULL) {
-                       zio = zio_root(db->db_dnode->dn_objset->os_spa,
-                           NULL, NULL, ZIO_FLAG_CANFAIL);
-               }
+               spa_t *spa = dn->dn_objset->os_spa;
+
+               if (zio == NULL)
+                       zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
                dbuf_read_impl(db, zio, &flags);
 
                /* dbuf_read_impl has dropped db_mtx for us */
 
                if (prefetch)
-                       dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
+                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
                            db->db.db_size, flags & DB_RF_CACHED);
 
                if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&db->db_dnode->dn_struct_rwlock);
+                       rw_exit(&dn->dn_struct_rwlock);
+               DB_DNODE_EXIT(db);
 
                if (!havepzio)
                        err = zio_wait(zio);
        } else {
                mutex_exit(&db->db_mtx);
                if (prefetch)
-                       dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
+                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
                            db->db.db_size, TRUE);
                if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&db->db_dnode->dn_struct_rwlock);
+                       rw_exit(&dn->dn_struct_rwlock);
+               DB_DNODE_EXIT(db);
 
                mutex_enter(&db->db_mtx);
                if ((flags & DB_RF_NEVERWAIT) == 0) {
@@ -642,11 +679,12 @@ dbuf_noread(dmu_buf_impl_t *db)
                cv_wait(&db->db_changed, &db->db_mtx);
        if (db->db_state == DB_UNCACHED) {
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
+               spa_t *spa;
 
                ASSERT(db->db_buf == NULL);
                ASSERT(db->db.db_data == NULL);
-               dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
-                   db->db.db_size, db, type));
+               DB_GET_SPA(&spa, db);
+               dbuf_set_data(db, arc_buf_alloc(spa, db->db.db_size, db, type));
                db->db_state = DB_FILL;
        } else if (db->db_state == DB_NOFILL) {
                dbuf_set_data(db, NULL);
@@ -687,7 +725,7 @@ dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
        /*
         * If the last dirty record for this dbuf has not yet synced
         * and its referencing the dbuf data, either:
-        *      reset the reference to point to a new copy,
+        *      reset the reference to point to a new copy,
         * or (if there a no active holders)
         *      just null out the current db_data pointer.
         */
@@ -700,8 +738,10 @@ dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
        } else if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
                int size = db->db.db_size;
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
-               dr->dt.dl.dr_data = arc_buf_alloc(
-                   db->db_dnode->dn_objset->os_spa, size, db, type);
+               spa_t *spa;
+
+               DB_GET_SPA(&spa, db);
+               dr->dt.dl.dr_data = arc_buf_alloc(spa, size, db, type);
                bcopy(db->db.db_data, dr->dt.dl.dr_data->b_data, size);
        } else {
                dbuf_set_data(db, NULL);
@@ -726,9 +766,12 @@ dbuf_unoverride(dbuf_dirty_record_t *dr)
        ASSERT(db->db_data_pending != dr);
 
        /* free this block */
-       if (!BP_IS_HOLE(bp))
-               zio_free(db->db_dnode->dn_objset->os_spa, txg, bp);
+       if (!BP_IS_HOLE(bp)) {
+               spa_t *spa;
 
+               DB_GET_SPA(&spa, db);
+               zio_free(spa, txg, bp);
+       }
        dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
        /*
         * Release the already-written buffer, so we leave it in
@@ -865,10 +908,15 @@ dbuf_block_freeable(dmu_buf_impl_t *db)
        else if (db->db_blkptr)
                birth_txg = db->db_blkptr->blk_birth;
 
-       /* If we don't exist or are in a snapshot, we can't be freed */
+       /*
+        * If we don't exist or are in a snapshot, we can't be freed.
+        * Don't pass the bp to dsl_dataset_block_freeable() since we
+        * are holding the db_mtx lock and might deadlock if we are
+        * prefetching a dedup-ed block.
+        */
        if (birth_txg)
                return (ds == NULL ||
-                   dsl_dataset_block_freeable(ds, db->db_blkptr, birth_txg));
+                   dsl_dataset_block_freeable(ds, NULL, birth_txg));
        else
                return (FALSE);
 }
@@ -879,11 +927,15 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        arc_buf_t *buf, *obuf;
        int osize = db->db.db_size;
        arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
+       dnode_t *dn;
 
        ASSERT(db->db_blkid != DMU_BONUS_BLKID);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+
        /* XXX does *this* func really need the lock? */
-       ASSERT(RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock));
+       ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
 
        /*
         * This call to dbuf_will_dirty() with the dn_struct_rwlock held
@@ -898,7 +950,7 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        dbuf_will_dirty(db, tx);
 
        /* create the data buffer for the new block */
-       buf = arc_buf_alloc(db->db_dnode->dn_objset->os_spa, size, db, type);
+       buf = arc_buf_alloc(dn->dn_objset->os_spa, size, db, type);
 
        /* copy old block data to the new block */
        obuf = db->db_buf;
@@ -918,15 +970,17 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        }
        mutex_exit(&db->db_mtx);
 
-       dnode_willuse_space(db->db_dnode, size-osize, tx);
+       dnode_willuse_space(dn, size-osize, tx);
+       DB_DNODE_EXIT(db);
 }
 
 void
 dbuf_release_bp(dmu_buf_impl_t *db)
 {
-       objset_t *os = db->db_dnode->dn_objset;
+       objset_t *os;
        zbookmark_t zb;
 
+       DB_GET_OBJSET(&os, db);
        ASSERT(dsl_pool_sync_context(dmu_objset_pool(os)));
        ASSERT(arc_released(os->os_phys_buf) ||
            list_link_active(&os->os_dsl_dataset->ds_synced_link));
@@ -944,8 +998,8 @@ dbuf_release_bp(dmu_buf_impl_t *db)
 dbuf_dirty_record_t *
 dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 {
-       dnode_t *dn = db->db_dnode;
-       objset_t *os = dn->dn_objset;
+       dnode_t *dn;
+       objset_t *os;
        dbuf_dirty_record_t **drp, *dr;
        int drop_struct_lock = FALSE;
        boolean_t do_free_accounting = B_FALSE;
@@ -955,6 +1009,8 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        ASSERT(!refcount_is_zero(&db->db_holds));
        DMU_TX_DIRTY_BUF(tx, db);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        /*
         * Shouldn't dirty a regular buffer in syncing context.  Private
         * objects may be dirtied in syncing context, but only if they
@@ -1009,6 +1065,8 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        while ((dr = *drp) != NULL && dr->dr_txg > tx->tx_txg)
                drp = &dr->dr_next;
        if (dr && dr->dr_txg == tx->tx_txg) {
+               DB_DNODE_EXIT(db);
+
                if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID) {
                        /*
                         * If this buffer has already been written out,
@@ -1044,6 +1102,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * we already dirtied it in open context.  Hence we must make
         * this assertion only if we're not already dirty.
         */
+       os = dn->dn_objset;
        ASSERT(!dmu_tx_is_syncing(tx) || DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
            os->os_dsl_dataset == NULL || BP_IS_HOLE(os->os_rootbp));
        ASSERT(db->db.db_size != 0);
@@ -1132,6 +1191,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
                mutex_exit(&dn->dn_mtx);
                dnode_setdirty(dn, tx);
+               DB_DNODE_EXIT(db);
                return (dr);
        } else if (do_free_accounting) {
                blkptr_t *bp = db->db_blkptr;
@@ -1145,6 +1205,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                 * db_blkptr, but since this is just a guess,
                 * it's OK if we get an odd answer.
                 */
+               ddt_prefetch(os->os_spa, bp);
                dnode_willuse_space(dn, -willfree, tx);
        }
 
@@ -1193,8 +1254,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        } else {
                ASSERT(db->db_level+1 == dn->dn_nlevels);
                ASSERT(db->db_blkid < dn->dn_nblkptr);
-               ASSERT(db->db_parent == NULL ||
-                   db->db_parent == db->db_dnode->dn_dbuf);
+               ASSERT(db->db_parent == NULL || db->db_parent == dn->dn_dbuf);
                mutex_enter(&dn->dn_mtx);
                ASSERT(!list_link_active(&dr->dr_dirty_node));
                list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
@@ -1204,13 +1264,14 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        }
 
        dnode_setdirty(dn, tx);
+       DB_DNODE_EXIT(db);
        return (dr);
 }
 
 static int
 dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        uint64_t txg = tx->tx_txg;
        dbuf_dirty_record_t *dr, **drp;
 
@@ -1231,6 +1292,9 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        ASSERT(dr->dr_txg == txg);
        ASSERT(dr->dr_dbuf == db);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+
        /*
         * If this buffer is currently held, we cannot undirty
         * it, since one of the current holders may be in the
@@ -1243,6 +1307,7 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                mutex_enter(&dn->dn_mtx);
                dnode_clear_range(dn, db->db_blkid, 1, tx);
                mutex_exit(&dn->dn_mtx);
+               DB_DNODE_EXIT(db);
                return (0);
        }
 
@@ -1264,6 +1329,7 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                list_remove(&dn->dn_dirty_records[txg & TXG_MASK], dr);
                mutex_exit(&dn->dn_mtx);
        }
+       DB_DNODE_EXIT(db);
 
        if (db->db_level == 0) {
                if (db->db_state != DB_NOFILL) {
@@ -1309,8 +1375,10 @@ dbuf_will_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        ASSERT(tx->tx_txg != 0);
        ASSERT(!refcount_is_zero(&db->db_holds));
 
-       if (RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock))
+       DB_DNODE_ENTER(db);
+       if (RW_WRITE_HELD(&DB_DNODE(db)->dn_struct_rwlock))
                rf |= DB_RF_HAVESTRUCT;
+       DB_DNODE_EXIT(db);
        (void) dbuf_read(db, NULL, rf);
        (void) dbuf_dirty(db, tx);
 }
@@ -1372,7 +1440,6 @@ void
 dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
 {
        ASSERT(!refcount_is_zero(&db->db_holds));
-       ASSERT(db->db_dnode->dn_object != DMU_META_DNODE_OBJECT);
        ASSERT(db->db_blkid != DMU_BONUS_BLKID);
        ASSERT(db->db_level == 0);
        ASSERT(DBUF_GET_BUFC_TYPE(db) == ARC_BUFC_DATA);
@@ -1436,7 +1503,7 @@ dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
  * in this case.  For callers from the DMU we will usually see:
  *     dbuf_clear()->arc_buf_evict()->dbuf_do_evict()->dbuf_destroy()
  * For the arc callback, we will usually see:
- *     dbuf_do_evict()->dbuf_clear();dbuf_destroy()
+ *     dbuf_do_evict()->dbuf_clear();dbuf_destroy()
  * Sometimes, though, we will get a mix of these two:
  *     DMU: dbuf_clear()->arc_buf_evict()
  *     ARC: dbuf_do_evict()->dbuf_destroy()
@@ -1444,9 +1511,9 @@ dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
 void
 dbuf_clear(dmu_buf_impl_t *db)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        dmu_buf_impl_t *parent = db->db_parent;
-       dmu_buf_impl_t *dndb = dn->dn_dbuf;
+       dmu_buf_impl_t *dndb;
        int dbuf_gone = FALSE;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
@@ -1470,10 +1537,26 @@ dbuf_clear(dmu_buf_impl_t *db)
        db->db_state = DB_EVICTING;
        db->db_blkptr = NULL;
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       dndb = dn->dn_dbuf;
        if (db->db_blkid != DMU_BONUS_BLKID && MUTEX_HELD(&dn->dn_dbufs_mtx)) {
                list_remove(&dn->dn_dbufs, db);
+               (void) atomic_dec_32_nv(&dn->dn_dbufs_count);
+               membar_producer();
+               DB_DNODE_EXIT(db);
+               /*
+                * Decrementing the dbuf count means that the hold corresponding
+                * to the removed dbuf is no longer discounted in dnode_move(),
+                * so the dnode cannot be moved until after we release the hold.
+                * The membar_producer() ensures visibility of the decremented
+                * value in dnode_move(), since DB_DNODE_EXIT doesn't actually
+                * release any lock.
+                */
                dnode_rele(dn, db);
-               db->db_dnode = NULL;
+               db->db_dnode_handle = NULL;
+       } else {
+               DB_DNODE_EXIT(db);
        }
 
        if (db->db_buf)
@@ -1483,7 +1566,7 @@ dbuf_clear(dmu_buf_impl_t *db)
                mutex_exit(&db->db_mtx);
 
        /*
-        * If this dbuf is referened from an indirect dbuf,
+        * If this dbuf is referenced from an indirect dbuf,
         * decrement the ref count on the indirect dbuf.
         */
        if (parent && parent != dndb)
@@ -1575,7 +1658,7 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        db->db_blkid = blkid;
        db->db_last_dirty = NULL;
        db->db_dirtycnt = 0;
-       db->db_dnode = dn;
+       db->db_dnode_handle = dn->dn_handle;
        db->db_parent = parent;
        db->db_blkptr = blkptr;
 
@@ -1632,6 +1715,7 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
            refcount_count(&dn->dn_holds) > 0);
        (void) refcount_add(&dn->dn_holds, db);
+       (void) atomic_inc_32_nv(&dn->dn_dbufs_count);
 
        dprintf_dbuf(db, "db=%p\n", db);
 
@@ -1671,15 +1755,24 @@ dbuf_destroy(dmu_buf_impl_t *db)
                 * If this dbuf is still on the dn_dbufs list,
                 * remove it from that list.
                 */
-               if (db->db_dnode) {
-                       dnode_t *dn = db->db_dnode;
+               if (db->db_dnode_handle != NULL) {
+                       dnode_t *dn;
 
+                       DB_DNODE_ENTER(db);
+                       dn = DB_DNODE(db);
                        mutex_enter(&dn->dn_dbufs_mtx);
                        list_remove(&dn->dn_dbufs, db);
+                       (void) atomic_dec_32_nv(&dn->dn_dbufs_count);
                        mutex_exit(&dn->dn_dbufs_mtx);
-
+                       DB_DNODE_EXIT(db);
+                       /*
+                        * Decrementing the dbuf count means that the hold
+                        * corresponding to the removed dbuf is no longer
+                        * discounted in dnode_move(), so the dnode cannot be
+                        * moved until after we release the hold.
+                        */
                        dnode_rele(dn, db);
-                       db->db_dnode = NULL;
+                       db->db_dnode_handle = NULL;
                }
                dbuf_hash_remove(db);
        }
@@ -1710,17 +1803,13 @@ dbuf_prefetch(dnode_t *dn, uint64_t blkid)
 
        /* dbuf_find() returns with db_mtx held */
        if (db = dbuf_find(dn, 0, blkid)) {
-               if (refcount_count(&db->db_holds) > 0) {
-                       /*
-                        * This dbuf is active.  We assume that it is
-                        * already CACHED, or else about to be either
-                        * read or filled.
-                        */
-                       mutex_exit(&db->db_mtx);
-                       return;
-               }
+               /*
+                * This dbuf is already in the cache.  We assume that
+                * it is already CACHED, or else about to be either
+                * read or filled.
+                */
                mutex_exit(&db->db_mtx);
-               db = NULL;
+               return;
        }
 
        if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp) == 0) {
@@ -1818,7 +1907,7 @@ top:
                        arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
 
                        dbuf_set_data(db,
-                           arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
+                           arc_buf_alloc(dn->dn_objset->os_spa,
                            db->db.db_size, db, type));
                        bcopy(dr->dt.dl.dr_data->b_data, db->db.db_data,
                            db->db.db_size);
@@ -1834,7 +1923,7 @@ top:
        if (parent)
                dbuf_rele(parent, NULL);
 
-       ASSERT3P(db->db_dnode, ==, dn);
+       ASSERT3P(DB_DNODE(db), ==, dn);
        ASSERT3U(db->db_blkid, ==, blkid);
        ASSERT3U(db->db_level, ==, level);
        *dbp = db;
@@ -1871,6 +1960,8 @@ int
 dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
+       dnode_t *dn;
+
        if (db->db_blkid != DMU_SPILL_BLKID)
                return (ENOTSUP);
        if (blksz == 0)
@@ -1880,9 +1971,12 @@ dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
        else
                blksz = P2ROUNDUP(blksz, SPA_MINBLOCKSIZE);
 
-       rw_enter(&db->db_dnode->dn_struct_rwlock, RW_WRITER);
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
        dbuf_new_size(db, blksz, tx);
-       rw_exit(&db->db_dnode->dn_struct_rwlock);
+       rw_exit(&dn->dn_struct_rwlock);
+       DB_DNODE_EXIT(db);
 
        return (0);
 }
@@ -1901,6 +1995,13 @@ dbuf_add_ref(dmu_buf_impl_t *db, void *tag)
        ASSERT(holds > 1);
 }
 
+/*
+ * If you call dbuf_rele() you had better not be referencing the dnode handle
+ * unless you have some other direct or indirect hold on the dnode. (An indirect
+ * hold is a hold on one of the dnode's dbufs, including the bonus buffer.)
+ * Without that, the dbuf_rele() could lead to a dnode_rele() followed by the
+ * dnode's parent dbuf evicting its dnode handles.
+ */
 #pragma weak dmu_buf_rele = dbuf_rele
 void
 dbuf_rele(dmu_buf_impl_t *db, void *tag)
@@ -1921,6 +2022,11 @@ dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag)
        ASSERT(MUTEX_HELD(&db->db_mtx));
        DBUF_VERIFY(db);
 
+       /*
+        * Remove the reference to the dbuf before removing its hold on the
+        * dnode so we can guarantee in dnode_move() that a referenced bonus
+        * buffer has a corresponding dnode hold.
+        */
        holds = refcount_remove(&db->db_holds, tag);
        ASSERT(holds >= 0);
 
@@ -1938,7 +2044,20 @@ dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag)
        if (holds == 0) {
                if (db->db_blkid == DMU_BONUS_BLKID) {
                        mutex_exit(&db->db_mtx);
-                       dnode_rele(db->db_dnode, db);
+
+                       /*
+                        * If the dnode moves here, we cannot cross this barrier
+                        * until the move completes.
+                        */
+                       DB_DNODE_ENTER(db);
+                       (void) atomic_dec_32_nv(&DB_DNODE(db)->dn_dbufs_count);
+                       DB_DNODE_EXIT(db);
+                       /*
+                        * The bonus buffer's dnode hold is no longer discounted
+                        * in dnode_move(). The dnode cannot move until after
+                        * the dnode_rele().
+                        */
+                       dnode_rele(DB_DNODE(db), db);
                } else if (db->db_buf == NULL) {
                        /*
                         * This is a special case: we never associated this
@@ -2089,7 +2208,7 @@ static void
 dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = dr->dr_dbuf;
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        zio_t *zio;
 
        ASSERT(dmu_tx_is_syncing(tx));
@@ -2107,10 +2226,13 @@ dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                mutex_enter(&db->db_mtx);
        }
        ASSERT3U(db->db_state, ==, DB_CACHED);
-       ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
        ASSERT(db->db_buf != NULL);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
        dbuf_check_blkptr(dn, db);
+       DB_DNODE_EXIT(db);
 
        db->db_data_pending = dr;
 
@@ -2130,8 +2252,8 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
 {
        arc_buf_t **datap = &dr->dt.dl.dr_data;
        dmu_buf_impl_t *db = dr->dr_dbuf;
-       dnode_t *dn = db->db_dnode;
-       objset_t *os = dn->dn_objset;
+       dnode_t *dn;
+       objset_t *os;
        uint64_t txg = tx->tx_txg;
 
        ASSERT(dmu_tx_is_syncing(tx));
@@ -2154,6 +2276,9 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        }
        DBUF_VERIFY(db);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+
        if (db->db_blkid == DMU_SPILL_BLKID) {
                mutex_enter(&dn->dn_mtx);
                dn->dn_phys->dn_flags |= DNODE_FLAG_SPILL_BLKPTR;
@@ -2173,6 +2298,8 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                ASSERT3U(db->db_level, ==, 0);
                ASSERT3U(dn->dn_phys->dn_bonuslen, <=, DN_MAX_BONUSLEN);
                bcopy(*datap, DN_BONUS(dn->dn_phys), dn->dn_phys->dn_bonuslen);
+               DB_DNODE_EXIT(db);
+
                if (*datap != db->db.db_data) {
                        zio_buf_free(*datap, DN_MAX_BONUSLEN);
                        arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
@@ -2191,6 +2318,8 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                return;
        }
 
+       os = dn->dn_objset;
+
        /*
         * This function may have dropped the db_mtx lock allowing a dmu_sync
         * operation to sneak in. As a result, we need to ensure that we
@@ -2200,7 +2329,7 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        dbuf_check_blkptr(dn, db);
 
        /*
-        * If this buffer is in the middle of an immdiate write,
+        * If this buffer is in the middle of an immediate write,
         * wait for the synchronous IO to complete.
         */
        while (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
@@ -2237,10 +2366,20 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        dbuf_write(dr, *datap, tx);
 
        ASSERT(!list_link_active(&dr->dr_dirty_node));
-       if (dn->dn_object == DMU_META_DNODE_OBJECT)
+       if (dn->dn_object == DMU_META_DNODE_OBJECT) {
                list_insert_tail(&dn->dn_dirty_records[txg&TXG_MASK], dr);
-       else
+               DB_DNODE_EXIT(db);
+       } else {
+               /*
+                * Although zio_nowait() does not "wait for an IO", it does
+                * initiate the IO. If this is an empty write it seems plausible
+                * that the IO could actually be completed before the nowait
+                * returns. We need to DB_DNODE_EXIT() first in case
+                * zio_nowait() invalidates the dbuf.
+                */
+               DB_DNODE_EXIT(db);
                zio_nowait(dr->dr_zio);
+       }
 }
 
 void
@@ -2274,9 +2413,9 @@ static void
 dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
 {
        dmu_buf_impl_t *db = vdb;
+       dnode_t *dn;
        blkptr_t *bp = zio->io_bp;
        blkptr_t *bp_orig = &zio->io_bp_orig;
-       dnode_t *dn = db->db_dnode;
        spa_t *spa = zio->io_spa;
        int64_t delta;
        uint64_t fill = 0;
@@ -2284,12 +2423,15 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
 
        ASSERT(db->db_blkptr == bp);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        delta = bp_get_dsize_sync(spa, bp) - bp_get_dsize_sync(spa, bp_orig);
        dnode_diduse_space(dn, delta - zio->io_prev_space_delta);
        zio->io_prev_space_delta = delta;
 
        if (BP_IS_HOLE(bp)) {
                ASSERT(bp->blk_fill == 0);
+               DB_DNODE_EXIT(db);
                return;
        }
 
@@ -2303,7 +2445,6 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
 
 #ifdef ZFS_DEBUG
        if (db->db_blkid == DMU_SPILL_BLKID) {
-               dnode_t *dn = db->db_dnode;
                ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
                ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
                    db->db_blkptr == &dn->dn_phys->dn_spill);
@@ -2336,6 +2477,7 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
                        fill += ibp->blk_fill;
                }
        }
+       DB_DNODE_EXIT(db);
 
        bp->blk_fill = fill;
 
@@ -2349,8 +2491,6 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
        dmu_buf_impl_t *db = vdb;
        blkptr_t *bp = zio->io_bp;
        blkptr_t *bp_orig = &zio->io_bp_orig;
-       dnode_t *dn = db->db_dnode;
-       objset_t *os = dn->dn_objset;
        uint64_t txg = zio->io_txg;
        dbuf_dirty_record_t **drp, *dr;
 
@@ -2360,8 +2500,13 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
        if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
                ASSERT(BP_EQUAL(bp, bp_orig));
        } else {
-               dsl_dataset_t *ds = os->os_dsl_dataset;
-               dmu_tx_t *tx = os->os_synctx;
+               objset_t *os;
+               dsl_dataset_t *ds;
+               dmu_tx_t *tx;
+
+               DB_GET_OBJSET(&os, db);
+               ds = os->os_dsl_dataset;
+               tx = os->os_synctx;
 
                (void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
                dsl_dataset_block_born(ds, bp, tx);
@@ -2382,10 +2527,14 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
 
 #ifdef ZFS_DEBUG
        if (db->db_blkid == DMU_SPILL_BLKID) {
-               dnode_t *dn = db->db_dnode;
+               dnode_t *dn;
+
+               DB_DNODE_ENTER(db);
+               dn = DB_DNODE(db);
                ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
                ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
                    db->db_blkptr == &dn->dn_phys->dn_spill);
+               DB_DNODE_EXIT(db);
        }
 #endif
 
@@ -2400,6 +2549,10 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                                arc_set_callback(db->db_buf, dbuf_do_evict, db);
                }
        } else {
+               dnode_t *dn;
+
+               DB_DNODE_ENTER(db);
+               dn = DB_DNODE(db);
                ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
                ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
                if (!BP_IS_HOLE(db->db_blkptr)) {
@@ -2411,6 +2564,7 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                            >> (db->db_level * epbs), >=, db->db_blkid);
                        arc_set_callback(db->db_buf, dbuf_do_evict, db);
                }
+               DB_DNODE_EXIT(db);
                mutex_destroy(&dr->dt.di.dr_mtx);
                list_destroy(&dr->dt.di.dr_children);
        }
@@ -2466,8 +2620,8 @@ static void
 dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = dr->dr_dbuf;
-       dnode_t *dn = db->db_dnode;
-       objset_t *os = dn->dn_objset;
+       dnode_t *dn;
+       objset_t *os;
        dmu_buf_impl_t *parent = db->db_parent;
        uint64_t txg = tx->tx_txg;
        zbookmark_t zb;
@@ -2475,6 +2629,10 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
        zio_t *zio;
        int wp_flag = 0;
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       os = dn->dn_objset;
+
        if (db->db_state != DB_NOFILL) {
                if (db->db_level > 0 || dn->dn_type == DMU_OT_DNODE) {
                        /*
@@ -2519,6 +2677,7 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
        wp_flag |= (db->db_state == DB_NOFILL) ? WP_NOFILL : 0;
 
        dmu_write_policy(os, dn, db->db_level, wp_flag, &zp);
+       DB_DNODE_EXIT(db);
 
        if (db->db_level == 0 && dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
                ASSERT(db->db_state != DB_NOFILL);