]> git.proxmox.com Git - mirror_zfs.git/commitdiff
Decrease contention on dn_struct_rwlock
authorPaul Dagnelie <pcd@delphix.com>
Mon, 8 Jul 2019 20:18:50 +0000 (13:18 -0700)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Mon, 8 Jul 2019 20:18:50 +0000 (13:18 -0700)
Currently, sequential async write workloads spend a lot of time
contending on the dn_struct_rwlock. This lock is responsible for
protecting the entire block tree below it; this naturally results
in some serialization during heavy write workloads. This can be
resolved by having per-dbuf locking, which will allow multiple
writers in the same object at the same time.

We introduce a new rwlock, the db_rwlock. This lock is responsible
for protecting the contents of the dbuf that it is a part of; when
reading a block pointer from a dbuf, you hold the lock as a reader.
When writing data to a dbuf, you hold it as a writer. This allows
multiple threads to write to different parts of a file at the same
time.

Reviewed by: Brad Lewis <brad.lewis@delphix.com>
Reviewed by: Matt Ahrens matt@delphix.com
Reviewed by: George Wilson george.wilson@delphix.com
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Paul Dagnelie <pcd@delphix.com>
External-issue: DLPX-52564
External-issue: DLPX-53085
External-issue: DLPX-57384
Closes #8946

include/sys/dbuf.h
include/sys/dmu_zfetch.h
module/zfs/dbuf.c
module/zfs/dmu.c
module/zfs/dmu_zfetch.c
module/zfs/dnode.c
module/zfs/dnode_sync.c

index a2f6062655faa0be21aa09ba23718ba04d8f4bda..53f9020f92f6f00213102a543a2c133547d2b6d9 100644 (file)
@@ -108,6 +108,12 @@ typedef enum override_states {
        DR_OVERRIDDEN
 } override_states_t;
 
+typedef enum db_lock_type {
+       DLT_NONE,
+       DLT_PARENT,
+       DLT_OBJSET
+} db_lock_type_t;
+
 typedef struct dbuf_dirty_record {
        /* link on our parents dirty list */
        list_node_t dr_dirty_node;
@@ -217,6 +223,22 @@ typedef struct dmu_buf_impl {
         */
        uint8_t db_level;
 
+       /*
+        * Protects db_buf's contents if they contain an indirect block or data
+        * block of the meta-dnode. We use this lock to protect the structure of
+        * the block tree. This means that when modifying this dbuf's data, we
+        * grab its rwlock. When modifying its parent's data (including the
+        * blkptr to this dbuf), we grab the parent's rwlock. The lock ordering
+        * for this lock is:
+        * 1) dn_struct_rwlock
+        * 2) db_rwlock
+        * We don't currently grab multiple dbufs' db_rwlocks at once.
+        */
+       krwlock_t db_rwlock;
+
+       /* buffer holding our data */
+       arc_buf_t *db_buf;
+
        /* db_mtx protects the members below */
        kmutex_t db_mtx;
 
@@ -232,9 +254,6 @@ typedef struct dmu_buf_impl {
         */
        zfs_refcount_t db_holds;
 
-       /* buffer holding our data */
-       arc_buf_t *db_buf;
-
        kcondvar_t db_changed;
        dbuf_dirty_record_t *db_data_pending;
 
@@ -335,6 +354,8 @@ void dbuf_destroy(dmu_buf_impl_t *db);
 void dbuf_unoverride(dbuf_dirty_record_t *dr);
 void dbuf_sync_list(list_t *list, int level, dmu_tx_t *tx);
 void dbuf_release_bp(dmu_buf_impl_t *db);
+db_lock_type_t dmu_buf_lock_parent(dmu_buf_impl_t *db, krw_t rw, void *tag);
+void dmu_buf_unlock_parent(dmu_buf_impl_t *db, db_lock_type_t type, void *tag);
 
 void dbuf_free_range(struct dnode *dn, uint64_t start, uint64_t end,
     struct dmu_tx *);
index 8125d07062398c6865747deba4ebdeec126894b2..1e539086d4c8258795ecb5bbc80a609b964cefee 100644 (file)
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2014, 2017 by Delphix. All rights reserved.
  */
 
 #ifndef        _DMU_ZFETCH_H
@@ -66,7 +66,8 @@ void          zfetch_fini(void);
 
 void           dmu_zfetch_init(zfetch_t *, struct dnode *);
 void           dmu_zfetch_fini(zfetch_t *);
-void           dmu_zfetch(zfetch_t *, uint64_t, uint64_t, boolean_t);
+void           dmu_zfetch(zfetch_t *, uint64_t, uint64_t, boolean_t,
+    boolean_t);
 
 
 #ifdef __cplusplus
index abfae29ade540068666f821bf71f72fa6b7777fe..31b9b1481b86fccbfe3bc93f73f9674b2f4a6c29 100644 (file)
@@ -21,7 +21,7 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
- * Copyright (c) 2012, 2018 by Delphix. All rights reserved.
+ * Copyright (c) 2012, 2019 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
  */
@@ -287,6 +287,7 @@ dbuf_cons(void *vdb, void *unused, int kmflag)
        bzero(db, sizeof (dmu_buf_impl_t));
 
        mutex_init(&db->db_mtx, NULL, MUTEX_DEFAULT, NULL);
+       rw_init(&db->db_rwlock, NULL, RW_DEFAULT, NULL);
        cv_init(&db->db_changed, NULL, CV_DEFAULT, NULL);
        multilist_link_init(&db->db_cache_link);
        zfs_refcount_create(&db->db_holds);
@@ -300,6 +301,7 @@ dbuf_dest(void *vdb, void *unused)
 {
        dmu_buf_impl_t *db = vdb;
        mutex_destroy(&db->db_mtx);
+       rw_destroy(&db->db_rwlock);
        cv_destroy(&db->db_changed);
        ASSERT(!multilist_link_active(&db->db_cache_link));
        zfs_refcount_destroy(&db->db_holds);
@@ -1014,10 +1016,10 @@ dbuf_verify(dmu_buf_impl_t *db)
                            db->db.db_object);
                        /*
                         * dnode_grow_indblksz() can make this fail if we don't
-                        * have the struct_rwlock.  XXX indblksz no longer
+                        * have the parent's rwlock.  XXX indblksz no longer
                         * grows.  safe to do this now?
                         */
-                       if (RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
+                       if (RW_LOCK_HELD(&db->db_parent->db_rwlock)) {
                                ASSERT3P(db->db_blkptr, ==,
                                    ((blkptr_t *)db->db_parent->db.db_data +
                                    db->db_blkid % epb));
@@ -1177,6 +1179,44 @@ dbuf_whichblock(const dnode_t *dn, const int64_t level, const uint64_t offset)
        }
 }
 
+/*
+ * This function is used to lock the parent of the provided dbuf. This should be
+ * used when modifying or reading db_blkptr.
+ */
+db_lock_type_t
+dmu_buf_lock_parent(dmu_buf_impl_t *db, krw_t rw, void *tag)
+{
+       enum db_lock_type ret = DLT_NONE;
+       if (db->db_parent != NULL) {
+               rw_enter(&db->db_parent->db_rwlock, rw);
+               ret = DLT_PARENT;
+       } else if (dmu_objset_ds(db->db_objset) != NULL) {
+               rrw_enter(&dmu_objset_ds(db->db_objset)->ds_bp_rwlock, rw,
+                   tag);
+               ret = DLT_OBJSET;
+       }
+       /*
+        * We only return a DLT_NONE lock when it's the top-most indirect block
+        * of the meta-dnode of the MOS.
+        */
+       return (ret);
+}
+
+/*
+ * We need to pass the lock type in because it's possible that the block will
+ * move from being the topmost indirect block in a dnode (and thus, have no
+ * parent) to not the top-most via an indirection increase. This would cause a
+ * panic if we didn't pass the lock type in.
+ */
+void
+dmu_buf_unlock_parent(dmu_buf_impl_t *db, db_lock_type_t type, void *tag)
+{
+       if (type == DLT_PARENT)
+               rw_exit(&db->db_parent->db_rwlock);
+       else if (type == DLT_OBJSET)
+               rrw_exit(&dmu_objset_ds(db->db_objset)->ds_bp_rwlock, tag);
+}
+
 static void
 dbuf_read_done(zio_t *zio, const zbookmark_phys_t *zb, const blkptr_t *bp,
     arc_buf_t *buf, void *vdb)
@@ -1273,8 +1313,13 @@ dbuf_read_verify_dnode_crypt(dmu_buf_impl_t *db, uint32_t flags)
        return (err);
 }
 
+/*
+ * Drops db_mtx and the parent lock specified by dblt and tag before
+ * returning.
+ */
 static int
-dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
+dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags,
+    db_lock_type_t dblt, void *tag)
 {
        dnode_t *dn;
        zbookmark_phys_t zb;
@@ -1284,11 +1329,11 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
        DB_DNODE_ENTER(db);
        dn = DB_DNODE(db);
        ASSERT(!zfs_refcount_is_zero(&db->db_holds));
-       /* We need the struct_rwlock to prevent db_blkptr from changing. */
-       ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
        ASSERT(MUTEX_HELD(&db->db_mtx));
        ASSERT(db->db_state == DB_UNCACHED);
        ASSERT(db->db_buf == NULL);
+       ASSERT(db->db_parent == NULL ||
+           RW_LOCK_HELD(&db->db_parent->db_rwlock));
 
        if (db->db_blkid == DMU_BONUS_BLKID) {
                /*
@@ -1316,6 +1361,7 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                DB_DNODE_EXIT(db);
                db->db_state = DB_CACHED;
                mutex_exit(&db->db_mtx);
+               dmu_buf_unlock_parent(db, dblt, tag);
                return (0);
        }
 
@@ -1356,6 +1402,7 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                DB_DNODE_EXIT(db);
                db->db_state = DB_CACHED;
                mutex_exit(&db->db_mtx);
+               dmu_buf_unlock_parent(db, dblt, tag);
                return (0);
        }
 
@@ -1387,12 +1434,14 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                    "object set %llu", dmu_objset_id(db->db_objset));
                DB_DNODE_EXIT(db);
                mutex_exit(&db->db_mtx);
+               dmu_buf_unlock_parent(db, dblt, tag);
                return (SET_ERROR(EIO));
        }
 
        err = dbuf_read_verify_dnode_crypt(db, flags);
        if (err != 0) {
                DB_DNODE_EXIT(db);
+               dmu_buf_unlock_parent(db, dblt, tag);
                mutex_exit(&db->db_mtx);
                return (err);
        }
@@ -1412,11 +1461,18 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
 
        if ((flags & DB_RF_NO_DECRYPT) && BP_IS_PROTECTED(db->db_blkptr))
                zio_flags |= ZIO_FLAG_RAW;
-
-       err = arc_read(zio, db->db_objset->os_spa, db->db_blkptr,
+       /*
+        * The zio layer will copy the provided blkptr later, but we need to
+        * do this now so that we can release the parent's rwlock. We have to
+        * do that now so that if dbuf_read_done is called synchronously (on
+        * an l1 cache hit) we don't acquire the db_mtx while holding the
+        * parent's rwlock, which would be a lock ordering violation.
+        */
+       blkptr_t bp = *db->db_blkptr;
+       dmu_buf_unlock_parent(db, dblt, tag);
+       (void) arc_read(zio, db->db_objset->os_spa, &bp,
            dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ, zio_flags,
            &aflags, &zb);
-
        return (err);
 }
 
@@ -1514,8 +1570,6 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
 
        DB_DNODE_ENTER(db);
        dn = DB_DNODE(db);
-       if ((flags & DB_RF_HAVESTRUCT) == 0)
-               rw_enter(&dn->dn_struct_rwlock, RW_READER);
 
        prefetch = db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
            (flags & DB_RF_NOPREFETCH) == 0 && dn != NULL &&
@@ -1552,30 +1606,33 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                        dbuf_set_data(db, db->db_buf);
                }
                mutex_exit(&db->db_mtx);
-               if (err == 0 && prefetch)
-                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
-               if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&dn->dn_struct_rwlock);
+               if (err == 0 && prefetch) {
+                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE,
+                           flags & DB_RF_HAVESTRUCT);
+               }
                DB_DNODE_EXIT(db);
                DBUF_STAT_BUMP(hash_hits);
        } else if (db->db_state == DB_UNCACHED) {
                spa_t *spa = dn->dn_objset->os_spa;
                boolean_t need_wait = B_FALSE;
 
+               db_lock_type_t dblt = dmu_buf_lock_parent(db, RW_READER, FTAG);
+
                if (zio == NULL &&
                    db->db_blkptr != NULL && !BP_IS_HOLE(db->db_blkptr)) {
                        zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
                        need_wait = B_TRUE;
                }
-               err = dbuf_read_impl(db, zio, flags);
-
-               /* dbuf_read_impl has dropped db_mtx for us */
-
-               if (!err && prefetch)
-                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
+               err = dbuf_read_impl(db, zio, flags, dblt, FTAG);
+               /*
+                * dbuf_read_impl has dropped db_mtx and our parent's rwlock
+                * for us
+                */
+               if (!err && prefetch) {
+                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE,
+                           flags & DB_RF_HAVESTRUCT);
+               }
 
-               if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&dn->dn_struct_rwlock);
                DB_DNODE_EXIT(db);
                DBUF_STAT_BUMP(hash_misses);
 
@@ -1600,10 +1657,10 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
                 * occurred and the dbuf went to UNCACHED.
                 */
                mutex_exit(&db->db_mtx);
-               if (prefetch)
-                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
-               if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&dn->dn_struct_rwlock);
+               if (prefetch) {
+                       dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE,
+                           flags & DB_RF_HAVESTRUCT);
+               }
                DB_DNODE_EXIT(db);
                DBUF_STAT_BUMP(hash_misses);
 
@@ -1785,7 +1842,9 @@ dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
                if (db->db_state == DB_CACHED) {
                        ASSERT(db->db.db_data != NULL);
                        arc_release(db->db_buf, db);
+                       rw_enter(&db->db_rwlock, RW_WRITER);
                        bzero(db->db.db_data, db->db.db_size);
+                       rw_exit(&db->db_rwlock);
                        arc_buf_freeze(db->db_buf);
                }
 
@@ -1809,15 +1868,6 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        DB_DNODE_ENTER(db);
        dn = DB_DNODE(db);
 
-       /* XXX does *this* func really need the lock? */
-       ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
-
-       /*
-        * This call to dmu_buf_will_dirty() with the dn_struct_rwlock held
-        * is OK, because there can be no other references to the db
-        * when we are changing its size, so no concurrent DB_FILL can
-        * be happening.
-        */
        /*
         * XXX we should be doing a dbuf_read, checking the return
         * value and returning that up to our callers
@@ -1894,8 +1944,8 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        dnode_t *dn;
        objset_t *os;
        dbuf_dirty_record_t **drp, *dr;
-       int drop_struct_lock = FALSE;
        int txgoff = tx->tx_txg & TXG_MASK;
+       boolean_t drop_struct_rwlock = B_FALSE;
 
        ASSERT(tx->tx_txg != 0);
        ASSERT(!zfs_refcount_is_zero(&db->db_holds));
@@ -2098,15 +2148,21 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                return (dr);
        }
 
-       /*
-        * The dn_struct_rwlock prevents db_blkptr from changing
-        * due to a write from syncing context completing
-        * while we are running, so we want to acquire it before
-        * looking at db_blkptr.
-        */
        if (!RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
                rw_enter(&dn->dn_struct_rwlock, RW_READER);
-               drop_struct_lock = TRUE;
+               drop_struct_rwlock = B_TRUE;
+       }
+
+       /*
+        * If we are overwriting a dedup BP, then unless it is snapshotted,
+        * when we get to syncing context we will need to decrement its
+        * refcount in the DDT.  Prefetch the relevant DDT block so that
+        * syncing context won't have to wait for the i/o.
+        */
+       if (db->db_blkptr != NULL) {
+               db_lock_type_t dblt = dmu_buf_lock_parent(db, RW_READER, FTAG);
+               ddt_prefetch(os->os_spa, db->db_blkptr);
+               dmu_buf_unlock_parent(db, dblt, FTAG);
        }
 
        /*
@@ -2119,19 +2175,12 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
            dn->dn_next_nlevels[(tx->tx_txg-1) & TXG_MASK] > db->db_level ||
            dn->dn_next_nlevels[(tx->tx_txg-2) & TXG_MASK] > db->db_level);
 
-       /*
-        * If we are overwriting a dedup BP, then unless it is snapshotted,
-        * when we get to syncing context we will need to decrement its
-        * refcount in the DDT.  Prefetch the relevant DDT block so that
-        * syncing context won't have to wait for the i/o.
-        */
-       ddt_prefetch(os->os_spa, db->db_blkptr);
 
        if (db->db_level == 0) {
                ASSERT(!db->db_objset->os_raw_receive ||
                    dn->dn_maxblkid >= db->db_blkid);
                dnode_new_blkid(dn, db->db_blkid, tx,
-                   drop_struct_lock, B_FALSE);
+                   drop_struct_rwlock, B_FALSE);
                ASSERT(dn->dn_maxblkid >= db->db_blkid);
        }
 
@@ -2142,15 +2191,14 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 
                if (db->db_parent == NULL || db->db_parent == dn->dn_dbuf) {
                        int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
-
-                       parent = dbuf_hold_level(dn, db->db_level+1,
+                       parent = dbuf_hold_level(dn, db->db_level + 1,
                            db->db_blkid >> epbs, FTAG);
                        ASSERT(parent != NULL);
                        parent_held = TRUE;
                }
-               if (drop_struct_lock)
+               if (drop_struct_rwlock)
                        rw_exit(&dn->dn_struct_rwlock);
-               ASSERT3U(db->db_level+1, ==, parent->db_level);
+               ASSERT3U(db->db_level + 1, ==, parent->db_level);
                di = dbuf_dirty(parent, tx);
                if (parent_held)
                        dbuf_rele(parent, FTAG);
@@ -2171,14 +2219,14 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                }
                mutex_exit(&db->db_mtx);
        } else {
-               ASSERT(db->db_level+1 == dn->dn_nlevels);
+               ASSERT(db->db_level + 1 == dn->dn_nlevels);
                ASSERT(db->db_blkid < dn->dn_nblkptr);
                ASSERT(db->db_parent == NULL || db->db_parent == dn->dn_dbuf);
                mutex_enter(&dn->dn_mtx);
                ASSERT(!list_link_active(&dr->dr_dirty_node));
                list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
                mutex_exit(&dn->dn_mtx);
-               if (drop_struct_lock)
+               if (drop_struct_rwlock)
                        rw_exit(&dn->dn_struct_rwlock);
        }
 
@@ -2767,10 +2815,12 @@ dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
                        *parentp = NULL;
                        return (err);
                }
+               rw_enter(&(*parentp)->db_rwlock, RW_READER);
                *bpp = ((blkptr_t *)(*parentp)->db.db_data) +
                    (blkid & ((1ULL << epbs) - 1));
                if (blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))
                        ASSERT(BP_IS_HOLE(*bpp));
+               rw_exit(&(*parentp)->db_rwlock);
                return (0);
        } else {
                /* the block is referenced from the dnode */
@@ -3060,7 +3110,7 @@ dbuf_prefetch(dnode_t *dn, int64_t level, uint64_t blkid, zio_priority_t prio,
        if (blkid > dn->dn_maxblkid)
                return;
 
-       if (dnode_block_freed(dn, blkid))
+       if (level == 0 && dnode_block_freed(dn, blkid))
                return;
 
        /*
@@ -3215,7 +3265,9 @@ dbuf_hold_copy(struct dbuf_hold_arg *dh)
                    DBUF_GET_BUFC_TYPE(db), db->db.db_size));
        }
 
+       rw_enter(&db->db_rwlock, RW_WRITER);
        bcopy(data->b_data, db->db.db_data, arc_buf_size(data));
+       rw_exit(&db->db_rwlock);
 }
 
 /*
@@ -3406,7 +3458,6 @@ int
 dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
-       dnode_t *dn;
 
        if (db->db_blkid != DMU_SPILL_BLKID)
                return (SET_ERROR(ENOTSUP));
@@ -3415,12 +3466,7 @@ dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
        ASSERT3U(blksz, <=, spa_maxblocksize(dmu_objset_spa(db->db_objset)));
        blksz = P2ROUNDUP(blksz, SPA_MINBLOCKSIZE);
 
-       DB_DNODE_ENTER(db);
-       dn = DB_DNODE(db);
-       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
        dbuf_new_size(db, blksz, tx);
-       rw_exit(&dn->dn_struct_rwlock);
-       DB_DNODE_EXIT(db);
 
        return (0);
 }
@@ -4183,9 +4229,9 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
 
        mutex_exit(&db->db_mtx);
 
-       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+       db_lock_type_t dblt = dmu_buf_lock_parent(db, RW_WRITER, FTAG);
        *db->db_blkptr = *bp;
-       rw_exit(&dn->dn_struct_rwlock);
+       dmu_buf_unlock_parent(db, dblt, FTAG);
 }
 
 /* ARGSUSED */
@@ -4226,9 +4272,9 @@ dbuf_write_children_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
                 * anybody from reading the blocks we're about to
                 * zero out.
                 */
-               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+               rw_enter(&db->db_rwlock, RW_WRITER);
                bzero(db->db.db_data, db->db.db_size);
-               rw_exit(&dn->dn_struct_rwlock);
+               rw_exit(&db->db_rwlock);
        }
        DB_DNODE_EXIT(db);
 }
@@ -4419,7 +4465,7 @@ dbuf_remap_impl_callback(uint64_t vdev, uint64_t offset, uint64_t size,
 }
 
 static void
-dbuf_remap_impl(dnode_t *dn, blkptr_t *bp, dmu_tx_t *tx)
+dbuf_remap_impl(dnode_t *dn, blkptr_t *bp, krwlock_t *rw, dmu_tx_t *tx)
 {
        blkptr_t bp_copy = *bp;
        spa_t *spa = dmu_objset_spa(dn->dn_objset);
@@ -4433,14 +4479,16 @@ dbuf_remap_impl(dnode_t *dn, blkptr_t *bp, dmu_tx_t *tx)
        if (spa_remap_blkptr(spa, &bp_copy, dbuf_remap_impl_callback,
            &drica)) {
                /*
-                * The struct_rwlock prevents dbuf_read_impl() from
+                * The db_rwlock prevents dbuf_read_impl() from
                 * dereferencing the BP while we are changing it.  To
                 * avoid lock contention, only grab it when we are actually
                 * changing the BP.
                 */
-               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+               if (rw != NULL)
+                       rw_enter(rw, RW_WRITER);
                *bp = bp_copy;
-               rw_exit(&dn->dn_struct_rwlock);
+               if (rw != NULL)
+                       rw_exit(rw);
        }
 }
 
@@ -4459,7 +4507,7 @@ dbuf_remap(dnode_t *dn, dmu_buf_impl_t *db, dmu_tx_t *tx)
        if (db->db_level > 0) {
                blkptr_t *bp = db->db.db_data;
                for (int i = 0; i < db->db.db_size >> SPA_BLKPTRSHIFT; i++) {
-                       dbuf_remap_impl(dn, &bp[i], tx);
+                       dbuf_remap_impl(dn, &bp[i], &db->db_rwlock, tx);
                }
        } else if (db->db.db_object == DMU_META_DNODE_OBJECT) {
                dnode_phys_t *dnp = db->db.db_data;
@@ -4468,7 +4516,10 @@ dbuf_remap(dnode_t *dn, dmu_buf_impl_t *db, dmu_tx_t *tx)
                for (int i = 0; i < db->db.db_size >> DNODE_SHIFT;
                    i += dnp[i].dn_extra_slots + 1) {
                        for (int j = 0; j < dnp[i].dn_nblkptr; j++) {
-                               dbuf_remap_impl(dn, &dnp[i].dn_blkptr[j], tx);
+                               krwlock_t *lock = (dn->dn_dbuf == NULL ? NULL :
+                                   &dn->dn_dbuf->db_rwlock);
+                               dbuf_remap_impl(dn, &dnp[i].dn_blkptr[j], lock,
+                                   tx);
                        }
                }
        }
index 0722f5c5e7f0e92158acd98114059f184b9eb0a8..955588fb7b6afee0295df217e833416aaad9abac 100644 (file)
@@ -158,8 +158,8 @@ dmu_buf_hold_noread_by_dnode(dnode_t *dn, uint64_t offset,
        uint64_t blkid;
        dmu_buf_impl_t *db;
 
-       blkid = dbuf_whichblock(dn, 0, offset);
        rw_enter(&dn->dn_struct_rwlock, RW_READER);
+       blkid = dbuf_whichblock(dn, 0, offset);
        db = dbuf_hold(dn, blkid, tag);
        rw_exit(&dn->dn_struct_rwlock);
 
@@ -183,8 +183,8 @@ dmu_buf_hold_noread(objset_t *os, uint64_t object, uint64_t offset,
        err = dnode_hold(os, object, FTAG, &dn);
        if (err)
                return (err);
-       blkid = dbuf_whichblock(dn, 0, offset);
        rw_enter(&dn->dn_struct_rwlock, RW_READER);
+       blkid = dbuf_whichblock(dn, 0, offset);
        db = dbuf_hold(dn, blkid, tag);
        rw_exit(&dn->dn_struct_rwlock);
        dnode_rele(dn, FTAG);
@@ -549,7 +549,7 @@ dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
        if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
            DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
                dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
-                   read && DNODE_IS_CACHEABLE(dn));
+                   read && DNODE_IS_CACHEABLE(dn), B_TRUE);
        }
        rw_exit(&dn->dn_struct_rwlock);
 
@@ -681,7 +681,6 @@ dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
        if (err != 0)
                return;
 
-       rw_enter(&dn->dn_struct_rwlock, RW_READER);
        /*
         * offset + len - 1 is the last byte we want to prefetch for, and offset
         * is the first.  Then dbuf_whichblk(dn, level, off + len - 1) is the
@@ -689,6 +688,7 @@ dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
         * offset)  is the first.  Then the number we need to prefetch is the
         * last - first + 1.
         */
+       rw_enter(&dn->dn_struct_rwlock, RW_READER);
        if (level > 0 || dn->dn_datablkshift != 0) {
                nblks = dbuf_whichblock(dn, level, offset + len - 1) -
                    dbuf_whichblock(dn, level, offset) + 1;
@@ -701,7 +701,6 @@ dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
                for (int i = 0; i < nblks; i++)
                        dbuf_prefetch(dn, level, blkid + i, pri, 0);
        }
-
        rw_exit(&dn->dn_struct_rwlock);
 
        dnode_rele(dn, FTAG);
index 364e4d7aa86751ccf6501c72b7747b684d074a5b..ee771c9fa9a05ec76cde69e550f5a9aa05cece9d 100644 (file)
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2013, 2015 by Delphix. All rights reserved.
+ * Copyright (c) 2013, 2017 by Delphix. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
@@ -205,7 +205,8 @@ dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)
  *   TRUE -- prefetch predicted data blocks plus following indirect blocks.
  */
 void
-dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data)
+dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
+    boolean_t have_lock)
 {
        zstream_t *zs;
        int64_t pf_start, ipf_start, ipf_istart, ipf_iend;
@@ -236,6 +237,8 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data)
                return;
 
 retry:
+       if (!have_lock)
+               rw_enter(&zf->zf_dnode->dn_struct_rwlock, RW_READER);
        rw_enter(&zf->zf_rwlock, rw);
 
        /*
@@ -260,6 +263,10 @@ retry:
                                        /* Already prefetched this before. */
                                        mutex_exit(&zs->zs_lock);
                                        rw_exit(&zf->zf_rwlock);
+                                       if (!have_lock) {
+                                               rw_exit(&zf->zf_dnode->
+                                                   dn_struct_rwlock);
+                                       }
                                        return;
                                }
                                break;
@@ -276,12 +283,16 @@ retry:
                ZFETCHSTAT_BUMP(zfetchstat_misses);
                if (rw == RW_READER && !rw_tryupgrade(&zf->zf_rwlock)) {
                        rw_exit(&zf->zf_rwlock);
+                       if (!have_lock)
+                               rw_exit(&zf->zf_dnode->dn_struct_rwlock);
                        rw = RW_WRITER;
                        goto retry;
                }
 
                dmu_zfetch_stream_create(zf, end_of_access_blkid);
                rw_exit(&zf->zf_rwlock);
+               if (!have_lock)
+                       rw_exit(&zf->zf_dnode->dn_struct_rwlock);
                return;
        }
 
@@ -361,6 +372,8 @@ retry:
                dbuf_prefetch(zf->zf_dnode, 1, iblk,
                    ZIO_PRIORITY_ASYNC_READ, ARC_FLAG_PREDICTIVE_PREFETCH);
        }
+       if (!have_lock)
+               rw_exit(&zf->zf_dnode->dn_struct_rwlock);
        ZFETCHSTAT_BUMP(zfetchstat_hits);
 }
 
index c06f614e1993673983dc8fe664835a1ac5f2e0e3..4d654e9e72521ba8815f2632b87c5fc1963ff6c0 100644 (file)
@@ -1331,7 +1331,6 @@ dnode_hold_impl(objset_t *os, uint64_t object, int flag, int slots,
        }
 
        blk = dbuf_whichblock(mdn, 0, object * sizeof (dnode_phys_t));
-
        db = dbuf_hold(mdn, blk, FTAG);
        if (drop_struct_lock)
                rw_exit(&mdn->dn_struct_rwlock);
@@ -1742,10 +1741,11 @@ dnode_set_blksz(dnode_t *dn, uint64_t size, int ibs, dmu_tx_t *tx)
 
        /* resize the old block */
        err = dbuf_hold_impl(dn, 0, 0, TRUE, FALSE, FTAG, &db);
-       if (err == 0)
+       if (err == 0) {
                dbuf_new_size(db, size, tx);
-       else if (err != ENOENT)
+       } else if (err != ENOENT) {
                goto fail;
+       }
 
        dnode_setdblksz(dn, size);
        dnode_setdirty(dn, tx);
@@ -1983,7 +1983,6 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
        int trunc = FALSE;
        int epbs;
 
-       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
        blksz = dn->dn_datablksz;
        blkshift = dn->dn_datablkshift;
        epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
@@ -2000,7 +1999,7 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
                head = P2NPHASE(off, blksz);
                blkoff = P2PHASE(off, blksz);
                if ((off >> blkshift) > dn->dn_maxblkid)
-                       goto out;
+                       return;
        } else {
                ASSERT(dn->dn_maxblkid == 0);
                if (off == 0 && len >= blksz) {
@@ -2009,12 +2008,15 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
                         */
                        blkid = 0;
                        nblks = 1;
-                       if (dn->dn_nlevels > 1)
+                       if (dn->dn_nlevels > 1) {
+                               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
                                dnode_dirty_l1(dn, 0, tx);
+                               rw_exit(&dn->dn_struct_rwlock);
+                       }
                        goto done;
                } else if (off >= blksz) {
                        /* Freeing past end-of-data */
-                       goto out;
+                       return;
                } else {
                        /* Freeing part of the block. */
                        head = blksz - off;
@@ -2024,19 +2026,26 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
        }
        /* zero out any partial block data at the start of the range */
        if (head) {
+               int res;
                ASSERT3U(blkoff + head, ==, blksz);
                if (len < head)
                        head = len;
-               if (dbuf_hold_impl(dn, 0, dbuf_whichblock(dn, 0, off),
-                   TRUE, FALSE, FTAG, &db) == 0) {
+               rw_enter(&dn->dn_struct_rwlock, RW_READER);
+               res = dbuf_hold_impl(dn, 0, dbuf_whichblock(dn, 0, off),
+                   TRUE, FALSE, FTAG, &db);
+               rw_exit(&dn->dn_struct_rwlock);
+               if (res == 0) {
                        caddr_t data;
+                       boolean_t dirty;
 
+                       db_lock_type_t dblt = dmu_buf_lock_parent(db, RW_READER,
+                           FTAG);
                        /* don't dirty if it isn't on disk and isn't dirty */
-                       if (db->db_last_dirty ||
-                           (db->db_blkptr && !BP_IS_HOLE(db->db_blkptr))) {
-                               rw_exit(&dn->dn_struct_rwlock);
+                       dirty = db->db_last_dirty ||
+                           (db->db_blkptr && !BP_IS_HOLE(db->db_blkptr));
+                       dmu_buf_unlock_parent(db, dblt, FTAG);
+                       if (dirty) {
                                dmu_buf_will_dirty(&db->db, tx);
-                               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
                                data = db->db.db_data;
                                bzero(data + blkoff, head);
                        }
@@ -2048,11 +2057,11 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
 
        /* If the range was less than one block, we're done */
        if (len == 0)
-               goto out;
+               return;
 
        /* If the remaining range is past end of file, we're done */
        if ((off >> blkshift) > dn->dn_maxblkid)
-               goto out;
+               return;
 
        ASSERT(ISP2(blksz));
        if (trunc)
@@ -2063,16 +2072,23 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
        ASSERT0(P2PHASE(off, blksz));
        /* zero out any partial block data at the end of the range */
        if (tail) {
+               int res;
                if (len < tail)
                        tail = len;
-               if (dbuf_hold_impl(dn, 0, dbuf_whichblock(dn, 0, off+len),
-                   TRUE, FALSE, FTAG, &db) == 0) {
+               rw_enter(&dn->dn_struct_rwlock, RW_READER);
+               res = dbuf_hold_impl(dn, 0, dbuf_whichblock(dn, 0, off+len),
+                   TRUE, FALSE, FTAG, &db);
+               rw_exit(&dn->dn_struct_rwlock);
+               if (res == 0) {
+                       boolean_t dirty;
                        /* don't dirty if not on disk and not dirty */
-                       if (db->db_last_dirty ||
-                           (db->db_blkptr && !BP_IS_HOLE(db->db_blkptr))) {
-                               rw_exit(&dn->dn_struct_rwlock);
+                       db_lock_type_t type = dmu_buf_lock_parent(db, RW_READER,
+                           FTAG);
+                       dirty = db->db_last_dirty ||
+                           (db->db_blkptr && !BP_IS_HOLE(db->db_blkptr));
+                       dmu_buf_unlock_parent(db, type, FTAG);
+                       if (dirty) {
                                dmu_buf_will_dirty(&db->db, tx);
-                               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
                                bzero(db->db.db_data, tail);
                        }
                        dbuf_rele(db, FTAG);
@@ -2082,7 +2098,7 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
 
        /* If the range did not include a full block, we are done */
        if (len == 0)
-               goto out;
+               return;
 
        ASSERT(IS_P2ALIGNED(off, blksz));
        ASSERT(trunc || IS_P2ALIGNED(len, blksz));
@@ -2112,6 +2128,7 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
         *    amount of space if we copy the freed BPs into deadlists.
         */
        if (dn->dn_nlevels > 1) {
+               rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
                uint64_t first, last;
 
                first = blkid >> epbs;
@@ -2156,6 +2173,7 @@ dnode_free_range(dnode_t *dn, uint64_t off, uint64_t len, dmu_tx_t *tx)
 
                        dnode_dirty_l1(dn, i, tx);
                }
+               rw_exit(&dn->dn_struct_rwlock);
        }
 
 done:
@@ -2178,9 +2196,6 @@ done:
 
        dbuf_free_range(dn, blkid, blkid + nblks - 1, tx);
        dnode_setdirty(dn, tx);
-out:
-
-       rw_exit(&dn->dn_struct_rwlock);
 }
 
 static boolean_t
@@ -2289,6 +2304,8 @@ dnode_next_offset_level(dnode_t *dn, int flags, uint64_t *offset,
        boolean_t hole;
        int i, inc, error, span;
 
+       ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
+
        hole = ((flags & DNODE_FIND_HOLE) != 0);
        inc = (flags & DNODE_FIND_BACKWARDS) ? -1 : 1;
        ASSERT(txg == 0 || !hole);
@@ -2321,9 +2338,9 @@ dnode_next_offset_level(dnode_t *dn, int flags, uint64_t *offset,
                        return (error);
                }
                data = db->db.db_data;
+               rw_enter(&db->db_rwlock, RW_READER);
        }
 
-
        if (db != NULL && txg != 0 && (db->db_blkptr == NULL ||
            db->db_blkptr->blk_birth <= txg ||
            BP_IS_HOLE(db->db_blkptr))) {
@@ -2396,8 +2413,10 @@ dnode_next_offset_level(dnode_t *dn, int flags, uint64_t *offset,
                        error = SET_ERROR(ESRCH);
        }
 
-       if (db)
+       if (db != NULL) {
+               rw_exit(&db->db_rwlock);
                dbuf_rele(db, FTAG);
+       }
 
        return (error);
 }
index d3acf1baaeaa3f2e7f690ae8ab265c327d451d40..4e002d326a90b5743069271dadf8705ee1d489d2 100644 (file)
@@ -51,7 +51,6 @@ dnode_increase_indirection(dnode_t *dn, dmu_tx_t *tx)
 
        /* this dnode can't be paged out because it's dirty */
        ASSERT(dn->dn_phys->dn_type != DMU_OT_NONE);
-       ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
        ASSERT(new_level > 1 && dn->dn_phys->dn_nlevels > 0);
 
        db = dbuf_hold_level(dn, dn->dn_phys->dn_nlevels, 0, FTAG);
@@ -61,8 +60,24 @@ dnode_increase_indirection(dnode_t *dn, dmu_tx_t *tx)
        dprintf("os=%p obj=%llu, increase to %d\n", dn->dn_objset,
            dn->dn_object, dn->dn_phys->dn_nlevels);
 
+       /*
+        * Lock ordering requires that we hold the children's db_mutexes (by
+        * calling dbuf_find()) before holding the parent's db_rwlock.  The lock
+        * order is imposed by dbuf_read's steps of "grab the lock to protect
+        * db_parent, get db_parent, hold db_parent's db_rwlock".
+        */
+       dmu_buf_impl_t *children[DN_MAX_NBLKPTR];
+       ASSERT3U(nblkptr, <=, DN_MAX_NBLKPTR);
+       for (i = 0; i < nblkptr; i++) {
+               children[i] =
+                   dbuf_find(dn->dn_objset, dn->dn_object, old_toplvl, i);
+       }
+
        /* transfer dnode's block pointers to new indirect block */
        (void) dbuf_read(db, NULL, DB_RF_MUST_SUCCEED|DB_RF_HAVESTRUCT);
+       if (dn->dn_dbuf != NULL)
+               rw_enter(&dn->dn_dbuf->db_rwlock, RW_WRITER);
+       rw_enter(&db->db_rwlock, RW_WRITER);
        ASSERT(db->db.db_data);
        ASSERT(arc_released(db->db_buf));
        ASSERT3U(sizeof (blkptr_t) * nblkptr, <=, db->db.db_size);
@@ -72,8 +87,7 @@ dnode_increase_indirection(dnode_t *dn, dmu_tx_t *tx)
 
        /* set dbuf's parent pointers to new indirect buf */
        for (i = 0; i < nblkptr; i++) {
-               dmu_buf_impl_t *child =
-                   dbuf_find(dn->dn_objset, dn->dn_object, old_toplvl, i);
+               dmu_buf_impl_t *child = children[i];
 
                if (child == NULL)
                        continue;
@@ -106,6 +120,10 @@ dnode_increase_indirection(dnode_t *dn, dmu_tx_t *tx)
 
        bzero(dn->dn_phys->dn_blkptr, sizeof (blkptr_t) * nblkptr);
 
+       rw_exit(&db->db_rwlock);
+       if (dn->dn_dbuf != NULL)
+               rw_exit(&dn->dn_dbuf->db_rwlock);
+
        dbuf_rele(db, FTAG);
 
        rw_exit(&dn->dn_struct_rwlock);
@@ -182,7 +200,7 @@ free_verify(dmu_buf_impl_t *db, uint64_t start, uint64_t end, dmu_tx_t *tx)
                ASSERT(db->db_level == 1);
 
                rw_enter(&dn->dn_struct_rwlock, RW_READER);
-               err = dbuf_hold_impl(dn, db->db_level-1,
+               err = dbuf_hold_impl(dn, db->db_level - 1,
                    (db->db_blkid << epbs) + i, TRUE, FALSE, FTAG, &child);
                rw_exit(&dn->dn_struct_rwlock);
                if (err == ENOENT)
@@ -280,7 +298,9 @@ free_children(dmu_buf_impl_t *db, uint64_t blkid, uint64_t nblks,
         * ancestor of the first or last block to be freed.  The first and
         * last L1 indirect blocks are always dirtied by dnode_free_range().
         */
+       db_lock_type_t dblt = dmu_buf_lock_parent(db, RW_READER, FTAG);
        VERIFY(BP_GET_FILL(db->db_blkptr) == 0 || db->db_dirtycnt > 0);
+       dmu_buf_unlock_parent(db, dblt, FTAG);
 
        dbuf_release_bp(db);
        bp = db->db.db_data;
@@ -306,7 +326,9 @@ free_children(dmu_buf_impl_t *db, uint64_t blkid, uint64_t nblks,
 
        if (db->db_level == 1) {
                FREE_VERIFY(db, start, end, tx);
-               free_blocks(dn, bp, end-start+1, tx);
+               rw_enter(&db->db_rwlock, RW_WRITER);
+               free_blocks(dn, bp, end - start + 1, tx);
+               rw_exit(&db->db_rwlock);
        } else {
                for (uint64_t id = start; id <= end; id++, bp++) {
                        if (BP_IS_HOLE(bp))
@@ -323,10 +345,12 @@ free_children(dmu_buf_impl_t *db, uint64_t blkid, uint64_t nblks,
        }
 
        if (free_indirects) {
+               rw_enter(&db->db_rwlock, RW_WRITER);
                for (i = 0, bp = db->db.db_data; i < 1 << epbs; i++, bp++)
                        ASSERT(BP_IS_HOLE(bp));
                bzero(db->db.db_data, db->db.db_size);
                free_blocks(dn, db->db_blkptr, 1, tx);
+               rw_exit(&db->db_rwlock);
        }
 
        DB_DNODE_EXIT(db);
@@ -378,7 +402,6 @@ dnode_sync_free_range_impl(dnode_t *dn, uint64_t blkid, uint64_t nblks,
                        VERIFY0(dbuf_hold_impl(dn, dnlevel - 1, i,
                            TRUE, FALSE, FTAG, &db));
                        rw_exit(&dn->dn_struct_rwlock);
-
                        free_children(db, blkid, nblks, free_indirects, tx);
                        dbuf_rele(db, FTAG);
                }