]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/CInode.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mds / CInode.cc
index 3c1ca791c60ae88df571c6b83cfd9f5a530c240b..4e47fdf7869aa26dd91f1dee0febbb9226b29f9b 100644 (file)
@@ -16,7 +16,6 @@
 #include "common/errno.h"
 
 #include <string>
-#include <stdio.h>
 
 #include "CInode.h"
 #include "CDir.h"
 #include "mds/MDSContinuation.h"
 #include "mds/InoTable.h"
 #include "cephfs_features.h"
+#include "osdc/Objecter.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
-#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
+#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << ino() << ") "
+
+void CInodeCommitOperation::update(ObjectOperation &op, inode_backtrace_t &bt) {
+  using ceph::encode;
+
+  op.priority = priority;
+  op.create(false);
+
+  bufferlist parent_bl;
+  encode(bt, parent_bl);
+  op.setxattr("parent", parent_bl);
 
+  // for the old pool there is no need to update the layout
+  if (!update_layout)
+    return;
+
+  bufferlist layout_bl;
+  encode(_layout, layout_bl, _features);
+  op.setxattr("layout", layout_bl);
+}
 
 class CInodeIOContext : public MDSIOContextBase
 {
@@ -100,7 +118,6 @@ std::string_view CInode::pin_name(int p) const
     case PIN_DIRTYRSTAT: return "dirtyrstat";
     case PIN_DIRTYPARENT: return "dirtyparent";
     case PIN_DIRWAITER: return "dirwaiter";
-    case PIN_SCRUBQUEUE: return "scrubqueue";
     default: return generic_pin_name(p);
   }
 }
@@ -108,7 +125,7 @@ std::string_view CInode::pin_name(int p) const
 //int cinode_pins[CINODE_NUM_PINS];  // counts
 ostream& CInode::print_db_line_prefix(ostream& out)
 {
-  return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
+  return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << ino() << ") ";
 }
 
 /*
@@ -127,7 +144,7 @@ ostream& operator<<(ostream& out, const CInode& in)
   string path;
   in.make_path_string(path, true);
 
-  out << "[inode " << in.inode.ino;
+  out << "[inode " << in.ino();
   out << " [" 
       << (in.is_multiversion() ? "...":"")
       << in.first << "," << in.last << "]";
@@ -165,51 +182,54 @@ ostream& operator<<(ostream& out, const CInode& in)
     out << " snaprealm=" << in.snaprealm;
 
   if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
-  if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
-  if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
-  if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
-  if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
+  if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " NEEDSRECOVER";
+  if (in.state_test(CInode::STATE_RECOVERING)) out << " RECOVERING";
+  if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " DIRTYPARENT";
+  if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " MISSINGOBJS";
+  if (in.is_ephemeral_dist()) out << " DISTEPHEMERALPIN";
+  if (in.is_ephemeral_rand()) out << " RANDEPHEMERALPIN";
   if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
   if (in.is_frozen_inode()) out << " FROZEN";
   if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
 
-  const CInode::mempool_inode *pi = in.get_projected_inode();
+  const auto& pi = in.get_projected_inode();
   if (pi->is_truncating())
     out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
 
-  if (in.inode.is_dir()) {
-    out << " " << in.inode.dirstat;
+  if (in.is_dir()) {
+    out << " " << in.get_inode()->dirstat;
     if (g_conf()->mds_debug_scatterstat && in.is_projected()) {
-      const CInode::mempool_inode *pi = in.get_projected_inode();
       out << "->" << pi->dirstat;
     }
   } else {
-    out << " s=" << in.inode.size;
-    if (in.inode.nlink != 1)
-      out << " nl=" << in.inode.nlink;
+    out << " s=" << in.get_inode()->size;
+    if (in.get_inode()->nlink != 1)
+      out << " nl=" << in.get_inode()->nlink;
   }
 
   // rstat
-  out << " " << in.inode.rstat;
-  if (!(in.inode.rstat == in.inode.accounted_rstat))
-    out << "/" << in.inode.accounted_rstat;
+  out << " " << in.get_inode()->rstat;
+  if (!(in.get_inode()->rstat == in.get_inode()->accounted_rstat))
+    out << "/" << in.get_inode()->accounted_rstat;
   if (g_conf()->mds_debug_scatterstat && in.is_projected()) {
-    const CInode::mempool_inode *pi = in.get_projected_inode();
     out << "->" << pi->rstat;
     if (!(pi->rstat == pi->accounted_rstat))
       out << "/" << pi->accounted_rstat;
   }
 
+  if (in.is_any_old_inodes()) {
+    out << " old_inodes=" << in.get_old_inodes()->size();
+  }
+
   if (!in.client_need_snapflush.empty())
     out << " need_snapflush=" << in.client_need_snapflush;
 
-
   // locks
   if (!in.authlock.is_sync_and_unlocked())
     out << " " << in.authlock;
   if (!in.linklock.is_sync_and_unlocked())
     out << " " << in.linklock;
-  if (in.inode.is_dir()) {
+  if (in.get_inode()->is_dir()) {
     if (!in.dirfragtreelock.is_sync_and_unlocked())
       out << " " << in.dirfragtreelock;
     if (!in.snaplock.is_sync_and_unlocked())
@@ -230,8 +250,8 @@ ostream& operator<<(ostream& out, const CInode& in)
     out << " " << in.versionlock;
 
   // hack: spit out crap on which clients have caps
-  if (in.inode.client_ranges.size())
-    out << " cr=" << in.inode.client_ranges;
+  if (in.get_inode()->client_ranges.size())
+    out << " cr=" << in.get_inode()->client_ranges;
 
   if (!in.get_client_caps().empty()) {
     out << " caps={";
@@ -270,8 +290,8 @@ ostream& operator<<(ostream& out, const CInode& in)
     in.print_pin_set(out);
   }
 
-  if (in.inode.export_pin != MDS_RANK_NONE) {
-    out << " export_pin=" << in.inode.export_pin;
+  if (in.get_inode()->export_pin != MDS_RANK_NONE) {
+    out << " export_pin=" << in.get_inode()->export_pin;
   }
   if (in.state_test(CInode::STATE_DISTEPHEMERALPIN)) {
     out << " distepin";
@@ -285,19 +305,8 @@ ostream& operator<<(ostream& out, const CInode& in)
   return out;
 }
 
-ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
-{
-  out << "{scrub_start_version: " << si.scrub_start_version
-      << ", scrub_start_stamp: " << si.scrub_start_stamp
-      << ", last_scrub_version: " << si.last_scrub_version
-      << ", last_scrub_stamp: " << si.last_scrub_stamp;
-  return out;
-}
-
-CInode::CInode(MDCache *c, bool auth, snapid_t f, snapid_t l)
-  :
-    mdcache(c),
-    first(f), last(l),
+CInode::CInode(MDCache *c, bool auth, snapid_t f, snapid_t l) :
+    mdcache(c), first(f), last(l),
     item_dirty(this),
     item_caps(this),
     item_open_file(this),
@@ -317,7 +326,8 @@ CInode::CInode(MDCache *c, bool auth, snapid_t f, snapid_t l)
     flocklock(this, &flocklock_type),
     policylock(this, &policylock_type)
 {
-  if (auth) state_set(STATE_AUTH);
+  if (auth)
+    state_set(STATE_AUTH);
 }
 
 void CInode::print(ostream& out)
@@ -334,7 +344,7 @@ void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client
 
     // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
     // long periods waiting for clients to flush their snaps.
-    auth_pin(this);   // pin head inode...
+    auth_pin(this);   // pin head get_inode()->..
   }
 
   auto &clients = client_need_snapflush[snapid];
@@ -420,88 +430,75 @@ void CInode::clear_dirty_rstat()
   }
 }
 
-CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
+CInode::projected_inode CInode::project_inode(const MutationRef& mut,
+                                             bool xattr, bool snap)
 {
-  auto &pi = projected_nodes.empty() ?
-    projected_nodes.emplace_back(inode) :
-    projected_nodes.emplace_back(projected_nodes.back().inode);
+  if (mut && mut->is_projected(this)) {
+    ceph_assert(!xattr && !snap);
+    auto _inode = std::const_pointer_cast<mempool_inode>(projected_nodes.back().inode);
+    return projected_inode(std::move(_inode), xattr_map_ptr());
+  }
+
+  auto pi = allocate_inode(*get_projected_inode());
 
   if (scrub_infop && scrub_infop->last_scrub_dirty) {
-    pi.inode.last_scrub_stamp = scrub_infop->last_scrub_stamp;
-    pi.inode.last_scrub_version = scrub_infop->last_scrub_version;
+    pi->last_scrub_stamp = scrub_infop->last_scrub_stamp;
+    pi->last_scrub_version = scrub_infop->last_scrub_version;
     scrub_infop->last_scrub_dirty = false;
     scrub_maybe_delete_info();
   }
 
+  const auto& ox = get_projected_xattrs();
+  xattr_map_ptr px;
   if (xattr) {
-    pi.xattrs.reset(new mempool_xattr_map(*get_projected_xattrs()));
-    ++num_projected_xattrs;
+    px = allocate_xattr_map();
+    if (ox)
+      *px = *ox;
   }
 
+  sr_t* ps = projected_inode::UNDEF_SRNODE;
   if (snap) {
-    project_snaprealm();
+    ps = prepare_new_srnode(0);
+    ++num_projected_srnodes;
   }
 
-  dout(15) << __func__ << " " << pi.inode.ino << dendl;
-  return pi;
+  projected_nodes.emplace_back(pi, xattr ? px : ox , ps);
+  if (mut)
+    mut->add_projected_node(this);
+  dout(15) << __func__ << " " << pi->ino << dendl;
+  return projected_inode(std::move(pi), std::move(px), ps);
 }
 
-void CInode::pop_and_dirty_projected_inode(LogSegment *ls
+void CInode::pop_and_dirty_projected_inode(LogSegment *ls, const MutationRef& mut)
 {
   ceph_assert(!projected_nodes.empty());
-  auto& front = projected_nodes.front();
-
-  dout(15) << __func__ << " " << front.inode.ino
-          << " v" << front.inode.version << dendl;
+  auto front = std::move(projected_nodes.front());
+  dout(15) << __func__ << " v" << front.inode->version << dendl;
 
-  int64_t old_pool = inode.layout.pool_id;
-  bool pin_update = inode.export_pin != front.inode.export_pin;
-  bool dist_update = inode.export_ephemeral_distributed_pin
-                     != front.inode.export_ephemeral_distributed_pin;
-
-  mark_dirty(front.inode.version, ls);
-
-  inode = std::move(front.inode);
-
-  if (pin_update)
-    maybe_export_pin(true);
-  if (dist_update)
-    maybe_ephemeral_dist_children(true);
+  projected_nodes.pop_front();
+  if (mut)
+    mut->remove_projected_node(this);
 
-  if (inode.is_backtrace_updated())
-    mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
+  bool pool_updated = get_inode()->layout.pool_id != front.inode->layout.pool_id;
+  bool pin_updated = (get_inode()->export_pin != front.inode->export_pin) ||
+                    (get_inode()->export_ephemeral_distributed_pin !=
+                     front.inode->export_ephemeral_distributed_pin);
 
-  if (front.xattrs) {
-    --num_projected_xattrs;
-    xattrs = *front.xattrs;
-  }
+  reset_inode(std::move(front.inode));
+  if (front.xattrs != get_xattrs())
+    reset_xattrs(std::move(front.xattrs));
 
-  if (projected_nodes.front().snapnode != projected_inode::UNDEF_SRNODE) {
-    pop_projected_snaprealm(projected_nodes.front().snapnode, false);
+  if (front.snapnode != projected_inode::UNDEF_SRNODE) {
     --num_projected_srnodes;
+    pop_projected_snaprealm(front.snapnode, false);
   }
 
-  projected_nodes.pop_front();
-}
+  mark_dirty(ls);
+  if (get_inode()->is_backtrace_updated())
+    mark_dirty_parent(ls, pool_updated);
 
-CInode::mempool_xattr_map *CInode::get_projected_xattrs()
-{
-  if (num_projected_xattrs > 0) {
-    for (auto it = projected_nodes.rbegin(); it != projected_nodes.rend(); ++it)
-      if (it->xattrs)
-       return it->xattrs.get();
-  }
-  return &xattrs;
-}
-
-CInode::mempool_xattr_map *CInode::get_previous_projected_xattrs()
-{
-  if (num_projected_xattrs > 0) {
-    for (auto it = ++projected_nodes.rbegin(); it != projected_nodes.rend(); ++it)
-      if (it->xattrs)
-       return it->xattrs.get();
-  }
-  return &xattrs;
+  if (pin_updated)
+    maybe_export_pin(true);
 }
 
 sr_t *CInode::prepare_new_srnode(snapid_t snapid)
@@ -511,21 +508,6 @@ sr_t *CInode::prepare_new_srnode(snapid_t snapid)
 
   if (cur_srnode) {
     new_srnode = new sr_t(*cur_srnode);
-    if (!new_srnode->past_parents.empty()) {
-      // convert past_parents to past_parent_snaps
-      ceph_assert(snaprealm);
-      auto& snaps = snaprealm->get_snaps();
-      for (auto p : snaps) {
-       if (p >= new_srnode->current_parent_since)
-         break;
-       if (!new_srnode->snaps.count(p))
-         new_srnode->past_parent_snaps.insert(p);
-      }
-      new_srnode->seq = snaprealm->get_newest_seq();
-      new_srnode->past_parents.clear();
-    }
-    if (snaprealm)
-      snaprealm->past_parents_dirty = false;
   } else {
     if (snapid == 0)
       snapid = mdcache->get_global_snaprealm()->get_newest_seq();
@@ -654,33 +636,17 @@ void CInode::pop_projected_snaprealm(sr_t *next_snaprealm, bool early)
   if (next_snaprealm) {
     dout(10) << __func__ << (early ? " (early) " : " ")
             << next_snaprealm << " seq " << next_snaprealm->seq << dendl;
-    bool invalidate_cached_snaps = false;
-    if (!snaprealm) {
+    if (!snaprealm)
       open_snaprealm();
-    } else if (!snaprealm->srnode.past_parents.empty()) {
-      invalidate_cached_snaps = true;
-      // re-open past parents
-      snaprealm->close_parents();
 
-      dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
-              << " -> " << next_snaprealm->past_parents << dendl;
-    }
     auto old_flags = snaprealm->srnode.flags;
     snaprealm->srnode = *next_snaprealm;
     delete next_snaprealm;
 
     if ((snaprealm->srnode.flags ^ old_flags) & sr_t::PARENT_GLOBAL) {
-      snaprealm->close_parents();
       snaprealm->adjust_parent();
     }
 
-    // we should be able to open these up (or have them already be open).
-    bool ok = snaprealm->_open_parents(NULL);
-    ceph_assert(ok);
-
-    if (invalidate_cached_snaps)
-      snaprealm->invalidate_cached_snaps();
-
     if (snaprealm->parent)
       dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
   } else {
@@ -695,9 +661,11 @@ void CInode::pop_projected_snaprealm(sr_t *next_snaprealm, bool early)
 
 // dirfrags
 
+InodeStoreBase::inode_const_ptr InodeStoreBase::empty_inode = InodeStoreBase::allocate_inode();
+
 __u32 InodeStoreBase::hash_dentry_name(std::string_view dn)
 {
-  int which = inode.dir_layout.dl_dir_hash;
+  int which = inode->dir_layout.dl_dir_hash;
   if (!which)
     which = CEPH_STR_HASH_LINUX;
   ceph_assert(ceph_str_hash_valid(which));
@@ -837,7 +805,7 @@ CDir *CInode::add_dirfrag(CDir *dir)
     dir->get(CDir::PIN_STICKY);
   }
 
-  maybe_pin();
+  maybe_export_pin();
 
   return dir;
 }
@@ -1062,7 +1030,7 @@ void CInode::make_path(filepath& fp, bool projected) const
 void CInode::name_stray_dentry(string& dname)
 {
   char s[20];
-  snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
+  snprintf(s, sizeof(s), "%llx", (unsigned long long)ino().val);
   dname = s;
 }
 
@@ -1072,16 +1040,16 @@ version_t CInode::pre_dirty()
   CDentry* _cdentry = get_projected_parent_dn(); 
   if (_cdentry) {
     pv = _cdentry->pre_dirty(get_projected_version());
-    dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
+    dout(10) << "pre_dirty " << pv << " (current v " << get_inode()->version << ")" << dendl;
   } else {
     ceph_assert(is_base());
     pv = get_projected_version() + 1;
   }
   // force update backtrace for old format inode (see mempool_inode::decode)
-  if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
-    mempool_inode &pi = projected_nodes.back().inode;
-    if (pi.backtrace_version == 0)
-      pi.update_backtrace(pv);
+  if (get_inode()->backtrace_version == 0 && !projected_nodes.empty()) {
+    auto pi = _get_projected_inode();
+    if (pi->backtrace_version == 0)
+      pi->update_backtrace(pv);
   }
   return pv;
 }
@@ -1099,7 +1067,7 @@ void CInode::_mark_dirty(LogSegment *ls)
     ls->dirty_inodes.push_back(&item_dirty);
 }
 
-void CInode::mark_dirty(version_t pv, LogSegment *ls) {
+void CInode::mark_dirty(LogSegment *ls) {
   
   dout(10) << __func__ << " " << *this << dendl;
 
@@ -1114,13 +1082,11 @@ void CInode::mark_dirty(version_t pv, LogSegment *ls) {
   ceph_assert(is_auth());
   
   // touch my private version
-  ceph_assert(inode.version < pv);
-  inode.version = pv;
   _mark_dirty(ls);
 
   // mark dentry too
   if (parent)
-    parent->mark_dirty(pv, ls);
+    parent->mark_dirty(get_version(), ls);
 }
 
 
@@ -1183,7 +1149,7 @@ void CInode::store(MDSContext *fin)
   m.write_full(bl);
 
   object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
-  object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
+  object_locator_t oloc(mdcache->mds->get_metadata_pool());
 
   Context *newfin =
     new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
@@ -1242,7 +1208,7 @@ struct C_IO_Inode_Fetched : public CInodeIOContext {
   Context *fin;
   C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
   void finish(int r) override {
-    // Ignore 'r', because we fetch from two places, so r is usually ENOENT
+    // Ignore 'r', because we fetch from two places, so r is usually CEPHFS_ENOENT
     in->_fetched(bl, bl2, fin);
   }
   void print(ostream& out) const override {
@@ -1258,7 +1224,7 @@ void CInode::fetch(MDSContext *fin)
   C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
 
   object_t oid = CInode::get_object_name(ino(), frag_t(), "");
-  object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
+  object_locator_t oloc(mdcache->mds->get_metadata_pool());
 
   // Old on-disk format: inode stored in xattr of a dirfrag
   ObjectOperation rd;
@@ -1282,7 +1248,7 @@ void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
     p = bl.cbegin();
   } else {
     derr << "No data while reading inode " << ino() << dendl;
-    fin->complete(-ENOENT);
+    fin->complete(-CEPHFS_ENOENT);
     return;
   }
 
@@ -1296,22 +1262,22 @@ void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
     if (magic != CEPH_FS_ONDISK_MAGIC) {
       dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
               << "'" << dendl;
-      fin->complete(-EINVAL);
+      fin->complete(-CEPHFS_EINVAL);
     } else {
       decode_store(p);
       dout(10) << "_fetched " << *this << dendl;
       fin->complete(0);
     }
   } catch (buffer::error &err) {
-    derr << "Corrupt inode " << ino() << ": " << err << dendl;
-    fin->complete(-EINVAL);
+    derr << "Corrupt inode " << ino() << ": " << err.what() << dendl;
+    fin->complete(-CEPHFS_EINVAL);
     return;
   }
 }
 
 void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
 {
-  bt.ino = inode.ino;
+  bt.ino = ino();
   bt.ancestors.clear();
   bt.pool = pool;
 
@@ -1319,14 +1285,15 @@ void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
   CDentry *pdn = get_parent_dn();
   while (pdn) {
     CInode *diri = pdn->get_dir()->get_inode();
-    bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->get_name(), in->inode.version));
+    bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->get_name(), in->get_inode()->version));
     in = diri;
     pdn = in->get_parent_dn();
   }
-  for (auto &p : inode.old_pools) {
+  bt.old_pools.reserve(get_inode()->old_pools.size());
+  for (auto &p : get_inode()->old_pools) {
     // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
     if (p != pool)
-      bt.old_pools.insert(p);
+      bt.old_pools.push_back(p);
   }
 }
 
@@ -1342,7 +1309,33 @@ struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
   }
 };
 
-void CInode::store_backtrace(MDSContext *fin, int op_prio)
+
+void CInode::_commit_ops(int r, C_GatherBuilder &gather_bld,
+                         std::vector<CInodeCommitOperation> &ops_vec,
+                         inode_backtrace_t &bt)
+{
+  dout(10) << __func__ << dendl;
+
+  if (r < 0) {
+    mdcache->mds->handle_write_error_with_lock(r);
+    return;
+  }
+
+  SnapContext snapc;
+  object_t oid = get_object_name(ino(), frag_t(), "");
+
+  for (auto &op : ops_vec) {
+    ObjectOperation obj_op;
+    object_locator_t oloc(op.get_pool());
+    op.update(obj_op, bt);
+    mdcache->mds->objecter->mutate(oid, oloc, obj_op, snapc,
+                                   ceph::real_clock::now(),
+                                   0, gather_bld.new_sub());
+  }
+}
+
+void CInode::_store_backtrace(std::vector<CInodeCommitOperation> &ops_vec,
+                              inode_backtrace_t &bt, int op_prio)
 {
   dout(10) << __func__ << " on " << *this << dendl;
   ceph_assert(is_dirty_parent());
@@ -1353,77 +1346,68 @@ void CInode::store_backtrace(MDSContext *fin, int op_prio)
   auth_pin(this);
 
   const int64_t pool = get_backtrace_pool();
-  inode_backtrace_t bt;
   build_backtrace(pool, bt);
-  bufferlist parent_bl;
-  using ceph::encode;
-  encode(bt, parent_bl);
-
-  ObjectOperation op;
-  op.priority = op_prio;
-  op.create(false);
-  op.setxattr("parent", parent_bl);
-
-  bufferlist layout_bl;
-  encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
-  op.setxattr("layout", layout_bl);
 
-  SnapContext snapc;
-  object_t oid = get_object_name(ino(), frag_t(), "");
-  object_locator_t oloc(pool);
-  Context *fin2 = new C_OnFinisher(
-    new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
-    mdcache->mds->finisher);
+  ops_vec.emplace_back(op_prio, pool, get_inode()->layout,
+                       mdcache->mds->mdsmap->get_up_features());
 
-  if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
+  if (!state_test(STATE_DIRTYPOOL) || get_inode()->old_pools.empty()) {
     dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
-    mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
-                                  ceph::real_clock::now(),
-                                  0, fin2);
     return;
   }
 
-  C_GatherBuilder gather(g_ceph_context, fin2);
-  mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
-                                ceph::real_clock::now(),
-                                0, gather.new_sub());
-
   // In the case where DIRTYPOOL is set, we update all old pools backtraces
   // such that anyone reading them will see the new pool ID in
   // inode_backtrace_t::pool and go read everything else from there.
-  for (const auto &p : inode.old_pools) {
+  for (const auto &p : get_inode()->old_pools) {
     if (p == pool)
       continue;
 
     dout(20) << __func__ << ": updating old pool " << p << dendl;
 
-    ObjectOperation op;
-    op.priority = op_prio;
-    op.create(false);
-    op.setxattr("parent", parent_bl);
-
-    object_locator_t oloc(p);
-    mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
-                                  ceph::real_clock::now(),
-                                  0, gather.new_sub());
+    ops_vec.emplace_back(op_prio, p);
   }
+}
+
+void CInode::store_backtrace(MDSContext *fin, int op_prio)
+{
+  std::vector<CInodeCommitOperation> ops_vec;
+  inode_backtrace_t bt;
+  auto version = get_inode()->backtrace_version;
+
+  _store_backtrace(ops_vec, bt, op_prio);
+
+  C_GatherBuilder gather(g_ceph_context,
+                        new C_OnFinisher(
+                          new C_IO_Inode_StoredBacktrace(this, version, fin),
+                          mdcache->mds->finisher));
+  _commit_ops(0, gather, ops_vec, bt);
+  ceph_assert(gather.has_subs());
   gather.activate();
 }
 
+void CInode::store_backtrace(CInodeCommitOperations &op, int op_prio)
+{
+  op.version = get_inode()->backtrace_version;
+  op.in = this;
+
+  _store_backtrace(op.ops_vec, op.bt, op_prio);
+}
+
 void CInode::_stored_backtrace(int r, version_t v, Context *fin)
 {
-  if (r == -ENOENT) {
+  if (r == -CEPHFS_ENOENT) {
     const int64_t pool = get_backtrace_pool();
     bool exists = mdcache->mds->objecter->with_osdmap(
         [pool](const OSDMap &osd_map) {
           return osd_map.have_pg_pool(pool);
         });
 
-    // This ENOENT is because the pool doesn't exist (the user deleted it
+    // This CEPHFS_ENOENT is because the pool doesn't exist (the user deleted it
     // out from under us), so the backtrace can never be written, so pretend
     // to succeed so that the user can proceed to e.g. delete the file.
     if (!exists) {
-      dout(4) << __func__ << " got ENOENT: a data pool was deleted "
+      dout(4) << __func__ << " got CEPHFS_ENOENT: a data pool was deleted "
                  "beneath us!" << dendl;
       r = 0;
     }
@@ -1444,7 +1428,7 @@ void CInode::_stored_backtrace(int r, version_t v, Context *fin)
   dout(10) << __func__ << " v " << v <<  dendl;
 
   auth_unpin(this);
-  if (v == inode.backtrace_version)
+  if (v == get_inode()->backtrace_version)
     clear_dirty_parent();
   if (fin)
     fin->complete(0);
@@ -1452,7 +1436,7 @@ void CInode::_stored_backtrace(int r, version_t v, Context *fin)
 
 void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
 {
-  mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
+  mdcache->fetch_backtrace(ino(), get_backtrace_pool(), *backtrace, fin);
 }
 
 void CInode::mark_dirty_parent(LogSegment *ls, bool dirty_pool)
@@ -1495,7 +1479,7 @@ void CInode::verify_diri_backtrace(bufferlist &bl, int err)
     if (backtrace.ancestors.empty() ||
        backtrace.ancestors[0].dname != pdn->get_name() ||
        backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
-      err = -EINVAL;
+      err = -CEPHFS_EINVAL;
   }
 
   if (err) {
@@ -1512,20 +1496,59 @@ void CInode::verify_diri_backtrace(bufferlist &bl, int err)
 // parent dir
 
 
+void InodeStoreBase::encode_xattrs(bufferlist &bl) const {
+  using ceph::encode;
+  if (xattrs)
+    encode(*xattrs, bl);
+  else
+    encode((__u32)0, bl);
+}
+
+void InodeStoreBase::decode_xattrs(bufferlist::const_iterator &p) {
+  using ceph::decode;
+  mempool_xattr_map tmp;
+  decode_noshare(tmp, p);
+  if (tmp.empty()) {
+    reset_xattrs(xattr_map_ptr());
+  } else {
+    reset_xattrs(allocate_xattr_map(std::move(tmp)));
+  }
+}
+
+void InodeStoreBase::encode_old_inodes(bufferlist &bl, uint64_t features) const {
+  using ceph::encode;
+  if (old_inodes)
+    encode(*old_inodes, bl, features);
+  else
+    encode((__u32)0, bl);
+}
+
+void InodeStoreBase::decode_old_inodes(bufferlist::const_iterator &p) {
+  using ceph::decode;
+  mempool_old_inode_map tmp;
+  decode(tmp, p);
+  if (tmp.empty()) {
+    reset_old_inodes(old_inode_map_ptr());
+  } else {
+    reset_old_inodes(allocate_old_inode_map(std::move(tmp)));
+  }
+}
+
 void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
                                 const bufferlist *snap_blob) const
 {
   using ceph::encode;
-  encode(inode, bl, features);
-  if (is_symlink())
+  encode(*inode, bl, features);
+  if (inode->is_symlink())
     encode(symlink, bl);
   encode(dirfragtree, bl);
-  encode(xattrs, bl);
+  encode_xattrs(bl);
+
   if (snap_blob)
     encode(*snap_blob, bl);
   else
     encode(bufferlist(), bl);
-  encode(old_inodes, bl, features);
+  encode_old_inodes(bl, features);
   encode(oldest_snap, bl);
   encode(damage_flags, bl);
 }
@@ -1550,23 +1573,26 @@ void InodeStoreBase::decode_bare(bufferlist::const_iterator &bl,
                              bufferlist& snap_blob, __u8 struct_v)
 {
   using ceph::decode;
-  decode(inode, bl);
-  if (is_symlink()) {
+
+  auto _inode = allocate_inode();
+  decode(*_inode, bl);
+
+  if (_inode->is_symlink()) {
     std::string tmp;
     decode(tmp, bl);
     symlink = std::string_view(tmp);
   }
   decode(dirfragtree, bl);
-  decode_noshare(xattrs, bl);
+  decode_xattrs(bl);
   decode(snap_blob, bl);
 
-  decode(old_inodes, bl);
-  if (struct_v == 2 && inode.is_dir()) {
+  decode_old_inodes(bl);
+  if (struct_v == 2 && _inode->is_dir()) {
     bool default_layout_exists;
     decode(default_layout_exists, bl);
     if (default_layout_exists) {
       decode(struct_v, bl); // this was a default_file_layout
-      decode(inode.layout, bl); // but we only care about the layout portion
+      decode(_inode->layout, bl); // but we only care about the layout portion
     }
   }
 
@@ -1581,6 +1607,8 @@ void InodeStoreBase::decode_bare(bufferlist::const_iterator &bl,
       decode(damage_flags, bl);
     }
   }
+
+  reset_inode(std::move(_inode));
 }
 
 
@@ -1627,52 +1655,58 @@ void CInode::set_object_info(MDSCacheObjectInfo &info)
 void CInode::encode_lock_iauth(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
-  encode(inode.version, bl);
-  encode(inode.ctime, bl);
-  encode(inode.mode, bl);
-  encode(inode.uid, bl);
-  encode(inode.gid, bl);  
+  encode(get_inode()->version, bl);
+  encode(get_inode()->ctime, bl);
+  encode(get_inode()->mode, bl);
+  encode(get_inode()->uid, bl);
+  encode(get_inode()->gid, bl);  
   ENCODE_FINISH(bl);
 }
 
 void CInode::decode_lock_iauth(bufferlist::const_iterator& p)
 {
+  ceph_assert(!is_auth());
+  auto _inode = allocate_inode(*get_inode());
   DECODE_START(1, p);
-  decode(inode.version, p);
+  decode(_inode->version, p);
   utime_t tm;
   decode(tm, p);
-  if (inode.ctime < tm) inode.ctime = tm;
-  decode(inode.mode, p);
-  decode(inode.uid, p);
-  decode(inode.gid, p);
+  if (_inode->ctime < tm) _inode->ctime = tm;
+  decode(_inode->mode, p);
+  decode(_inode->uid, p);
+  decode(_inode->gid, p);
   DECODE_FINISH(p);
+  reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_ilink(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
-  encode(inode.version, bl);
-  encode(inode.ctime, bl);
-  encode(inode.nlink, bl);
+  encode(get_inode()->version, bl);
+  encode(get_inode()->ctime, bl);
+  encode(get_inode()->nlink, bl);
   ENCODE_FINISH(bl);
 }
 
 void CInode::decode_lock_ilink(bufferlist::const_iterator& p)
 {
+  ceph_assert(!is_auth());
+  auto _inode = allocate_inode(*get_inode());
   DECODE_START(1, p);
-  decode(inode.version, p);
+  decode(_inode->version, p);
   utime_t tm;
   decode(tm, p);
-  if (inode.ctime < tm) inode.ctime = tm;
-  decode(inode.nlink, p);
+  if (_inode->ctime < tm) _inode->ctime = tm;
+  decode(_inode->nlink, p);
   DECODE_FINISH(p);
+  reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_idft(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
   if (is_auth()) {
-    encode(inode.version, bl);
+    encode(get_inode()->version, bl);
   } else {
     // treat flushing as dirty when rejoining cache
     bool dirty = dirfragtreelock.is_dirty_or_flushing();
@@ -1698,6 +1732,8 @@ void CInode::encode_lock_idft(bufferlist& bl)
 
 void CInode::decode_lock_idft(bufferlist::const_iterator& p)
 {
+  inode_ptr _inode;
+
   DECODE_START(1, p);
   if (is_auth()) {
     bool replica_dirty;
@@ -1707,7 +1743,8 @@ void CInode::decode_lock_idft(bufferlist::const_iterator& p)
       dirfragtreelock.mark_dirty();  // ok bc we're auth and caller will handle
     }
   } else {
-    decode(inode.version, p);
+    _inode = allocate_inode(*get_inode());
+    decode(_inode->version, p);
   }
   {
     fragtree_t temp;
@@ -1742,39 +1779,42 @@ void CInode::decode_lock_idft(bufferlist::const_iterator& p)
       verify_dirfrags();
   }
   DECODE_FINISH(p);
+
+  if (_inode)
+    reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_ifile(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
   if (is_auth()) {
-    encode(inode.version, bl); 
-    encode(inode.ctime, bl); 
-    encode(inode.mtime, bl); 
-    encode(inode.atime, bl); 
-    encode(inode.time_warp_seq, bl); 
+    encode(get_inode()->version, bl); 
+    encode(get_inode()->ctime, bl); 
+    encode(get_inode()->mtime, bl); 
+    encode(get_inode()->atime, bl); 
+    encode(get_inode()->time_warp_seq, bl); 
     if (!is_dir()) {
-      encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
-      encode(inode.size, bl); 
-      encode(inode.truncate_seq, bl); 
-      encode(inode.truncate_size, bl); 
-      encode(inode.client_ranges, bl); 
-      encode(inode.inline_data, bl); 
+      encode(get_inode()->layout, bl, mdcache->mds->mdsmap->get_up_features());
+      encode(get_inode()->size, bl); 
+      encode(get_inode()->truncate_seq, bl); 
+      encode(get_inode()->truncate_size, bl); 
+      encode(get_inode()->client_ranges, bl); 
+      encode(get_inode()->inline_data, bl); 
     }    
   } else {
     // treat flushing as dirty when rejoining cache
     bool dirty = filelock.is_dirty_or_flushing();
     encode(dirty, bl); 
   }    
-  dout(15) << __func__ << " inode.dirstat is " << inode.dirstat << dendl;
-  encode(inode.dirstat, bl);  // only meaningful if i am auth.
+  dout(15) << __func__ << " inode.dirstat is " << get_inode()->dirstat << dendl;
+  encode(get_inode()->dirstat, bl);  // only meaningful if i am auth.
   bufferlist tmp;
   __u32 n = 0;
   for (const auto &p : dirfrags) {
     frag_t fg = p.first;
     CDir *dir = p.second;
     if (is_auth() || dir->is_auth()) {
-      fnode_t *pf = dir->get_projected_fnode();
+      const auto& pf = dir->get_projected_fnode();
       dout(15) << fg << " " << *dir << dendl;
       dout(20) << fg << "           fragstat " << pf->fragstat << dendl;
       dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
@@ -1792,22 +1832,26 @@ void CInode::encode_lock_ifile(bufferlist& bl)
 
 void CInode::decode_lock_ifile(bufferlist::const_iterator& p)
 {
+  inode_ptr _inode;
+
   DECODE_START(1, p);
   if (!is_auth()) {
-    decode(inode.version, p);
+    _inode = allocate_inode(*get_inode());
+
+    decode(_inode->version, p);
     utime_t tm;
     decode(tm, p);
-    if (inode.ctime < tm) inode.ctime = tm;
-    decode(inode.mtime, p);
-    decode(inode.atime, p);
-    decode(inode.time_warp_seq, p);
+    if (_inode->ctime < tm) _inode->ctime = tm;
+    decode(_inode->mtime, p);
+    decode(_inode->atime, p);
+    decode(_inode->time_warp_seq, p);
     if (!is_dir()) {
-      decode(inode.layout, p);
-      decode(inode.size, p);
-      decode(inode.truncate_seq, p);
-      decode(inode.truncate_size, p);
-      decode(inode.client_ranges, p);
-      decode(inode.inline_data, p);
+      decode(_inode->layout, p);
+      decode(_inode->size, p);
+      decode(_inode->truncate_seq, p);
+      decode(_inode->truncate_size, p);
+      decode(_inode->client_ranges, p);
+      decode(_inode->inline_data, p);
     }
   } else {
     bool replica_dirty;
@@ -1822,7 +1866,7 @@ void CInode::decode_lock_ifile(bufferlist::const_iterator& p)
   decode(dirstat, p);
   if (!is_auth()) {
     dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
-    inode.dirstat = dirstat;    // take inode summation if replica
+    _inode->dirstat = dirstat;    // take inode summation if replica
   }
   __u32 n;
   decode(n, p);
@@ -1846,8 +1890,10 @@ void CInode::decode_lock_ifile(bufferlist::const_iterator& p)
       dout(10) << fg << " first " << dir->first << " -> " << fgfirst
                << " on " << *dir << dendl;
       dir->first = fgfirst;
-      dir->fnode.fragstat = fragstat;
-      dir->fnode.accounted_fragstat = accounted_fragstat;
+      auto _fnode = CDir::allocate_fnode(*dir->get_fnode());
+      _fnode->fragstat = fragstat;
+      _fnode->accounted_fragstat = accounted_fragstat;
+      dir->reset_fnode(std::move(_fnode));
       if (!(fragstat == accounted_fragstat)) {
         dout(10) << fg << " setting filelock updated flag" << dendl;
         filelock.mark_dirty();  // ok bc we're auth and caller will handle
@@ -1857,34 +1903,37 @@ void CInode::decode_lock_ifile(bufferlist::const_iterator& p)
         dout(10) << fg << " first " << dir->first << " -> " << fgfirst
                  << " on " << *dir << dendl;
         dir->first = fgfirst;
-        fnode_t *pf = dir->get_projected_fnode();
+        const auto& pf = dir->get_projected_fnode();
         finish_scatter_update(&filelock, dir,
-                              inode.dirstat.version, pf->accounted_fragstat.version);
+                              _inode->dirstat.version, pf->accounted_fragstat.version);
       }
     }
   }
   DECODE_FINISH(p);
+
+  if (_inode)
+    reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_inest(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
   if (is_auth()) {
-    encode(inode.version, bl);
+    encode(get_inode()->version, bl);
   } else {
     // treat flushing as dirty when rejoining cache
     bool dirty = nestlock.is_dirty_or_flushing();
     encode(dirty, bl);
   }
-  dout(15) << __func__ << " inode.rstat is " << inode.rstat << dendl;
-  encode(inode.rstat, bl);  // only meaningful if i am auth.
+  dout(15) << __func__ << " inode.rstat is " << get_inode()->rstat << dendl;
+  encode(get_inode()->rstat, bl);  // only meaningful if i am auth.
   bufferlist tmp;
   __u32 n = 0;
   for (const auto &p : dirfrags) {
     frag_t fg = p.first;
     CDir *dir = p.second;
     if (is_auth() || dir->is_auth()) {
-      fnode_t *pf = dir->get_projected_fnode();
+      const auto& pf = dir->get_projected_fnode();
       dout(10) << __func__ << " " << fg << " dir " << *dir << dendl;
       dout(10) << __func__ << " " << fg << " rstat " << pf->rstat << dendl;
       dout(10) << __func__ << " " << fg << " accounted_rstat " << pf->rstat << dendl;
@@ -1904,6 +1953,8 @@ void CInode::encode_lock_inest(bufferlist& bl)
 
 void CInode::decode_lock_inest(bufferlist::const_iterator& p)
 {
+  inode_ptr _inode;
+
   DECODE_START(1, p);
   if (is_auth()) {
     bool replica_dirty;
@@ -1913,13 +1964,14 @@ void CInode::decode_lock_inest(bufferlist::const_iterator& p)
       nestlock.mark_dirty();  // ok bc we're auth and caller will handle
     }
   } else {
-    decode(inode.version, p);
+    _inode = allocate_inode(*get_inode());
+    decode(_inode->version, p);
   }
   nest_info_t rstat;
   decode(rstat, p);
   if (!is_auth()) {
     dout(10) << __func__ << " taking inode rstat " << rstat << " for " << *this << dendl;
-    inode.rstat = rstat;    // take inode summation if replica
+    _inode->rstat = rstat;    // take inode summation if replica
   }
   __u32 n;
   decode(n, p);
@@ -1944,8 +1996,10 @@ void CInode::decode_lock_inest(bufferlist::const_iterator& p)
       dout(10) << fg << " first " << dir->first << " -> " << fgfirst
                << " on " << *dir << dendl;
       dir->first = fgfirst;
-      dir->fnode.rstat = rstat;
-      dir->fnode.accounted_rstat = accounted_rstat;
+      auto _fnode = CDir::allocate_fnode(*dir->get_fnode());
+      _fnode->rstat = rstat;
+      _fnode->accounted_rstat = accounted_rstat;
+      dir->reset_fnode(std::move(_fnode));
       dir->dirty_old_rstat.swap(dirty_old_rstat);
       if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
         dout(10) << fg << " setting nestlock updated flag" << dendl;
@@ -1956,111 +2010,125 @@ void CInode::decode_lock_inest(bufferlist::const_iterator& p)
         dout(10) << fg << " first " << dir->first << " -> " << fgfirst
                  << " on " << *dir << dendl;
         dir->first = fgfirst;
-        fnode_t *pf = dir->get_projected_fnode();
+        const auto& pf = dir->get_projected_fnode();
         finish_scatter_update(&nestlock, dir,
-                              inode.rstat.version, pf->accounted_rstat.version);
+                              _inode->rstat.version, pf->accounted_rstat.version);
       }
     }
   }
   DECODE_FINISH(p);
+
+  if (_inode)
+    reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_ixattr(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
-  encode(inode.version, bl);
-  encode(inode.ctime, bl);
-  encode(xattrs, bl);
+  encode(get_inode()->version, bl);
+  encode(get_inode()->ctime, bl);
+  encode_xattrs(bl);
   ENCODE_FINISH(bl);
 }
 
 void CInode::decode_lock_ixattr(bufferlist::const_iterator& p)
 {
+  ceph_assert(!is_auth());
+  auto _inode = allocate_inode(*get_inode());
   DECODE_START(1, p);
-  decode(inode.version, p);
+  decode(_inode->version, p);
   utime_t tm;
   decode(tm, p);
-  if (inode.ctime < tm) inode.ctime = tm;
-  decode_noshare(xattrs, p);
+  if (_inode->ctime < tm)
+    _inode->ctime = tm;
+  decode_xattrs(p);
   DECODE_FINISH(p);
+  reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_isnap(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
-  encode(inode.version, bl);
-  encode(inode.ctime, bl);
+  encode(get_inode()->version, bl);
+  encode(get_inode()->ctime, bl);
   encode_snap(bl);
   ENCODE_FINISH(bl);
 }
 
 void CInode::decode_lock_isnap(bufferlist::const_iterator& p)
 {
+  ceph_assert(!is_auth());
+  auto _inode = allocate_inode(*get_inode());
   DECODE_START(1, p);
-  decode(inode.version, p);
+  decode(_inode->version, p);
   utime_t tm;
   decode(tm, p);
-  if (inode.ctime < tm) inode.ctime = tm;
+  if (_inode->ctime < tm) _inode->ctime = tm;
   decode_snap(p);
   DECODE_FINISH(p);
+  reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_iflock(bufferlist& bl)
 {
   ENCODE_START(1, 1, bl);
-  encode(inode.version, bl);
+  encode(get_inode()->version, bl);
   _encode_file_locks(bl);
   ENCODE_FINISH(bl);
 }
 
 void CInode::decode_lock_iflock(bufferlist::const_iterator& p)
 {
+  ceph_assert(!is_auth());
+  auto _inode = allocate_inode(*get_inode());
   DECODE_START(1, p);
-  decode(inode.version, p);
+  decode(_inode->version, p);
   _decode_file_locks(p);
   DECODE_FINISH(p);
+  reset_inode(std::move(_inode));
 }
 
 void CInode::encode_lock_ipolicy(bufferlist& bl)
 {
   ENCODE_START(2, 1, bl);
-  if (inode.is_dir()) {
-    encode(inode.version, bl);
-    encode(inode.ctime, bl);
-    encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
-    encode(inode.quota, bl);
-    encode(inode.export_pin, bl);
-    encode(inode.export_ephemeral_distributed_pin, bl);
-    encode(inode.export_ephemeral_random_pin, bl);
+  if (is_dir()) {
+    encode(get_inode()->version, bl);
+    encode(get_inode()->ctime, bl);
+    encode(get_inode()->layout, bl, mdcache->mds->mdsmap->get_up_features());
+    encode(get_inode()->quota, bl);
+    encode(get_inode()->export_pin, bl);
+    encode(get_inode()->export_ephemeral_distributed_pin, bl);
+    encode(get_inode()->export_ephemeral_random_pin, bl);
   }
   ENCODE_FINISH(bl);
 }
 
 void CInode::decode_lock_ipolicy(bufferlist::const_iterator& p)
 {
-  DECODE_START(2, p);
-  if (inode.is_dir()) {
-    decode(inode.version, p);
+  ceph_assert(!is_auth());
+  auto _inode = allocate_inode(*get_inode());
+  DECODE_START(1, p);
+  if (is_dir()) {
+    decode(_inode->version, p);
     utime_t tm;
     decode(tm, p);
-    if (inode.ctime < tm) inode.ctime = tm;
-    decode(inode.layout, p);
-    decode(inode.quota, p);
-    {
-      mds_rank_t old_pin = inode.export_pin;
-      decode(inode.export_pin, p);
-      maybe_export_pin(old_pin != inode.export_pin);
-    }
+    if (_inode->ctime < tm)
+      _inode->ctime = tm;
+    decode(_inode->layout, p);
+    decode(_inode->quota, p);
+    decode(_inode->export_pin, p);
     if (struct_v >= 2) {
-      {
-        bool old_ephemeral_pin = inode.export_ephemeral_distributed_pin;
-        decode(inode.export_ephemeral_distributed_pin, p);
-        maybe_ephemeral_dist_children(old_ephemeral_pin != inode.export_ephemeral_distributed_pin);
-      }
-      decode(inode.export_ephemeral_random_pin, p);
+      decode(_inode->export_ephemeral_distributed_pin, p);
+      decode(_inode->export_ephemeral_random_pin, p);
     }
   }
   DECODE_FINISH(p);
+
+  bool pin_updated = (get_inode()->export_pin != _inode->export_pin) ||
+                    (get_inode()->export_ephemeral_distributed_pin !=
+                     _inode->export_ephemeral_distributed_pin);
+  reset_inode(std::move(_inode));
+  maybe_export_pin(pin_updated);
 }
 
 void CInode::encode_lock_state(int type, bufferlist& bl)
@@ -2228,12 +2296,12 @@ void CInode::start_scatter(ScatterLock *lock)
 {
   dout(10) << __func__ << " " << *lock << " on " << *this << dendl;
   ceph_assert(is_auth());
-  mempool_inode *pi = get_projected_inode();
+  const auto& pi = get_projected_inode();
 
   for (const auto &p : dirfrags) {
     frag_t fg = p.first;
     CDir *dir = p.second;
-    fnode_t *pf = dir->get_projected_fnode();
+    const auto& pf = dir->get_projected_fnode();
     dout(20) << fg << " " << *dir << dendl;
 
     if (!dir->is_auth())
@@ -2288,25 +2356,24 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
       MutationRef mut(new MutationImpl());
       mut->ls = mdlog->get_current_segment();
 
-      mempool_inode *pi = get_projected_inode();
-      fnode_t *pf = dir->project_fnode();
+      auto pf = dir->project_fnode(mut);
 
       std::string_view ename;
       switch (lock->get_type()) {
       case CEPH_LOCK_IFILE:
-       pf->fragstat.version = pi->dirstat.version;
+       pf->fragstat.version = inode_version;
        pf->accounted_fragstat = pf->fragstat;
        ename = "lock ifile accounted scatter stat update";
        break;
       case CEPH_LOCK_INEST:
-       pf->rstat.version = pi->rstat.version;
+       pf->rstat.version = inode_version;
        pf->accounted_rstat = pf->rstat;
        ename = "lock inest accounted scatter stat update";
 
        if (!is_auth() && lock->get_state() == LOCK_MIX) {
          dout(10) << __func__ << " try to assimilate dirty rstat on " 
            << *dir << dendl; 
-         dir->assimilate_dirty_rstat_inodes();
+         dir->assimilate_dirty_rstat_inodes(mut);
        }
 
        break;
@@ -2314,9 +2381,6 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
        ceph_abort();
       }
        
-      pf->version = dir->pre_dirty();
-      mut->add_projected_fnode(dir);
-
       EUpdate *le = new EUpdate(mdlog, ename);
       mdlog->start_entry(le);
       le->metablob.add_dir_context(dir);
@@ -2329,7 +2393,7 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
          !is_auth() && lock->get_state() == LOCK_MIX) {
         dout(10) << __func__ << " finish assimilating dirty rstat on " 
           << *dir << dendl; 
-        dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
+        dir->assimilate_dirty_rstat_inodes_finish(&le->metablob);
 
         if (!(pf->rstat == pf->accounted_rstat)) {
           if (!mut->is_wrlocked(&nestlock)) {
@@ -2340,6 +2404,8 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
           mut->ls->dirty_dirfrag_nest.push_back(&item_dirty_dirfrag_nest);
         }
       }
+
+      pf->version = dir->pre_dirty();
       
       mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
     } else {
@@ -2376,7 +2442,7 @@ void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
  * un-stale.
  */
 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
-void CInode::finish_scatter_gather_update(int type)
+void CInode::finish_scatter_gather_update(int type, MutationRef& mut)
 {
   LogChannelRef clog = mdcache->mds->clog;
 
@@ -2392,7 +2458,7 @@ void CInode::finish_scatter_gather_update(int type)
 
       // adjust summation
       ceph_assert(is_auth());
-      mempool_inode *pi = get_projected_inode();
+      auto pi = _get_projected_inode();
 
       bool touched_mtime = false, touched_chattr = false;
       dout(20) << "  orig dirstat " << pi->dirstat << dendl;
@@ -2410,9 +2476,13 @@ void CInode::finish_scatter_gather_update(int type)
          dirstat_valid = false;
        }
 
-       fnode_t *pf = dir->get_projected_fnode();
-       if (update)
-         pf = dir->project_fnode();
+       CDir::fnode_const_ptr pf;
+       if (update) {
+         mut->auth_pin(dir);
+         pf = dir->project_fnode(mut);
+       } else {
+         pf = dir->get_projected_fnode();
+       }
 
        if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
          dout(20) << fg << "           fragstat " << pf->fragstat << dendl;
@@ -2425,18 +2495,21 @@ void CInode::finish_scatter_gather_update(int type)
        if (pf->fragstat.nfiles < 0 ||
            pf->fragstat.nsubdirs < 0) {
          clog->error() << "bad/negative dir size on "
-             << dir->dirfrag() << " " << pf->fragstat;
+                       << dir->dirfrag() << " " << pf->fragstat;
          ceph_assert(!"bad/negative fragstat" == g_conf()->mds_verify_scatter);
-         
+
+         auto _pf = const_cast<fnode_t*>(pf.get());
          if (pf->fragstat.nfiles < 0)
-           pf->fragstat.nfiles = 0;
+           _pf->fragstat.nfiles = 0;
          if (pf->fragstat.nsubdirs < 0)
-           pf->fragstat.nsubdirs = 0;
+           _pf->fragstat.nsubdirs = 0;
        }
 
        if (update) {
-         pf->accounted_fragstat = pf->fragstat;
-         pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
+         auto _pf = const_cast<fnode_t*>(pf.get());
+         _pf->accounted_fragstat = _pf->fragstat;
+         _pf->fragstat.version = _pf->accounted_fragstat.version = pi->dirstat.version;
+         _pf->version = dir->pre_dirty();
          dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
        }
 
@@ -2477,8 +2550,7 @@ void CInode::finish_scatter_gather_update(int type)
        }
       }
 
-      if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0)
-      {
+      if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0) {
         std::string path;
         make_path_string(path);
        clog->error() << "Inconsistent statistics detected: fragstat on inode "
@@ -2506,7 +2578,7 @@ void CInode::finish_scatter_gather_update(int type)
       if (const sr_t *srnode = get_projected_srnode(); srnode)
        rstat.rsnaps = srnode->snaps.size();
 
-      mempool_inode *pi = get_projected_inode();
+      auto pi = _get_projected_inode();
       dout(20) << "  orig rstat " << pi->rstat << dendl;
       pi->rstat.version++;
       for (const auto &p : dirfrags) {
@@ -2522,16 +2594,20 @@ void CInode::finish_scatter_gather_update(int type)
          rstat_valid = false;
        }
 
-       fnode_t *pf = dir->get_projected_fnode();
-       if (update)
-         pf = dir->project_fnode();
+       CDir::fnode_const_ptr pf;
+       if (update) {
+         mut->auth_pin(dir);
+         pf = dir->project_fnode(mut);
+       } else {
+         pf = dir->get_projected_fnode();
+       }
 
        if (pf->accounted_rstat.version == pi->rstat.version-1) {
          // only pull this frag's dirty rstat inodes into the frag if
          // the frag is non-stale and updateable.  if it's stale,
          // that info will just get thrown out!
          if (update)
-           dir->assimilate_dirty_rstat_inodes();
+           dir->assimilate_dirty_rstat_inodes(mut);
 
          dout(20) << fg << "           rstat " << pf->rstat << dendl;
          dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
@@ -2548,9 +2624,11 @@ void CInode::finish_scatter_gather_update(int type)
          dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
        }
        if (update) {
-         pf->accounted_rstat = pf->rstat;
+         auto _pf = const_cast<fnode_t*>(pf.get());
+         _pf->accounted_rstat = pf->rstat;
+         _pf->rstat.version = _pf->accounted_rstat.version = pi->rstat.version;
+         _pf->version = dir->pre_dirty();
          dir->dirty_old_rstat.clear();
-         pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
          dir->check_rstats();
          dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
        }
@@ -2599,7 +2677,7 @@ void CInode::finish_scatter_gather_update(int type)
   }
 }
 
-void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
+void CInode::finish_scatter_gather_update_accounted(int type, EMetaBlob *metablob)
 {
   dout(10) << __func__ << " " << type << " on " << *this << dendl;
   ceph_assert(is_auth());
@@ -2612,16 +2690,12 @@ void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut,
     if (type == CEPH_LOCK_IDFT)
       continue;  // nothing to do.
 
+    if (type == CEPH_LOCK_INEST)
+      dir->assimilate_dirty_rstat_inodes_finish(metablob);
+
     dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
     ceph_assert(dir->is_projected());
-    fnode_t *pf = dir->get_projected_fnode();
-    pf->version = dir->pre_dirty();
-    mut->add_projected_fnode(dir);
     metablob->add_dir(dir, true);
-    mut->auth_pin(dir);
-
-    if (type == CEPH_LOCK_INEST)
-      dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
   }
 }
 
@@ -2909,27 +2983,32 @@ mds_authority_t CInode::authority() const
 snapid_t CInode::get_oldest_snap()
 {
   snapid_t t = first;
-  if (!old_inodes.empty())
-    t = old_inodes.begin()->second.first;
+  if (is_any_old_inodes())
+    t = get_old_inodes()->begin()->second.first;
   return std::min(t, oldest_snap);
 }
 
-CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head)
+const CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head)
 {
   ceph_assert(follows >= first);
 
-  mempool_inode *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
-  mempool_xattr_map *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
+  const auto& pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
+  const auto& px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
+
+  auto _old_inodes = allocate_old_inode_map();
+  if (old_inodes)
+    *_old_inodes = *old_inodes;
 
-  mempool_old_inode &old = old_inodes[follows];
+  mempool_old_inode &old = (*_old_inodes)[follows];
   old.first = first;
   old.inode = *pi;
-  old.xattrs = *px;
+  if (px) {
+    dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
+    old.xattrs = *px;
+  }
 
   if (first < oldest_snap)
     oldest_snap = first;
-  
-  dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
 
   old.inode.trim_client_ranges(follows);
 
@@ -2943,22 +3022,10 @@ CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head
           << " to [" << old.first << "," << follows << "] on "
           << *this << dendl;
 
+  reset_old_inodes(std::move(_old_inodes));
   return old;
 }
 
-void CInode::split_old_inode(snapid_t snap)
-{
-  auto it = old_inodes.lower_bound(snap);
-  ceph_assert(it != old_inodes.end() && it->second.first < snap);
-
-  mempool_old_inode &old = old_inodes[snap - 1];
-  old = it->second;
-
-  it->second.first = snap;
-  dout(10) << __func__ << " " << "[" << old.first << "," << it->first
-          << "] to [" << snap << "," << it->first << "] on " << *this << dendl;
-}
-
 void CInode::pre_cow_old_inode()
 {
   snapid_t follows = mdcache->get_global_snaprealm()->get_newest_seq();
@@ -2969,11 +3036,11 @@ void CInode::pre_cow_old_inode()
 bool CInode::has_snap_data(snapid_t snapid)
 {
   bool found = snapid >= first && snapid <= last;
-  if (!found && is_multiversion()) {
-    auto p = old_inodes.lower_bound(snapid);
-    if (p != old_inodes.end()) {
+  if (!found && is_any_old_inodes()) {
+    auto p = old_inodes->lower_bound(snapid);
+    if (p != old_inodes->end()) {
       if (p->second.first > snapid) {
-       if  (p != old_inodes.begin())
+       if  (p != old_inodes->begin())
          --p;
       }
       if (p->second.first <= snapid && snapid <= p->first) {
@@ -2988,30 +3055,43 @@ void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
 {
   dout(10) << __func__ << " " << snaps << dendl;
 
-  for (auto it = old_inodes.begin(); it != old_inodes.end(); ) {
-    const snapid_t &id = it->first;
-    const auto &s = snaps.lower_bound(it->second.first);
+  if (!get_old_inodes())
+    return;
+
+  std::vector<snapid_t> to_remove;
+  for (auto p : *get_old_inodes()) {
+    const snapid_t &id = p.first;
+    const auto &s = snaps.lower_bound(p.second.first);
     if (s == snaps.end() || *s > id) {
-      dout(10) << " purging old_inode [" << it->second.first << "," << id << "]" << dendl;
-      it = old_inodes.erase(it);
-    } else {
-      ++it;
+      dout(10) << " purging old_inode [" << p.second.first << "," << id << "]" << dendl;
+      to_remove.push_back(id);
     }
   }
+
+  if (to_remove.size() == get_old_inodes()->size()) {
+    reset_old_inodes(old_inode_map_ptr());
+  } else if (!to_remove.empty()) {
+    auto _old_inodes = allocate_old_inode_map(*get_old_inodes());
+    for (auto id : to_remove)
+      _old_inodes->erase(id);
+    reset_old_inodes(std::move(_old_inodes));
+  }
 }
 
 /*
  * pick/create an old_inode
  */
-CInode::mempool_old_inode * CInode::pick_old_inode(snapid_t snap)
+snapid_t CInode::pick_old_inode(snapid_t snap) const
 {
-  auto it = old_inodes.lower_bound(snap);  // p is first key >= to snap
-  if (it != old_inodes.end() && it->second.first <= snap) {
-    dout(10) << __func__ << " snap " << snap << " -> [" << it->second.first << "," << it->first << "]" << dendl;
-    return &it->second;
+  if (is_any_old_inodes()) {
+    auto it = old_inodes->lower_bound(snap);  // p is first key >= to snap
+    if (it != old_inodes->end() && it->second.first <= snap) {
+      dout(10) << __func__ << " snap " << snap << " -> [" << it->second.first << "," << it->first << "]" << dendl;
+      return it->first;
+    }
   }
   dout(10) << __func__ << " snap " << snap << " -> nothing" << dendl;
-  return NULL;
+  return 0;
 }
 
 void CInode::open_snaprealm(bool nosplit)
@@ -3035,7 +3115,6 @@ void CInode::close_snaprealm(bool nojoin)
 {
   if (snaprealm) {
     dout(15) << __func__ << " " << *snaprealm << dendl;
-    snaprealm->close_parents();
     if (snaprealm->parent) {
       snaprealm->parent->open_children.erase(snaprealm);
       //if (!nojoin)
@@ -3074,12 +3153,8 @@ void CInode::decode_snap_blob(const bufferlist& snapbl)
     auto old_flags = snaprealm->srnode.flags;
     auto p = snapbl.cbegin();
     decode(snaprealm->srnode, p);
-    if (is_base()) {
-      bool ok = snaprealm->_open_parents(NULL);
-      ceph_assert(ok);
-    } else {
+    if (!is_base()) {
       if ((snaprealm->srnode.flags ^ old_flags) & sr_t::PARENT_GLOBAL) {
-       snaprealm->close_parents();
        snaprealm->adjust_parent();
       }
     }
@@ -3453,7 +3528,7 @@ int CInode::get_xlocker_mask(client_t client) const
 }
 
 int CInode::get_caps_allowed_for_client(Session *session, Capability *cap,
-                                       mempool_inode *file_i) const
+                                       const mempool_inode *file_i) const
 {
   client_t client = session->get_client();
   int allowed;
@@ -3611,10 +3686,10 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   bool valid = true;
 
   // pick a version!
-  mempool_inode *oi = &inode;
-  mempool_inode *pi = get_projected_inode();
+  const mempool_inode *oi = get_inode().get();
+  const mempool_inode *pi = get_projected_inode().get();
 
-  CInode::mempool_xattr_map *pxattrs = nullptr;
+  const mempool_xattr_map *pxattrs = nullptr;
 
   if (snapid != CEPH_NOSNAP) {
 
@@ -3622,11 +3697,11 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
     if (!is_auth())
       valid = false;
 
-    if (is_multiversion()) {
-      auto it = old_inodes.lower_bound(snapid);
-      if (it != old_inodes.end()) {
+    if (is_any_old_inodes()) {
+      auto it = old_inodes->lower_bound(snapid);
+      if (it != old_inodes->end()) {
        if (it->second.first > snapid) {
-         if  (it != old_inodes.begin())
+         if  (it != old_inodes->begin())
            --it;
        }
        if (it->second.first <= snapid && snapid <= it->first) {
@@ -3634,9 +3709,8 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
                   << " to old_inode [" << it->second.first << "," << it->first << "]"
                   << " " << it->second.inode.rstat
                   << dendl;
-          auto &p = it->second;
-         pi = oi = &p.inode;
-         pxattrs = &p.xattrs;
+         pi = oi = &it->second.inode;
+         pxattrs = &it->second.xattrs;
        } else {
          // snapshoted remote dentry can result this
          dout(0) << __func__ << " old_inode for snapid " << snapid
@@ -3651,6 +3725,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   }
 
   utime_t snap_btime;
+  std::map<std::string, std::string> snap_metadata;
   SnapRealm *realm = find_snaprealm();
   if (snapid != CEPH_NOSNAP && realm) {
     // add snapshot timestamp vxattr
@@ -3662,6 +3737,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
       ceph_assert(infomap.size() == 1);
       const SnapInfo *si = infomap.begin()->second;
       snap_btime = si->stamp;
+      snap_metadata = si->metadata;
     }
   }
 
@@ -3694,7 +3770,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   bool plocal = versionlock.get_last_wrlock_client() == client;
   bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
   
-  mempool_inode *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
+  const mempool_inode *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
   
   dout(20) << " pfile " << pfile << " pauth " << pauth
           << " plink " << plink << " pxattr " << pxattr
@@ -3703,7 +3779,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
           << " valid=" << valid << dendl;
 
   // file
-  mempool_inode *file_i = pfile ? pi:oi;
+  const mempool_inode *file_i = pfile ? pi:oi;
   file_layout_t layout;
   if (is_dir()) {
     layout = (ppolicy ? pi : oi)->layout;
@@ -3726,7 +3802,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
             (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
     inline_version = file_i->inline_data.version;
     if (file_i->inline_data.length() > 0)
-      inline_data = file_i->inline_data.get_data();
+      file_i->inline_data.get_data(inline_data);
   }
 
   // nest (do same as file... :/)
@@ -3736,13 +3812,13 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   }
 
   // auth
-  mempool_inode *auth_i = pauth ? pi:oi;
+  const mempool_inode *auth_i = pauth ? pi:oi;
 
   // link
-  mempool_inode *link_i = plink ? pi:oi;
+  const mempool_inode *link_i = plink ? pi:oi;
   
   // xattr
-  mempool_inode *xattr_i = pxattr ? pi:oi;
+  const mempool_inode *xattr_i = pxattr ? pi:oi;
 
   using ceph::encode;
   // xattr
@@ -3751,7 +3827,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
       (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
       (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
     if (!pxattrs)
-      pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
+      pxattrs = pxattr ? get_projected_xattrs().get() : get_xattrs().get();
     xattr_version = xattr_i->xattr_version;
   } else {
     xattr_version = 0;
@@ -3785,7 +3861,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
       sizeof(struct ceph_timespec) + 8; // btime + change_attr
 
     if (bytes > max_bytes)
-      return -ENOSPC;
+      return -CEPHFS_ENOSPC;
   }
 
 
@@ -3921,7 +3997,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
    * note: encoding matches MClientReply::InodeStat
    */
   if (session->info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(6, 1, bl);
     encode(oi->ino, bl);
     encode(snapid, bl);
     encode(oi->rdev, bl);
@@ -3957,13 +4033,16 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
     encode_xattrs();
     encode(inline_version, bl);
     encode(inline_data, bl);
-    mempool_inode *policy_i = ppolicy ? pi : oi;
+    const mempool_inode *policy_i = ppolicy ? pi : oi;
     encode(policy_i->quota, bl);
     encode(layout.pool_ns, bl);
     encode(any_i->btime, bl);
     encode(any_i->change_attr, bl);
     encode(file_i->export_pin, bl);
     encode(snap_btime, bl);
+    encode(file_i->rstat.rsnaps, bl);
+    encode(snap_metadata, bl);
+    encode(file_i->fscrypt, bl);
     ENCODE_FINISH(bl);
   }
   else {
@@ -4010,7 +4089,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
       encode(inline_data, bl);
     }
     if (conn->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
-      mempool_inode *policy_i = ppolicy ? pi : oi;
+      const mempool_inode *policy_i = ppolicy ? pi : oi;
       encode(policy_i->quota, bl);
     }
     if (conn->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
@@ -4036,9 +4115,9 @@ void CInode::encode_cap_message(const ref_t<MClientCaps> &m, Capability *cap)
   bool plink = linklock.is_xlocked_by_client(client);
   bool pxattr = xattrlock.is_xlocked_by_client(client);
  
-  mempool_inode *oi = &inode;
-  mempool_inode *pi = get_projected_inode();
-  mempool_inode *i = (pfile|pauth|plink|pxattr) ? pi : oi;
+  const mempool_inode *oi = get_inode().get();
+  const mempool_inode *pi = get_projected_inode().get();
+  const mempool_inode *i = (pfile|pauth|plink|pxattr) ? pi : oi;
 
   dout(20) << __func__ << " pfile " << pfile
           << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
@@ -4060,7 +4139,7 @@ void CInode::encode_cap_message(const ref_t<MClientCaps> &m, Capability *cap)
   if (cap->client_inline_version < i->inline_data.version) {
     m->inline_version = cap->client_inline_version = i->inline_data.version;
     if (i->inline_data.length() > 0)
-      m->inline_data = i->inline_data.get_data();
+      i->inline_data.get_data(m->inline_data);
   } else {
     m->inline_version = 0;
   }
@@ -4080,11 +4159,14 @@ void CInode::encode_cap_message(const ref_t<MClientCaps> &m, Capability *cap)
 
   using ceph::encode;
   i = pxattr ? pi:oi;
-  auto ix = pxattr ? get_projected_xattrs() : &xattrs;
+  const auto& ix = pxattr ? get_projected_xattrs() : get_xattrs();
   if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
       i->xattr_version > cap->client_xattr_version) {
     dout(10) << "    including xattrs v " << i->xattr_version << dendl;
-    encode(*ix, m->xattrbl);
+    if (ix)
+      encode(*ix, m->xattrbl);
+    else
+      encode((__u32)0, m->xattrbl);
     m->head.xattr_version = i->xattr_version;
     cap->client_xattr_version = i->xattr_version;
   }
@@ -4096,11 +4178,11 @@ void CInode::_encode_base(bufferlist& bl, uint64_t features)
 {
   ENCODE_START(1, 1, bl);
   encode(first, bl);
-  encode(inode, bl, features);
+  encode(*get_inode(), bl, features);
   encode(symlink, bl);
   encode(dirfragtree, bl);
-  encode(xattrs, bl);
-  encode(old_inodes, bl, features);
+  encode_xattrs(bl);
+  encode_old_inodes(bl, features);
   encode(damage_flags, bl);
   encode_snap(bl);
   ENCODE_FINISH(bl);
@@ -4109,15 +4191,19 @@ void CInode::_decode_base(bufferlist::const_iterator& p)
 {
   DECODE_START(1, p);
   decode(first, p);
-  decode(inode, p);
+  {
+    auto _inode = allocate_inode();
+    decode(*_inode, p);
+    reset_inode(std::move(_inode));
+  }
   {
     std::string tmp;
     decode(tmp, p);
     symlink = std::string_view(tmp);
   }
   decode(dirfragtree, p);
-  decode_noshare(xattrs, p);
-  decode(old_inodes, p);
+  decode_xattrs(p);
+  decode_old_inodes(p);
   decode(damage_flags, p);
   decode_snap(p);
   DECODE_FINISH(p);
@@ -4252,15 +4338,15 @@ void CInode::encode_export(bufferlist& bl)
 
   // include scatterlock info for any bounding CDirs
   bufferlist bounding;
-  if (inode.is_dir())
+  if (get_inode()->is_dir())
     for (const auto &p : dirfrags) {
       CDir *dir = p.second;
       if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
        encode(p.first, bounding);
-       encode(dir->fnode.fragstat, bounding);
-       encode(dir->fnode.accounted_fragstat, bounding);
-       encode(dir->fnode.rstat, bounding);
-       encode(dir->fnode.accounted_rstat, bounding);
+       encode(dir->get_fnode()->fragstat, bounding);
+       encode(dir->get_fnode()->accounted_fragstat, bounding);
+       encode(dir->get_fnode()->rstat, bounding);
+       encode(dir->get_fnode()->accounted_rstat, bounding);
        dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
       }
     }
@@ -4301,13 +4387,8 @@ void CInode::decode_import(bufferlist::const_iterator& p,
     decode(s, p);
     s &= MASK_STATE_EXPORTED;
 
-    if (s & STATE_RANDEPHEMERALPIN) {
-      set_ephemeral_rand(true);
-    }
-    if (s & STATE_DISTEPHEMERALPIN) {
-      set_ephemeral_dist(true);
-    }
-
+    set_ephemeral_pin((s & STATE_DISTEPHEMERALPIN),
+                     (s & STATE_RANDEPHEMERALPIN));
     state_set(STATE_AUTH | s);
   }
 
@@ -4347,6 +4428,7 @@ void CInode::decode_import(bufferlist::const_iterator& p,
     // check because the dirfrag is under migration?  That implies
     // it is frozen (and in a SYNC or LOCK state).  FIXME.
 
+    auto _fnode = CDir::allocate_fnode(*dir->get_fnode());
     if (dir->is_auth() ||
         filelock.get_state() == LOCK_MIX) {
       dout(10) << " skipped fragstat info for " << *dir << dendl;
@@ -4354,8 +4436,8 @@ void CInode::decode_import(bufferlist::const_iterator& p,
       decode(f, q);
       decode(f, q);
     } else {
-      decode(dir->fnode.fragstat, q);
-      decode(dir->fnode.accounted_fragstat, q);
+      decode(_fnode->fragstat, q);
+      decode(_fnode->accounted_fragstat, q);
       dout(10) << " took fragstat info for " << *dir << dendl;
     }
     if (dir->is_auth() ||
@@ -4365,10 +4447,11 @@ void CInode::decode_import(bufferlist::const_iterator& p,
       decode(n, q);
       decode(n, q);
     } else {
-      decode(dir->fnode.rstat, q);
-      decode(dir->fnode.accounted_rstat, q);
+      decode(_fnode->rstat, q);
+      decode(_fnode->accounted_rstat, q);
       dout(10) << " took rstat info for " << *dir << dendl;
     }
+    dir->reset_fnode(std::move(_fnode));
   }
 
   _decode_locks_full(p);
@@ -4381,16 +4464,18 @@ void CInode::decode_import(bufferlist::const_iterator& p,
 
 void InodeStoreBase::dump(Formatter *f) const
 {
-  inode.dump(f);
+  inode->dump(f);
   f->dump_string("symlink", symlink);
 
   f->open_array_section("xattrs");
-  for (const auto& [key, val] : xattrs) {
-    f->open_object_section("xattr");
-    f->dump_string("key", key);
-    std::string v(val.c_str(), val.length());
-    f->dump_string("val", v);
-    f->close_section();
+  if (xattrs) {
+    for (const auto& [key, val] : *xattrs) {
+      f->open_object_section("xattr");
+      f->dump_string("key", key);
+      std::string v(val.c_str(), val.length());
+      f->dump_string("val", v);
+      f->close_section();
+    }
   }
   f->close_section();
   f->open_object_section("dirfragtree");
@@ -4398,12 +4483,14 @@ void InodeStoreBase::dump(Formatter *f) const
   f->close_section(); // dirfragtree
   
   f->open_array_section("old_inodes");
-  for (const auto &p : old_inodes) {
-    f->open_object_section("old_inode");
-    // The key is the last snapid, the first is in the mempool_old_inode
-    f->dump_int("last", p.first);
-    p.second.dump(f);
-    f->close_section();  // old_inode
+  if (old_inodes) {
+    for (const auto &p : *old_inodes) {
+      f->open_object_section("old_inode");
+      // The key is the last snapid, the first is in the mempool_old_inode
+      f->dump_int("last", p.first);
+      p.second.dump(f);
+      f->close_section();  // old_inode
+    }
   }
   f->close_section();  // old_inodes
 
@@ -4411,11 +4498,62 @@ void InodeStoreBase::dump(Formatter *f) const
   f->dump_unsigned("damage_flags", damage_flags);
 }
 
+template <>
+void decode_json_obj(mempool::mds_co::string& t, JSONObj *obj){
+
+  t = mempool::mds_co::string(std::string_view(obj->get_data()));
+}
+
+void InodeStoreBase::decode_json(JSONObj *obj)
+{
+  {
+    auto _inode = allocate_inode();
+    _inode->decode_json(obj);
+    reset_inode(std::move(_inode));
+  }
+
+  JSONDecoder::decode_json("symlink", symlink, obj, true);
+  // JSONDecoder::decode_json("dirfragtree", dirfragtree, obj, true); // cann't decode it now
+  //
+  //
+  {
+    mempool_xattr_map tmp;
+    JSONDecoder::decode_json("xattrs", tmp, xattrs_cb, obj, true);
+    if (tmp.empty())
+      reset_xattrs(xattr_map_ptr());
+    else
+      reset_xattrs(allocate_xattr_map(std::move(tmp)));
+  }
+  // JSONDecoder::decode_json("old_inodes", old_inodes, InodeStoreBase::old_indoes_cb, obj, true); // cann't decode old_inodes now
+  JSONDecoder::decode_json("oldest_snap", oldest_snap.val, obj, true);
+  JSONDecoder::decode_json("damage_flags", damage_flags, obj, true);
+  //sr_t srnode;
+  //JSONDecoder::decode_json("snap_blob", srnode, obj, true);   // cann't decode it now
+  //snap_blob = srnode;
+}
+
+void InodeStoreBase::xattrs_cb(InodeStoreBase::mempool_xattr_map& c, JSONObj *obj){
+
+  string k;
+  JSONDecoder::decode_json("key", k, obj, true);
+  string v;
+  JSONDecoder::decode_json("val", v, obj, true);
+  c[k.c_str()] = buffer::copy(v.c_str(), v.size());
+}
+
+void InodeStoreBase::old_indoes_cb(InodeStoreBase::mempool_old_inode_map& c, JSONObj *obj){
+
+  snapid_t s;
+  JSONDecoder::decode_json("last", s.val, obj, true);
+  InodeStoreBase::mempool_old_inode i;
+  // i.decode_json(obj); // cann't decode now, simon
+  c[s] = i;
+}
 
 void InodeStore::generate_test_instances(std::list<InodeStore*> &ls)
 {
   InodeStore *populated = new InodeStore;
-  populated->inode.ino = 0xdeadbeef;
+  populated->get_inode()->ino = 0xdeadbeef;
   populated->symlink = "rhubarb";
   ls.push_back(populated);
 }
@@ -4423,7 +4561,7 @@ void InodeStore::generate_test_instances(std::list<InodeStore*> &ls)
 void InodeStoreBare::generate_test_instances(std::list<InodeStoreBare*> &ls)
 {
   InodeStoreBare *populated = new InodeStoreBare;
-  populated->inode.ino = 0xdeadbeef;
+  populated->get_inode()->ino = 0xdeadbeef;
   populated->symlink = "rhubarb";
   ls.push_back(populated);
 }
@@ -4459,7 +4597,6 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
       set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
       set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
-      set_callback(SNAPREALM, static_cast<Continuation::stagePtr>(&ValidationContinuation::_snaprealm));
     }
 
     ~ValidationContinuation() override {
@@ -4483,6 +4620,11 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       fetch.getxattr("parent", bt, bt_r);
       in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
                                       NULL, 0, fin);
+      if (in->mdcache->mds->logger) {
+        in->mdcache->mds->logger->inc(l_mds_openino_backtrace_fetch);
+        in->mdcache->mds->logger->inc(l_mds_scrub_backtrace_fetch);
+      }
+
       using ceph::encode;
       if (!is_internal) {
         ObjectOperation scrub_tag;
@@ -4493,24 +4635,21 @@ void CInode::validate_disk_state(CInode::validated_data *results,
         in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
                                           ceph::real_clock::now(),
                                           0, NULL);
+        if (in->mdcache->mds->logger)
+          in->mdcache->mds->logger->inc(l_mds_scrub_set_tag);
       }
     }
 
     bool _start(int rval) {
+      ceph_assert(in->can_auth_pin());
+      in->auth_pin(this);
+
       if (in->is_dirty()) {
-       MDCache *mdcache = in->mdcache;
-       mempool_inode& inode = in->inode;
+       MDCache *mdcache = in->mdcache;  // For the benefit of dout
+       auto ino = [this]() { return in->ino(); }; // For the benefit of dout
        dout(20) << "validating a dirty CInode; results will be inconclusive"
-                << dendl;
+         << dendl;
       }
-      if (in->is_symlink()) {
-       // there's nothing to do for symlinks!
-       return true;
-      }
-
-      // prefetch snaprealm's past parents
-      if (in->snaprealm && !in->snaprealm->have_past_parents_open())
-       in->snaprealm->open_parents(nullptr);
 
       C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
                                            in->mdcache->mds->finisher);
@@ -4536,7 +4675,7 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       int memory_newer;
 
       MDCache *mdcache = in->mdcache;  // For the benefit of dout
-      const mempool_inode& inode = in->inode;  // For the benefit of dout
+      auto ino = [this]() { return in->ino(); }; // For the benefit of dout
 
       // Ignore rval because it's the result of a FAILOK operation
       // from fetch_backtrace_and_tag: the real result is in
@@ -4546,8 +4685,10 @@ void CInode::validate_disk_state(CInode::validated_data *results,
         results->backtrace.error_str << "failed to read off disk; see retval";
         // we probably have a new unwritten file!
         // so skip the backtrace scrub for this entry and say that all's well
-        if (in->is_dirty_parent())
+        if (in->is_dirty_parent()) {
+          dout(20) << "forcing backtrace as passed since inode is dirty parent" << dendl;
           results->backtrace.passed = true;
+        }
         goto next;
       }
 
@@ -4568,8 +4709,11 @@ void CInode::validate_disk_state(CInode::validated_data *results,
                                      << bl.length() << " bytes)!";
         // we probably have a new unwritten file!
         // so skip the backtrace scrub for this entry and say that all's well
-        if (in->is_dirty_parent())
+        if (in->is_dirty_parent()) {
+          dout(20) << "decode failed; forcing backtrace as passed since "
+                      "inode is dirty parent" << dendl;
           results->backtrace.passed = true;
+        }
 
        goto next;
       }
@@ -4580,10 +4724,16 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       if (divergent || memory_newer < 0) {
         // we're divergent, or on-disk version is newer
         results->backtrace.error_str << "On-disk backtrace is divergent or newer";
-        // we probably have a new unwritten file!
-        // so skip the backtrace scrub for this entry and say that all's well
-        if (divergent && in->is_dirty_parent())
+        /* if the backtraces are divergent and the link count is 0, then
+         * most likely its a stray entry that's being purged and things are
+         * well and there's no reason for alarm
+         */
+        if (divergent && (in->is_dirty_parent() || in->get_inode()->nlink == 0)) {
           results->backtrace.passed = true;
+          dout(20) << "divergent backtraces are acceptable when dn "
+                      "is being purged or has been renamed or moved to a "
+                      "different directory " << *in << dendl;
+        }
       } else {
         results->backtrace.passed = true;
       }
@@ -4598,6 +4748,8 @@ next:
                            false);
         // Flag that we repaired this BT so that it won't go into damagetable
         results->backtrace.repaired = true;
+        if (in->mdcache->mds->logger)
+          in->mdcache->mds->logger->inc(l_mds_scrub_backtrace_repaired);
       }
 
       // If the inode's number was free in the InoTable, fix that
@@ -4605,20 +4757,22 @@ next:
       {
         InoTable *inotable = mdcache->mds->inotable;
 
-        dout(10) << "scrub: inotable ino = " << inode.ino << dendl;
+        dout(10) << "scrub: inotable ino = " << in->ino() << dendl;
         dout(10) << "scrub: inotable free says "
-          << inotable->is_marked_free(inode.ino) << dendl;
+          << inotable->is_marked_free(in->ino()) << dendl;
 
-        if (inotable->is_marked_free(inode.ino)) {
+        if (inotable->is_marked_free(in->ino())) {
           LogChannelRef clog = in->mdcache->mds->clog;
-          clog->error() << "scrub: inode wrongly marked free: " << inode.ino;
+          clog->error() << "scrub: inode wrongly marked free: " << in->ino();
 
           if (in->scrub_infop->header->get_repair()) {
-            bool repaired = inotable->repair(inode.ino);
+            bool repaired = inotable->repair(in->ino());
             if (repaired) {
-              clog->error() << "inode table repaired for inode: " << inode.ino;
+              clog->error() << "inode table repaired for inode: " << in->ino();
 
               inotable->save();
+              if (in->mdcache->mds->logger)
+                in->mdcache->mds->logger->inc(l_mds_scrub_inotable_repaired);
             } else {
               clog->error() << "Cannot repair inotable while other operations"
                 " are in progress";
@@ -4629,10 +4783,14 @@ next:
 
 
       if (in->is_dir()) {
+        if (in->mdcache->mds->logger)
+          in->mdcache->mds->logger->inc(l_mds_scrub_dir_inodes);
        return validate_directory_data();
       } else {
+        if (in->mdcache->mds->logger)
+          in->mdcache->mds->logger->inc(l_mds_scrub_file_inodes);
        // TODO: validate on-disk inode for normal files
-       return check_inode_snaprealm();
+       return true;
       }
     }
 
@@ -4642,33 +4800,38 @@ next:
       if (in->is_base()) {
        if (!shadow_in) {
          shadow_in = new CInode(in->mdcache);
-         in->mdcache->create_unlinked_system_inode(shadow_in, in->inode.ino, in->inode.mode);
+         in->mdcache->create_unlinked_system_inode(shadow_in, in->ino(), in->get_inode()->mode);
          in->mdcache->num_shadow_inodes++;
        }
         shadow_in->fetch(get_internal_callback(INODE));
+        if (in->mdcache->mds->logger)
+          in->mdcache->mds->logger->inc(l_mds_scrub_dir_base_inodes);
         return false;
       } else {
        // TODO: validate on-disk inode for non-base directories
+        if (in->mdcache->mds->logger)
+          in->mdcache->mds->logger->inc(l_mds_scrub_dirfrag_rstats);
        results->inode.passed = true;
        return check_dirfrag_rstats();
       }
     }
 
     bool _inode_disk(int rval) {
+      const auto& si = shadow_in->get_inode();
+      const auto& i = in->get_inode();
+
       results->inode.checked = true;
       results->inode.ondisk_read_retval = rval;
-      results->inode.ondisk_value = shadow_in->inode;
-      results->inode.memory_value = in->inode;
+      results->inode.ondisk_value = *si;
+      results->inode.memory_value = *i;
 
-      mempool_inode& si = shadow_in->inode;
-      mempool_inode& i = in->inode;
-      if (si.version > i.version) {
+      if (si->version > i->version) {
         // uh, what?
         results->inode.error_str << "On-disk inode is newer than in-memory one; ";
        goto next;
       } else {
         bool divergent = false;
-        int r = i.compare(si, &divergent);
+        int r = i->compare(*si, &divergent);
         results->inode.passed = !divergent && r >= 0;
         if (!results->inode.passed) {
           results->inode.error_str <<
@@ -4681,38 +4844,21 @@ next:
     }
 
     bool check_dirfrag_rstats() {
-      MDSGatherBuilder gather(g_ceph_context);
-      frag_vec_t leaves;
-      in->dirfragtree.get_leaves(leaves);
-      for (const auto& leaf : leaves) {
-        CDir *dir = in->get_or_open_dirfrag(in->mdcache, leaf);
-       dir->scrub_info();
-       if (!dir->scrub_infop->header)
-         dir->scrub_infop->header = in->scrub_infop->header;
-        if (dir->is_complete()) {
-         dir->scrub_local();
-       } else {
-         dir->scrub_infop->need_scrub_local = true;
-         dir->fetch(gather.new_sub(), false);
-       }
-      }
-      if (gather.has_subs()) {
-        gather.set_finisher(get_internal_callback(DIRFRAGS));
-        gather.activate();
-        return false;
+      if (in->has_subtree_root_dirfrag()) {
+       in->mdcache->rdlock_dirfrags_stats(in, get_internal_callback(DIRFRAGS));
+       return false;
       } else {
-        return immediate(DIRFRAGS, 0);
+       return immediate(DIRFRAGS, 0);
       }
     }
 
     bool _dirfrags(int rval) {
-      int frags_errors = 0;
       // basic reporting setup
       results->raw_stats.checked = true;
       results->raw_stats.ondisk_read_retval = rval;
 
-      results->raw_stats.memory_value.dirstat = in->inode.dirstat;
-      results->raw_stats.memory_value.rstat = in->inode.rstat;
+      results->raw_stats.memory_value.dirstat = in->get_inode()->dirstat;
+      results->raw_stats.memory_value.rstat = in->get_inode()->rstat;
       frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
       nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
 
@@ -4725,28 +4871,16 @@ next:
       for (const auto &p : in->dirfrags) {
        CDir *dir = p.second;
        ceph_assert(dir->get_version() > 0);
-       nest_info.add(dir->fnode.accounted_rstat);
-       dir_info.add(dir->fnode.accounted_fragstat);
-       if (dir->scrub_infop->pending_scrub_error) {
-         dir->scrub_infop->pending_scrub_error = false;
-         if (dir->scrub_infop->header->get_repair()) {
-            results->raw_stats.repaired = true;
-           results->raw_stats.error_str
-             << "dirfrag(" << p.first << ") has bad stats (will be fixed); ";
-         } else {
-           results->raw_stats.error_str
-             << "dirfrag(" << p.first << ") has bad stats; ";
-         }
-         frags_errors++;
-       }
+       nest_info.add(dir->get_fnode()->accounted_rstat);
+       dir_info.add(dir->get_fnode()->accounted_fragstat);
       }
       nest_info.rsubdirs++; // it gets one to account for self
       if (const sr_t *srnode = in->get_projected_srnode(); srnode)
        nest_info.rsnaps += srnode->snaps.size();
 
       // ...and that their sum matches our inode settings
-      if (!dir_info.same_sums(in->inode.dirstat) ||
-         !nest_info.same_sums(in->inode.rstat)) {
+      if (!dir_info.same_sums(in->get_inode()->dirstat) ||
+         !nest_info.same_sums(in->get_inode()->rstat)) {
        if (in->scrub_infop->header->get_repair()) {
          results->raw_stats.error_str
            << "freshly-calculated rstats don't match existing ones (will be fixed)";
@@ -4756,44 +4890,25 @@ next:
          results->raw_stats.error_str
            << "freshly-calculated rstats don't match existing ones";
        }
+        if (in->is_dirty()) {
+          MDCache *mdcache = in->mdcache; // for dout()
+          auto ino = [this]() { return in->ino(); }; // for dout()
+          dout(20) << "raw stats most likely wont match since inode is dirty; "
+                      "please rerun scrub when system is stable; "
+                      "assuming passed for now;" << dendl;
+          results->raw_stats.passed = true;
+        }
        goto next;
       }
-      if (frags_errors > 0)
-       goto next;
 
       results->raw_stats.passed = true;
-next:
-      // snaprealm
-      return check_inode_snaprealm();
-    }
-
-    bool check_inode_snaprealm() {
-      if (!in->snaprealm)
-       return true;
-
-      if (!in->snaprealm->have_past_parents_open()) {
-       in->snaprealm->open_parents(get_internal_callback(SNAPREALM));
-       return false;
-      } else {
-       return immediate(SNAPREALM, 0);
+      {
+        MDCache *mdcache = in->mdcache; // for dout()
+        auto ino = [this]() { return in->ino(); }; // for dout()
+        dout(20) << "raw stats check passed on " << *in << dendl;
       }
-    }
 
-    bool _snaprealm(int rval) {
-
-      if (in->snaprealm->past_parents_dirty ||
-         !in->get_projected_srnode()->past_parents.empty()) {
-       // temporarily store error in field of on-disk inode validation temporarily
-       results->inode.checked = true;
-       results->inode.passed = false;
-       if (in->scrub_infop->header->get_repair()) {
-         results->inode.error_str << "Inode has old format snaprealm (will upgrade)";
-         results->inode.repaired = true;
-         in->mdcache->upgrade_inode_snaprealm(in);
-       } else {
-         results->inode.error_str << "Inode has old format snaprealm";
-       }
-      }
+next:
       return true;
     }
 
@@ -4811,6 +4926,8 @@ next:
        in->scrub_infop->header->set_repaired();
       if (fin)
        fin->complete(get_rval());
+
+      in->auth_unpin(this);
     }
   };
 
@@ -4845,7 +4962,7 @@ void CInode::validated_data::dump(Formatter *f) const
       f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
       f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
       f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
-      f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
+      f->dump_stream("memory_value.dirstat") << raw_stats.memory_value.dirstat;
       f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
       f->dump_string("error_str", raw_stats.error_str.str());
     }
@@ -5018,13 +5135,13 @@ void CInode::scrub_info_create() const
 
   // break out of const-land to set up implicit initial state
   CInode *me = const_cast<CInode*>(this);
-  mempool_inode *in = me->get_projected_inode();
+  const auto& pi = me->get_projected_inode();
 
-  scrub_info_t *si = new scrub_info_t();
-  si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
-  si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
+  std::unique_ptr<scrub_info_t> si(new scrub_info_t());
+  si->last_scrub_stamp = pi->last_scrub_stamp;
+  si->last_scrub_version = pi->last_scrub_version;
 
-  me->scrub_infop = si;
+  me->scrub_infop.swap(si);
 }
 
 void CInode::scrub_maybe_delete_info()
@@ -5032,195 +5149,83 @@ void CInode::scrub_maybe_delete_info()
   if (scrub_infop &&
       !scrub_infop->scrub_in_progress &&
       !scrub_infop->last_scrub_dirty) {
-    delete scrub_infop;
-    scrub_infop = NULL;
+    scrub_infop.reset();
   }
 }
 
-void CInode::scrub_initialize(CDentry *scrub_parent,
-                             ScrubHeaderRef& header,
-                             MDSContext *f)
+void CInode::scrub_initialize(ScrubHeaderRef& header)
 {
   dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
-  if (scrub_is_in_progress()) {
-    dout(20) << __func__ << " inode moved during scrub, reinitializing "
-            << dendl;
-    ceph_assert(scrub_infop->scrub_parent);
-    CDentry *dn = scrub_infop->scrub_parent;
-    CDir *dir = dn->dir;
-    dn->put(CDentry::PIN_SCRUBPARENT);
-    ceph_assert(dir->scrub_infop && dir->scrub_infop->directory_scrubbing);
-    dir->scrub_infop->directories_scrubbing.erase(dn->key());
-    dir->scrub_infop->others_scrubbing.erase(dn->key());
-  }
-  scrub_info();
-  if (!scrub_infop)
-    scrub_infop = new scrub_info_t();
-
-  if (get_projected_inode()->is_dir()) {
-    // fill in dirfrag_stamps with initial state
-    frag_vec_t leaves;
-    dirfragtree.get_leaves(leaves);
-    for (const auto& leaf : leaves) {
-      if (header->get_force())
-       scrub_infop->dirfrag_stamps[leaf].reset();
-      else
-       scrub_infop->dirfrag_stamps[leaf];
-    }
-  }
 
-  if (scrub_parent)
-    scrub_parent->get(CDentry::PIN_SCRUBPARENT);
-  scrub_infop->scrub_parent = scrub_parent;
-  scrub_infop->on_finish = f;
+  scrub_info();
   scrub_infop->scrub_in_progress = true;
-  scrub_infop->children_scrubbed = false;
+  scrub_infop->queued_frags.clear();
   scrub_infop->header = header;
-
-  scrub_infop->scrub_start_version = get_version();
-  scrub_infop->scrub_start_stamp = ceph_clock_now();
+  header->inc_num_pending();
   // right now we don't handle remote inodes
 }
 
-int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
-{
-  dout(20) << __func__ << dendl;
-  ceph_assert(scrub_is_in_progress());
-
-  if (!is_dir()) {
-    return -ENOTDIR;
-  }
-
-  std::map<frag_t, scrub_stamp_info_t>::iterator i =
-      scrub_infop->dirfrag_stamps.begin();
-
-  while (i != scrub_infop->dirfrag_stamps.end()) {
-    if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
-      i->second.scrub_start_version = get_projected_version();
-      i->second.scrub_start_stamp = ceph_clock_now();
-      *out_dirfrag = i->first;
-      dout(20) << " return frag " << *out_dirfrag << dendl;
-      return 0;
-    }
-    ++i;
-  }
-
-  dout(20) << " no frags left, ENOENT " << dendl;
-  return ENOENT;
-}
-
-void CInode::scrub_dirfrags_scrubbing(frag_vec_t* out_dirfrags)
-{
-  ceph_assert(out_dirfrags != NULL);
-  ceph_assert(scrub_infop != NULL);
-
-  out_dirfrags->clear();
-  std::map<frag_t, scrub_stamp_info_t>::iterator i =
-      scrub_infop->dirfrag_stamps.begin();
-
-  while (i != scrub_infop->dirfrag_stamps.end()) {
-    if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
-      if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
-        out_dirfrags->push_back(i->first);
-    } else {
-      return;
-    }
-
-    ++i;
-  }
-}
-
-void CInode::scrub_dirfrag_finished(frag_t dirfrag)
-{
-  dout(20) << __func__ << " on frag " << dirfrag << dendl;
-  ceph_assert(scrub_is_in_progress());
-
-  std::map<frag_t, scrub_stamp_info_t>::iterator i =
-      scrub_infop->dirfrag_stamps.find(dirfrag);
-  ceph_assert(i != scrub_infop->dirfrag_stamps.end());
-
-  scrub_stamp_info_t &si = i->second;
-  si.last_scrub_stamp = si.scrub_start_stamp;
-  si.last_scrub_version = si.scrub_start_version;
-}
-
-void CInode::scrub_aborted(MDSContext **c) {
+void CInode::scrub_aborted() {
   dout(20) << __func__ << dendl;
   ceph_assert(scrub_is_in_progress());
 
-  *c = nullptr;
-  std::swap(*c, scrub_infop->on_finish);
-
-  if (scrub_infop->scrub_parent) {
-    CDentry *dn = scrub_infop->scrub_parent;
-    scrub_infop->scrub_parent = NULL;
-    dn->dir->scrub_dentry_finished(dn);
-    dn->put(CDentry::PIN_SCRUBPARENT);
-  }
-
-  delete scrub_infop;
-  scrub_infop = nullptr;
+  scrub_infop->scrub_in_progress = false;
+  scrub_infop->header->dec_num_pending();
+  scrub_maybe_delete_info();
 }
 
-void CInode::scrub_finished(MDSContext **c) {
+void CInode::scrub_finished() {
   dout(20) << __func__ << dendl;
   ceph_assert(scrub_is_in_progress());
-  for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
-      scrub_infop->dirfrag_stamps.begin();
-      i != scrub_infop->dirfrag_stamps.end();
-      ++i) {
-    if(i->second.last_scrub_version != i->second.scrub_start_version) {
-      derr << i->second.last_scrub_version << " != "
-        << i->second.scrub_start_version << dendl;
-    }
-    ceph_assert(i->second.last_scrub_version == i->second.scrub_start_version);
-  }
 
-  scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
-  scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
+  scrub_infop->last_scrub_version = get_version();
+  scrub_infop->last_scrub_stamp = ceph_clock_now();
   scrub_infop->last_scrub_dirty = true;
   scrub_infop->scrub_in_progress = false;
-
-  if (scrub_infop->scrub_parent) {
-    CDentry *dn = scrub_infop->scrub_parent;
-    scrub_infop->scrub_parent = NULL;
-    dn->dir->scrub_dentry_finished(dn);
-    dn->put(CDentry::PIN_SCRUBPARENT);
-  }
-
-  *c = scrub_infop->on_finish;
-  scrub_infop->on_finish = NULL;
-
-  if (scrub_infop->header->get_origin() == this) {
-    // We are at the point that a tagging scrub was initiated
-    LogChannelRef clog = mdcache->mds->clog;
-    clog->info() << "scrub complete with tag '"
-                 << scrub_infop->header->get_tag() << "'";
-  }
+  scrub_infop->header->dec_num_pending();
 }
 
 int64_t CInode::get_backtrace_pool() const
 {
   if (is_dir()) {
-    return mdcache->mds->mdsmap->get_metadata_pool();
+    return mdcache->mds->get_metadata_pool();
   } else {
     // Files are required to have an explicit layout that specifies
     // a pool
-    ceph_assert(inode.layout.pool_id != -1);
-    return inode.layout.pool_id;
+    ceph_assert(get_inode()->layout.pool_id != -1);
+    return get_inode()->layout.pool_id;
   }
 }
 
-void CInode::queue_export_pin(mds_rank_t target)
+void CInode::queue_export_pin(mds_rank_t export_pin)
 {
   if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
     return;
 
+  mds_rank_t target;
+  if (export_pin >= 0)
+    target = export_pin;
+  else if (export_pin == MDS_RANK_EPHEMERAL_RAND)
+    target = mdcache->hash_into_rank_bucket(ino());
+  else
+    target = MDS_RANK_NONE;
+
+  unsigned min_frag_bits = mdcache->get_ephemeral_dist_frag_bits();
   bool queue = false;
   for (auto& p : dirfrags) {
     CDir *dir = p.second;
     if (!dir->is_auth())
       continue;
+
+    if (export_pin == MDS_RANK_EPHEMERAL_DIST) {
+      if (dir->get_frag().bits() < min_frag_bits) {
+       // needs split
+       queue = true;
+       break;
+      }
+      target = mdcache->hash_into_rank_bucket(ino(), dir->get_frag());
+    }
+
     if (target != MDS_RANK_NONE) {
       if (dir->is_subtree_root()) {
        // set auxsubtree bit or export it
@@ -5235,11 +5240,13 @@ void CInode::queue_export_pin(mds_rank_t target)
       // clear aux subtrees ?
       queue = dir->state_test(CDir::STATE_AUXSUBTREE);
     }
-    if (queue) {
-      state_set(CInode::STATE_QUEUEDEXPORTPIN);
-      mdcache->export_pin_queue.insert(this);
+
+    if (queue)
       break;
-    }
+  }
+  if (queue) {
+    state_set(CInode::STATE_QUEUEDEXPORTPIN);
+    mdcache->export_pin_queue.insert(this);
   }
 }
 
@@ -5252,145 +5259,71 @@ void CInode::maybe_export_pin(bool update)
 
   dout(15) << __func__ << " update=" << update << " " << *this << dendl;
 
-  mds_rank_t export_pin = get_export_pin(false, false);
-  if (export_pin == MDS_RANK_NONE && !update) {
+  mds_rank_t export_pin = get_export_pin(false);
+  if (export_pin == MDS_RANK_NONE && !update)
     return;
-  }
 
-  /* disable ephemeral pins */
-  set_ephemeral_dist(false);
-  set_ephemeral_rand(false);
+  check_pin_policy(export_pin);
   queue_export_pin(export_pin);
 }
 
-void CInode::set_ephemeral_dist(bool yes)
-{
-  if (yes) {
-    if (!state_test(CInode::STATE_DISTEPHEMERALPIN)) {
-      state_set(CInode::STATE_DISTEPHEMERALPIN);
-      auto p = mdcache->dist_ephemeral_pins.insert(this);
-      ceph_assert(p.second);
-    }
-  } else {
-    /* avoid std::set::erase if unnecessary */
-    if (state_test(CInode::STATE_DISTEPHEMERALPIN)) {
-      dout(10) << "clearing ephemeral distributed pin on " << *this << dendl;
-      state_clear(CInode::STATE_DISTEPHEMERALPIN);
-      auto count = mdcache->dist_ephemeral_pins.erase(this);
-      ceph_assert(count == 1);
-      queue_export_pin(MDS_RANK_NONE);
-    }
-  }
-}
-
-void CInode::maybe_ephemeral_dist(bool update)
-{
-  if (!mdcache->get_export_ephemeral_distributed_config()) {
-    dout(15) << __func__ << " config false: cannot ephemeral distributed pin " << *this << dendl;
-    set_ephemeral_dist(false);
-    return;
-  } else if (!is_dir() || !is_normal()) {
-    dout(15) << __func__ << " !dir or !normal: cannot ephemeral distributed pin " << *this << dendl;
-    set_ephemeral_dist(false);
-    return;
-  } else if (get_inode().nlink == 0) {
-    dout(15) << __func__ << " unlinked directory: cannot ephemeral distributed pin " << *this << dendl;
-    set_ephemeral_dist(false);
-    return;
-  } else if (!update && state_test(CInode::STATE_DISTEPHEMERALPIN)) {
-    dout(15) << __func__ << " requeueing already pinned " << *this << dendl;
-    queue_export_pin(mdcache->hash_into_rank_bucket(ino()));
-    return;
-  }
-
-  dout(15) << __func__ << " update=" << update << " " << *this << dendl;
-
-  auto dir = get_parent_dir();
-  if (!dir) {
-    return;
-  }
-
-  bool pin = dir->get_inode()->get_inode().export_ephemeral_distributed_pin;
-  if (pin) {
-    dout(10) << __func__ << "  ephemeral distributed pinning " << *this << dendl;
-    set_ephemeral_dist(true);
-    queue_export_pin(mdcache->hash_into_rank_bucket(ino()));
-  } else if (update) {
-    set_ephemeral_dist(false);
-    queue_export_pin(MDS_RANK_NONE);
-  }
-}
-
-void CInode::maybe_ephemeral_dist_children(bool update)
+void CInode::set_ephemeral_pin(bool dist, bool rand)
 {
-  if (!mdcache->get_export_ephemeral_distributed_config()) {
-    dout(15) << __func__ << " config false: cannot ephemeral distributed pin " << *this << dendl;
-    return;
-  } else if (!is_dir() || !is_normal()) {
-    dout(15) << __func__ << " !dir or !normal: cannot ephemeral distributed pin " << *this << dendl;
+  unsigned state = 0;
+  if (dist)
+    state |= STATE_DISTEPHEMERALPIN;
+  if (rand)
+    state |= STATE_RANDEPHEMERALPIN;
+  if (!state)
     return;
-  } else if (get_inode().nlink == 0) {
-    dout(15) << __func__ << " unlinked directory: cannot ephemeral distributed pin " << *this << dendl;
-    return;
-  }
-
-  bool pin = get_inode().export_ephemeral_distributed_pin;
-  /* FIXME: expensive to iterate children when not updating */
-  if (!pin && !update) {
-    return;
-  }
 
-  dout(10) << __func__ << " maybe ephemerally pinning children of " << *this << dendl;
-  for (auto& p : dirfrags) {
-    auto& dir = p.second;
-    for (auto& q : *dir) {
-      auto& dn = q.second;
-      auto&& in = dn->get_linkage()->get_inode();
-      if (in && in->is_dir()) {
-        in->maybe_ephemeral_dist(update);
-      }
+  if (state_test(state) != state) {
+    dout(10) << "set ephemeral (" << (dist ? "dist" : "")
+            << (rand ? " rand" : "") << ") pin on " << *this << dendl;
+    if (!is_ephemerally_pinned()) {
+      auto p = mdcache->export_ephemeral_pins.insert(this);
+      ceph_assert(p.second);
     }
+    state_set(state);
   }
 }
 
-void CInode::set_ephemeral_rand(bool yes)
+void CInode::clear_ephemeral_pin(bool dist, bool rand)
 {
-  if (yes) {
-    if (!state_test(CInode::STATE_RANDEPHEMERALPIN)) {
-      state_set(CInode::STATE_RANDEPHEMERALPIN);
-      auto p = mdcache->rand_ephemeral_pins.insert(this);
-      ceph_assert(p.second);
-    }
-  } else {
-    if (state_test(CInode::STATE_RANDEPHEMERALPIN)) {
-      dout(10) << "clearing ephemeral random pin on " << *this << dendl;
-      state_clear(CInode::STATE_RANDEPHEMERALPIN);
-      auto count = mdcache->rand_ephemeral_pins.erase(this);
+  unsigned state = 0;
+  if (dist)
+    state |= STATE_DISTEPHEMERALPIN;
+  if (rand)
+    state |= STATE_RANDEPHEMERALPIN;
+
+  if (state_test(state)) {
+    dout(10) << "clear ephemeral (" << (dist ? "dist" : "")
+            << (rand ? " rand" : "") << ") pin on " << *this << dendl;
+    state_clear(state);
+    if (!is_ephemerally_pinned()) {
+      auto count = mdcache->export_ephemeral_pins.erase(this);
       ceph_assert(count == 1);
-      queue_export_pin(MDS_RANK_NONE);
     }
   }
 }
 
-void CInode::maybe_ephemeral_rand(bool fresh, double threshold)
+void CInode::maybe_ephemeral_rand(double threshold)
 {
   if (!mdcache->get_export_ephemeral_random_config()) {
     dout(15) << __func__ << " config false: cannot ephemeral random pin " << *this << dendl;
-    set_ephemeral_rand(false);
+    clear_ephemeral_pin(false, true);
     return;
   } else if (!is_dir() || !is_normal()) {
     dout(15) << __func__ << " !dir or !normal: cannot ephemeral random pin " << *this << dendl;
-    set_ephemeral_rand(false);
+    clear_ephemeral_pin(false, true);
     return;
-  } else if (get_inode().nlink == 0) {
+  } else if (get_inode()->nlink == 0) {
     dout(15) << __func__ << " unlinked directory: cannot ephemeral random pin " << *this << dendl;
-    set_ephemeral_rand(false);
+    clear_ephemeral_pin(false, true);
     return;
   } else if (state_test(CInode::STATE_RANDEPHEMERALPIN)) {
     dout(10) << __func__ << " already ephemeral random pinned: requeueing " << *this << dendl;
-    queue_export_pin(mdcache->hash_into_rank_bucket(ino()));
-    return;
-  } else if (!fresh) {
+    queue_export_pin(MDS_RANK_EPHEMERAL_RAND);
     return;
   }
 
@@ -5408,97 +5341,99 @@ void CInode::maybe_ephemeral_rand(bool fresh, double threshold)
 
   if (n <= threshold) {
     dout(10) << __func__ << " randomly export pinning " << *this << dendl;
-    set_ephemeral_rand(true);
-    queue_export_pin(mdcache->hash_into_rank_bucket(ino()));
+    set_ephemeral_pin(false, true);
+    queue_export_pin(MDS_RANK_EPHEMERAL_RAND);
   }
 }
 
 void CInode::setxattr_ephemeral_rand(double probability)
 {
   ceph_assert(is_dir());
-  ceph_assert(is_projected());
-  get_projected_inode()->export_ephemeral_random_pin = probability;
+  _get_projected_inode()->export_ephemeral_random_pin = probability;
 }
 
 void CInode::setxattr_ephemeral_dist(bool val)
 {
   ceph_assert(is_dir());
-  ceph_assert(is_projected());
-  get_projected_inode()->export_ephemeral_distributed_pin = val;
+  _get_projected_inode()->export_ephemeral_distributed_pin = val;
 }
 
 void CInode::set_export_pin(mds_rank_t rank)
 {
   ceph_assert(is_dir());
-  ceph_assert(is_projected());
-  get_projected_inode()->export_pin = rank;
+  _get_projected_inode()->export_pin = rank;
+  maybe_export_pin(true);
 }
 
-void CInode::check_pin_policy()
+mds_rank_t CInode::get_export_pin(bool inherit) const
 {
-  const CInode *in = this;
-  mds_rank_t etarget = MDS_RANK_NONE;
-  while (true) {
-    if (in->is_system())
-      break;
-    const CDentry *pdn = in->get_parent_dn();
-    if (!pdn)
-      break;
-    if (in->get_inode().nlink == 0) {
-      // ignore export pin for unlinked directory
-      return;
-    } else if (etarget != MDS_RANK_NONE && in->has_ephemeral_policy()) {
-      return;
-    } else if (in->get_inode().export_pin >= 0) {
-      /* clear any epin policy */
-      set_ephemeral_dist(false);
-      set_ephemeral_rand(false);
-      return;
-    } else if (etarget == MDS_RANK_NONE && in->is_ephemerally_pinned()) {
-      /* If a parent overrides a grandparent ephemeral pin policy with an export pin, we use that export pin instead. */
-      etarget = mdcache->hash_into_rank_bucket(in->ino());
-    }
-    in = pdn->get_dir()->inode;
-  }
-}
+  if (!g_conf()->mds_bal_export_pin)
+    return MDS_RANK_NONE;
 
-mds_rank_t CInode::get_export_pin(bool inherit, bool ephemeral) const
-{
   /* An inode that is export pinned may not necessarily be a subtree root, we
    * need to traverse the parents. A base or system inode cannot be pinned.
    * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
    * have a parent yet.
    */
+  mds_rank_t r_target = MDS_RANK_NONE;
   const CInode *in = this;
-  mds_rank_t etarget = MDS_RANK_NONE;
+  const CDir *dir = nullptr;
   while (true) {
     if (in->is_system())
       break;
     const CDentry *pdn = in->get_parent_dn();
     if (!pdn)
       break;
-    if (in->get_inode().nlink == 0) {
+    if (in->get_inode()->nlink == 0) {
       // ignore export pin for unlinked directory
-      return MDS_RANK_NONE;
-    } else if (etarget != MDS_RANK_NONE && in->has_ephemeral_policy()) {
-      return etarget;
-    } else if (in->get_inode().export_pin >= 0) {
-      return in->get_inode().export_pin;
-    } else if (etarget == MDS_RANK_NONE && ephemeral && in->is_ephemerally_pinned()) {
+      break;
+    }
+
+    if (in->get_inode()->export_pin >= 0) {
+      return in->get_inode()->export_pin;
+    } else if (in->get_inode()->export_ephemeral_distributed_pin &&
+              mdcache->get_export_ephemeral_distributed_config()) {
+      if (in != this)
+       return mdcache->hash_into_rank_bucket(in->ino(), dir->get_frag());
+      return MDS_RANK_EPHEMERAL_DIST;
+    } else if (r_target != MDS_RANK_NONE && in->get_inode()->export_ephemeral_random_pin > 0.0) {
+      return r_target;
+    } else if (r_target == MDS_RANK_NONE && in->is_ephemeral_rand() &&
+              mdcache->get_export_ephemeral_random_config()) {
       /* If a parent overrides a grandparent ephemeral pin policy with an export pin, we use that export pin instead. */
-      etarget = mdcache->hash_into_rank_bucket(in->ino());
-      if (!inherit) return etarget;
+      if (!inherit)
+       return MDS_RANK_EPHEMERAL_RAND;
+      if (in == this)
+       r_target = MDS_RANK_EPHEMERAL_RAND;
+      else
+       r_target = mdcache->hash_into_rank_bucket(in->ino());
     }
 
-    if (!inherit) {
+    if (!inherit)
       break;
-    }
-    in = pdn->get_dir()->inode;
+    dir = pdn->get_dir();
+    in = dir->inode;
   }
   return MDS_RANK_NONE;
 }
 
-double CInode::get_ephemeral_rand(bool inherit) const
+void CInode::check_pin_policy(mds_rank_t export_pin)
+{
+  if (export_pin == MDS_RANK_EPHEMERAL_DIST) {
+    set_ephemeral_pin(true, false);
+    clear_ephemeral_pin(false, true);
+  } else if (export_pin == MDS_RANK_EPHEMERAL_RAND) {
+    set_ephemeral_pin(false, true);
+    clear_ephemeral_pin(true, false);
+  } else if (is_ephemerally_pinned()) {
+    // export_pin >= 0 || export_pin == MDS_RANK_NONE
+    clear_ephemeral_pin(true, true);
+    if (export_pin != get_inode()->export_pin) // inherited export_pin
+      queue_export_pin(MDS_RANK_NONE);
+  }
+}
+
+double CInode::get_ephemeral_rand() const
 {
   /* N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
    * have a parent yet.
@@ -5512,37 +5447,24 @@ double CInode::get_ephemeral_rand(bool inherit) const
     if (!pdn)
       break;
     // ignore export pin for unlinked directory
-    if (in->get_inode().nlink == 0)
+    if (in->get_inode()->nlink == 0)
       break;
 
-    if (in->get_inode().export_ephemeral_random_pin > 0.0)
-      return std::min(in->get_inode().export_ephemeral_random_pin, max);
+    if (in->get_inode()->export_ephemeral_random_pin > 0.0)
+      return std::min(in->get_inode()->export_ephemeral_random_pin, max);
 
     /* An export_pin overrides only if no closer parent (incl. this one) has a
      * random pin set.
      */
-    if (in->get_inode().export_pin >= 0)
+    if (in->get_inode()->export_pin >= 0 ||
+       in->get_inode()->export_ephemeral_distributed_pin)
       return 0.0;
 
-    if (!inherit)
-      break;
     in = pdn->get_dir()->inode;
   }
   return 0.0;
 }
 
-bool CInode::is_exportable(mds_rank_t dest) const
-{
-  mds_rank_t pin = get_export_pin();
-  if (pin == dest) {
-    return true;
-  } else if (pin >= 0) {
-    return false;
-  } else {
-    return true;
-  }
-}
-
 void CInode::get_nested_dirfrags(std::vector<CDir*>& v) const
 {
   for (const auto &p : dirfrags) {