]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/CInode.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mds / CInode.cc
index b91c47881f5eb07954cae4b44401fde29c10be68..ea86ff6b960fba365d646f980e326c2b31c99e96 100644 (file)
 
 #include "common/Clock.h"
 
-#include "messages/MLock.h"
-#include "messages/MClientCaps.h"
-
 #include "common/config.h"
 #include "global/global_context.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 
 #include "mds/MDSContinuation.h"
 #include "mds/InoTable.h"
+#include "cephfs_features.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
@@ -61,10 +59,11 @@ protected:
   MDSRank *get_mds() override {return in->mdcache->mds;}
 public:
   explicit CInodeIOContext(CInode *in_) : in(in_) {
-    assert(in != NULL);
+    ceph_assert(in != NULL);
   }
 };
 
+sr_t* const CInode::projected_inode::UNDEF_SRNODE = (sr_t*)(unsigned long)-1;
 
 LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
 LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
@@ -128,10 +127,10 @@ ostream& operator<<(ostream& out, const CInode& in)
   if (in.get_projected_version() > in.get_version())
     out << " pv" << in.get_projected_version();
 
-  if (in.is_auth_pinned()) {
-    out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
+  if (in.get_num_auth_pins()) {
+    out << " ap=" << in.get_num_auth_pins();
 #ifdef MDS_AUTHPIN_SET
-    out << "(" << in.auth_pin_set << ")";
+    in.print_authpin_set(out);
 #endif
   }
 
@@ -153,7 +152,7 @@ ostream& operator<<(ostream& out, const CInode& in)
 
   if (in.inode.is_dir()) {
     out << " " << in.inode.dirstat;
-    if (g_conf->mds_debug_scatterstat && in.is_projected()) {
+    if (g_conf()->mds_debug_scatterstat && in.is_projected()) {
       const CInode::mempool_inode *pi = in.get_projected_inode();
       out << "->" << pi->dirstat;
     }
@@ -167,7 +166,7 @@ ostream& operator<<(ostream& out, const CInode& in)
   out << " " << in.inode.rstat;
   if (!(in.inode.rstat == in.inode.accounted_rstat))
     out << "/" << in.inode.accounted_rstat;
-  if (g_conf->mds_debug_scatterstat && in.is_projected()) {
+  if (g_conf()->mds_debug_scatterstat && in.is_projected()) {
     const CInode::mempool_inode *pi = in.get_projected_inode();
     out << "->" << pi->rstat;
     if (!(pi->rstat == pi->accounted_rstat))
@@ -209,16 +208,16 @@ ostream& operator<<(ostream& out, const CInode& in)
 
   if (!in.get_client_caps().empty()) {
     out << " caps={";
-    for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
-         it != in.get_client_caps().end();
-         ++it) {
-      if (it != in.get_client_caps().begin()) out << ",";
-      out << it->first << "="
-         << ccap_string(it->second->pending());
-      if (it->second->issued() != it->second->pending())
-       out << "/" << ccap_string(it->second->issued());
-      out << "/" << ccap_string(it->second->wanted())
-         << "@" << it->second->get_last_seq();
+    bool first = true;
+    for (const auto &p : in.get_client_caps()) {
+      if (!first) out << ",";
+      out << p.first << "="
+         << ccap_string(p.second.pending());
+      if (p.second.issued() != p.second.pending())
+       out << "/" << ccap_string(p.second.issued());
+      out << "/" << ccap_string(p.second.wanted())
+         << "@" << p.second.get_last_seq();
+      first = false;
     }
     out << "}";
     if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
@@ -262,18 +261,40 @@ ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
   return out;
 }
 
-
+CInode::CInode(MDCache *c, bool auth, snapid_t f, snapid_t l)
+  :
+    mdcache(c),
+    first(f), last(l),
+    item_dirty(this),
+    item_caps(this),
+    item_open_file(this),
+    item_dirty_parent(this),
+    item_dirty_dirfrag_dir(this),
+    item_dirty_dirfrag_nest(this),
+    item_dirty_dirfrag_dirfragtree(this),
+    pop(c->decayrate),
+    versionlock(this, &versionlock_type),
+    authlock(this, &authlock_type),
+    linklock(this, &linklock_type),
+    dirfragtreelock(this, &dirfragtreelock_type),
+    filelock(this, &filelock_type),
+    xattrlock(this, &xattrlock_type),
+    snaplock(this, &snaplock_type),
+    nestlock(this, &nestlock_type),
+    flocklock(this, &flocklock_type),
+    policylock(this, &policylock_type)
+{
+  if (auth) state_set(STATE_AUTH);
+}
 
 void CInode::print(ostream& out)
 {
   out << *this;
 }
 
-
-
 void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
 {
-  dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
+  dout(10) << __func__ << " client." << client << " snapid " << snapid << " on " << snapin << dendl;
 
   if (client_need_snapflush.empty()) {
     get(CInode::PIN_NEEDSNAPFLUSH);
@@ -316,11 +337,11 @@ void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t cli
 
 bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
 {
-  dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
+  dout(10) << __func__ << " [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
   bool need_flush = false;
   for (auto it = client_need_snapflush.lower_bound(cowin->first);
        it != client_need_snapflush.end() && it->first < in->first; ) {
-    assert(!it->second.empty());
+    ceph_assert(!it->second.empty());
     if (cowin->last >= it->first) {
       cowin->auth_pin(this);
       need_flush = true;
@@ -336,7 +357,7 @@ bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
 void CInode::mark_dirty_rstat()
 {
   if (!state_test(STATE_DIRTYRSTAT)) {
-    dout(10) << "mark_dirty_rstat" << dendl;
+    dout(10) << __func__ << dendl;
     state_set(STATE_DIRTYRSTAT);
     get(PIN_DIRTYRSTAT);
     CDentry *pdn = get_projected_parent_dn();
@@ -347,49 +368,25 @@ void CInode::mark_dirty_rstat()
     } else {
       // under cross-MDS rename.
       // DIRTYRSTAT flag will get cleared when rename finishes
-      assert(state_test(STATE_AMBIGUOUSAUTH));
+      ceph_assert(state_test(STATE_AMBIGUOUSAUTH));
     }
   }
 }
 void CInode::clear_dirty_rstat()
 {
   if (state_test(STATE_DIRTYRSTAT)) {
-    dout(10) << "clear_dirty_rstat" << dendl;
+    dout(10) << __func__ << dendl;
     state_clear(STATE_DIRTYRSTAT);
     put(PIN_DIRTYRSTAT);
     dirty_rstat_item.remove_myself();
   }
 }
 
-/* Ideally this function would be subsumed by project_inode but it is also
- * needed by CInode::project_past_snaprealm_parent so we keep it.
- */
-sr_t &CInode::project_snaprealm(projected_inode &pi)
-{
-  const sr_t *cur_srnode = get_projected_srnode();
-
-  assert(!pi.snapnode);
-  if (cur_srnode) {
-    pi.snapnode.reset(new sr_t(*cur_srnode));
-  } else {
-    pi.snapnode.reset(new sr_t());
-    pi.snapnode->created = 0;
-    pi.snapnode->current_parent_since = get_oldest_snap();
-  }
-  ++num_projected_srnodes;
-
-  dout(10) << __func__ << " " << pi.snapnode.get() << dendl;
-  return *pi.snapnode.get();
-}
-
 CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
 {
-  if (projected_nodes.empty()) {
-    projected_nodes.emplace_back(inode);
-  } else {
+  auto &pi = projected_nodes.empty() ?
+    projected_nodes.emplace_back(inode) :
     projected_nodes.emplace_back(projected_nodes.back().inode);
-  }
-  auto &pi = projected_nodes.back();
 
   if (scrub_infop && scrub_infop->last_scrub_dirty) {
     pi.inode.last_scrub_stamp = scrub_infop->last_scrub_stamp;
@@ -404,7 +401,7 @@ CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
   }
 
   if (snap) {
-    project_snaprealm(pi);
+    project_snaprealm();
   }
 
   dout(15) << __func__ << " " << pi.inode.ino << dendl;
@@ -413,7 +410,7 @@ CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
 
 void CInode::pop_and_dirty_projected_inode(LogSegment *ls) 
 {
-  assert(!projected_nodes.empty());
+  ceph_assert(!projected_nodes.empty());
   auto &front = projected_nodes.front();
   dout(15) << __func__ << " " << front.inode.ino
           << " v" << front.inode.version << dendl;
@@ -433,67 +430,184 @@ void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
     xattrs = *front.xattrs;
   }
 
-  auto &snapnode = front.snapnode;
-  if (snapnode) {
-    pop_projected_snaprealm(snapnode.get());
+  if (projected_nodes.front().snapnode != projected_inode::UNDEF_SRNODE) {
+    pop_projected_snaprealm(projected_nodes.front().snapnode, false);
     --num_projected_srnodes;
   }
 
   projected_nodes.pop_front();
 }
 
+sr_t *CInode::prepare_new_srnode(snapid_t snapid)
+{
+  const sr_t *cur_srnode = get_projected_srnode();
+  sr_t *new_srnode;
+
+  if (cur_srnode) {
+    new_srnode = new sr_t(*cur_srnode);
+    if (!new_srnode->past_parents.empty()) {
+      // convert past_parents to past_parent_snaps
+      ceph_assert(snaprealm);
+      auto& snaps = snaprealm->get_snaps();
+      for (auto p : snaps) {
+       if (p >= new_srnode->current_parent_since)
+         break;
+       if (!new_srnode->snaps.count(p))
+         new_srnode->past_parent_snaps.insert(p);
+      }
+      new_srnode->seq = snaprealm->get_newest_seq();
+      new_srnode->past_parents.clear();
+    }
+    if (snaprealm)
+      snaprealm->past_parents_dirty = false;
+  } else {
+    if (snapid == 0)
+      snapid = mdcache->get_global_snaprealm()->get_newest_seq();
+    new_srnode = new sr_t();
+    new_srnode->seq = snapid;
+    new_srnode->created = snapid;
+    new_srnode->current_parent_since = get_oldest_snap();
+  }
+  return new_srnode;
+}
+
+void CInode::project_snaprealm(sr_t *new_srnode)
+{
+  dout(10) << __func__ << " " << new_srnode << dendl;
+  ceph_assert(projected_nodes.back().snapnode == projected_inode::UNDEF_SRNODE);
+  projected_nodes.back().snapnode = new_srnode;
+  ++num_projected_srnodes;
+}
+
+void CInode::mark_snaprealm_global(sr_t *new_srnode)
+{
+  ceph_assert(!is_dir());
+  // 'last_destroyed' is no longer used, use it to store origin 'current_parent_since'
+  new_srnode->last_destroyed = new_srnode->current_parent_since;
+  new_srnode->current_parent_since = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+  new_srnode->mark_parent_global();
+}
+
+void CInode::clear_snaprealm_global(sr_t *new_srnode)
+{
+  // restore 'current_parent_since'
+  new_srnode->current_parent_since = new_srnode->last_destroyed;
+  new_srnode->last_destroyed = 0;
+  new_srnode->seq = mdcache->get_global_snaprealm()->get_newest_seq();
+  new_srnode->clear_parent_global();
+}
+
+bool CInode::is_projected_snaprealm_global() const
+{
+  const sr_t *srnode = get_projected_srnode();
+  if (srnode && srnode->is_parent_global())
+    return true;
+  return false;
+}
+
+void CInode::project_snaprealm_past_parent(SnapRealm *newparent)
+{
+  sr_t *new_snap = project_snaprealm();
+  record_snaprealm_past_parent(new_snap, newparent);
+}
+
+
 /* if newparent != parent, add parent to past_parents
  if parent DNE, we need to find what the parent actually is and fill that in */
-void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
+void CInode::record_snaprealm_past_parent(sr_t *new_snap, SnapRealm *newparent)
 {
-  assert(!projected_nodes.empty());
-  sr_t &new_snap = project_snaprealm(projected_nodes.back());
+  ceph_assert(!new_snap->is_parent_global());
   SnapRealm *oldparent;
   if (!snaprealm) {
     oldparent = find_snaprealm();
-    new_snap.seq = oldparent->get_newest_seq();
-  }
-  else
+  } else {
     oldparent = snaprealm->parent;
+  }
 
   if (newparent != oldparent) {
     snapid_t oldparentseq = oldparent->get_newest_seq();
-    if (oldparentseq + 1 > new_snap.current_parent_since) {
-      new_snap.past_parents[oldparentseq].ino = oldparent->inode->ino();
-      new_snap.past_parents[oldparentseq].first = new_snap.current_parent_since;
+    if (oldparentseq + 1 > new_snap->current_parent_since) {
+      // copy old parent's snaps
+      const set<snapid_t>& snaps = oldparent->get_snaps();
+      auto p = snaps.lower_bound(new_snap->current_parent_since);
+      if (p != snaps.end())
+       new_snap->past_parent_snaps.insert(p, snaps.end());
+      if (oldparentseq > new_snap->seq)
+       new_snap->seq = oldparentseq;
     }
-    new_snap.current_parent_since = std::max(oldparentseq, newparent->get_last_created()) + 1;
+    new_snap->current_parent_since = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
   }
 }
 
-void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
+void CInode::record_snaprealm_parent_dentry(sr_t *new_snap, SnapRealm *newparent,
+                                           CDentry *dn, bool primary_dn)
 {
-  assert(next_snaprealm);
-  dout(10) << "pop_projected_snaprealm " << next_snaprealm
-          << " seq" << next_snaprealm->seq << dendl;
-  bool invalidate_cached_snaps = false;
-  if (!snaprealm) {
-    open_snaprealm();
-  } else if (next_snaprealm->past_parents.size() !=
-            snaprealm->srnode.past_parents.size()) {
-    invalidate_cached_snaps = true;
-    // re-open past parents
-    snaprealm->_close_parents();
+  ceph_assert(new_snap->is_parent_global());
+  SnapRealm *oldparent = dn->get_dir()->inode->find_snaprealm();
+  auto& snaps = oldparent->get_snaps();
+
+  if (!primary_dn) {
+    auto p = snaps.lower_bound(dn->first);
+    if (p != snaps.end())
+      new_snap->past_parent_snaps.insert(p, snaps.end());
+  } else if (newparent != oldparent) {
+    // 'last_destroyed' is used as 'current_parent_since'
+    auto p = snaps.lower_bound(new_snap->last_destroyed);
+    if (p != snaps.end())
+      new_snap->past_parent_snaps.insert(p, snaps.end());
+    new_snap->last_destroyed = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+  }
+}
 
-    dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
-            << " -> " << next_snaprealm->past_parents << dendl;
+void CInode::early_pop_projected_snaprealm()
+{
+  ceph_assert(!projected_nodes.empty());
+  if (projected_nodes.front().snapnode != projected_inode::UNDEF_SRNODE) {
+    pop_projected_snaprealm(projected_nodes.front().snapnode, true);
+    projected_nodes.front().snapnode = projected_inode::UNDEF_SRNODE;
+    --num_projected_srnodes;
   }
-  snaprealm->srnode = *next_snaprealm;
+}
 
-  // we should be able to open these up (or have them already be open).
-  bool ok = snaprealm->_open_parents(NULL);
-  assert(ok);
+void CInode::pop_projected_snaprealm(sr_t *next_snaprealm, bool early)
+{
+  if (next_snaprealm) {
+    dout(10) << __func__ << (early ? " (early) " : " ")
+            << next_snaprealm << " seq " << next_snaprealm->seq << dendl;
+    bool invalidate_cached_snaps = false;
+    if (!snaprealm) {
+      open_snaprealm();
+    } else if (!snaprealm->srnode.past_parents.empty()) {
+      invalidate_cached_snaps = true;
+      // re-open past parents
+      snaprealm->close_parents();
+
+      dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
+              << " -> " << next_snaprealm->past_parents << dendl;
+    }
+    auto old_flags = snaprealm->srnode.flags;
+    snaprealm->srnode = *next_snaprealm;
+    delete next_snaprealm;
 
-  if (invalidate_cached_snaps)
-    snaprealm->invalidate_cached_snaps();
+    if ((snaprealm->srnode.flags ^ old_flags) & sr_t::PARENT_GLOBAL) {
+      snaprealm->close_parents();
+      snaprealm->adjust_parent();
+    }
 
-  if (snaprealm->parent)
-    dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
+    // we should be able to open these up (or have them already be open).
+    bool ok = snaprealm->_open_parents(NULL);
+    ceph_assert(ok);
+
+    if (invalidate_cached_snaps)
+      snaprealm->invalidate_cached_snaps();
+
+    if (snaprealm->parent)
+      dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
+  } else {
+    dout(10) << __func__ << (early ? " (early) null" : " null") << dendl;
+    ceph_assert(snaprealm);
+    snaprealm->merge_to(NULL);
+  }
 }
 
 
@@ -501,16 +615,16 @@ void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
 
 // dirfrags
 
-__u32 InodeStoreBase::hash_dentry_name(boost::string_view dn)
+__u32 InodeStoreBase::hash_dentry_name(std::string_view dn)
 {
   int which = inode.dir_layout.dl_dir_hash;
   if (!which)
     which = CEPH_STR_HASH_LINUX;
-  assert(ceph_str_hash_valid(which));
+  ceph_assert(ceph_str_hash_valid(which));
   return ceph_str_hash(which, dn.data(), dn.length());
 }
 
-frag_t InodeStoreBase::pick_dirfrag(boost::string_view dn)
+frag_t InodeStoreBase::pick_dirfrag(std::string_view dn)
 {
   if (dirfragtree.empty())
     return frag_t();          // avoid the string hash if we can.
@@ -522,13 +636,17 @@ frag_t InodeStoreBase::pick_dirfrag(boost::string_view dn)
 bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
 {
   bool all = true;
-  std::list<frag_t> fglist;
-  dirfragtree.get_leaves_under(fg, fglist);
-  for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
-    if (dirfrags.count(*p))
-      ls.push_back(dirfrags[*p]);
-    else 
-      all = false;
+  {
+    frag_vec_t leaves;
+    dirfragtree.get_leaves_under(fg, leaves);
+    for (const auto &leaf : leaves) {
+      if (auto it = dirfrags.find(leaf); it != dirfrags.end()) {
+        ls.push_back(it->second);
+      } else {
+        all = false;
+      }
+    }
+  }
 
   if (all)
     return all;
@@ -542,11 +660,14 @@ bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
   }
 
   all = true;
-  tmpdft.get_leaves_under(fg, fglist);
-  for (const auto &p : fglist) {
-    if (!dirfrags.count(p)) {
-      all = false;
-      break;
+  {
+    frag_vec_t leaves;
+    tmpdft.get_leaves_under(fg, leaves);
+    for (const auto& leaf : leaves) {
+      if (!dirfrags.count(leaf)) {
+        all = false;
+        break;
+      }
     }
   }
 
@@ -563,7 +684,7 @@ void CInode::verify_dirfrags()
       bad = true;
     }
   }
-  assert(!bad);
+  ceph_assert(!bad);
 }
 
 void CInode::force_dirfrags()
@@ -578,10 +699,11 @@ void CInode::force_dirfrags()
   }
 
   if (bad) {
-    list<frag_t> leaves;
+    frag_vec_t leaves;
     dirfragtree.get_leaves(leaves);
-    for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
-      mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
+    for (const auto& leaf : leaves) {
+      mdcache->get_force_dirfrag(dirfrag_t(ino(), leaf), true);
+    }
   }
 
   verify_dirfrags();
@@ -609,13 +731,13 @@ CDir *CInode::get_approx_dirfrag(frag_t fg)
 
 CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
 {
-  assert(is_dir());
+  ceph_assert(is_dir());
 
   // have it?
   CDir *dir = get_dirfrag(fg);
   if (!dir) {
     // create it.
-    assert(is_auth() || mdcache->mds->is_any_replay());
+    ceph_assert(is_auth() || mdcache->mds->is_any_replay());
     dir = new CDir(this, fg, mdcache, is_auth());
     add_dirfrag(dir);
   }
@@ -624,8 +746,8 @@ CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
 
 CDir *CInode::add_dirfrag(CDir *dir)
 {
-  assert(dirfrags.count(dir->dirfrag().frag) == 0);
-  dirfrags[dir->dirfrag().frag] = dir;
+  auto em = dirfrags.emplace(std::piecewise_construct, std::forward_as_tuple(dir->dirfrag().frag), std::forward_as_tuple(dir));
+  ceph_assert(em.second);
 
   if (stickydir_ref > 0) {
     dir->state_set(CDir::STATE_STICKY);
@@ -639,8 +761,8 @@ CDir *CInode::add_dirfrag(CDir *dir)
 
 void CInode::close_dirfrag(frag_t fg)
 {
-  dout(14) << "close_dirfrag " << fg << dendl;
-  assert(dirfrags.count(fg));
+  dout(14) << __func__ << " " << fg << dendl;
+  ceph_assert(dirfrags.count(fg));
   
   CDir *dir = dirfrags[fg];
   dir->remove_null_dentries();
@@ -661,7 +783,7 @@ void CInode::close_dirfrag(frag_t fg)
   for (const auto &p : dir->items)
     dout(14) << __func__ << " LEFTOVER dn " << *p.second << dendl;
 
-  assert(dir->get_num_ref() == 0);
+  ceph_assert(dir->get_num_ref() == 0);
   delete dir;
   dirfrags.erase(fg);
 }
@@ -707,7 +829,7 @@ void CInode::get_stickydirs()
 
 void CInode::put_stickydirs()
 {
-  assert(stickydir_ref > 0);
+  ceph_assert(stickydir_ref > 0);
   stickydir_ref--;
   if (stickydir_ref == 0) {
     put(PIN_STICKYDIRS);
@@ -780,14 +902,32 @@ CInode *CInode::get_parent_inode()
   return NULL;
 }
 
-bool CInode::is_projected_ancestor_of(CInode *other)
+bool CInode::is_ancestor_of(const CInode *other) const
 {
   while (other) {
     if (other == this)
       return true;
-    if (!other->get_projected_parent_dn())
+    const CDentry *pdn = other->get_oldest_parent_dn();
+    if (!pdn) {
+      ceph_assert(other->is_base());
       break;
-    other = other->get_projected_parent_dn()->get_dir()->get_inode();
+    }
+    other = pdn->get_dir()->get_inode();
+  }
+  return false;
+}
+
+bool CInode::is_projected_ancestor_of(const CInode *other) const
+{
+  while (other) {
+    if (other == this)
+      return true;
+    const CDentry *pdn = other->get_projected_parent_dn();
+    if (!pdn) {
+      ceph_assert(other->is_base());
+      break;
+    }
+    other = pdn->get_dir()->get_inode();
   }
   return false;
 }
@@ -829,7 +969,7 @@ void CInode::make_path(filepath& fp, bool projected) const
 {
   const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
   if (use_parent) {
-    assert(!is_base());
+    ceph_assert(!is_base());
     use_parent->make_path(fp, projected);
   } else {
     fp = filepath(ino());
@@ -851,7 +991,7 @@ version_t CInode::pre_dirty()
     pv = _cdentry->pre_dirty(get_projected_version());
     dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
   } else {
-    assert(is_base());
+    ceph_assert(is_base());
     pv = get_projected_version() + 1;
   }
   // force update backtrace for old format inode (see mempool_inode::decode)
@@ -868,7 +1008,7 @@ void CInode::_mark_dirty(LogSegment *ls)
   if (!state_test(STATE_DIRTY)) {
     state_set(STATE_DIRTY);
     get(PIN_DIRTY);
-    assert(ls);
+    ceph_assert(ls);
   }
   
   // move myself to this segment's dirty list
@@ -878,7 +1018,7 @@ void CInode::_mark_dirty(LogSegment *ls)
 
 void CInode::mark_dirty(version_t pv, LogSegment *ls) {
   
-  dout(10) << "mark_dirty " << *this << dendl;
+  dout(10) << __func__ << " " << *this << dendl;
 
   /*
     NOTE: I may already be dirty, but this fn _still_ needs to be called so that
@@ -888,10 +1028,10 @@ void CInode::mark_dirty(version_t pv, LogSegment *ls) {
   
   // only auth can get dirty.  "dirty" async data in replicas is relative to
   // filelock state, not the dirty flag.
-  assert(is_auth());
+  ceph_assert(is_auth());
   
   // touch my private version
-  assert(inode.version < pv);
+  ceph_assert(inode.version < pv);
   inode.version = pv;
   _mark_dirty(ls);
 
@@ -903,7 +1043,7 @@ void CInode::mark_dirty(version_t pv, LogSegment *ls) {
 
 void CInode::mark_clean()
 {
-  dout(10) << " mark_clean " << *this << dendl;
+  dout(10) << __func__ << " " << *this << dendl;
   if (state_test(STATE_DIRTY)) {
     state_clear(STATE_DIRTY);
     put(PIN_DIRTY);
@@ -930,17 +1070,19 @@ struct C_IO_Inode_Stored : public CInodeIOContext {
   }
 };
 
-object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
+object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, std::string_view suffix)
 {
   char n[60];
-  snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
+  snprintf(n, sizeof(n), "%llx.%08llx", (long long unsigned)ino, (long long unsigned)fg);
+  ceph_assert(strlen(n) + suffix.size() < sizeof n);
+  strncat(n, suffix.data(), suffix.size());
   return object_t(n);
 }
 
-void CInode::store(MDSInternalContextBase *fin)
+void CInode::store(MDSContext *fin)
 {
-  dout(10) << "store " << get_version() << dendl;
-  assert(is_base());
+  dout(10) << __func__ << " " << get_version() << dendl;
+  ceph_assert(is_base());
 
   if (snaprealm)
     purge_stale_snap_data(snaprealm->get_snaps());
@@ -948,7 +1090,8 @@ void CInode::store(MDSInternalContextBase *fin)
   // encode
   bufferlist bl;
   string magic = CEPH_FS_ONDISK_MAGIC;
-  ::encode(magic, bl);
+  using ceph::encode;
+  encode(magic, bl);
   encode_store(bl, mdcache->mds->mdsmap->get_up_features());
 
   // write it.
@@ -978,17 +1121,17 @@ void CInode::_stored(int r, version_t v, Context *fin)
     return;
   }
 
-  dout(10) << "_stored " << v << " on " << *this << dendl;
+  dout(10) << __func__ << " " << v << " on " << *this << dendl;
   if (v == get_projected_version())
     mark_clean();
 
   fin->complete(0);
 }
 
-void CInode::flush(MDSInternalContextBase *fin)
+void CInode::flush(MDSContext *fin)
 {
-  dout(10) << "flush " << *this << dendl;
-  assert(is_auth() && can_auth_pin());
+  dout(10) << __func__ << " " << *this << dendl;
+  ceph_assert(is_auth() && can_auth_pin());
 
   MDSGatherBuilder gather(g_ceph_context);
 
@@ -1024,9 +1167,9 @@ struct C_IO_Inode_Fetched : public CInodeIOContext {
   }
 };
 
-void CInode::fetch(MDSInternalContextBase *fin)
+void CInode::fetch(MDSContext *fin)
 {
-  dout(10) << "fetch" << dendl;
+  dout(10) << __func__  << dendl;
 
   C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
   C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
@@ -1048,22 +1191,23 @@ void CInode::fetch(MDSInternalContextBase *fin)
 
 void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
 {
-  dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
-  bufferlist::iterator p;
+  dout(10) << __func__ << " got " << bl.length() << " and " << bl2.length() << dendl;
+  bufferlist::const_iterator p;
   if (bl2.length()) {
-    p = bl2.begin();
+    p = bl2.cbegin();
   } else if (bl.length()) {
-    p = bl.begin();
+    p = bl.cbegin();
   } else {
     derr << "No data while reading inode " << ino() << dendl;
     fin->complete(-ENOENT);
     return;
   }
 
+  using ceph::decode;
   // Attempt decode
   try {
     string magic;
-    ::decode(magic, p);
+    decode(magic, p);
     dout(10) << " magic is '" << magic << "' (expecting '"
              << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
     if (magic != CEPH_FS_ONDISK_MAGIC) {
@@ -1115,10 +1259,10 @@ struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
   }
 };
 
-void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
+void CInode::store_backtrace(MDSContext *fin, int op_prio)
 {
-  dout(10) << "store_backtrace on " << *this << dendl;
-  assert(is_dirty_parent());
+  dout(10) << __func__ << " on " << *this << dendl;
+  ceph_assert(is_dirty_parent());
 
   if (op_prio < 0)
     op_prio = CEPH_MSG_PRIO_DEFAULT;
@@ -1129,7 +1273,8 @@ void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
   inode_backtrace_t bt;
   build_backtrace(pool, bt);
   bufferlist parent_bl;
-  ::encode(bt, parent_bl);
+  using ceph::encode;
+  encode(bt, parent_bl);
 
   ObjectOperation op;
   op.priority = op_prio;
@@ -1137,7 +1282,7 @@ void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
   op.setxattr("parent", parent_bl);
 
   bufferlist layout_bl;
-  ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
+  encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
   op.setxattr("layout", layout_bl);
 
   SnapContext snapc;
@@ -1195,7 +1340,7 @@ void CInode::_stored_backtrace(int r, version_t v, Context *fin)
     // out from under us), so the backtrace can never be written, so pretend
     // to succeed so that the user can proceed to e.g. delete the file.
     if (!exists) {
-      dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
+      dout(4) << __func__ << " got ENOENT: a data pool was deleted "
                  "beneath us!" << dendl;
       r = 0;
     }
@@ -1213,7 +1358,7 @@ void CInode::_stored_backtrace(int r, version_t v, Context *fin)
     return;
   }
 
-  dout(10) << "_stored_backtrace v " << v <<  dendl;
+  dout(10) << __func__ << " v " << v <<  dendl;
 
   auth_unpin(this);
   if (v == inode.backtrace_version)
@@ -1230,10 +1375,10 @@ void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
 void CInode::mark_dirty_parent(LogSegment *ls, bool dirty_pool)
 {
   if (!state_test(STATE_DIRTYPARENT)) {
-    dout(10) << "mark_dirty_parent" << dendl;
+    dout(10) << __func__ << dendl;
     state_set(STATE_DIRTYPARENT);
     get(PIN_DIRTYPARENT);
-    assert(ls);
+    ceph_assert(ls);
   }
   if (dirty_pool)
     state_set(STATE_DIRTYPOOL);
@@ -1244,7 +1389,7 @@ void CInode::mark_dirty_parent(LogSegment *ls, bool dirty_pool)
 void CInode::clear_dirty_parent()
 {
   if (state_test(STATE_DIRTYPARENT)) {
-    dout(10) << "clear_dirty_parent" << dendl;
+    dout(10) << __func__ << dendl;
     state_clear(STATE_DIRTYPARENT);
     state_clear(STATE_DIRTYPOOL);
     put(PIN_DIRTYPARENT);
@@ -1257,11 +1402,12 @@ void CInode::verify_diri_backtrace(bufferlist &bl, int err)
   if (is_base() || is_dirty_parent() || !is_auth())
     return;
 
-  dout(10) << "verify_diri_backtrace" << dendl;
+  dout(10) << __func__ << dendl;
 
   if (err == 0) {
     inode_backtrace_t backtrace;
-    ::decode(backtrace, bl);
+    using ceph::decode;
+    decode(backtrace, bl);
     CDentry *pdn = get_parent_dn();
     if (backtrace.ancestors.empty() ||
        backtrace.ancestors[0].dname != pdn->get_name() ||
@@ -1272,7 +1418,7 @@ void CInode::verify_diri_backtrace(bufferlist &bl, int err)
   if (err) {
     MDSRank *mds = mdcache->mds;
     mds->clog->error() << "bad backtrace on directory inode " << ino();
-    assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
+    ceph_assert(!"bad backtrace" == (g_conf()->mds_verify_backtrace > 1));
 
     mark_dirty_parent(mds->mdlog->get_current_segment(), false);
     mds->mdlog->flush();
@@ -1286,18 +1432,19 @@ void CInode::verify_diri_backtrace(bufferlist &bl, int err)
 void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
                                 const bufferlist *snap_blob) const
 {
-  ::encode(inode, bl, features);
+  using ceph::encode;
+  encode(inode, bl, features);
   if (is_symlink())
-    ::encode(symlink, bl);
-  ::encode(dirfragtree, bl);
-  ::encode(xattrs, bl);
+    encode(symlink, bl);
+  encode(dirfragtree, bl);
+  encode(xattrs, bl);
   if (snap_blob)
-    ::encode(*snap_blob, bl);
+    encode(*snap_blob, bl);
   else
-    ::encode(bufferlist(), bl);
-  ::encode(old_inodes, bl, features);
-  ::encode(oldest_snap, bl);
-  ::encode(damage_flags, bl);
+    encode(bufferlist(), bl);
+  encode(old_inodes, bl, features);
+  encode(oldest_snap, bl);
+  encode(damage_flags, bl);
 }
 
 void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
@@ -1316,26 +1463,27 @@ void CInode::encode_store(bufferlist& bl, uint64_t features)
                         &snap_blob);
 }
 
-void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
+void InodeStoreBase::decode_bare(bufferlist::const_iterator &bl,
                              bufferlist& snap_blob, __u8 struct_v)
 {
-  ::decode(inode, bl);
+  using ceph::decode;
+  decode(inode, bl);
   if (is_symlink()) {
     std::string tmp;
-    ::decode(tmp, bl);
-    symlink = mempool::mds_co::string(boost::string_view(tmp));
+    decode(tmp, bl);
+    symlink = std::string_view(tmp);
   }
-  ::decode(dirfragtree, bl);
-  ::decode(xattrs, bl);
-  ::decode(snap_blob, bl);
+  decode(dirfragtree, bl);
+  decode(xattrs, bl);
+  decode(snap_blob, bl);
 
-  ::decode(old_inodes, bl);
+  decode(old_inodes, bl);
   if (struct_v == 2 && inode.is_dir()) {
     bool default_layout_exists;
-    ::decode(default_layout_exists, bl);
+    decode(default_layout_exists, bl);
     if (default_layout_exists) {
-      ::decode(struct_v, bl); // this was a default_file_layout
-      ::decode(inode.layout, bl); // but we only care about the layout portion
+      decode(struct_v, bl); // this was a default_file_layout
+      decode(inode.layout, bl); // but we only care about the layout portion
     }
   }
 
@@ -1343,24 +1491,24 @@ void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
     // InodeStore is embedded in dentries without proper versioning, so
     // we consume up to the end of the buffer
     if (!bl.end()) {
-      ::decode(oldest_snap, bl);
+      decode(oldest_snap, bl);
     }
 
     if (!bl.end()) {
-      ::decode(damage_flags, bl);
+      decode(damage_flags, bl);
     }
   }
 }
 
 
-void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
+void InodeStoreBase::decode(bufferlist::const_iterator &bl, bufferlist& snap_blob)
 {
   DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
   decode_bare(bl, snap_blob, struct_v);
   DECODE_FINISH(bl);
 }
 
-void CInode::decode_store(bufferlist::iterator& bl)
+void CInode::decode_store(bufferlist::const_iterator& bl)
 {
   bufferlist snap_blob;
   InodeStoreBase::decode(bl, snap_blob);
@@ -1378,34 +1526,37 @@ void CInode::set_object_info(MDSCacheObjectInfo &info)
 
 void CInode::encode_lock_state(int type, bufferlist& bl)
 {
-  ::encode(first, bl);
+  using ceph::encode;
+  encode(first, bl);
+  if (!is_base())
+    encode(parent->first, bl);
 
   switch (type) {
   case CEPH_LOCK_IAUTH:
-    ::encode(inode.version, bl);
-    ::encode(inode.ctime, bl);
-    ::encode(inode.mode, bl);
-    ::encode(inode.uid, bl);
-    ::encode(inode.gid, bl);  
+    encode(inode.version, bl);
+    encode(inode.ctime, bl);
+    encode(inode.mode, bl);
+    encode(inode.uid, bl);
+    encode(inode.gid, bl);  
     break;
     
   case CEPH_LOCK_ILINK:
-    ::encode(inode.version, bl);
-    ::encode(inode.ctime, bl);
-    ::encode(inode.nlink, bl);
+    encode(inode.version, bl);
+    encode(inode.ctime, bl);
+    encode(inode.nlink, bl);
     break;
     
   case CEPH_LOCK_IDFT:
     if (is_auth()) {
-      ::encode(inode.version, bl);
+      encode(inode.version, bl);
     } else {
       // treat flushing as dirty when rejoining cache
       bool dirty = dirfragtreelock.is_dirty_or_flushing();
-      ::encode(dirty, bl);
+      encode(dirty, bl);
     }
     {
       // encode the raw tree
-      ::encode(dirfragtree, bl);
+      encode(dirfragtree, bl);
 
       // also specify which frags are mine
       set<frag_t> myfrags;
@@ -1416,34 +1567,34 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
          frag_t fg = (*p)->get_frag();
          myfrags.insert(fg);
        }
-      ::encode(myfrags, bl);
+      encode(myfrags, bl);
     }
     break;
     
   case CEPH_LOCK_IFILE:
     if (is_auth()) {
-      ::encode(inode.version, bl);
-      ::encode(inode.ctime, bl);
-      ::encode(inode.mtime, bl);
-      ::encode(inode.atime, bl);
-      ::encode(inode.time_warp_seq, bl);
+      encode(inode.version, bl);
+      encode(inode.ctime, bl);
+      encode(inode.mtime, bl);
+      encode(inode.atime, bl);
+      encode(inode.time_warp_seq, bl);
       if (!is_dir()) {
-       ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
-       ::encode(inode.size, bl);
-       ::encode(inode.truncate_seq, bl);
-       ::encode(inode.truncate_size, bl);
-       ::encode(inode.client_ranges, bl);
-       ::encode(inode.inline_data, bl);
+       encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
+       encode(inode.size, bl);
+       encode(inode.truncate_seq, bl);
+       encode(inode.truncate_size, bl);
+       encode(inode.client_ranges, bl);
+       encode(inode.inline_data, bl);
       }
     } else {
       // treat flushing as dirty when rejoining cache
       bool dirty = filelock.is_dirty_or_flushing();
-      ::encode(dirty, bl);
+      encode(dirty, bl);
     }
 
     {
-      dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
-      ::encode(inode.dirstat, bl);  // only meaningful if i am auth.
+      dout(15) << __func__ << " inode.dirstat is " << inode.dirstat << dendl;
+      encode(inode.dirstat, bl);  // only meaningful if i am auth.
       bufferlist tmp;
       __u32 n = 0;
       for (const auto &p : dirfrags) {
@@ -1454,29 +1605,29 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
          dout(15) << fg << " " << *dir << dendl;
          dout(20) << fg << "           fragstat " << pf->fragstat << dendl;
          dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
-         ::encode(fg, tmp);
-         ::encode(dir->first, tmp);
-         ::encode(pf->fragstat, tmp);
-         ::encode(pf->accounted_fragstat, tmp);
+         encode(fg, tmp);
+         encode(dir->first, tmp);
+         encode(pf->fragstat, tmp);
+         encode(pf->accounted_fragstat, tmp);
          n++;
        }
       }
-      ::encode(n, bl);
+      encode(n, bl);
       bl.claim_append(tmp);
     }
     break;
 
   case CEPH_LOCK_INEST:
     if (is_auth()) {
-      ::encode(inode.version, bl);
+      encode(inode.version, bl);
     } else {
       // treat flushing as dirty when rejoining cache
       bool dirty = nestlock.is_dirty_or_flushing();
-      ::encode(dirty, bl);
+      encode(dirty, bl);
     }
     {
-      dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
-      ::encode(inode.rstat, bl);  // only meaningful if i am auth.
+      dout(15) << __func__ << " inode.rstat is " << inode.rstat << dendl;
+      encode(inode.rstat, bl);  // only meaningful if i am auth.
       bufferlist tmp;
       __u32 n = 0;
       for (const auto &p : dirfrags) {
@@ -1488,43 +1639,43 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
          dout(10) << fg << " " << pf->rstat << dendl;
          dout(10) << fg << " " << pf->rstat << dendl;
          dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
-         ::encode(fg, tmp);
-         ::encode(dir->first, tmp);
-         ::encode(pf->rstat, tmp);
-         ::encode(pf->accounted_rstat, tmp);
-         ::encode(dir->dirty_old_rstat, tmp);
+         encode(fg, tmp);
+         encode(dir->first, tmp);
+         encode(pf->rstat, tmp);
+         encode(pf->accounted_rstat, tmp);
+         encode(dir->dirty_old_rstat, tmp);
          n++;
        }
       }
-      ::encode(n, bl);
+      encode(n, bl);
       bl.claim_append(tmp);
     }
     break;
     
   case CEPH_LOCK_IXATTR:
-    ::encode(inode.version, bl);
-    ::encode(inode.ctime, bl);
-    ::encode(xattrs, bl);
+    encode(inode.version, bl);
+    encode(inode.ctime, bl);
+    encode(xattrs, bl);
     break;
 
   case CEPH_LOCK_ISNAP:
-    ::encode(inode.version, bl);
-    ::encode(inode.ctime, bl);
+    encode(inode.version, bl);
+    encode(inode.ctime, bl);
     encode_snap(bl);
     break;
 
   case CEPH_LOCK_IFLOCK:
-    ::encode(inode.version, bl);
+    encode(inode.version, bl);
     _encode_file_locks(bl);
     break;
 
   case CEPH_LOCK_IPOLICY:
     if (inode.is_dir()) {
-      ::encode(inode.version, bl);
-      ::encode(inode.ctime, bl);
-      ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
-      ::encode(inode.quota, bl);
-      ::encode(inode.export_pin, bl);
+      encode(inode.version, bl);
+      encode(inode.ctime, bl);
+      encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
+      encode(inode.quota, bl);
+      encode(inode.export_pin, bl);
     }
     break;
   
@@ -1536,57 +1687,59 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
 
 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
 
-void CInode::decode_lock_state(int type, bufferlist& bl)
+void CInode::decode_lock_state(int type, const bufferlist& bl)
 {
-  bufferlist::iterator p = bl.begin();
+  auto p = bl.cbegin();
   utime_t tm;
 
   snapid_t newfirst;
-  ::decode(newfirst, p);
-
+  using ceph::decode;
+  decode(newfirst, p);
   if (!is_auth() && newfirst != first) {
-    dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
-    assert(newfirst > first);
-    if (!is_multiversion() && parent) {
-      assert(parent->first == first);
+    dout(10) << __func__ << " first " << first << " -> " << newfirst << dendl;
+    first = newfirst;
+  }
+  if (!is_base()) {
+    decode(newfirst, p);
+    if (!parent->is_auth() && newfirst != parent->first) {
+      dout(10) << __func__ << " parent first " << first << " -> " << newfirst << dendl;
       parent->first = newfirst;
     }
-    first = newfirst;
   }
 
   switch (type) {
   case CEPH_LOCK_IAUTH:
-    ::decode(inode.version, p);
-    ::decode(tm, p);
+    decode(inode.version, p);
+    decode(tm, p);
     if (inode.ctime < tm) inode.ctime = tm;
-    ::decode(inode.mode, p);
-    ::decode(inode.uid, p);
-    ::decode(inode.gid, p);
+    decode(inode.mode, p);
+    decode(inode.uid, p);
+    decode(inode.gid, p);
     break;
 
   case CEPH_LOCK_ILINK:
-    ::decode(inode.version, p);
-    ::decode(tm, p);
+    decode(inode.version, p);
+    decode(tm, p);
     if (inode.ctime < tm) inode.ctime = tm;
-    ::decode(inode.nlink, p);
+    decode(inode.nlink, p);
     break;
 
   case CEPH_LOCK_IDFT:
     if (is_auth()) {
       bool replica_dirty;
-      ::decode(replica_dirty, p);
+      decode(replica_dirty, p);
       if (replica_dirty) {
-       dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
+       dout(10) << __func__ << " setting dftlock dirty flag" << dendl;
        dirfragtreelock.mark_dirty();  // ok bc we're auth and caller will handle
       }
     } else {
-      ::decode(inode.version, p);
+      decode(inode.version, p);
     }
     {
       fragtree_t temp;
-      ::decode(temp, p);
+      decode(temp, p);
       set<frag_t> authfrags;
-      ::decode(authfrags, p);
+      decode(authfrags, p);
       if (is_auth()) {
        // auth.  believe replica's auth frags only.
        for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
@@ -1610,61 +1763,61 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
            p.second->state_clear(CDir::STATE_DIRTYDFT);
        }
       }
-      if (g_conf->mds_debug_frag)
+      if (g_conf()->mds_debug_frag)
        verify_dirfrags();
     }
     break;
 
   case CEPH_LOCK_IFILE:
     if (!is_auth()) {
-      ::decode(inode.version, p);
-      ::decode(tm, p);
+      decode(inode.version, p);
+      decode(tm, p);
       if (inode.ctime < tm) inode.ctime = tm;
-      ::decode(inode.mtime, p);
-      ::decode(inode.atime, p);
-      ::decode(inode.time_warp_seq, p);
+      decode(inode.mtime, p);
+      decode(inode.atime, p);
+      decode(inode.time_warp_seq, p);
       if (!is_dir()) {
-       ::decode(inode.layout, p);
-       ::decode(inode.size, p);
-       ::decode(inode.truncate_seq, p);
-       ::decode(inode.truncate_size, p);
-       ::decode(inode.client_ranges, p);
-       ::decode(inode.inline_data, p);
+       decode(inode.layout, p);
+       decode(inode.size, p);
+       decode(inode.truncate_seq, p);
+       decode(inode.truncate_size, p);
+       decode(inode.client_ranges, p);
+       decode(inode.inline_data, p);
       }
     } else {
       bool replica_dirty;
-      ::decode(replica_dirty, p);
+      decode(replica_dirty, p);
       if (replica_dirty) {
-       dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
+       dout(10) << __func__ << " setting filelock dirty flag" << dendl;
        filelock.mark_dirty();  // ok bc we're auth and caller will handle
       }
     }
     {
       frag_info_t dirstat;
-      ::decode(dirstat, p);
+      decode(dirstat, p);
       if (!is_auth()) {
        dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
        inode.dirstat = dirstat;    // take inode summation if replica
       }
       __u32 n;
-      ::decode(n, p);
+      decode(n, p);
       dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
       while (n--) {
        frag_t fg;
        snapid_t fgfirst;
        frag_info_t fragstat;
        frag_info_t accounted_fragstat;
-       ::decode(fg, p);
-       ::decode(fgfirst, p);
-       ::decode(fragstat, p);
-       ::decode(accounted_fragstat, p);
+       decode(fg, p);
+       decode(fgfirst, p);
+       decode(fragstat, p);
+       decode(accounted_fragstat, p);
        dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
        dout(10) << fg << "           fragstat " << fragstat << dendl;
        dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
 
        CDir *dir = get_dirfrag(fg);
        if (is_auth()) {
-         assert(dir);                // i am auth; i had better have this dir open
+         ceph_assert(dir);                // i am auth; i had better have this dir open
          dout(10) << fg << " first " << dir->first << " -> " << fgfirst
                   << " on " << *dir << dendl;
          dir->first = fgfirst;
@@ -1692,34 +1845,34 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
   case CEPH_LOCK_INEST:
     if (is_auth()) {
       bool replica_dirty;
-      ::decode(replica_dirty, p);
+      decode(replica_dirty, p);
       if (replica_dirty) {
-       dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
+       dout(10) << __func__ << " setting nestlock dirty flag" << dendl;
        nestlock.mark_dirty();  // ok bc we're auth and caller will handle
       }
     } else {
-      ::decode(inode.version, p);
+      decode(inode.version, p);
     }
     {
       nest_info_t rstat;
-      ::decode(rstat, p);
+      decode(rstat, p);
       if (!is_auth()) {
        dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
        inode.rstat = rstat;    // take inode summation if replica
       }
       __u32 n;
-      ::decode(n, p);
+      decode(n, p);
       while (n--) {
        frag_t fg;
        snapid_t fgfirst;
        nest_info_t rstat;
        nest_info_t accounted_rstat;
        decltype(CDir::dirty_old_rstat) dirty_old_rstat;
-       ::decode(fg, p);
-       ::decode(fgfirst, p);
-       ::decode(rstat, p);
-       ::decode(accounted_rstat, p);
-       ::decode(dirty_old_rstat, p);
+       decode(fg, p);
+       decode(fgfirst, p);
+       decode(rstat, p);
+       decode(accounted_rstat, p);
+       decode(dirty_old_rstat, p);
        dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
        dout(10) << fg << "               rstat " << rstat << dendl;
        dout(10) << fg << "     accounted_rstat " << accounted_rstat << dendl;
@@ -1727,7 +1880,7 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
 
        CDir *dir = get_dirfrag(fg);
        if (is_auth()) {
-         assert(dir);                // i am auth; i had better have this dir open
+         ceph_assert(dir);                // i am auth; i had better have this dir open
          dout(10) << fg << " first " << dir->first << " -> " << fgfirst
                   << " on " << *dir << dendl;
          dir->first = fgfirst;
@@ -1753,40 +1906,35 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
     break;
 
   case CEPH_LOCK_IXATTR:
-    ::decode(inode.version, p);
-    ::decode(tm, p);
+    decode(inode.version, p);
+    decode(tm, p);
     if (inode.ctime < tm) inode.ctime = tm;
-    ::decode(xattrs, p);
+    decode(xattrs, p);
     break;
 
   case CEPH_LOCK_ISNAP:
     {
-      ::decode(inode.version, p);
-      ::decode(tm, p);
+      decode(inode.version, p);
+      decode(tm, p);
       if (inode.ctime < tm) inode.ctime = tm;
-      snapid_t seq = 0;
-      if (snaprealm)
-       seq = snaprealm->srnode.seq;
       decode_snap(p);
-      if (snaprealm && snaprealm->srnode.seq != seq)
-       mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
     }
     break;
 
   case CEPH_LOCK_IFLOCK:
-    ::decode(inode.version, p);
+    decode(inode.version, p);
     _decode_file_locks(p);
     break;
 
   case CEPH_LOCK_IPOLICY:
     if (inode.is_dir()) {
-      ::decode(inode.version, p);
-      ::decode(tm, p);
+      decode(inode.version, p);
+      decode(tm, p);
       if (inode.ctime < tm) inode.ctime = tm;
-      ::decode(inode.layout, p);
-      ::decode(inode.quota, p);
+      decode(inode.layout, p);
+      decode(inode.quota, p);
       mds_rank_t old_pin = inode.export_pin;
-      ::decode(inode.export_pin, p);
+      decode(inode.export_pin, p);
       maybe_export_pin(old_pin != inode.export_pin);
     }
     break;
@@ -1814,8 +1962,8 @@ void CInode::clear_scatter_dirty()
 
 void CInode::clear_dirty_scattered(int type)
 {
-  dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
-  assert(is_dir());
+  dout(10) << __func__ << " " << type << " on " << *this << dendl;
+  ceph_assert(is_dir());
   switch (type) {
   case CEPH_LOCK_IFILE:
     item_dirty_dirfrag_dir.remove_myself();
@@ -1842,8 +1990,8 @@ void CInode::clear_dirty_scattered(int type)
 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
 void CInode::start_scatter(ScatterLock *lock)
 {
-  dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
-  assert(is_auth());
+  dout(10) << __func__ << " " << *lock << " on " << *this << dendl;
+  ceph_assert(is_auth());
   mempool_inode *pi = get_projected_inode();
 
   for (const auto &p : dirfrags) {
@@ -1890,15 +2038,15 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
                                   version_t inode_version, version_t dir_accounted_version)
 {
   frag_t fg = dir->get_frag();
-  assert(dir->is_auth());
+  ceph_assert(dir->is_auth());
 
   if (dir->is_frozen()) {
-    dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
+    dout(10) << __func__ << " " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
   } else if (dir->get_version() == 0) {
-    dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
+    dout(10) << __func__ << " " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
   } else {
     if (dir_accounted_version != inode_version) {
-      dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
+      dout(10) << __func__ << " " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
 
       MDLog *mdlog = mdcache->mds->mdlog;
       MutationRef mut(new MutationImpl());
@@ -1907,7 +2055,7 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
       mempool_inode *pi = get_projected_inode();
       fnode_t *pf = dir->project_fnode();
 
-      const char *ename = 0;
+      std::string_view ename = 0;
       switch (lock->get_type()) {
       case CEPH_LOCK_IFILE:
        pf->fragstat.version = pi->dirstat.version;
@@ -1920,7 +2068,7 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
        ename = "lock inest accounted scatter stat update";
 
        if (!is_auth() && lock->get_state() == LOCK_MIX) {
-         dout(10) << "finish_scatter_update try to assimilate dirty rstat on " 
+         dout(10) << __func__ << " try to assimilate dirty rstat on " 
            << *dir << dendl; 
          dir->assimilate_dirty_rstat_inodes();
        }
@@ -1938,17 +2086,17 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
       le->metablob.add_dir_context(dir);
       le->metablob.add_dir(dir, true);
       
-      assert(!dir->is_frozen());
+      ceph_assert(!dir->is_frozen());
       mut->auth_pin(dir);
 
       if (lock->get_type() == CEPH_LOCK_INEST && 
          !is_auth() && lock->get_state() == LOCK_MIX) {
-        dout(10) << "finish_scatter_update finish assimilating dirty rstat on " 
+        dout(10) << __func__ << " finish assimilating dirty rstat on " 
           << *dir << dendl; 
         dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
 
         if (!(pf->rstat == pf->accounted_rstat)) {
-          if (mut->wrlocks.count(&nestlock) == 0) {
+          if (!mut->is_wrlocked(&nestlock)) {
             mdcache->mds->locker->wrlock_force(&nestlock, mut);
           }
 
@@ -1959,7 +2107,7 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
       
       mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
     } else {
-      dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
+      dout(10) << __func__ << " " << fg << " accounted " << *lock
               << " scatter stat unchanged at v" << dir_accounted_version << dendl;
     }
   }
@@ -1967,7 +2115,7 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
 
 void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
 {
-  dout(10) << "_finish_frag_update on " << *dir << dendl;
+  dout(10) << __func__ << " on " << *dir << dendl;
   mut->apply();
   mdcache->mds->locker->drop_locks(mut.get());
   mut->cleanup();
@@ -1996,8 +2144,8 @@ void CInode::finish_scatter_gather_update(int type)
 {
   LogChannelRef clog = mdcache->mds->clog;
 
-  dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
-  assert(is_auth());
+  dout(10) << __func__ << " " << type << " on " << *this << dendl;
+  ceph_assert(is_auth());
 
   switch (type) {
   case CEPH_LOCK_IFILE:
@@ -2007,7 +2155,7 @@ void CInode::finish_scatter_gather_update(int type)
       bool dirstat_valid = true;
 
       // adjust summation
-      assert(is_auth());
+      ceph_assert(is_auth());
       mempool_inode *pi = get_projected_inode();
 
       bool touched_mtime = false, touched_chattr = false;
@@ -2042,7 +2190,7 @@ void CInode::finish_scatter_gather_update(int type)
            pf->fragstat.nsubdirs < 0) {
          clog->error() << "bad/negative dir size on "
              << dir->dirfrag() << " " << pf->fragstat;
-         assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
+         ceph_assert(!"bad/negative fragstat" == g_conf()->mds_verify_scatter);
          
          if (pf->fragstat.nfiles < 0)
            pf->fragstat.nfiles = 0;
@@ -2066,20 +2214,21 @@ void CInode::finish_scatter_gather_update(int type)
       dout(20) << " final dirstat " << pi->dirstat << dendl;
 
       if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
-       list<frag_t> ls;
-       tmpdft.get_leaves_under(frag_t(), ls);
-       for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
-         if (!dirfrags.count(*p)) {
+        frag_vec_t leaves;
+        tmpdft.get_leaves_under(frag_t(), leaves);
+       for (const auto& leaf : leaves) {
+         if (!dirfrags.count(leaf)) {
            dirstat_valid = false;
            break;
          }
+        }
        if (dirstat_valid) {
          if (state_test(CInode::STATE_REPAIRSTATS)) {
            dout(20) << " dirstat mismatch, fixing" << dendl;
          } else {
            clog->error() << "unmatched fragstat on " << ino() << ", inode has "
                          << pi->dirstat << ", dirfrags have " << dirstat;
-           assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
+           ceph_assert(!"unmatched fragstat" == g_conf()->mds_verify_scatter);
          }
          // trust the dirfrags for now
          version_t v = pi->dirstat.version;
@@ -2098,7 +2247,7 @@ void CInode::finish_scatter_gather_update(int type)
         make_path_string(path);
        clog->error() << "Inconsistent statistics detected: fragstat on inode "
                       << ino() << " (" << path << "), inode has " << pi->dirstat;
-       assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
+       ceph_assert(!"bad/negative fragstat" == g_conf()->mds_verify_scatter);
 
        if (pi->dirstat.nfiles < 0)
          pi->dirstat.nfiles = 0;
@@ -2110,13 +2259,17 @@ void CInode::finish_scatter_gather_update(int type)
 
   case CEPH_LOCK_INEST:
     {
+      // adjust summation
+      ceph_assert(is_auth());
+
       fragtree_t tmpdft = dirfragtree;
       nest_info_t rstat;
-      rstat.rsubdirs = 1;
       bool rstat_valid = true;
 
-      // adjust summation
-      assert(is_auth());
+      rstat.rsubdirs = 1;
+      if (const sr_t *srnode = get_projected_srnode(); srnode)
+       rstat.rsnaps = srnode->snaps.size();
+
       mempool_inode *pi = get_projected_inode();
       dout(20) << "  orig rstat " << pi->rstat << dendl;
       pi->rstat.version++;
@@ -2172,13 +2325,14 @@ void CInode::finish_scatter_gather_update(int type)
       dout(20) << " final rstat " << pi->rstat << dendl;
 
       if (rstat_valid && !rstat.same_sums(pi->rstat)) {
-       list<frag_t> ls;
-       tmpdft.get_leaves_under(frag_t(), ls);
-       for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
-         if (!dirfrags.count(*p)) {
+        frag_vec_t leaves;
+        tmpdft.get_leaves_under(frag_t(), leaves);
+        for (const auto& leaf : leaves) {
+          if (!dirfrags.count(leaf)) {
            rstat_valid = false;
            break;
          }
+        }
        if (rstat_valid) {
          if (state_test(CInode::STATE_REPAIRSTATS)) {
            dout(20) << " rstat mismatch, fixing" << dendl;
@@ -2186,7 +2340,7 @@ void CInode::finish_scatter_gather_update(int type)
            clog->error() << "inconsistent rstat on inode " << ino()
                           << ", inode has " << pi->rstat
                           << ", directory fragments have " << rstat;
-           assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
+           ceph_assert(!"unmatched rstat" == g_conf()->mds_verify_scatter);
          }
          // trust the dirfrag for now
          version_t v = pi->rstat.version;
@@ -2211,8 +2365,8 @@ void CInode::finish_scatter_gather_update(int type)
 
 void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
 {
-  dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
-  assert(is_auth());
+  dout(10) << __func__ << " " << type << " on " << *this << dendl;
+  ceph_assert(is_auth());
 
   for (const auto &p : dirfrags) {
     CDir *dir = p.second;
@@ -2223,7 +2377,7 @@ void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut,
       continue;  // nothing to do.
 
     dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
-    assert(dir->is_projected());
+    ceph_assert(dir->is_projected());
     fnode_t *pf = dir->get_projected_fnode();
     pf->version = dir->pre_dirty();
     mut->add_projected_fnode(dir);
@@ -2257,15 +2411,15 @@ bool CInode::is_freezing() const
   return false;
 }
 
-void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
+void CInode::add_dir_waiter(frag_t fg, MDSContext *c)
 {
   if (waiting_on_dir.empty())
     get(PIN_DIRWAITER);
   waiting_on_dir[fg].push_back(c);
-  dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
+  dout(10) << __func__ << " frag " << fg << " " << c << " on " << *this << dendl;
 }
 
-void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
+void CInode::take_dir_waiting(frag_t fg, MDSContext::vec& ls)
 {
   if (waiting_on_dir.empty())
     return;
@@ -2273,7 +2427,8 @@ void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
   auto it = waiting_on_dir.find(fg);
   if (it != waiting_on_dir.end()) {
     dout(10) << __func__ << " frag " << fg << " on " << *this << dendl;
-    ls.splice(ls.end(), it->second);
+    auto& waiting = it->second;
+    ls.insert(ls.end(), waiting.begin(), waiting.end());
     waiting_on_dir.erase(it);
 
     if (waiting_on_dir.empty())
@@ -2281,9 +2436,9 @@ void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
   }
 }
 
-void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c) 
+void CInode::add_waiter(uint64_t tag, MDSContext *c) 
 {
-  dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
+  dout(10) << __func__ << " tag " << std::hex << tag << std::dec << " " << c
           << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
           << " !frozen " << !is_frozen_inode()
           << " !freezing " << !is_freezing_inode()
@@ -2301,14 +2456,15 @@ void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
   MDSCacheObject::add_waiter(tag, c);
 }
 
-void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
+void CInode::take_waiting(uint64_t mask, MDSContext::vec& ls)
 {
   if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
     // take all dentry waiters
     while (!waiting_on_dir.empty()) {
       auto it = waiting_on_dir.begin();
       dout(10) << __func__ << " dirfrag " << it->first << " on " << *this << dendl;
-      ls.splice(ls.end(), it->second);
+      auto& waiting = it->second;
+      ls.insert(ls.end(), waiting.begin(), waiting.end());
       waiting_on_dir.erase(it);
     }
     put(PIN_DIRWAITER);
@@ -2320,8 +2476,8 @@ void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
 
 bool CInode::freeze_inode(int auth_pin_allowance)
 {
-  assert(auth_pin_allowance > 0);  // otherwise we need to adjust parent's nested_auth_pins
-  assert(auth_pins >= auth_pin_allowance);
+  ceph_assert(auth_pin_allowance > 0);  // otherwise we need to adjust parent's nested_auth_pins
+  ceph_assert(auth_pins >= auth_pin_allowance);
   if (auth_pins > auth_pin_allowance) {
     dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
     auth_pin_freeze_allowance = auth_pin_allowance;
@@ -2331,7 +2487,7 @@ bool CInode::freeze_inode(int auth_pin_allowance)
   }
 
   dout(10) << "freeze_inode - frozen" << dendl;
-  assert(auth_pins == auth_pin_allowance);
+  ceph_assert(auth_pins == auth_pin_allowance);
   if (!state_test(STATE_FROZEN)) {
     get(PIN_FROZEN);
     state_set(STATE_FROZEN);
@@ -2339,9 +2495,9 @@ bool CInode::freeze_inode(int auth_pin_allowance)
   return true;
 }
 
-void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished) 
+void CInode::unfreeze_inode(MDSContext::vec& finished) 
 {
-  dout(10) << "unfreeze_inode" << dendl;
+  dout(10) << __func__ << dendl;
   if (state_test(STATE_FREEZING)) {
     state_clear(STATE_FREEZING);
     put(PIN_FREEZING);
@@ -2355,38 +2511,38 @@ void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
 
 void CInode::unfreeze_inode()
 {
-    list<MDSInternalContextBase*> finished;
+    MDSContext::vec finished;
     unfreeze_inode(finished);
     mdcache->mds->queue_waiters(finished);
 }
 
 void CInode::freeze_auth_pin()
 {
-  assert(state_test(CInode::STATE_FROZEN));
+  ceph_assert(state_test(CInode::STATE_FROZEN));
   state_set(CInode::STATE_FROZENAUTHPIN);
 }
 
 void CInode::unfreeze_auth_pin()
 {
-  assert(state_test(CInode::STATE_FROZENAUTHPIN));
+  ceph_assert(state_test(CInode::STATE_FROZENAUTHPIN));
   state_clear(CInode::STATE_FROZENAUTHPIN);
   if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
-    list<MDSInternalContextBase*> finished;
+    MDSContext::vec finished;
     take_waiting(WAIT_UNFREEZE, finished);
     mdcache->mds->queue_waiters(finished);
   }
 }
 
-void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
+void CInode::clear_ambiguous_auth(MDSContext::vec& finished)
 {
-  assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
+  ceph_assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
   state_clear(CInode::STATE_AMBIGUOUSAUTH);
   take_waiting(CInode::WAIT_SINGLEAUTH, finished);
 }
 
 void CInode::clear_ambiguous_auth()
 {
-  list<MDSInternalContextBase*> finished;
+  MDSContext::vec finished;
   clear_ambiguous_auth(finished);
   mdcache->mds->queue_waiters(finished);
 }
@@ -2418,12 +2574,10 @@ void CInode::auth_pin(void *by)
   auth_pin_set.insert(by);
 #endif
 
-  dout(10) << "auth_pin by " << by << " on " << *this
-          << " now " << auth_pins << "+" << nested_auth_pins
-          << dendl;
+  dout(10) << "auth_pin by " << by << " on " << *this << " now " << auth_pins << dendl;
   
   if (parent)
-    parent->adjust_nested_auth_pins(1, 1, this);
+    parent->adjust_nested_auth_pins(1, this);
 }
 
 void CInode::auth_unpin(void *by) 
@@ -2431,21 +2585,22 @@ void CInode::auth_unpin(void *by)
   auth_pins--;
 
 #ifdef MDS_AUTHPIN_SET
-  assert(auth_pin_set.count(by));
-  auth_pin_set.erase(auth_pin_set.find(by));
+  {
+    auto it = auth_pin_set.find(by);
+    ceph_assert(it != auth_pin_set.end());
+    auth_pin_set.erase(it);
+  }
 #endif
 
   if (auth_pins == 0)
     put(PIN_AUTHPIN);
   
-  dout(10) << "auth_unpin by " << by << " on " << *this
-          << " now " << auth_pins << "+" << nested_auth_pins
-          << dendl;
+  dout(10) << "auth_unpin by " << by << " on " << *this << " now " << auth_pins << dendl;
   
-  assert(auth_pins >= 0);
+  ceph_assert(auth_pins >= 0);
 
   if (parent)
-    parent->adjust_nested_auth_pins(-1, -1, by);
+    parent->adjust_nested_auth_pins(-1, by);
 
   if (is_freezing_inode() &&
       auth_pins == auth_pin_freeze_allowance) {
@@ -2458,31 +2613,6 @@ void CInode::auth_unpin(void *by)
   }  
 }
 
-void CInode::adjust_nested_auth_pins(int a, void *by)
-{
-  assert(a);
-  nested_auth_pins += a;
-  dout(35) << "adjust_nested_auth_pins by " << by
-          << " change " << a << " yields "
-          << auth_pins << "+" << nested_auth_pins << dendl;
-  assert(nested_auth_pins >= 0);
-
-  if (g_conf->mds_debug_auth_pins) {
-    // audit
-    int s = 0;
-    for (const auto &p : dirfrags) {
-      CDir *dir = p.second;
-      if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
-       s++;
-    } 
-    assert(s == nested_auth_pins);
-  } 
-
-  if (parent)
-    parent->adjust_nested_auth_pins(a, 0, by);
-}
-
-
 // authority
 
 mds_authority_t CInode::authority() const
@@ -2509,12 +2639,12 @@ snapid_t CInode::get_oldest_snap()
   snapid_t t = first;
   if (!old_inodes.empty())
     t = old_inodes.begin()->second.first;
-  return MIN(t, oldest_snap);
+  return std::min(t, oldest_snap);
 }
 
 CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head)
 {
-  assert(follows >= first);
+  ceph_assert(follows >= first);
 
   mempool_inode *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
   mempool_xattr_map *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
@@ -2531,13 +2661,13 @@ CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head
 
   old.inode.trim_client_ranges(follows);
 
-  if (g_conf->mds_snap_rstat &&
+  if (g_conf()->mds_snap_rstat &&
       !(old.inode.rstat == old.inode.accounted_rstat))
     dirty_old_rstats.insert(follows);
   
   first = follows+1;
 
-  dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
+  dout(10) << __func__ << " " << (cow_head ? "head" : "previous_head" )
           << " to [" << old.first << "," << follows << "] on "
           << *this << dendl;
 
@@ -2547,7 +2677,7 @@ CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head
 void CInode::split_old_inode(snapid_t snap)
 {
   auto it = old_inodes.lower_bound(snap);
-  assert(it != old_inodes.end() && it->second.first < snap);
+  ceph_assert(it != old_inodes.end() && it->second.first < snap);
 
   mempool_old_inode &old = old_inodes[snap - 1];
   old = it->second;
@@ -2559,14 +2689,32 @@ void CInode::split_old_inode(snapid_t snap)
 
 void CInode::pre_cow_old_inode()
 {
-  snapid_t follows = find_snaprealm()->get_newest_seq();
+  snapid_t follows = mdcache->get_global_snaprealm()->get_newest_seq();
   if (first <= follows)
     cow_old_inode(follows, true);
 }
 
+bool CInode::has_snap_data(snapid_t snapid)
+{
+  bool found = snapid >= first && snapid <= last;
+  if (!found && is_multiversion()) {
+    auto p = old_inodes.lower_bound(snapid);
+    if (p != old_inodes.end()) {
+      if (p->second.first > snapid) {
+       if  (p != old_inodes.begin())
+         --p;
+      }
+      if (p->second.first <= snapid && snapid <= p->first) {
+       found = true;
+      }
+    }
+  }
+  return found;
+}
+
 void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
 {
-  dout(10) << "purge_stale_snap_data " << snaps << dendl;
+  dout(10) << __func__ << " " << snaps << dendl;
 
   for (auto it = old_inodes.begin(); it != old_inodes.end(); ) {
     const snapid_t &id = it->first;
@@ -2590,7 +2738,7 @@ CInode::mempool_old_inode * CInode::pick_old_inode(snapid_t snap)
     dout(10) << __func__ << " snap " << snap << " -> [" << it->second.first << "," << it->first << "]" << dendl;
     return &it->second;
   }
-  dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
+  dout(10) << __func__ << " snap " << snap << " -> nothing" << dendl;
   return NULL;
 }
 
@@ -2600,7 +2748,7 @@ void CInode::open_snaprealm(bool nosplit)
     SnapRealm *parent = find_snaprealm();
     snaprealm = new SnapRealm(mdcache, this);
     if (parent) {
-      dout(10) << "open_snaprealm " << snaprealm
+      dout(10) << __func__ << " " << snaprealm
               << " parent is " << parent
               << dendl;
       dout(30) << " siblings are " << parent->open_children << dendl;
@@ -2614,7 +2762,7 @@ void CInode::open_snaprealm(bool nosplit)
 void CInode::close_snaprealm(bool nojoin)
 {
   if (snaprealm) {
-    dout(15) << "close_snaprealm " << *snaprealm << dendl;
+    dout(15) << __func__ << " " << *snaprealm << dendl;
     snaprealm->close_parents();
     if (snaprealm->parent) {
       snaprealm->parent->open_children.erase(snaprealm);
@@ -2630,12 +2778,10 @@ SnapRealm *CInode::find_snaprealm() const
 {
   const CInode *cur = this;
   while (!cur->snaprealm) {
-    if (cur->get_parent_dn())
-      cur = cur->get_parent_dn()->get_dir()->get_inode();
-    else if (get_projected_parent_dn())
-      cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
-    else
+    const CDentry *pdn = cur->get_oldest_parent_dn();
+    if (!pdn)
       break;
+    cur = pdn->get_dir()->get_inode();
   }
   return cur->snaprealm;
 }
@@ -2643,37 +2789,50 @@ SnapRealm *CInode::find_snaprealm() const
 void CInode::encode_snap_blob(bufferlist &snapbl)
 {
   if (snaprealm) {
-    ::encode(snaprealm->srnode, snapbl);
-    dout(20) << "encode_snap_blob " << *snaprealm << dendl;
+    using ceph::encode;
+    encode(snaprealm->srnode, snapbl);
+    dout(20) << __func__ << " " << *snaprealm << dendl;
   }
 }
-void CInode::decode_snap_blob(bufferlist& snapbl)
+void CInode::decode_snap_blob(const bufferlist& snapbl)
 {
+  using ceph::decode;
   if (snapbl.length()) {
     open_snaprealm();
-    bufferlist::iterator p = snapbl.begin();
-    ::decode(snaprealm->srnode, p);
+    auto old_flags = snaprealm->srnode.flags;
+    auto p = snapbl.cbegin();
+    decode(snaprealm->srnode, p);
     if (is_base()) {
       bool ok = snaprealm->_open_parents(NULL);
-      assert(ok);
+      ceph_assert(ok);
+    } else {
+      if ((snaprealm->srnode.flags ^ old_flags) & sr_t::PARENT_GLOBAL) {
+       snaprealm->close_parents();
+       snaprealm->adjust_parent();
+      }
     }
-    dout(20) << "decode_snap_blob " << *snaprealm << dendl;
+    dout(20) << __func__ << " " << *snaprealm << dendl;
+  } else if (snaprealm) {
+    ceph_assert(mdcache->mds->is_any_replay());
+    snaprealm->merge_to(NULL);
   }
 }
 
 void CInode::encode_snap(bufferlist& bl)
 {
+  using ceph::encode;
   bufferlist snapbl;
   encode_snap_blob(snapbl);
-  ::encode(snapbl, bl);
-  ::encode(oldest_snap, bl);
-}    
+  encode(snapbl, bl);
+  encode(oldest_snap, bl);
+}
 
-void CInode::decode_snap(bufferlist::iterator& p)
+void CInode::decode_snap(bufferlist::const_iterator& p)
 {
+  using ceph::decode;
   bufferlist snapbl;
-  ::decode(snapbl, p);
-  ::decode(oldest_snap, p);
+  decode(snapbl, p);
+  decode(oldest_snap, p);
   decode_snap_blob(snapbl);
 }
 
@@ -2683,22 +2842,21 @@ client_t CInode::calc_ideal_loner()
 {
   if (mdcache->is_readonly())
     return -1;
-  if (!mds_caps_wanted.empty())
+  if (!get_mds_caps_wanted().empty())
     return -1;
   
   int n = 0;
   client_t loner = -1;
-  for (map<client_t,Capability*>::iterator it = client_caps.begin();
-       it != client_caps.end();
-       ++it) 
-    if (!it->second->is_stale() &&
-       ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
+  for (const auto &p : client_caps) {
+    if (!p.second.is_stale() &&
+       ((p.second.wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
         (inode.is_dir() && !has_subtree_root_dirfrag()))) {
       if (n)
        return -1;
       n++;
-      loner = it->first;
+      loner = p.first;
     }
+  }
   return loner;
 }
 
@@ -2717,14 +2875,14 @@ bool CInode::choose_ideal_loner()
       set_loner_cap(want_loner_cap);
       changed = true;
     } else
-      assert(loner_cap == want_loner_cap);
+      ceph_assert(loner_cap == want_loner_cap);
   }
   return changed;
 }
 
 bool CInode::try_set_loner()
 {
-  assert(want_loner_cap >= 0);
+  ceph_assert(want_loner_cap >= 0);
   if (loner_cap >= 0 && loner_cap != want_loner_cap)
     return false;
   set_loner_cap(want_loner_cap);
@@ -2780,7 +2938,7 @@ void CInode::choose_lock_state(SimpleLock *lock, int allissued)
   } else {
     // our states have already been chosen during rejoin.
     if (lock->is_xlocked())
-      assert(lock->get_state() == LOCK_LOCK);
+      ceph_assert(lock->get_state() == LOCK_LOCK);
   }
 }
  
@@ -2797,8 +2955,46 @@ void CInode::choose_lock_states(int dirty_caps)
   choose_lock_state(&linklock, issued);
 }
 
+void CInode::set_mds_caps_wanted(mempool::mds_co::compact_map<int32_t,int32_t>& m)
+{
+  bool old_empty = mds_caps_wanted.empty();
+  mds_caps_wanted.swap(m);
+  if (old_empty != (bool)mds_caps_wanted.empty()) {
+    if (old_empty)
+      adjust_num_caps_wanted(1);
+    else
+      adjust_num_caps_wanted(-1);
+  }
+}
+
+void CInode::set_mds_caps_wanted(mds_rank_t mds, int32_t wanted)
+{
+  bool old_empty = mds_caps_wanted.empty();
+  if (wanted) {
+    mds_caps_wanted[mds] = wanted;
+    if (old_empty)
+      adjust_num_caps_wanted(1);
+  } else if (!old_empty) {
+    mds_caps_wanted.erase(mds);
+    if (mds_caps_wanted.empty())
+      adjust_num_caps_wanted(-1);
+  }
+}
+
+void CInode::adjust_num_caps_wanted(int d)
+{
+  if (!num_caps_wanted && d > 0)
+    mdcache->open_file_table.add_inode(this);
+  else if (num_caps_wanted > 0 && num_caps_wanted == -d)
+    mdcache->open_file_table.remove_inode(this);
+
+  num_caps_wanted +=d;
+  ceph_assert(num_caps_wanted >= 0);
+}
+
 Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
 {
+  ceph_assert(last == CEPH_NOSNAP);
   if (client_caps.empty()) {
     get(PIN_CAPS);
     if (conrealm)
@@ -2806,27 +3002,30 @@ Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm
     else
       containing_realm = find_snaprealm();
     containing_realm->inodes_with_caps.push_back(&item_caps);
-    dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
-  }
+    dout(10) << __func__ << " first cap, joining realm " << *containing_realm << dendl;
 
-  if (client_caps.empty())
     mdcache->num_inodes_with_caps++;
-  
-  Capability *cap = new Capability(this, session, ++mdcache->last_cap_id);
-  assert(client_caps.count(client) == 0);
-  client_caps[client] = cap;
+    if (parent)
+      parent->dir->adjust_num_inodes_with_caps(1);
+  }
+
+  uint64_t cap_id = ++mdcache->last_cap_id;
+  auto ret = client_caps.emplace(std::piecewise_construct, std::forward_as_tuple(client),
+                                 std::forward_as_tuple(this, session, cap_id));
+  ceph_assert(ret.second == true);
+  Capability *cap = &ret.first->second;
 
   cap->client_follows = first-1;
-  
   containing_realm->add_cap(client, cap);
-  
+
   return cap;
 }
 
 void CInode::remove_client_cap(client_t client)
 {
-  assert(client_caps.count(client) == 1);
-  Capability *cap = client_caps[client];
+  auto it = client_caps.find(client);
+  ceph_assert(it != client_caps.end());
+  Capability *cap = &it->second;
   
   cap->item_session_caps.remove_myself();
   cap->item_revoking_caps.remove_myself();
@@ -2836,22 +3035,25 @@ void CInode::remove_client_cap(client_t client)
   if (client == loner_cap)
     loner_cap = -1;
 
-  delete cap;
-  client_caps.erase(client);
+  if (cap->wanted())
+    adjust_num_caps_wanted(-1);
+
+  client_caps.erase(it);
   if (client_caps.empty()) {
-    dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
+    dout(10) << __func__ << " last cap, leaving realm " << *containing_realm << dendl;
     put(PIN_CAPS);
     item_caps.remove_myself();
     containing_realm = NULL;
-    item_open_file.remove_myself();  // unpin logsegment
     mdcache->num_inodes_with_caps--;
+    if (parent)
+      parent->dir->adjust_num_inodes_with_caps(-1);
   }
 
   //clean up advisory locks
   bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
   bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false; 
   if (fcntl_removed || flock_removed) {
-    list<MDSInternalContextBase*> waiters;
+    MDSContext::vec waiters;
     take_waiting(CInode::WAIT_FLOCK, waiters);
     mdcache->mds->queue_waiters(waiters);
   }
@@ -2859,13 +3061,11 @@ void CInode::remove_client_cap(client_t client)
 
 void CInode::move_to_realm(SnapRealm *realm)
 {
-  dout(10) << "move_to_realm joining realm " << *realm
+  dout(10) << __func__ << " joining realm " << *realm
           << ", leaving realm " << *containing_realm << dendl;
-  for (map<client_t,Capability*>::iterator q = client_caps.begin();
-       q != client_caps.end();
-       ++q) {
-    containing_realm->remove_cap(q->first, q->second);
-    realm->add_cap(q->first, q->second);
+  for (auto& p : client_caps) {
+    containing_realm->remove_cap(p.first, &p.second);
+    realm->add_cap(p.first, &p.second);
   }
   item_caps.remove_myself();
   realm->inodes_with_caps.push_back(&item_caps);
@@ -2895,15 +3095,16 @@ void CInode::clear_client_caps_after_export()
     remove_client_cap(client_caps.begin()->first);
   loner_cap = -1;
   want_loner_cap = -1;
-  mds_caps_wanted.clear();
+  if (!get_mds_caps_wanted().empty()) {
+    mempool::mds_co::compact_map<int32_t,int32_t> empty;
+    set_mds_caps_wanted(empty);
+  }
 }
 
 void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
 {
-  for (map<client_t,Capability*>::iterator it = client_caps.begin();
-       it != client_caps.end();
-       ++it) {
-    cl[it->first] = it->second->make_export();
+  for (const auto &p : client_caps) {
+    cl[p.first] = p.second.make_export();
   }
 }
 
@@ -2959,9 +3160,10 @@ int CInode::get_xlocker_mask(client_t client) const
     (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
 }
 
-int CInode::get_caps_allowed_for_client(Session *session, mempool_inode *file_i) const
+int CInode::get_caps_allowed_for_client(Session *session, Capability *cap,
+                                       mempool_inode *file_i) const
 {
-  client_t client = session->info.inst.name.num();
+  client_t client = session->get_client();
   int allowed;
   if (client == get_loner()) {
     // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
@@ -2973,11 +3175,23 @@ int CInode::get_caps_allowed_for_client(Session *session, mempool_inode *file_i)
   }
 
   if (!is_dir()) {
-    if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
-        !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
-       (!file_i->layout.pool_ns.empty() &&
-        !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
-      allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
+    if (file_i->inline_data.version == CEPH_INLINE_NONE &&
+       file_i->layout.pool_ns.empty()) {
+      // noop
+    } else if (cap) {
+      if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
+          cap->is_noinline()) ||
+         (!file_i->layout.pool_ns.empty() &&
+          cap->is_nopoolns()))
+       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
+    } else {
+      auto& conn = session->get_connection();
+      if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
+          !conn->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
+         (!file_i->layout.pool_ns.empty() &&
+          !conn->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
+       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
+    }
   }
   return allowed;
 }
@@ -2992,16 +3206,14 @@ int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
     loner_cap = -1;
   }
 
-  for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
-       it != client_caps.end();
-       ++it) {
-    int i = it->second->issued();
+  for (const auto &p : client_caps) {
+    int i = p.second.issued();
     c |= i;
-    if (it->first == loner_cap)
+    if (p.first == loner_cap)
       loner |= i;
     else
       other |= i;
-    xlocker |= get_xlocker_mask(it->first) & i;
+    xlocker |= get_xlocker_mask(p.first) & i;
   }
   if (ploner) *ploner = (loner >> shift) & mask;
   if (pother) *pother = (other >> shift) & mask;
@@ -3011,11 +3223,10 @@ int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
 
 bool CInode::is_any_caps_wanted() const
 {
-  for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
-       it != client_caps.end();
-       ++it)
-    if (it->second->wanted())
+  for (const auto &p : client_caps) {
+    if (p.second.wanted())
       return true;
+  }
   return false;
 }
 
@@ -3023,13 +3234,11 @@ int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
 {
   int w = 0;
   int loner = 0, other = 0;
-  for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
-       it != client_caps.end();
-       ++it) {
-    if (!it->second->is_stale()) {
-      int t = it->second->wanted();
+  for (const auto &p : client_caps) {
+    if (!p.second.is_stale()) {
+      int t = p.second.wanted();
       w |= t;
-      if (it->first == loner_cap)
+      if (p.first == loner_cap)
        loner |= t;
       else
        other |= t;     
@@ -3059,24 +3268,6 @@ bool CInode::issued_caps_need_gather(SimpleLock *lock)
   return false;
 }
 
-void CInode::replicate_relax_locks()
-{
-  //dout(10) << " relaxing locks on " << *this << dendl;
-  assert(is_auth());
-  assert(!is_replicated());
-  
-  authlock.replicate_relax();
-  linklock.replicate_relax();
-  dirfragtreelock.replicate_relax();
-  filelock.replicate_relax();
-  xattrlock.replicate_relax();
-  snaplock.replicate_relax();
-  nestlock.replicate_relax();
-  flocklock.replicate_relax();
-  policylock.replicate_relax();
-}
-
-
 
 // =============================================
 
@@ -3086,9 +3277,8 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
                             unsigned max_bytes,
                             int getattr_caps)
 {
-  client_t client = session->info.inst.name.num();
-  assert(snapid);
-  assert(session->connection);
+  client_t client = session->get_client();
+  ceph_assert(snapid);
   
   bool valid = true;
 
@@ -3121,13 +3311,13 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
          pxattrs = &p.xattrs;
        } else {
          // snapshoted remote dentry can result this
-         dout(0) << "encode_inodestat old_inode for snapid " << snapid
+         dout(0) << __func__ << " old_inode for snapid " << snapid
                  << " not found" << dendl;
        }
       }
     } else if (snapid < first || snapid > last) {
       // snapshoted remote dentry can result this
-      dout(0) << "encode_inodestat [" << first << "," << last << "]"
+      dout(0) << __func__ << " [" << first << "," << last << "]"
              << " not match snapid " << snapid << dendl;
     }
   }
@@ -3140,7 +3330,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
                 is_frozen() ||
                 state_test(CInode::STATE_EXPORTINGCAPS);
   if (no_caps)
-    dout(20) << "encode_inodestat no caps"
+    dout(20) << __func__ << " no caps"
             << (!valid?", !valid":"")
             << (session->is_stale()?", session stale ":"")
             << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
@@ -3181,7 +3371,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
 
   // max_size is min of projected, actual
   uint64_t max_size =
-    MIN(oi->client_ranges.count(client) ?
+    std::min(oi->client_ranges.count(client) ?
        oi->client_ranges[client].range.last : 0,
        pi->client_ranges.count(client) ?
        pi->client_ranges[client].range.last : 0);
@@ -3214,15 +3404,14 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   // xattr
   mempool_inode *xattr_i = pxattr ? pi:oi;
 
+  using ceph::encode;
   // xattr
-  bufferlist xbl;
   version_t xattr_version;
   if ((!cap && !no_caps) ||
       (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
       (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
     if (!pxattrs)
       pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
-    ::encode(*pxattrs, xbl);
     xattr_version = xattr_i->xattr_version;
   } else {
     xattr_version = 0;
@@ -3230,17 +3419,31 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   
   // do we have room?
   if (max_bytes) {
-    unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
-      sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
-      sizeof(struct ceph_timespec) * 3 +
-      4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
-      8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
-      4;
-    bytes += sizeof(__u32);
-    bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
-    bytes += sizeof(__u32) + symlink.length();
-    bytes += sizeof(__u32) + xbl.length();
-    bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
+    unsigned bytes =
+      8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
+      sizeof(struct ceph_file_layout) +
+      sizeof(struct ceph_timespec) * 3 + 4 + // ctime ~ time_warp_seq
+      8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 + // size ~ nlink
+      8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) + // dirstat.nfiles ~ rstat.rctime
+      sizeof(__u32) + sizeof(__u32) * 2 * dirfragtree._splits.size() + // dirfragtree
+      sizeof(__u32) + symlink.length() + // symlink
+      sizeof(struct ceph_dir_layout); // dir_layout
+
+    if (xattr_version) {
+      bytes += sizeof(__u32) + sizeof(__u32); // xattr buffer len + number entries
+      if (pxattrs) {
+       for (const auto &p : *pxattrs)
+         bytes += sizeof(__u32) * 2 + p.first.length() + p.second.length();
+      }
+    } else {
+      bytes += sizeof(__u32); // xattr buffer len
+    }
+    bytes +=
+      sizeof(version_t) + sizeof(__u32) + inline_data.length() + // inline data
+      1 + 1 + 8 + 8 + 4 + // quota
+      4 + layout.pool_ns.size() + // pool ns
+      sizeof(struct ceph_timespec) + 8; // btime + change_attr
+
     if (bytes > max_bytes)
       return -ENOSPC;
   }
@@ -3265,7 +3468,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
      */
     ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
     if (last == CEPH_NOSNAP || is_any_caps())
-      ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
+      ecap.caps = ecap.caps & get_caps_allowed_for_client(session, nullptr, file_i);
     ecap.seq = 0;
     ecap.mseq = 0;
     ecap.realm = 0;
@@ -3280,7 +3483,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
     int issue = 0;
     if (!no_caps && cap) {
       int likes = get_caps_liked();
-      int allowed = get_caps_allowed_for_client(session, file_i);
+      int allowed = get_caps_allowed_for_client(session, cap, file_i);
       issue = (cap->wanted() | likes) & allowed;
       cap->issue_norevoke(issue);
       issue = cap->pending();
@@ -3288,7 +3491,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
               << " seq " << cap->get_last_seq() << dendl;
     } else if (cap && cap->is_new() && !dir_realm) {
       // alway issue new caps to client, otherwise the caps get lost
-      assert(cap->is_stale());
+      ceph_assert(cap->is_stale());
       issue = cap->pending() | CEPH_CAP_PIN;
       cap->issue_norevoke(issue);
       dout(10) << "encode_inodestat issuing " << ccap_string(issue)
@@ -3318,8 +3521,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
   dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
           << " seq " << ecap.seq << " mseq " << ecap.mseq
-          << " xattrv " << xattr_version << " len " << xbl.length()
-          << dendl;
+          << " xattrv " << xattr_version << dendl;
 
   if (inline_data.length() && cap) {
     if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
@@ -3333,84 +3535,158 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
   }
 
   // include those xattrs?
-  if (xbl.length() && cap) {
+  if (xattr_version && cap) {
     if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
-      dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
-      cap->client_xattr_version = xattr_i->xattr_version;
+      dout(10) << "including xattrs version " << xattr_version << dendl;
+      cap->client_xattr_version = xattr_version;
     } else {
-      dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
-      xbl.clear(); // no xattrs .. XXX what's this about?!?
+      dout(10) << "dropping xattrs version " << xattr_version << dendl;
       xattr_version = 0;
     }
   }
 
+  // The end result of encode_xattrs() is equivalent to:
+  // {
+  //   bufferlist xbl;
+  //   if (xattr_version) {
+  //     if (pxattrs)
+  //       encode(*pxattrs, bl);
+  //     else
+  //       encode((__u32)0, bl);
+  //   }
+  //   encode(xbl, bl);
+  // }
+  //
+  // But encoding xattrs into the 'xbl' requires a memory allocation.
+  // The 'bl' should have enough pre-allocated memory in most cases.
+  // Encoding xattrs directly into it can avoid the extra allocation.
+  auto encode_xattrs = [xattr_version, pxattrs, &bl]() {
+    using ceph::encode;
+    if (xattr_version) {
+      ceph_le32 xbl_len;
+      auto filler = bl.append_hole(sizeof(xbl_len));
+      const auto starting_bl_len = bl.length();
+      if (pxattrs)
+       encode(*pxattrs, bl);
+      else
+       encode((__u32)0, bl);
+      xbl_len = bl.length() - starting_bl_len;
+      filler.copy_in(sizeof(xbl_len), (char *)&xbl_len);
+    } else {
+      encode((__u32)0, bl);
+    }
+  };
+
   /*
    * note: encoding matches MClientReply::InodeStat
    */
-  ::encode(oi->ino, bl);
-  ::encode(snapid, bl);
-  ::encode(oi->rdev, bl);
-  ::encode(version, bl);
-
-  ::encode(xattr_version, bl);
-
-  ::encode(ecap, bl);
-  {
-    ceph_file_layout legacy_layout;
-    layout.to_legacy(&legacy_layout);
-    ::encode(legacy_layout, bl);
-  }
-  ::encode(any_i->ctime, bl);
-  ::encode(file_i->mtime, bl);
-  ::encode(file_i->atime, bl);
-  ::encode(file_i->time_warp_seq, bl);
-  ::encode(file_i->size, bl);
-  ::encode(max_size, bl);
-  ::encode(file_i->truncate_size, bl);
-  ::encode(file_i->truncate_seq, bl);
-
-  ::encode(auth_i->mode, bl);
-  ::encode((uint32_t)auth_i->uid, bl);
-  ::encode((uint32_t)auth_i->gid, bl);
-
-  ::encode(link_i->nlink, bl);
-
-  ::encode(file_i->dirstat.nfiles, bl);
-  ::encode(file_i->dirstat.nsubdirs, bl);
-  ::encode(file_i->rstat.rbytes, bl);
-  ::encode(file_i->rstat.rfiles, bl);
-  ::encode(file_i->rstat.rsubdirs, bl);
-  ::encode(file_i->rstat.rctime, bl);
-
-  dirfragtree.encode(bl);
-
-  ::encode(symlink, bl);
-  if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
-    ::encode(file_i->dir_layout, bl);
-  }
-  ::encode(xbl, bl);
-  if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
-    ::encode(inline_version, bl);
-    ::encode(inline_data, bl);
-  }
-  if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
+  if (session->info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
+    ENCODE_START(2, 1, bl);
+    encode(oi->ino, bl);
+    encode(snapid, bl);
+    encode(oi->rdev, bl);
+    encode(version, bl);
+    encode(xattr_version, bl);
+    encode(ecap, bl);
+    {
+      ceph_file_layout legacy_layout;
+      layout.to_legacy(&legacy_layout);
+      encode(legacy_layout, bl);
+    }
+    encode(any_i->ctime, bl);
+    encode(file_i->mtime, bl);
+    encode(file_i->atime, bl);
+    encode(file_i->time_warp_seq, bl);
+    encode(file_i->size, bl);
+    encode(max_size, bl);
+    encode(file_i->truncate_size, bl);
+    encode(file_i->truncate_seq, bl);
+    encode(auth_i->mode, bl);
+    encode((uint32_t)auth_i->uid, bl);
+    encode((uint32_t)auth_i->gid, bl);
+    encode(link_i->nlink, bl);
+    encode(file_i->dirstat.nfiles, bl);
+    encode(file_i->dirstat.nsubdirs, bl);
+    encode(file_i->rstat.rbytes, bl);
+    encode(file_i->rstat.rfiles, bl);
+    encode(file_i->rstat.rsubdirs, bl);
+    encode(file_i->rstat.rctime, bl);
+    dirfragtree.encode(bl);
+    encode(symlink, bl);
+    encode(file_i->dir_layout, bl);
+    encode_xattrs();
+    encode(inline_version, bl);
+    encode(inline_data, bl);
     mempool_inode *policy_i = ppolicy ? pi : oi;
-    ::encode(policy_i->quota, bl);
-  }
-  if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
-    ::encode(layout.pool_ns, bl);
-  }
-  if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
-    ::encode(any_i->btime, bl);
-    ::encode(any_i->change_attr, bl);
+    encode(policy_i->quota, bl);
+    encode(layout.pool_ns, bl);
+    encode(any_i->btime, bl);
+    encode(any_i->change_attr, bl);
+    encode(file_i->export_pin, bl);
+    ENCODE_FINISH(bl);
+  }
+  else {
+    ceph_assert(session->get_connection());
+
+    encode(oi->ino, bl);
+    encode(snapid, bl);
+    encode(oi->rdev, bl);
+    encode(version, bl);
+    encode(xattr_version, bl);
+    encode(ecap, bl);
+    {
+      ceph_file_layout legacy_layout;
+      layout.to_legacy(&legacy_layout);
+      encode(legacy_layout, bl);
+    }
+    encode(any_i->ctime, bl);
+    encode(file_i->mtime, bl);
+    encode(file_i->atime, bl);
+    encode(file_i->time_warp_seq, bl);
+    encode(file_i->size, bl);
+    encode(max_size, bl);
+    encode(file_i->truncate_size, bl);
+    encode(file_i->truncate_seq, bl);
+    encode(auth_i->mode, bl);
+    encode((uint32_t)auth_i->uid, bl);
+    encode((uint32_t)auth_i->gid, bl);
+    encode(link_i->nlink, bl);
+    encode(file_i->dirstat.nfiles, bl);
+    encode(file_i->dirstat.nsubdirs, bl);
+    encode(file_i->rstat.rbytes, bl);
+    encode(file_i->rstat.rfiles, bl);
+    encode(file_i->rstat.rsubdirs, bl);
+    encode(file_i->rstat.rctime, bl);
+    dirfragtree.encode(bl);
+    encode(symlink, bl);
+    auto& conn = session->get_connection();
+    if (conn->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
+      encode(file_i->dir_layout, bl);
+    }
+    encode_xattrs();
+    if (conn->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
+      encode(inline_version, bl);
+      encode(inline_data, bl);
+    }
+    if (conn->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
+      mempool_inode *policy_i = ppolicy ? pi : oi;
+      encode(policy_i->quota, bl);
+    }
+    if (conn->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
+      encode(layout.pool_ns, bl);
+    }
+    if (conn->has_feature(CEPH_FEATURE_FS_BTIME)) {
+      encode(any_i->btime, bl);
+      encode(any_i->change_attr, bl);
+    }
   }
 
   return valid;
 }
 
-void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
+void CInode::encode_cap_message(const MClientCaps::ref &m, Capability *cap)
 {
-  assert(cap);
+  ceph_assert(cap);
 
   client_t client = cap->get_client();
 
@@ -3423,7 +3699,7 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
   mempool_inode *pi = get_projected_inode();
   mempool_inode *i = (pfile|pauth|plink|pxattr) ? pi : oi;
 
-  dout(20) << "encode_cap_message pfile " << pfile
+  dout(20) << __func__ << " pfile " << pfile
           << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
           << " ctime " << i->ctime << dendl;
 
@@ -3451,7 +3727,7 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
   // max_size is min of projected, actual.
   uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
   uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
-  m->max_size = MIN(oldms, newms);
+  m->max_size = std::min(oldms, newms);
 
   i = pauth ? pi:oi;
   m->head.mode = i->mode;
@@ -3461,12 +3737,13 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
   i = plink ? pi:oi;
   m->head.nlink = i->nlink;
 
+  using ceph::encode;
   i = pxattr ? pi:oi;
   auto ix = pxattr ? get_projected_xattrs() : &xattrs;
   if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
       i->xattr_version > cap->client_xattr_version) {
     dout(10) << "    including xattrs v " << i->xattr_version << dendl;
-    ::encode(*ix, m->xattrbl);
+    encode(*ix, m->xattrbl);
     m->head.xattr_version = i->xattr_version;
     cap->client_xattr_version = i->xattr_version;
   }
@@ -3476,58 +3753,62 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
 
 void CInode::_encode_base(bufferlist& bl, uint64_t features)
 {
-  ::encode(first, bl);
-  ::encode(inode, bl, features);
-  ::encode(symlink, bl);
-  ::encode(dirfragtree, bl);
-  ::encode(xattrs, bl);
-  ::encode(old_inodes, bl, features);
-  ::encode(damage_flags, bl);
+  using ceph::encode;
+  encode(first, bl);
+  encode(inode, bl, features);
+  encode(symlink, bl);
+  encode(dirfragtree, bl);
+  encode(xattrs, bl);
+  encode(old_inodes, bl, features);
+  encode(damage_flags, bl);
   encode_snap(bl);
 }
-void CInode::_decode_base(bufferlist::iterator& p)
+void CInode::_decode_base(bufferlist::const_iterator& p)
 {
-  ::decode(first, p);
-  ::decode(inode, p);
+  using ceph::decode;
+  decode(first, p);
+  decode(inode, p);
   {
     std::string tmp;
-    ::decode(tmp, p);
-    symlink = mempool::mds_co::string(boost::string_view(tmp));
+    decode(tmp, p);
+    symlink = std::string_view(tmp);
   }
-  ::decode(dirfragtree, p);
-  ::decode(xattrs, p);
-  ::decode(old_inodes, p);
-  ::decode(damage_flags, p);
+  decode(dirfragtree, p);
+  decode(xattrs, p);
+  decode(old_inodes, p);
+  decode(damage_flags, p);
   decode_snap(p);
 }
 
 void CInode::_encode_locks_full(bufferlist& bl)
 {
-  ::encode(authlock, bl);
-  ::encode(linklock, bl);
-  ::encode(dirfragtreelock, bl);
-  ::encode(filelock, bl);
-  ::encode(xattrlock, bl);
-  ::encode(snaplock, bl);
-  ::encode(nestlock, bl);
-  ::encode(flocklock, bl);
-  ::encode(policylock, bl);
-
-  ::encode(loner_cap, bl);
-}
-void CInode::_decode_locks_full(bufferlist::iterator& p)
-{
-  ::decode(authlock, p);
-  ::decode(linklock, p);
-  ::decode(dirfragtreelock, p);
-  ::decode(filelock, p);
-  ::decode(xattrlock, p);
-  ::decode(snaplock, p);
-  ::decode(nestlock, p);
-  ::decode(flocklock, p);
-  ::decode(policylock, p);
-
-  ::decode(loner_cap, p);
+  using ceph::encode;
+  encode(authlock, bl);
+  encode(linklock, bl);
+  encode(dirfragtreelock, bl);
+  encode(filelock, bl);
+  encode(xattrlock, bl);
+  encode(snaplock, bl);
+  encode(nestlock, bl);
+  encode(flocklock, bl);
+  encode(policylock, bl);
+
+  encode(loner_cap, bl);
+}
+void CInode::_decode_locks_full(bufferlist::const_iterator& p)
+{
+  using ceph::decode;
+  decode(authlock, p);
+  decode(linklock, p);
+  decode(dirfragtreelock, p);
+  decode(filelock, p);
+  decode(xattrlock, p);
+  decode(snaplock, p);
+  decode(nestlock, p);
+  decode(flocklock, p);
+  decode(policylock, p);
+
+  decode(loner_cap, p);
   set_loner_cap(loner_cap);
   want_loner_cap = loner_cap;  // for now, we'll eval() shortly.
 }
@@ -3543,7 +3824,8 @@ void CInode::_encode_locks_state_for_replica(bufferlist& bl, bool need_recover)
   snaplock.encode_state_for_replica(bl);
   flocklock.encode_state_for_replica(bl);
   policylock.encode_state_for_replica(bl);
-  ::encode(need_recover, bl);
+  using ceph::encode;
+  encode(need_recover, bl);
 }
 
 void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
@@ -3559,7 +3841,7 @@ void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
   policylock.encode_state_for_replica(bl);
 }
 
-void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
+void CInode::_decode_locks_state(bufferlist::const_iterator& p, bool is_new)
 {
   authlock.decode_state(p, is_new);
   linklock.decode_state(p, is_new);
@@ -3571,8 +3853,9 @@ void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
   flocklock.decode_state(p, is_new);
   policylock.decode_state(p, is_new);
 
+  using ceph::decode;
   bool need_recover;
-  ::decode(need_recover, p);
+  decode(need_recover, p);
   if (need_recover && is_new) {
     // Auth mds replicated this inode while it's recovering. Auth mds may take xlock on the lock
     // and change the object when replaying unsafe requests.
@@ -3587,7 +3870,7 @@ void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
     policylock.mark_need_recover();
   }
 }
-void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
+void CInode::_decode_locks_rejoin(bufferlist::const_iterator& p, MDSContext::vec& waiters,
                                  list<SimpleLock*>& eval_locks, bool survivor)
 {
   authlock.decode_state_rejoin(p, waiters, survivor);
@@ -3616,11 +3899,11 @@ void CInode::encode_export(bufferlist& bl)
   ENCODE_START(5, 4, bl);
   _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
 
-  ::encode(state, bl);
+  encode(state, bl);
 
-  ::encode(pop, bl);
+  encode(pop, bl);
 
-  ::encode(get_replicas(), bl);
+  encode(get_replicas(), bl);
 
   // include scatterlock info for any bounding CDirs
   bufferlist bounding;
@@ -3628,15 +3911,15 @@ void CInode::encode_export(bufferlist& bl)
     for (const auto &p : dirfrags) {
       CDir *dir = p.second;
       if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
-       ::encode(p.first, bounding);
-       ::encode(dir->fnode.fragstat, bounding);
-       ::encode(dir->fnode.accounted_fragstat, bounding);
-       ::encode(dir->fnode.rstat, bounding);
-       ::encode(dir->fnode.accounted_rstat, bounding);
+       encode(p.first, bounding);
+       encode(dir->fnode.fragstat, bounding);
+       encode(dir->fnode.accounted_fragstat, bounding);
+       encode(dir->fnode.rstat, bounding);
+       encode(dir->fnode.accounted_rstat, bounding);
        dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
       }
     }
-  ::encode(bounding, bl);
+  encode(bounding, bl);
 
   _encode_locks_full(bl);
 
@@ -3647,11 +3930,11 @@ void CInode::encode_export(bufferlist& bl)
   get(PIN_TEMPEXPORTING);
 }
 
-void CInode::finish_export(utime_t now)
+void CInode::finish_export()
 {
   state &= MASK_STATE_EXPORT_KEPT;
 
-  pop.zero(now);
+  pop.zero();
 
   // just in case!
   //dirlock.clear_updated();
@@ -3661,7 +3944,7 @@ void CInode::finish_export(utime_t now)
   put(PIN_TEMPEXPORTING);
 }
 
-void CInode::decode_import(bufferlist::iterator& p,
+void CInode::decode_import(bufferlist::const_iterator& p,
                           LogSegment *ls)
 {
   DECODE_START(5, p);
@@ -3669,7 +3952,7 @@ void CInode::decode_import(bufferlist::iterator& p,
   _decode_base(p);
 
   unsigned s;
-  ::decode(s, p);
+  decode(s, p);
   state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
 
   if (is_dirty()) {
@@ -3681,22 +3964,22 @@ void CInode::decode_import(bufferlist::iterator& p,
     mark_dirty_parent(ls);
   }
 
-  ::decode(pop, ceph_clock_now(), p);
+  decode(pop, p);
 
-  ::decode(get_replicas(), p);
+  decode(get_replicas(), p);
   if (is_replicated())
     get(PIN_REPLICATED);
   replica_nonce = 0;
 
   // decode fragstat info on bounding cdirs
   bufferlist bounding;
-  ::decode(bounding, p);
-  bufferlist::iterator q = bounding.begin();
+  decode(bounding, p);
+  auto q = bounding.cbegin();
   while (!q.end()) {
     frag_t fg;
-    ::decode(fg, q);
+    decode(fg, q);
     CDir *dir = get_dirfrag(fg);
-    assert(dir);  // we should have all bounds open
+    ceph_assert(dir);  // we should have all bounds open
 
     // Only take the remote's fragstat/rstat if we are non-auth for
     // this dirfrag AND the lock is NOT in a scattered (MIX) state.
@@ -3712,22 +3995,22 @@ void CInode::decode_import(bufferlist::iterator& p,
         filelock.get_state() == LOCK_MIX) {
       dout(10) << " skipped fragstat info for " << *dir << dendl;
       frag_info_t f;
-      ::decode(f, q);
-      ::decode(f, q);
+      decode(f, q);
+      decode(f, q);
     } else {
-      ::decode(dir->fnode.fragstat, q);
-      ::decode(dir->fnode.accounted_fragstat, q);
+      decode(dir->fnode.fragstat, q);
+      decode(dir->fnode.accounted_fragstat, q);
       dout(10) << " took fragstat info for " << *dir << dendl;
     }
     if (dir->is_auth() ||
         nestlock.get_state() == LOCK_MIX) {
       dout(10) << " skipped rstat info for " << *dir << dendl;
       nest_info_t n;
-      ::decode(n, q);
-      ::decode(n, q);
+      decode(n, q);
+      decode(n, q);
     } else {
-      ::decode(dir->fnode.rstat, q);
-      ::decode(dir->fnode.accounted_rstat, q);
+      decode(dir->fnode.rstat, q);
+      decode(dir->fnode.accounted_rstat, q);
       dout(10) << " took rstat info for " << *dir << dendl;
     }
   }
@@ -3768,12 +4051,20 @@ void InodeStore::generate_test_instances(list<InodeStore*> &ls)
   ls.push_back(populated);
 }
 
+void InodeStoreBare::generate_test_instances(list<InodeStoreBare*> &ls)
+{
+  InodeStoreBare *populated = new InodeStoreBare;
+  populated->inode.ino = 0xdeadbeef;
+  populated->symlink = "rhubarb";
+  ls.push_back(populated);
+}
+
 void CInode::validate_disk_state(CInode::validated_data *results,
-                                 MDSInternalContext *fin)
+                                 MDSContext *fin)
 {
   class ValidationContinuation : public MDSContinuation {
   public:
-    MDSInternalContext *fin;
+    MDSContext *fin;
     CInode *in;
     CInode::validated_data *results;
     bufferlist bl;
@@ -3783,12 +4074,13 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       START = 0,
       BACKTRACE,
       INODE,
-      DIRFRAGS
+      DIRFRAGS,
+      SNAPREALM,
     };
 
     ValidationContinuation(CInode *i,
                            CInode::validated_data *data_r,
-                           MDSInternalContext *fin_) :
+                           MDSContext *fin_) :
                              MDSContinuation(i->mdcache->mds->server),
                              fin(fin_),
                              in(i),
@@ -3798,6 +4090,7 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
       set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
       set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
+      set_callback(SNAPREALM, static_cast<Continuation::stagePtr>(&ValidationContinuation::_snaprealm));
     }
 
     ~ValidationContinuation() override {
@@ -3810,7 +4103,8 @@ void CInode::validate_disk_state(CInode::validated_data *results,
     /**
      * Fetch backtrace and set tag if tag is non-empty
      */
-    void fetch_backtrace_and_tag(CInode *in, boost::string_view tag,
+    void fetch_backtrace_and_tag(CInode *in,
+                                 std::string_view tag, bool is_internal,
                                  Context *fin, int *bt_r, bufferlist *bt)
     {
       const int64_t pool = in->get_backtrace_pool();
@@ -3820,10 +4114,11 @@ void CInode::validate_disk_state(CInode::validated_data *results,
       fetch.getxattr("parent", bt, bt_r);
       in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
                                       NULL, 0, fin);
-      if (!tag.empty()) {
-       ObjectOperation scrub_tag;
+      using ceph::encode;
+      if (!is_internal) {
+        ObjectOperation scrub_tag;
         bufferlist tag_bl;
-        ::encode(tag, tag_bl);
+        encode(tag, tag_bl);
         scrub_tag.setxattr("scrub_tag", tag_bl);
         SnapContext snapc;
         in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
@@ -3834,35 +4129,29 @@ void CInode::validate_disk_state(CInode::validated_data *results,
 
     bool _start(int rval) {
       if (in->is_dirty()) {
-        MDCache *mdcache = in->mdcache;
-        mempool_inode& inode = in->inode;
-        dout(20) << "validating a dirty CInode; results will be inconclusive"
-                 << dendl;
+       MDCache *mdcache = in->mdcache;
+       mempool_inode& inode = in->inode;
+       dout(20) << "validating a dirty CInode; results will be inconclusive"
+                << dendl;
       }
       if (in->is_symlink()) {
-        // there's nothing to do for symlinks!
-        return true;
+       // there's nothing to do for symlinks!
+       return true;
       }
 
+      // prefetch snaprealm's past parents
+      if (in->snaprealm && !in->snaprealm->have_past_parents_open())
+       in->snaprealm->open_parents(nullptr);
+
       C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
-                                            in->mdcache->mds->finisher);
-
-      // Whether we have a tag to apply depends on ScrubHeader (if one is
-      // present)
-      if (in->scrub_infop) {
-        // I'm a non-orphan, so look up my ScrubHeader via my linkage
-        boost::string_view tag = in->scrub_infop->header->get_tag();
-        // Rather than using the usual CInode::fetch_backtrace,
-        // use a special variant that optionally writes a tag in the same
-        // operation.
-        fetch_backtrace_and_tag(in, tag, conf,
-                                &results->backtrace.ondisk_read_retval, &bl);
-      } else {
-        // When we're invoked outside of ScrubStack we might be called
-        // on an orphaned inode like /
-        fetch_backtrace_and_tag(in, {}, conf,
-                                &results->backtrace.ondisk_read_retval, &bl);
-      }
+                                           in->mdcache->mds->finisher);
+
+      std::string_view tag = in->scrub_infop->header->get_tag();
+      bool is_internal = in->scrub_infop->header->is_internal_tag();
+      // Rather than using the usual CInode::fetch_backtrace,
+      // use a special variant that optionally writes a tag in the same
+      // operation.
+      fetch_backtrace_and_tag(in, tag, is_internal, conf, &results->backtrace.ondisk_read_retval, &bl);
       return false;
     }
 
@@ -3891,8 +4180,9 @@ void CInode::validate_disk_state(CInode::validated_data *results,
 
       // extract the backtrace, and compare it to a newly-constructed one
       try {
-        bufferlist::iterator p = bl.begin();
-        ::decode(results->backtrace.ondisk_value, p);
+        auto p = bl.cbegin();
+       using ceph::decode;
+        decode(results->backtrace.ondisk_value, p);
         dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
       } catch (buffer::error&) {
         if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
@@ -3926,10 +4216,6 @@ next:
                            false);
         // Flag that we repaired this BT so that it won't go into damagetable
         results->backtrace.repaired = true;
-
-        // Flag that we did some repair work so that our repair operation
-        // can be flushed at end of scrub
-        in->scrub_infop->header->set_repaired();
       }
 
       // If the inode's number was free in the InoTable, fix that
@@ -3943,14 +4229,12 @@ next:
 
         if (inotable->is_marked_free(inode.ino)) {
           LogChannelRef clog = in->mdcache->mds->clog;
-          clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
-            << inode.ino;
+          clog->error() << "scrub: inode wrongly marked free: " << inode.ino;
 
           if (in->scrub_infop->header->get_repair()) {
             bool repaired = inotable->repair(inode.ino);
             if (repaired) {
-              clog->error() << "inode table repaired for inode: 0x" << std::hex
-                << inode.ino;
+              clog->error() << "inode table repaired for inode: " << inode.ino;
 
               inotable->save();
             } else {
@@ -3961,17 +4245,17 @@ next:
         }
       }
 
-      // quit if we're a file, or kick off directory checks otherwise
-      // TODO: validate on-disk inode for non-base directories
-      if (!in->is_dir()) {
-        return true;
-      }
 
-      return validate_directory_data();
+      if (in->is_dir()) {
+       return validate_directory_data();
+      } else {
+       // TODO: validate on-disk inode for normal files
+       return check_inode_snaprealm();
+      }
     }
 
     bool validate_directory_data() {
-      assert(in->is_dir());
+      ceph_assert(in->is_dir());
 
       if (in->is_base()) {
        if (!shadow_in) {
@@ -3982,8 +4266,9 @@ next:
         shadow_in->fetch(get_internal_callback(INODE));
         return false;
       } else {
+       // TODO: validate on-disk inode for non-base directories
        results->inode.passed = true;
-        return check_dirfrag_rstats();
+       return check_dirfrag_rstats();
       }
     }
 
@@ -3997,7 +4282,7 @@ next:
       mempool_inode& i = in->inode;
       if (si.version > i.version) {
         // uh, what?
-        results->inode.error_str << "On-disk inode is newer than in-memory one!";
+        results->inode.error_str << "On-disk inode is newer than in-memory one";
        goto next;
       } else {
         bool divergent = false;
@@ -4005,7 +4290,7 @@ next:
         results->inode.passed = !divergent && r >= 0;
         if (!results->inode.passed) {
           results->inode.error_str <<
-              "On-disk inode is divergent or newer than in-memory one!";
+              "On-disk inode is divergent or newer than in-memory one";
          goto next;
         }
       }
@@ -4015,12 +4300,10 @@ next:
 
     bool check_dirfrag_rstats() {
       MDSGatherBuilder gather(g_ceph_context);
-      std::list<frag_t> frags;
-      in->dirfragtree.get_leaves(frags);
-      for (list<frag_t>::iterator p = frags.begin();
-          p != frags.end();
-          ++p) {
-        CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
+      frag_vec_t leaves;
+      in->dirfragtree.get_leaves(leaves);
+      for (const auto& leaf : leaves) {
+        CDir *dir = in->get_or_open_dirfrag(in->mdcache, leaf);
        dir->scrub_info();
        if (!dir->scrub_infop->header)
          dir->scrub_infop->header = in->scrub_infop->header;
@@ -4059,11 +4342,10 @@ next:
       // check each dirfrag...
       for (const auto &p : in->dirfrags) {
        CDir *dir = p.second;
-       assert(dir->get_version() > 0);
+       ceph_assert(dir->get_version() > 0);
        nest_info.add(dir->fnode.accounted_rstat);
        dir_info.add(dir->fnode.accounted_fragstat);
-       if (dir->scrub_infop &&
-           dir->scrub_infop->pending_scrub_error) {
+       if (dir->scrub_infop->pending_scrub_error) {
          dir->scrub_infop->pending_scrub_error = false;
          if (dir->scrub_infop->header->get_repair()) {
             results->raw_stats.repaired = true;
@@ -4077,11 +4359,13 @@ next:
        }
       }
       nest_info.rsubdirs++; // it gets one to account for self
+      if (const sr_t *srnode = in->get_projected_srnode(); srnode)
+       nest_info.rsnaps += srnode->snaps.size();
+
       // ...and that their sum matches our inode settings
       if (!dir_info.same_sums(in->inode.dirstat) ||
          !nest_info.same_sums(in->inode.rstat)) {
-       if (in->scrub_infop &&
-           in->scrub_infop->header->get_repair()) {
+       if (in->scrub_infop->header->get_repair()) {
          results->raw_stats.error_str
            << "freshly-calculated rstats don't match existing ones (will be fixed)";
          in->mdcache->repair_inode_stats(in);
@@ -4097,6 +4381,37 @@ next:
 
       results->raw_stats.passed = true;
 next:
+      // snaprealm
+      return check_inode_snaprealm();
+    }
+
+    bool check_inode_snaprealm() {
+      if (!in->snaprealm)
+       return true;
+
+      if (!in->snaprealm->have_past_parents_open()) {
+       in->snaprealm->open_parents(get_internal_callback(SNAPREALM));
+       return false;
+      } else {
+       return immediate(SNAPREALM, 0);
+      }
+    }
+
+    bool _snaprealm(int rval) {
+
+      if (in->snaprealm->past_parents_dirty ||
+         !in->get_projected_srnode()->past_parents.empty()) {
+       // temporarily store error in field of on-disk inode validation temporarily
+       results->inode.checked = true;
+       results->inode.passed = false;
+       if (in->scrub_infop->header->get_repair()) {
+         results->inode.error_str << "Inode has old format snaprealm (will upgrade)";
+         results->inode.repaired = true;
+         in->mdcache->upgrade_inode_snaprealm(in);
+       } else {
+         results->inode.error_str << "Inode has old format snaprealm";
+       }
+      }
       return true;
     }
 
@@ -4104,10 +4419,16 @@ next:
       if ((!results->raw_stats.checked || results->raw_stats.passed) &&
          (!results->backtrace.checked || results->backtrace.passed) &&
          (!results->inode.checked || results->inode.passed))
-        results->passed_validation = true;
-      if (fin) {
-        fin->complete(get_rval());
-      }
+       results->passed_validation = true;
+
+      // Flag that we did some repair work so that our repair operation
+      // can be flushed at end of scrub
+      if (results->backtrace.repaired ||
+         results->inode.repaired ||
+         results->raw_stats.repaired)
+       in->scrub_infop->header->set_repaired();
+      if (fin)
+       fin->complete(get_rval());
     }
   };
 
@@ -4172,117 +4493,147 @@ bool CInode::validated_data::all_damage_repaired() const
   return !unrepaired;
 }
 
-void CInode::dump(Formatter *f) const
-{
-  InodeStoreBase::dump(f);
-
-  MDSCacheObject::dump(f);
-
-  f->open_object_section("versionlock");
-  versionlock.dump(f);
-  f->close_section();
-
-  f->open_object_section("authlock");
-  authlock.dump(f);
-  f->close_section();
-
-  f->open_object_section("linklock");
-  linklock.dump(f);
-  f->close_section();
-
-  f->open_object_section("dirfragtreelock");
-  dirfragtreelock.dump(f);
-  f->close_section();
-
-  f->open_object_section("filelock");
-  filelock.dump(f);
-  f->close_section();
-
-  f->open_object_section("xattrlock");
-  xattrlock.dump(f);
-  f->close_section();
-
-  f->open_object_section("snaplock");
-  snaplock.dump(f);
-  f->close_section();
-
-  f->open_object_section("nestlock");
-  nestlock.dump(f);
-  f->close_section();
-
-  f->open_object_section("flocklock");
-  flocklock.dump(f);
-  f->close_section();
-
-  f->open_object_section("policylock");
-  policylock.dump(f);
-  f->close_section();
-
-  f->open_array_section("states");
-  MDSCacheObject::dump_states(f);
-  if (state_test(STATE_EXPORTING))
-    f->dump_string("state", "exporting");
-  if (state_test(STATE_OPENINGDIR))
-    f->dump_string("state", "openingdir");
-  if (state_test(STATE_FREEZING))
-    f->dump_string("state", "freezing");
-  if (state_test(STATE_FROZEN))
-    f->dump_string("state", "frozen");
-  if (state_test(STATE_AMBIGUOUSAUTH))
-    f->dump_string("state", "ambiguousauth");
-  if (state_test(STATE_EXPORTINGCAPS))
-    f->dump_string("state", "exportingcaps");
-  if (state_test(STATE_NEEDSRECOVER))
-    f->dump_string("state", "needsrecover");
-  if (state_test(STATE_PURGING))
-    f->dump_string("state", "purging");
-  if (state_test(STATE_DIRTYPARENT))
-    f->dump_string("state", "dirtyparent");
-  if (state_test(STATE_DIRTYRSTAT))
-    f->dump_string("state", "dirtyrstat");
-  if (state_test(STATE_STRAYPINNED))
-    f->dump_string("state", "straypinned");
-  if (state_test(STATE_FROZENAUTHPIN))
-    f->dump_string("state", "frozenauthpin");
-  if (state_test(STATE_DIRTYPOOL))
-    f->dump_string("state", "dirtypool");
-  if (state_test(STATE_ORPHAN))
-    f->dump_string("state", "orphan");
-  if (state_test(STATE_MISSINGOBJS))
-    f->dump_string("state", "missingobjs");
-  f->close_section();
-
-  f->open_array_section("client_caps");
-  for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
-       it != client_caps.end(); ++it) {
-    f->open_object_section("client_cap");
-    f->dump_int("client_id", it->first.v);
-    f->dump_string("pending", ccap_string(it->second->pending()));
-    f->dump_string("issued", ccap_string(it->second->issued()));
-    f->dump_string("wanted", ccap_string(it->second->wanted()));
-    f->dump_int("last_sent", it->second->get_last_seq());
+void CInode::dump(Formatter *f, int flags) const
+{
+  if (flags & DUMP_PATH) {
+    std::string path;
+    make_path_string(path, true);
+    if (path.empty())
+      path = "/";
+    f->dump_string("path", path);
+  }
+
+  if (flags & DUMP_INODE_STORE_BASE)
+    InodeStoreBase::dump(f);
+  
+  if (flags & DUMP_MDS_CACHE_OBJECT)
+    MDSCacheObject::dump(f);
+
+  if (flags & DUMP_LOCKS) {
+    f->open_object_section("versionlock");
+    versionlock.dump(f);
+    f->close_section();
+
+    f->open_object_section("authlock");
+    authlock.dump(f);
+    f->close_section();
+
+    f->open_object_section("linklock");
+    linklock.dump(f);
+    f->close_section();
+
+    f->open_object_section("dirfragtreelock");
+    dirfragtreelock.dump(f);
+    f->close_section();
+
+    f->open_object_section("filelock");
+    filelock.dump(f);
+    f->close_section();
+
+    f->open_object_section("xattrlock");
+    xattrlock.dump(f);
+    f->close_section();
+
+    f->open_object_section("snaplock");
+    snaplock.dump(f);
+    f->close_section();
+
+    f->open_object_section("nestlock");
+    nestlock.dump(f);
+    f->close_section();
+
+    f->open_object_section("flocklock");
+    flocklock.dump(f);
+    f->close_section();
+
+    f->open_object_section("policylock");
+    policylock.dump(f);
+    f->close_section();
+  }
+
+  if (flags & DUMP_STATE) {
+    f->open_array_section("states");
+    MDSCacheObject::dump_states(f);
+    if (state_test(STATE_EXPORTING))
+      f->dump_string("state", "exporting");
+    if (state_test(STATE_OPENINGDIR))
+      f->dump_string("state", "openingdir");
+    if (state_test(STATE_FREEZING))
+      f->dump_string("state", "freezing");
+    if (state_test(STATE_FROZEN))
+      f->dump_string("state", "frozen");
+    if (state_test(STATE_AMBIGUOUSAUTH))
+      f->dump_string("state", "ambiguousauth");
+    if (state_test(STATE_EXPORTINGCAPS))
+      f->dump_string("state", "exportingcaps");
+    if (state_test(STATE_NEEDSRECOVER))
+      f->dump_string("state", "needsrecover");
+    if (state_test(STATE_PURGING))
+      f->dump_string("state", "purging");
+    if (state_test(STATE_DIRTYPARENT))
+      f->dump_string("state", "dirtyparent");
+    if (state_test(STATE_DIRTYRSTAT))
+      f->dump_string("state", "dirtyrstat");
+    if (state_test(STATE_STRAYPINNED))
+      f->dump_string("state", "straypinned");
+    if (state_test(STATE_FROZENAUTHPIN))
+      f->dump_string("state", "frozenauthpin");
+    if (state_test(STATE_DIRTYPOOL))
+      f->dump_string("state", "dirtypool");
+    if (state_test(STATE_ORPHAN))
+      f->dump_string("state", "orphan");
+    if (state_test(STATE_MISSINGOBJS))
+      f->dump_string("state", "missingobjs");
     f->close_section();
   }
-  f->close_section();
 
-  f->dump_int("loner", loner_cap.v);
-  f->dump_int("want_loner", want_loner_cap.v);
+  if (flags & DUMP_CAPS) {
+    f->open_array_section("client_caps");
+    for (const auto &p : client_caps) {
+      auto &client = p.first;
+      auto cap = &p.second;
+      f->open_object_section("client_cap");
+      f->dump_int("client_id", client.v);
+      f->dump_string("pending", ccap_string(cap->pending()));
+      f->dump_string("issued", ccap_string(cap->issued()));
+      f->dump_string("wanted", ccap_string(cap->wanted()));
+      f->dump_int("last_sent", cap->get_last_seq());
+      f->close_section();
+    }
+    f->close_section();
+
+    f->dump_int("loner", loner_cap.v);
+    f->dump_int("want_loner", want_loner_cap.v);
+
+    f->open_array_section("mds_caps_wanted");
+    for (const auto &p : mds_caps_wanted) {
+      f->open_object_section("mds_cap_wanted");
+      f->dump_int("rank", p.first);
+      f->dump_string("cap", ccap_string(p.second));
+      f->close_section();
+    }
+    f->close_section();
+  }
 
-  f->open_array_section("mds_caps_wanted");
-  for (const auto &p : mds_caps_wanted) {
-    f->open_object_section("mds_cap_wanted");
-    f->dump_int("rank", p.first);
-    f->dump_string("cap", ccap_string(p.second));
+  if (flags & DUMP_DIRFRAGS) {
+    f->open_array_section("dirfrags");
+    list<CDir*> dfs;
+    get_dirfrags(dfs);
+    for(const auto &dir: dfs) {
+      f->open_object_section("dir");
+      dir->dump(f, CDir::DUMP_DEFAULT | CDir::DUMP_ITEMS);
+      dir->check_rstats();
+      f->close_section();
+    }
     f->close_section();
   }
-  f->close_section();
 }
 
 /****** Scrub Stuff *****/
 void CInode::scrub_info_create() const
 {
   dout(25) << __func__ << dendl;
-  assert(!scrub_infop);
+  ceph_assert(!scrub_infop);
 
   // break out of const-land to set up implicit initial state
   CInode *me = const_cast<CInode*>(this);
@@ -4307,17 +4658,17 @@ void CInode::scrub_maybe_delete_info()
 
 void CInode::scrub_initialize(CDentry *scrub_parent,
                              ScrubHeaderRef& header,
-                             MDSInternalContextBase *f)
+                             MDSContext *f)
 {
   dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
   if (scrub_is_in_progress()) {
     dout(20) << __func__ << " inode moved during scrub, reinitializing "
             << dendl;
-    assert(scrub_infop->scrub_parent);
+    ceph_assert(scrub_infop->scrub_parent);
     CDentry *dn = scrub_infop->scrub_parent;
     CDir *dir = dn->dir;
     dn->put(CDentry::PIN_SCRUBPARENT);
-    assert(dir->scrub_infop && dir->scrub_infop->directory_scrubbing);
+    ceph_assert(dir->scrub_infop && dir->scrub_infop->directory_scrubbing);
     dir->scrub_infop->directories_scrubbing.erase(dn->key());
     dir->scrub_infop->others_scrubbing.erase(dn->key());
   }
@@ -4327,15 +4678,13 @@ void CInode::scrub_initialize(CDentry *scrub_parent,
 
   if (get_projected_inode()->is_dir()) {
     // fill in dirfrag_stamps with initial state
-    std::list<frag_t> frags;
-    dirfragtree.get_leaves(frags);
-    for (std::list<frag_t>::iterator i = frags.begin();
-        i != frags.end();
-        ++i) {
+    frag_vec_t leaves;
+    dirfragtree.get_leaves(leaves);
+    for (const auto& leaf : leaves) {
       if (header->get_force())
-       scrub_infop->dirfrag_stamps[*i].reset();
+       scrub_infop->dirfrag_stamps[leaf].reset();
       else
-       scrub_infop->dirfrag_stamps[*i];
+       scrub_infop->dirfrag_stamps[leaf];
     }
   }
 
@@ -4355,7 +4704,7 @@ void CInode::scrub_initialize(CDentry *scrub_parent,
 int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
 {
   dout(20) << __func__ << dendl;
-  assert(scrub_is_in_progress());
+  ceph_assert(scrub_is_in_progress());
 
   if (!is_dir()) {
     return -ENOTDIR;
@@ -4379,10 +4728,10 @@ int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
   return ENOENT;
 }
 
-void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
+void CInode::scrub_dirfrags_scrubbing(frag_vec_t* out_dirfrags)
 {
-  assert(out_dirfrags != NULL);
-  assert(scrub_infop != NULL);
+  ceph_assert(out_dirfrags != NULL);
+  ceph_assert(scrub_infop != NULL);
 
   out_dirfrags->clear();
   std::map<frag_t, scrub_stamp_info_t>::iterator i =
@@ -4403,20 +4752,38 @@ void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
 void CInode::scrub_dirfrag_finished(frag_t dirfrag)
 {
   dout(20) << __func__ << " on frag " << dirfrag << dendl;
-  assert(scrub_is_in_progress());
+  ceph_assert(scrub_is_in_progress());
 
   std::map<frag_t, scrub_stamp_info_t>::iterator i =
       scrub_infop->dirfrag_stamps.find(dirfrag);
-  assert(i != scrub_infop->dirfrag_stamps.end());
+  ceph_assert(i != scrub_infop->dirfrag_stamps.end());
 
   scrub_stamp_info_t &si = i->second;
   si.last_scrub_stamp = si.scrub_start_stamp;
   si.last_scrub_version = si.scrub_start_version;
 }
 
-void CInode::scrub_finished(MDSInternalContextBase **c) {
+void CInode::scrub_aborted(MDSContext **c) {
+  dout(20) << __func__ << dendl;
+  ceph_assert(scrub_is_in_progress());
+
+  *c = nullptr;
+  std::swap(*c, scrub_infop->on_finish);
+
+  if (scrub_infop->scrub_parent) {
+    CDentry *dn = scrub_infop->scrub_parent;
+    scrub_infop->scrub_parent = NULL;
+    dn->dir->scrub_dentry_finished(dn);
+    dn->put(CDentry::PIN_SCRUBPARENT);
+  }
+
+  delete scrub_infop;
+  scrub_infop = nullptr;
+}
+
+void CInode::scrub_finished(MDSContext **c) {
   dout(20) << __func__ << dendl;
-  assert(scrub_is_in_progress());
+  ceph_assert(scrub_is_in_progress());
   for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
       scrub_infop->dirfrag_stamps.begin();
       i != scrub_infop->dirfrag_stamps.end();
@@ -4425,7 +4792,7 @@ void CInode::scrub_finished(MDSInternalContextBase **c) {
       derr << i->second.last_scrub_version << " != "
         << i->second.scrub_start_version << dendl;
     }
-    assert(i->second.last_scrub_version == i->second.scrub_start_version);
+    ceph_assert(i->second.last_scrub_version == i->second.scrub_start_version);
   }
 
   scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
@@ -4446,12 +4813,8 @@ void CInode::scrub_finished(MDSInternalContextBase **c) {
   if (scrub_infop->header->get_origin() == this) {
     // We are at the point that a tagging scrub was initiated
     LogChannelRef clog = mdcache->mds->clog;
-    if (scrub_infop->header->get_tag().empty()) {
-      clog->info() << "scrub complete";
-    } else {
-      clog->info() << "scrub complete with tag '"
-                   << scrub_infop->header->get_tag() << "'";
-    }
+    clog->info() << "scrub complete with tag '"
+                 << scrub_infop->header->get_tag() << "'";
   }
 }
 
@@ -4462,14 +4825,14 @@ int64_t CInode::get_backtrace_pool() const
   } else {
     // Files are required to have an explicit layout that specifies
     // a pool
-    assert(inode.layout.pool_id != -1);
+    ceph_assert(inode.layout.pool_id != -1);
     return inode.layout.pool_id;
   }
 }
 
 void CInode::maybe_export_pin(bool update)
 {
-  if (!g_conf->mds_bal_export_pin)
+  if (!g_conf()->mds_bal_export_pin)
     return;
   if (!is_dir() || !is_normal())
     return;
@@ -4510,8 +4873,8 @@ void CInode::maybe_export_pin(bool update)
 
 void CInode::set_export_pin(mds_rank_t rank)
 {
-  assert(is_dir());
-  assert(is_projected());
+  ceph_assert(is_dir());
+  ceph_assert(is_projected());
   get_projected_inode()->export_pin = rank;
 }