]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/MDCache.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mds / MDCache.cc
index 9fc11eef93b7c0e858727ae8db74c43fbb0ecf2c..6caee2f4ac1cdb31756235d9822ce24eef3c318e 100644 (file)
 
 #include "include/ceph_fs.h"
 #include "include/filepath.h"
+#include "include/util.h"
 
 #include "msg/Message.h"
 #include "msg/Messenger.h"
 
+#include "common/MemoryModel.h"
 #include "common/errno.h"
-#include "common/safe_io.h"
 #include "common/perf_counters.h"
-#include "common/MemoryModel.h"
+#include "common/safe_io.h"
+
 #include "osdc/Journaler.h"
 #include "osdc/Filer.h"
 
@@ -181,6 +183,7 @@ MDCache::MDCache(MDSRank *m, PurgeQueue &purge_queue_) :
     strays[i] = NULL;
   }
 
+  num_shadow_inodes = 0;
   num_inodes_with_caps = 0;
 
   max_dir_commit_size = g_conf->mds_dir_max_commit_size ?
@@ -202,10 +205,8 @@ MDCache::MDCache(MDSRank *m, PurgeQueue &purge_queue_) :
   cap_imports_num_opening = 0;
 
   opening_root = open = false;
-  lru.lru_set_max(g_conf->mds_cache_size);
-  lru.lru_set_midpoint(g_conf->mds_cache_mid);
+  lru.lru_set_midpoint(cache_mid());
 
-  bottom_lru.lru_set_max(0);
   bottom_lru.lru_set_midpoint(0);
 
   decayrate.set_halflife(g_conf->mds_decay_halflife);
@@ -224,7 +225,7 @@ MDCache::~MDCache()
 
 void MDCache::log_stat()
 {
-  mds->logger->set(l_mds_inode_max, g_conf->mds_cache_size);
+  mds->logger->set(l_mds_inode_max, cache_limit_inodes() == 0 ? INT_MAX : cache_limit_inodes());
   mds->logger->set(l_mds_inodes, lru.lru_get_size());
   mds->logger->set(l_mds_inodes_pinned, lru.lru_get_num_pinned());
   mds->logger->set(l_mds_inodes_top, lru.lru_get_top());
@@ -255,8 +256,15 @@ bool MDCache::shutdown()
 void MDCache::add_inode(CInode *in) 
 {
   // add to lru, inode map
-  assert(inode_map.count(in->vino()) == 0);  // should be no dup inos!
-  inode_map[ in->vino() ] = in;
+  if (in->last == CEPH_NOSNAP) {
+    auto &p = inode_map[in->ino()];
+    assert(!p); // should be no dup inos!
+    p = in;
+  } else {
+    auto &p = snap_inode_map[in->vino()];
+    assert(!p); // should be no dup inos!
+    p = in;
+  }
 
   if (in->ino() < MDS_INO_SYSTEM_BASE) {
     if (in->ino() == MDS_INO_ROOT)
@@ -272,8 +280,7 @@ void MDCache::add_inode(CInode *in)
       base_inodes.insert(in);
   }
 
-  if (CInode::count() >
-        g_conf->mds_cache_size * g_conf->mds_health_cache_threshold) {
+  if (cache_toofull()) {
     exceeded_size_limit = true;
   }
 }
@@ -302,7 +309,10 @@ void MDCache::remove_inode(CInode *o)
     export_pin_queue.erase(o);
 
   // remove from inode map
-  inode_map.erase(o->vino());    
+  if (o->last == CEPH_NOSNAP)
+    inode_map.erase(o->ino());
+  else
+    snap_inode_map.erase(o->vino());
 
   if (o->ino() < MDS_INO_SYSTEM_BASE) {
     if (o == root) root = 0;
@@ -871,8 +881,8 @@ void MDCache::adjust_subtree_auth(CDir *dir, mds_authority_t auth)
 void MDCache::try_subtree_merge(CDir *dir)
 {
   dout(7) << "try_subtree_merge " << *dir << dendl;
-  assert(subtrees.count(dir));
-  set<CDir*> oldbounds = subtrees[dir];
+  // record my old bounds
+  auto oldbounds = subtrees.at(dir);
 
   set<CInode*> to_eval;
   // try merge at my root
@@ -901,31 +911,32 @@ public:
 void MDCache::try_subtree_merge_at(CDir *dir, set<CInode*> *to_eval)
 {
   dout(10) << "try_subtree_merge_at " << *dir << dendl;
-  assert(subtrees.count(dir));
+
+  if (dir->dir_auth.second != CDIR_AUTH_UNKNOWN ||
+      dir->state_test(CDir::STATE_EXPORTBOUND) ||
+      dir->state_test(CDir::STATE_AUXSUBTREE))
+    return;
+
+  auto it = subtrees.find(dir);
+  assert(it != subtrees.end());
 
   // merge with parent?
   CDir *parent = dir;  
   if (!dir->inode->is_base())
     parent = get_subtree_root(dir->get_parent_dir());
   
-  if (parent != dir &&                              // we have a parent,
-      parent->dir_auth == dir->dir_auth &&          // auth matches,
-      dir->dir_auth.second == CDIR_AUTH_UNKNOWN &&  // auth is unambiguous,
-      !dir->state_test(CDir::STATE_EXPORTBOUND) && // not an exportbound,
-      !dir->state_test(CDir::STATE_AUXSUBTREE)) {  // not aux subtree
+  if (parent != dir &&                         // we have a parent,
+      parent->dir_auth == dir->dir_auth) {     // auth matches,
     // merge with parent.
     dout(10) << "  subtree merge at " << *dir << dendl;
     dir->set_dir_auth(CDIR_AUTH_DEFAULT);
     
     // move our bounds under the parent
-    for (set<CDir*>::iterator p = subtrees[dir].begin();
-        p != subtrees[dir].end();
-        ++p) 
-      subtrees[parent].insert(*p);
+    subtrees[parent].insert(it->second.begin(), it->second.end());
     
     // we are no longer a subtree or bound
     dir->put(CDir::PIN_SUBTREE);
-    subtrees.erase(dir);
+    subtrees.erase(it);
     subtrees[parent].erase(dir);
 
     // adjust popularity?
@@ -941,9 +952,9 @@ void MDCache::try_subtree_merge_at(CDir *dir, set<CInode*> *to_eval)
 
     if (to_eval && dir->get_inode()->is_auth())
       to_eval->insert(dir->get_inode());
-  } 
 
-  show_subtrees(15);
+    show_subtrees(15);
+  }
 }
 
 void MDCache::subtree_merge_writebehind_finish(CInode *in, MutationRef& mut)
@@ -1459,24 +1470,12 @@ CInode *MDCache::pick_inode_snap(CInode *in, snapid_t follows)
   dout(10) << "pick_inode_snap follows " << follows << " on " << *in << dendl;
   assert(in->last == CEPH_NOSNAP);
 
-  SnapRealm *realm = in->find_snaprealm();
-  const set<snapid_t>& snaps = realm->get_snaps();
-  dout(10) << " realm " << *realm << " " << *realm->inode << dendl;
-  dout(10) << " snaps " << snaps << dendl;
-
-  if (snaps.empty())
-    return in;
-
-  for (set<snapid_t>::const_iterator p = snaps.upper_bound(follows);  // first item > follows
-       p != snaps.end();
-       ++p) {
-    CInode *t = get_inode(in->ino(), *p);
-    if (t) {
-      in = t;
-      dout(10) << "pick_inode_snap snap " << *p << " found " << *in << dendl;
-      break;
-    }
+  auto p = snap_inode_map.upper_bound(vinodeno_t(in->ino(), follows));
+  if (p != snap_inode_map.end() && p->second->ino() == in->ino()) {
+    dout(10) << "pick_inode_snap found " << *p->second << dendl;
+    in = p->second;
   }
+
   return in;
 }
 
@@ -1493,22 +1492,7 @@ CInode *MDCache::cow_inode(CInode *in, snapid_t last)
 {
   assert(last >= in->first);
 
-  SnapRealm *realm = in->find_snaprealm();
-  const set<snapid_t>& snaps = realm->get_snaps();
-
-  // make sure snap inode's last match existing snapshots.
-  // MDCache::pick_inode_snap() requires this.
-  snapid_t last_snap = last;
-  if (snaps.count(last) == 0) {
-    set<snapid_t>::const_iterator p = snaps.upper_bound(last);
-    if (p != snaps.begin()) {
-      --p;
-      if (*p >= in->first)
-       last_snap = *p;
-    }
-  }
-
-  CInode *oldin = new CInode(this, true, in->first, last_snap);
+  CInode *oldin = new CInode(this, true, in->first, last);
   oldin->inode = *in->get_previous_projected_inode();
   oldin->symlink = in->symlink;
   oldin->xattrs = *in->get_previous_projected_xattrs();
@@ -1542,43 +1526,43 @@ CInode *MDCache::cow_inode(CInode *in, snapid_t last)
     return oldin;
   }
 
-  // clone caps?
-  for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
-      p != in->client_caps.end();
-      ++p) {
-    client_t client = p->first;
-    Capability *cap = p->second;
-    int issued = cap->issued();
-    if ((issued & CEPH_CAP_ANY_WR) &&
-       cap->client_follows < last) {
-      // note in oldin
-      for (int i = 0; i < num_cinode_locks; i++) {
-       if (issued & cinode_lock_info[i].wr_caps) {
-         int lockid = cinode_lock_info[i].lock;
-         SimpleLock *lock = oldin->get_lock(lockid);
-         assert(lock);
-         oldin->client_snap_caps[lockid].insert(client);
-         oldin->auth_pin(lock);
-         lock->set_state(LOCK_SNAP_SYNC);  // gathering
-         lock->get_wrlock(true);
-         dout(10) << " client." << client << " cap " << ccap_string(issued & cinode_lock_info[i].wr_caps)
-                  << " wrlock lock " << *lock << " on " << *oldin << dendl;
+  if (!in->client_caps.empty()) {
+    const set<snapid_t>& snaps = in->find_snaprealm()->get_snaps();
+    // clone caps?
+    for (auto p : in->client_caps) {
+      client_t client = p.first;
+      Capability *cap = p.second;
+      int issued = cap->issued();
+      if ((issued & CEPH_CAP_ANY_WR) &&
+         cap->client_follows < last) {
+       // note in oldin
+       for (int i = 0; i < num_cinode_locks; i++) {
+         if (issued & cinode_lock_info[i].wr_caps) {
+           int lockid = cinode_lock_info[i].lock;
+           SimpleLock *lock = oldin->get_lock(lockid);
+           assert(lock);
+           oldin->client_snap_caps[lockid].insert(client);
+           oldin->auth_pin(lock);
+           lock->set_state(LOCK_SNAP_SYNC);  // gathering
+           lock->get_wrlock(true);
+           dout(10) << " client." << client << " cap " << ccap_string(issued & cinode_lock_info[i].wr_caps)
+                    << " wrlock lock " << *lock << " on " << *oldin << dendl;
+         }
        }
+       cap->client_follows = last;
+
+       // we need snapflushes for any intervening snaps
+       dout(10) << "  snaps " << snaps << dendl;
+       for (auto q = snaps.lower_bound(oldin->first);
+            q != snaps.end() && *q <= last;
+            ++q) {
+         in->add_need_snapflush(oldin, *q, client);
+       }
+      } else {
+       dout(10) << " ignoring client." << client << " cap follows " << cap->client_follows << dendl;
       }
-      cap->client_follows = last;
-      
-      // we need snapflushes for any intervening snaps
-      dout(10) << "  snaps " << snaps << dendl;
-      for (set<snapid_t>::const_iterator q = snaps.lower_bound(oldin->first);
-          q != snaps.end() && *q <= last;
-          ++q) {
-       in->add_need_snapflush(oldin, *q, client);
-      }
-    } else {
-      dout(10) << " ignoring client." << client << " cap follows " << cap->client_follows << dendl;
     }
   }
-
   return oldin;
 }
 
@@ -1680,7 +1664,7 @@ void MDCache::journal_cow_dentry(MutationImpl *mut, EMetaBlob *metablob,
       mut->add_cow_inode(oldin);
       if (pcow_inode)
        *pcow_inode = oldin;
-      CDentry *olddn = dn->dir->add_primary_dentry(dn->name, oldin, oldfirst, follows);
+      CDentry *olddn = dn->dir->add_primary_dentry(dn->name, oldin, oldfirst, oldin->last);
       oldin->inode.version = olddn->pre_dirty();
       dout(10) << " olddn " << *olddn << dendl;
       bool need_snapflush = !oldin->client_snap_caps.empty();
@@ -2034,12 +2018,10 @@ update:
     msg->quota = i->quota;
     mds->send_message_client_counted(msg, session->connection);
   }
-  for (compact_map<mds_rank_t, unsigned>::iterator it = in->replicas_begin();
-       it != in->replicas_end();
-       ++it) {
+  for (const auto &it : in->get_replicas()) {
     MGatherCaps *msg = new MGatherCaps;
     msg->ino = in->ino();
-    mds->send_message_mds(msg, it->first);
+    mds->send_message_mds(msg, it.first);
   }
 }
 
@@ -3739,10 +3721,8 @@ void MDCache::trim_unlinked_inodes()
 {
   dout(7) << "trim_unlinked_inodes" << dendl;
   list<CInode*> q;
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
-       p != inode_map.end();
-       ++p) {
-    CInode *in = p->second;
+  for (auto p : inode_map) {
+    CInode *in = p.second;
     if (in->get_parent_dn() == NULL && !in->is_base()) {
       dout(7) << " will trim from " << *in << dendl;
       q.push_back(in);
@@ -4343,7 +4323,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
 
     for (auto p = weak->cap_exports.begin(); p != weak->cap_exports.end(); ++p) {
       CInode *in = get_inode(p->first);
-      assert(in && in->is_auth());
+      assert(!in || in->is_auth());
       // note
       for (auto q = p->second.begin(); q != p->second.end(); ++q) {
        dout(10) << " claiming cap import " << p->first << " client." << q->first << dendl;
@@ -4542,20 +4522,17 @@ void MDCache::rejoin_scour_survivor_replicas(mds_rank_t from, MMDSCacheRejoin *a
 {
   dout(10) << "rejoin_scour_survivor_replicas from mds." << from << dendl;
 
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
-       p != inode_map.end();
-       ++p) {
-    CInode *in = p->second;
-    
+  auto scour_func = [this, from, ack, &acked_inodes, &gather_locks] (CInode *in) {
     // inode?
     if (in->is_auth() &&
        in->is_replica(from) &&
-       (ack == NULL || acked_inodes.count(p->second->vino()) == 0)) {
+       (ack == NULL || acked_inodes.count(in->vino()) == 0)) {
       inode_remove_replica(in, from, false, gather_locks);
       dout(10) << " rem " << *in << dendl;
     }
 
-    if (!in->is_dir()) continue;
+    if (!in->is_dir())
+      return;
     
     list<CDir*> dfs;
     in->get_dirfrags(dfs);
@@ -4563,9 +4540,10 @@ void MDCache::rejoin_scour_survivor_replicas(mds_rank_t from, MMDSCacheRejoin *a
         p != dfs.end();
         ++p) {
       CDir *dir = *p;
+      if (!dir->is_auth())
+       continue;
       
-      if (dir->is_auth() &&
-         dir->is_replica(from) &&
+      if (dir->is_replica(from) &&
          (ack == NULL || ack->strong_dirfrags.count(dir->dirfrag()) == 0)) {
        dir->remove_replica(from);
        dout(10) << " rem " << *dir << dendl;
@@ -4586,7 +4564,12 @@ void MDCache::rejoin_scour_survivor_replicas(mds_rank_t from, MMDSCacheRejoin *a
        }
       }
     }
-  }
+  };
+
+  for (auto p : inode_map)
+    scour_func(p.second);
+  for (auto p : snap_inode_map)
+    scour_func(p.second);
 }
 
 
@@ -4900,6 +4883,9 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
   dout(7) << "handle_cache_rejoin_ack from " << ack->get_source() << dendl;
   mds_rank_t from = mds_rank_t(ack->get_source().num());
 
+  assert(mds->get_state() >= MDSMap::STATE_REJOIN);
+  bool survivor = !mds->is_rejoin();
+
   // for sending cache expire message
   set<CInode*> isolated_inodes;
   set<CInode*> refragged_inodes;
@@ -5009,7 +4995,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
       }
 
       dn->set_replica_nonce(q->second.nonce);
-      dn->lock.set_state_rejoin(q->second.lock, rejoin_waiters);
+      dn->lock.set_state_rejoin(q->second.lock, rejoin_waiters, survivor);
       dn->state_clear(CDentry::STATE_REJOINING);
       dout(10) << " got " << *dn << dendl;
     }
@@ -5072,7 +5058,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
     assert(in);
     in->set_replica_nonce(nonce);
     bufferlist::iterator q = lockbl.begin();
-    in->_decode_locks_rejoin(q, rejoin_waiters, rejoin_eval_locks);
+    in->_decode_locks_rejoin(q, rejoin_waiters, rejoin_eval_locks, survivor);
     in->state_clear(CInode::STATE_REJOINING);
     dout(10) << " got inode locks " << *in << dendl;
   }
@@ -5116,7 +5102,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
   // done?
   assert(rejoin_ack_gather.count(from));
   rejoin_ack_gather.erase(from);
-  if (mds->is_rejoin()) {
+  if (!survivor) {
 
     if (rejoin_gather.empty()) {
       // eval unstable scatter locks after all wrlocks are rejoined.
@@ -5471,10 +5457,8 @@ void MDCache::choose_lock_states_and_reconnect_caps()
 
   map<client_t,MClientSnap*> splits;
 
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator i = inode_map.begin();
-       i != inode_map.end();
-       ++i) {
-    CInode *in = i->second;
+  for (auto i : inode_map) {
+    CInode *in = i.second;
 
     if (in->last != CEPH_NOSNAP)
       continue;
@@ -5482,9 +5466,8 @@ void MDCache::choose_lock_states_and_reconnect_caps()
     if (in->is_auth() && !in->is_base() && in->inode.is_dirty_rstat())
       in->mark_dirty_rstat();
 
-    auto p = reconnected_caps.find(in->ino());
-
     int dirty_caps = 0;
+    auto p = reconnected_caps.find(in->ino());
     if (p != reconnected_caps.end()) {
       for (const auto &it : p->second)
        dirty_caps |= it.second.dirty_caps;
@@ -5973,13 +5956,11 @@ void MDCache::rejoin_send_acks()
       dq.pop_front();
       
       // dir
-      for (compact_map<mds_rank_t,unsigned>::iterator r = dir->replicas_begin();
-          r != dir->replicas_end();
-          ++r) {
-       auto it = acks.find(r->first);
+      for (auto &r : dir->get_replicas()) {
+       auto it = acks.find(r.first);
        if (it == acks.end())
          continue;
-       it->second->add_strong_dirfrag(dir->dirfrag(), ++r->second, dir->dir_rep);
+       it->second->add_strong_dirfrag(dir->dirfrag(), ++r.second, dir->dir_rep);
        it->second->add_dirfrag_base(dir);
       }
           
@@ -5995,36 +5976,32 @@ void MDCache::rejoin_send_acks()
          in = dnl->get_inode();
 
        // dentry
-       for (compact_map<mds_rank_t,unsigned>::iterator r = dn->replicas_begin();
-            r != dn->replicas_end();
-            ++r) {
-         auto it = acks.find(r->first);
+       for (auto &r : dn->get_replicas()) {
+         auto it = acks.find(r.first);
          if (it == acks.end())
            continue;
          it->second->add_strong_dentry(dir->dirfrag(), dn->name, dn->first, dn->last,
                                           dnl->is_primary() ? dnl->get_inode()->ino():inodeno_t(0),
                                           dnl->is_remote() ? dnl->get_remote_ino():inodeno_t(0),
                                           dnl->is_remote() ? dnl->get_remote_d_type():0,
-                                          ++r->second,
+                                          ++r.second,
                                           dn->lock.get_replica_state());
          // peer missed MDentrylink message ?
-         if (in && !in->is_replica(r->first))
-           in->add_replica(r->first);
+         if (in && !in->is_replica(r.first))
+           in->add_replica(r.first);
        }
        
        if (!in)
          continue;
 
-       for (compact_map<mds_rank_t,unsigned>::iterator r = in->replicas_begin();
-            r != in->replicas_end();
-            ++r) {
-         auto it = acks.find(r->first);
+       for (auto &r : in->get_replicas()) {
+         auto it = acks.find(r.first);
          if (it == acks.end())
            continue;
          it->second->add_inode_base(in, mds->mdsmap->get_up_features());
          bufferlist bl;
-         in->_encode_locks_state_for_rejoin(bl, r->first);
-         it->second->add_inode_locks(in, ++r->second, bl);
+         in->_encode_locks_state_for_rejoin(bl, r.first);
+         it->second->add_inode_locks(in, ++r.second, bl);
        }
        
        // subdirs in this subtree?
@@ -6035,28 +6012,24 @@ void MDCache::rejoin_send_acks()
 
   // base inodes too
   if (root && root->is_auth()) 
-    for (compact_map<mds_rank_t,unsigned>::iterator r = root->replicas_begin();
-        r != root->replicas_end();
-        ++r) {
-      auto it = acks.find(r->first);
+    for (auto &r : root->get_replicas()) {
+      auto it = acks.find(r.first);
       if (it == acks.end())
        continue;
       it->second->add_inode_base(root, mds->mdsmap->get_up_features());
       bufferlist bl;
-      root->_encode_locks_state_for_rejoin(bl, r->first);
-      it->second->add_inode_locks(root, ++r->second, bl);
+      root->_encode_locks_state_for_rejoin(bl, r.first);
+      it->second->add_inode_locks(root, ++r.second, bl);
     }
   if (myin)
-    for (compact_map<mds_rank_t,unsigned>::iterator r = myin->replicas_begin();
-        r != myin->replicas_end();
-        ++r) {
-      auto it = acks.find(r->first);
+    for (auto &r : myin->get_replicas()) {
+      auto it = acks.find(r.first);
       if (it == acks.end())
        continue;
       it->second->add_inode_base(myin, mds->mdsmap->get_up_features());
       bufferlist bl;
-      myin->_encode_locks_state_for_rejoin(bl, r->first);
-      it->second->add_inode_locks(myin, ++r->second, bl);
+      myin->_encode_locks_state_for_rejoin(bl, r.first);
+      it->second->add_inode_locks(myin, ++r.second, bl);
     }
 
   // include inode base for any inodes whose scatterlocks may have updated
@@ -6064,10 +6037,8 @@ void MDCache::rejoin_send_acks()
        p != rejoin_potential_updated_scatterlocks.end();
        ++p) {
     CInode *in = *p;
-    for (compact_map<mds_rank_t,unsigned>::iterator r = in->replicas_begin();
-        r != in->replicas_end();
-        ++r) {
-      auto it = acks.find(r->first);
+    for (const auto &r : in->get_replicas()) {
+      auto it = acks.find(r.first);
       if (it == acks.end())
        continue;
       it->second->add_inode_base(in, mds->mdsmap->get_up_features());
@@ -6102,10 +6073,8 @@ void MDCache::reissue_all_caps()
 {
   dout(10) << "reissue_all_caps" << dendl;
 
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
-       p != inode_map.end();
-       ++p) {
-    CInode *in = p->second;
+  for (auto p : inode_map) {
+    CInode *in = p.second;
     if (in->is_head() && in->is_any_caps()) {
       // called by MDSRank::active_start(). There shouldn't be any frozen subtree.
       if (in->is_frozen_inode()) {
@@ -6192,10 +6161,8 @@ void MDCache::_queued_file_recover_cow(CInode *in, MutationRef& mut)
 void MDCache::identify_files_to_recover()
 {
   dout(10) << "identify_files_to_recover" << dendl;
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
-       p != inode_map.end();
-       ++p) {
-    CInode *in = p->second;
+  for (auto p : inode_map) {
+    CInode *in = p.second;
     if (!in->is_auth())
       continue;
 
@@ -6452,34 +6419,19 @@ void MDCache::start_recovered_truncates()
 // ================================================================================
 // cache trimming
 
-
-/*
- * note: only called while MDS is active or stopping... NOT during recovery.
- * however, we may expire a replica whose authority is recovering.
- * 
- */
-bool MDCache::trim(int max, int count)
-{
-  // trim LRU
-  if (count > 0) {
-    max = lru.lru_get_size() - count;
-    if (max <= 0)
-      max = 1;
-  } else if (max < 0) {
-    max = g_conf->mds_cache_size;
-    if (max <= 0)
-      return false;
-  }
-  dout(7) << "trim max=" << max << "  cur=" << lru.lru_get_size()
-         << "/" << bottom_lru.lru_get_size() << dendl;
-
-  // process delayed eval_stray()
-  stray_manager.advance_delayed();
-
-  map<mds_rank_t, MCacheExpire*> expiremap;
+void MDCache::trim_lru(uint64_t count, map<mds_rank_t, MCacheExpire*> &expiremap)
+{
   bool is_standby_replay = mds->is_standby_replay();
-  int unexpirable = 0;
-  list<CDentry*> unexpirables;
+  std::vector<CDentry *> unexpirables;
+  uint64_t trimmed = 0;
+
+  dout(7) << "trim_lru trimming " << count
+          << " items from LRU"
+          << " size=" << lru.lru_get_size()
+          << " mid=" << lru.lru_get_top()
+          << " pintail=" << lru.lru_get_pintail()
+          << " pinned=" << lru.lru_get_num_pinned()
+          << dendl;
 
   for (;;) {
     CDentry *dn = static_cast<CDentry*>(bottom_lru.lru_expire());
@@ -6487,34 +6439,65 @@ bool MDCache::trim(int max, int count)
       break;
     if (trim_dentry(dn, expiremap)) {
       unexpirables.push_back(dn);
-      ++unexpirable;
+    } else {
+      trimmed++;
     }
   }
 
-  for(auto dn : unexpirables)
+  for (auto &dn : unexpirables) {
     bottom_lru.lru_insert_mid(dn);
+  }
   unexpirables.clear();
 
-  // trim dentries from the LRU: only enough to satisfy `max`,
-  while (lru.lru_get_size() + unexpirable > (unsigned)max) {
+  // trim dentries from the LRU until count is reached
+  while (cache_toofull() || count > 0) {
     CDentry *dn = static_cast<CDentry*>(lru.lru_expire());
     if (!dn) {
       break;
     }
     if ((is_standby_replay && dn->get_linkage()->inode &&
-        dn->get_linkage()->inode->item_open_file.is_on_list()) ||
-       trim_dentry(dn, expiremap)) {
+        dn->get_linkage()->inode->item_open_file.is_on_list())) {
+      unexpirables.push_back(dn);
+    } else if (trim_dentry(dn, expiremap)) {
       unexpirables.push_back(dn);
-      ++unexpirable;
+    } else {
+      trimmed++;
+      if (count > 0) count--;
     }
   }
-  for(auto dn : unexpirables)
+
+  for (auto &dn : unexpirables) {
     lru.lru_insert_mid(dn);
+  }
   unexpirables.clear();
 
+  dout(7) << "trim_lru trimmed " << trimmed << " items" << dendl;
+}
+
+/*
+ * note: only called while MDS is active or stopping... NOT during recovery.
+ * however, we may expire a replica whose authority is recovering.
+ *
+ * @param count is number of dentries to try to expire
+ */
+bool MDCache::trim(uint64_t count)
+{
+  uint64_t used = cache_size();
+  uint64_t limit = cache_limit_memory();
+  map<mds_rank_t, MCacheExpire*> expiremap;
+
+  dout(7) << "trim bytes_used=" << bytes2str(used)
+          << " limit=" << bytes2str(limit)
+          << " reservation=" << cache_reservation()
+          << "% count=" << count << dendl;
+
+  // process delayed eval_stray()
+  stray_manager.advance_delayed();
+
+  trim_lru(count, expiremap);
+
   // trim non-auth, non-bound subtrees
-  for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
-       p != subtrees.end();) {
+  for (auto p = subtrees.begin(); p != subtrees.end();) {
     CDir *dir = p->first;
     ++p;
     CInode *diri = dir->get_inode();
@@ -6522,6 +6505,7 @@ bool MDCache::trim(int max, int count)
       if (!diri->is_auth() && !diri->is_base() &&
          dir->get_num_head_items() == 0) {
        if (dir->state_test(CDir::STATE_EXPORTING) ||
+           !(mds->is_active() || mds->is_stopping()) ||
            dir->is_freezing() || dir->is_frozen())
          continue;
 
@@ -6547,7 +6531,7 @@ bool MDCache::trim(int max, int count)
   }
 
   // trim root?
-  if (max == 0 && root) {
+  if (mds->is_stopping() && root) {
     list<CDir*> ls;
     root->get_dirfrags(ls);
     for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
@@ -6590,7 +6574,7 @@ bool MDCache::trim(int max, int count)
   }
 
   // Other rank's base inodes (when I'm stopping)
-  if (max == 0) {
+  if (mds->is_stopping()) {
     for (set<CInode*>::iterator p = base_inodes.begin();
          p != base_inodes.end(); ++p) {
       if (MDS_INO_MDSDIR_OWNER((*p)->ino()) != mds->get_nodeid()) {
@@ -6939,11 +6923,10 @@ void MDCache::trim_non_auth()
   if (lru.lru_get_size() == 0 &&
       bottom_lru.lru_get_size() == 0) {
     // root, stray, etc.?
-    ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
+    auto p = inode_map.begin();
     while (p != inode_map.end()) {
-      ceph::unordered_map<vinodeno_t,CInode*>::iterator next = p;
-      ++next;
       CInode *in = p->second;
+      ++p;
       if (!in->is_auth()) {
        list<CDir*> ls;
        in->get_dirfrags(ls);
@@ -6960,7 +6943,6 @@ void MDCache::trim_non_auth()
        assert(in->get_num_ref() == 0);
        remove_inode(in);
       }
-      p = next;
     }
   }
 
@@ -7277,7 +7259,7 @@ void MDCache::handle_cache_expire(MCacheExpire *m)
       if (nonce == dir->get_replica_nonce(from)) {
        // remove from our cached_by
        dout(7) << " dir expire on " << *dir << " from mds." << from
-               << " replicas was " << dir->replica_map << dendl;
+               << " replicas was " << dir->get_replicas() << dendl;
        dir->remove_replica(from);
       } 
       else {
@@ -7441,10 +7423,10 @@ void MDCache::check_memory_usage()
   static MemoryModel::snap baseline = last;
 
   // check client caps
-  assert(CInode::count() == inode_map.size());
-  float caps_per_inode = 0.0;
+  assert(CInode::count() == inode_map.size() + snap_inode_map.size() + num_shadow_inodes);
+  double caps_per_inode = 0.0;
   if (CInode::count())
-    caps_per_inode = (float)Capability::count() / (float)CInode::count();
+    caps_per_inode = (double)Capability::count() / (double)CInode::count();
 
   dout(2) << "check_memory_usage"
           << " total " << last.get_total()
@@ -7460,20 +7442,15 @@ void MDCache::check_memory_usage()
   mds->mlogger->set(l_mdm_rss, last.get_rss());
   mds->mlogger->set(l_mdm_heap, last.get_heap());
 
-  if (num_inodes_with_caps > g_conf->mds_cache_size) {
-    float ratio = (float)g_conf->mds_cache_size * .9 / (float)num_inodes_with_caps;
-    if (ratio < 1.0) {
-      last_recall_state = ceph_clock_now();
-      mds->server->recall_client_state(ratio);
-    }
+  if (cache_toofull()) {
+    last_recall_state = ceph_clock_now();
+    mds->server->recall_client_state();
   }
 
   // If the cache size had exceeded its limit, but we're back in bounds
   // now, free any unused pool memory so that our memory usage isn't
   // permanently bloated.
-  if (exceeded_size_limit
-      && CInode::count() <=
-        g_conf->mds_cache_size * g_conf->mds_health_cache_threshold) {
+  if (exceeded_size_limit && !cache_toofull()) {
     // Only do this once we are back in bounds: otherwise the releases would
     // slow down whatever process caused us to exceed bounds to begin with
     if (ceph_using_tcmalloc()) {
@@ -7565,7 +7542,7 @@ bool MDCache::shutdown_pass()
   }
 
   // trim cache
-  trim(0);
+  trim(UINT64_MAX);
   dout(5) << "lru size now " << lru.lru_get_size() << "/" << bottom_lru.lru_get_size() << dendl;
 
   // SUBTREES
@@ -7631,12 +7608,6 @@ bool MDCache::shutdown_pass()
   assert(!migrator->is_exporting());
   assert(!migrator->is_importing());
 
-  if ((myin && myin->is_auth_pinned()) ||
-      (mydir && mydir->is_auth_pinned())) {
-    dout(7) << "still have auth pinned objects" << dendl;
-    return false;
-  }
-
   // flush what we can from the log
   mds->mdlog->trim(0);
   if (mds->mdlog->get_num_segments() > 1) {
@@ -7644,6 +7615,12 @@ bool MDCache::shutdown_pass()
     return false;
   }
 
+  if ((myin && myin->is_auth_pinned()) ||
+      (mydir && mydir->is_auth_pinned())) {
+    dout(7) << "still have auth pinned objects" << dendl;
+    return false;
+  }
+
   // (only do this once!)
   if (!mds->mdlog->is_capped()) {
     dout(7) << "capping the log" << dendl;
@@ -8605,7 +8582,6 @@ void MDCache::do_open_ino(inodeno_t ino, open_ino_info_t& info, int err)
 {
   if (err < 0 && err != -EAGAIN) {
     info.checked.clear();
-    info.checked.insert(mds->get_nodeid());
     info.checking = MDS_RANK_NONE;
     info.check_peers = true;
     info.fetch_backtrace = true;
@@ -8617,7 +8593,13 @@ void MDCache::do_open_ino(inodeno_t ino, open_ino_info_t& info, int err)
       info.last_err = err;
   }
 
-  if (info.check_peers) {
+  if (info.check_peers || info.discover) {
+    if (info.discover) {
+      // got backtrace from peer, but failed to find inode. re-check peers
+      info.discover = false;
+      info.ancestors.clear();
+      info.checked.clear();
+    }
     info.check_peers = false;
     info.checking = MDS_RANK_NONE;
     do_open_ino_peer(ino, info);
@@ -8626,7 +8608,6 @@ void MDCache::do_open_ino(inodeno_t ino, open_ino_info_t& info, int err)
     info.fetch_backtrace = false;
     info.checking = mds->get_nodeid();
     info.checked.clear();
-    info.checked.insert(mds->get_nodeid());
     C_IO_MDC_OpenInoBacktraceFetched *fin =
       new C_IO_MDC_OpenInoBacktraceFetched(this, ino);
     fetch_backtrace(ino, info.pool, fin->bl,
@@ -8664,7 +8645,8 @@ void MDCache::do_open_ino_peer(inodeno_t ino, open_ino_info_t& info)
       }
   }
   if (peer < 0) {
-    if (all.size() > active.size() && all != info.checked) {
+    all.erase(mds->get_nodeid());
+    if (all != info.checked) {
       dout(10) << " waiting for more peers to be active" << dendl;
     } else {
       dout(10) << " all MDS peers have been checked " << dendl;
@@ -8813,7 +8795,6 @@ void MDCache::open_ino(inodeno_t ino, int64_t pool, MDSInternalContextBase* fin,
     info.waiters.push_back(fin);
   } else {
     open_ino_info_t& info = opening_inodes[ino];
-    info.checked.insert(mds->get_nodeid());
     info.want_replica = want_replica;
     info.want_xlocked = want_xlocked;
     info.tid = ++open_ino_last_tid;
@@ -8837,7 +8818,12 @@ void MDCache::open_ino(inodeno_t ino, int64_t pool, MDSInternalContextBase* fin,
 void MDCache::find_ino_peers(inodeno_t ino, MDSInternalContextBase *c, mds_rank_t hint)
 {
   dout(5) << "find_ino_peers " << ino << " hint " << hint << dendl;
-  assert(!have_inode(ino));
+  CInode *in = get_inode(ino);
+  if (in && in->state_test(CInode::STATE_PURGING)) {
+    c->complete(-ESTALE);
+    return;
+  }
+  assert(!in);
   
   ceph_tid_t tid = ++find_ino_peer_last_tid;
   find_ino_peer_info_t& fip = find_ino_peer[tid];
@@ -8845,7 +8831,6 @@ void MDCache::find_ino_peers(inodeno_t ino, MDSInternalContextBase *c, mds_rank_
   fip.tid = tid;
   fip.fin = c;
   fip.hint = hint;
-  fip.checked.insert(mds->get_nodeid());
   _do_find_ino_peer(fip);
 }
 
@@ -8873,7 +8858,8 @@ void MDCache::_do_find_ino_peer(find_ino_peer_info_t& fip)
       }
   }
   if (m == MDS_RANK_NONE) {
-    if (all.size() > active.size()) {
+    all.erase(mds->get_nodeid());
+    if (all != fip.checked) {
       dout(10) << "_do_find_ino_peer waiting for more peers to be active" << dendl;
     } else {
       dout(10) << "_do_find_ino_peer failed on " << fip.ino << dendl;
@@ -9081,6 +9067,27 @@ void MDCache::request_finish(MDRequestRef& mdr)
     return; 
   }
 
+  switch(mdr->internal_op) {
+    case CEPH_MDS_OP_FRAGMENTDIR:
+      logger->inc(l_mdss_ireq_fragmentdir);
+      break;
+    case CEPH_MDS_OP_EXPORTDIR:
+      logger->inc(l_mdss_ireq_exportdir);
+      break;
+    case CEPH_MDS_OP_ENQUEUE_SCRUB:
+      logger->inc(l_mdss_ireq_enqueue_scrub);
+      break;
+    case CEPH_MDS_OP_FLUSH:
+      logger->inc(l_mdss_ireq_flush);
+      break;
+    case CEPH_MDS_OP_REPAIR_FRAGSTATS:
+      logger->inc(l_mdss_ireq_fragstats);
+      break;
+    case CEPH_MDS_OP_REPAIR_INODESTATS:
+      logger->inc(l_mdss_ireq_inodestats);
+      break;
+  }
+
   request_cleanup(mdr);
 }
 
@@ -9715,7 +9722,7 @@ void MDCache::handle_discover(MDiscover *dis)
 
   if (mds->get_state() <= MDSMap::STATE_REJOIN) {
     if (mds->get_state() < MDSMap::STATE_REJOIN &&
-       mds->get_want_state() != CEPH_MDS_STATE_REJOIN) {
+       mds->get_want_state() < CEPH_MDS_STATE_REJOIN) {
       dis->put();
       return;
     }
@@ -10159,6 +10166,29 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
 // ----------------------------
 // REPLICAS
 
+
+void MDCache::replicate_dir(CDir *dir, mds_rank_t to, bufferlist& bl)
+{
+  dirfrag_t df = dir->dirfrag();
+  ::encode(df, bl);
+  dir->encode_replica(to, bl);
+}
+
+void MDCache::replicate_dentry(CDentry *dn, mds_rank_t to, bufferlist& bl)
+{
+  ::encode(dn->name, bl);
+  ::encode(dn->last, bl);
+  dn->encode_replica(to, bl, mds->get_state() < MDSMap::STATE_ACTIVE);
+}
+
+void MDCache::replicate_inode(CInode *in, mds_rank_t to, bufferlist& bl,
+                             uint64_t features)
+{
+  ::encode(in->inode.ino, bl);  // bleh, minor assymetry here
+  ::encode(in->last, bl);
+  in->encode_replica(to, bl, features, mds->get_state() < MDSMap::STATE_ACTIVE);
+}
+
 CDir *MDCache::add_replica_dir(bufferlist::iterator& p, CInode *diri, mds_rank_t from,
                               list<MDSInternalContextBase*>& finished)
 {
@@ -10296,10 +10326,9 @@ int MDCache::send_dir_updates(CDir *dir, bool bcast)
   if (bcast) {
     mds->get_mds_map()->get_active_mds_set(who);
   } else {
-    for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
-        p != dir->replicas_end();
-        ++p)
-      who.insert(p->first);
+    for (const auto &p : dir->get_replicas()) {
+      who.insert(p.first);
+    }
   }
   
   dout(7) << "sending dir_update on " << *dir << " bcast " << bcast << " to " << who << dendl;
@@ -10382,22 +10411,20 @@ void MDCache::send_dentry_link(CDentry *dn, MDRequestRef& mdr)
   dout(7) << "send_dentry_link " << *dn << dendl;
 
   CDir *subtree = get_subtree_root(dn->get_dir());
-  for (compact_map<mds_rank_t,unsigned>::iterator p = dn->replicas_begin();
-       p != dn->replicas_end();
-       ++p) {
+  for (const auto &p : dn->get_replicas()) {
     // don't tell (rename) witnesses; they already know
-    if (mdr.get() && mdr->more()->witnessed.count(p->first))
+    if (mdr.get() && mdr->more()->witnessed.count(p.first))
       continue;
-    if (mds->mdsmap->get_state(p->first) < MDSMap::STATE_REJOIN ||
-       (mds->mdsmap->get_state(p->first) == MDSMap::STATE_REJOIN &&
-        rejoin_gather.count(p->first)))
+    if (mds->mdsmap->get_state(p.first) < MDSMap::STATE_REJOIN ||
+       (mds->mdsmap->get_state(p.first) == MDSMap::STATE_REJOIN &&
+        rejoin_gather.count(p.first)))
       continue;
     CDentry::linkage_t *dnl = dn->get_linkage();
     MDentryLink *m = new MDentryLink(subtree->dirfrag(), dn->get_dir()->dirfrag(),
                                     dn->name, dnl->is_primary());
     if (dnl->is_primary()) {
       dout(10) << "  primary " << *dnl->get_inode() << dendl;
-      replicate_inode(dnl->get_inode(), p->first, m->bl,
+      replicate_inode(dnl->get_inode(), p.first, m->bl,
                      mds->mdsmap->get_up_features());
     } else if (dnl->is_remote()) {
       inodeno_t ino = dnl->get_remote_ino();
@@ -10407,7 +10434,7 @@ void MDCache::send_dentry_link(CDentry *dn, MDRequestRef& mdr)
       ::encode(d_type, m->bl);
     } else
       ceph_abort();   // aie, bad caller!
-    mds->send_message_mds(m, p->first);
+    mds->send_message_mds(m, p.first);
   }
 }
 
@@ -11323,12 +11350,10 @@ void MDCache::_fragment_stored(MDRequestRef& mdr)
 
   // tell peers
   CDir *first = *info.resultfrags.begin();
-  for (compact_map<mds_rank_t,unsigned>::iterator p = first->replicas_begin();
-       p != first->replicas_end();
-       ++p) {
-    if (mds->mdsmap->get_state(p->first) < MDSMap::STATE_REJOIN ||
-       (mds->mdsmap->get_state(p->first) == MDSMap::STATE_REJOIN &&
-        rejoin_gather.count(p->first)))
+  for (const auto &p : first->get_replicas()) {
+    if (mds->mdsmap->get_state(p.first) < MDSMap::STATE_REJOIN ||
+       (mds->mdsmap->get_state(p.first) == MDSMap::STATE_REJOIN &&
+        rejoin_gather.count(p.first)))
       continue;
 
     MMDSFragmentNotify *notify = new MMDSFragmentNotify(basedirfrag, info.bits);
@@ -11337,9 +11362,9 @@ void MDCache::_fragment_stored(MDRequestRef& mdr)
     for (list<CDir*>::iterator q = info.resultfrags.begin();
         q != info.resultfrags.end();
         ++q)
-      replicate_dir(*q, p->first, notify->basebl);
+      replicate_dir(*q, p.first, notify->basebl);
 
-    mds->send_message_mds(notify, p->first);
+    mds->send_message_mds(notify, p.first);
   }
 
   // journal commit
@@ -11657,10 +11682,8 @@ void MDCache::force_readonly()
   mds->server->force_clients_readonly();
 
   // revoke write caps
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
-       p != inode_map.end();
-       ++p) {
-    CInode *in = p->second;
+  for (auto p : inode_map) {
+    CInode *in = p.second;
     if (in->is_head())
       mds->locker->eval(in, CEPH_CAP_LOCKS);
   }
@@ -11808,25 +11831,22 @@ void MDCache::show_subtrees(int dbl)
   assert(lost == 0);
 }
 
-
 void MDCache::show_cache()
 {
   dout(7) << "show_cache" << dendl;
-  
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator it = inode_map.begin();
-       it != inode_map.end();
-       ++it) {
+
+  auto show_func = [this](CInode *in) {
     // unlinked?
-    if (!it->second->parent)
-      dout(7) << " unlinked " << *it->second << dendl;
-    
+    if (!in->parent)
+      dout(7) << " unlinked " << *in << dendl;
+
     // dirfrags?
     list<CDir*> dfs;
-    it->second->get_dirfrags(dfs);
+    in->get_dirfrags(dfs);
     for (list<CDir*>::iterator p = dfs.begin(); p != dfs.end(); ++p) {
       CDir *dir = *p;
       dout(7) << "  dirfrag " << *dir << dendl;
-           
+
       for (CDir::map_t::iterator p = dir->items.begin();
           p != dir->items.end();
           ++p) {
@@ -11837,7 +11857,24 @@ void MDCache::show_cache()
          dout(7) << "    inode " << *dnl->get_inode() << dendl;
       }
     }
-  }
+  };
+
+  for (auto p : inode_map)
+    show_func(p.second);
+  for (auto p : snap_inode_map)
+    show_func(p.second);
+}
+
+int MDCache::cache_status(Formatter *f)
+{
+  f->open_object_section("cache");
+
+  f->open_object_section("pool");
+  mempool::get_pool(mempool::mds_co::id).dump(f);
+  f->close_section();
+
+  f->close_section();
+  return 0;
 }
 
 int MDCache::dump_cache(std::string const &file_name)
@@ -11883,11 +11920,8 @@ int MDCache::dump_cache(const char *fn, Formatter *f,
     }
   }
 
-  for (ceph::unordered_map<vinodeno_t,CInode*>::iterator it = inode_map.begin();
-       it != inode_map.end();
-       ++it) {
-    CInode *in = it->second;
-
+  auto dump_func = [this, fd, f, depth, &dump_root](CInode *in) {
+    int r;
     if (!dump_root.empty()) {
       string ipath;
       if (in->is_root())
@@ -11897,11 +11931,11 @@ int MDCache::dump_cache(const char *fn, Formatter *f,
 
       if (dump_root.length() > ipath.length() ||
          !equal(dump_root.begin(), dump_root.end(), ipath.begin()))
-       continue;
+       return 0;
 
       if (depth >= 0 &&
          count(ipath.begin() + dump_root.length(), ipath.end(), '/') > depth)
-       continue;
+       return 0;
     }
 
     if (f) {
@@ -11912,9 +11946,8 @@ int MDCache::dump_cache(const char *fn, Formatter *f,
       ss << *in << std::endl;
       std::string s = ss.str();
       r = safe_write(fd, s.c_str(), s.length());
-      if (r < 0) {
-       goto out;
-      }
+      if (r < 0)
+       return r;
     }
 
     list<CDir*> dfs;
@@ -11932,9 +11965,8 @@ int MDCache::dump_cache(const char *fn, Formatter *f,
         tt << " " << *dir << std::endl;
         string t = tt.str();
         r = safe_write(fd, t.c_str(), t.length());
-        if (r < 0) {
-          goto out;
-        }
+        if (r < 0)
+         return r;
       }
       
       if (f) {
@@ -11953,9 +11985,8 @@ int MDCache::dump_cache(const char *fn, Formatter *f,
           uu << "  " << *dn << std::endl;
           string u = uu.str();
           r = safe_write(fd, u.c_str(), u.length());
-          if (r < 0) {
-            goto out;
-          }
+          if (r < 0)
+           return r;
         }
       }
       if (f) {
@@ -11973,7 +12004,20 @@ int MDCache::dump_cache(const char *fn, Formatter *f,
     if (f) {
       f->close_section();  // inode
     }
+    return 1;
+  };
+
+  for (auto p : inode_map) {
+    r = dump_func(p.second);
+    if (r < 0)
+      goto out;
   }
+  for (auto p : snap_inode_map) {
+    r = dump_func(p.second);
+    if (r < 0)
+      goto out;
+  }
+  r = 0;
 
  out:
   if (f) {
@@ -12069,14 +12113,45 @@ void MDCache::enqueue_scrub_work(MDRequestRef& mdr)
 
   header->set_origin(in);
 
-  // only set completion context for non-recursive scrub, because we don't 
-  // want to block asok caller on long running scrub
+  Context *fin = nullptr;
+  if (!header->get_recursive()) {
+    fin = cs->take_finisher();
+  }
+
+  // If the scrub did some repair, then flush the journal at the end of
+  // the scrub.  Otherwise in the case of e.g. rewriting a backtrace
+  // the on disk state will still look damaged.
+  auto expiry_fin = new FunctionContext([this, header, fin](int r){
+      if (header->get_repaired()) {
+        dout(4) << "Flushing journal because scrub did some repairs" << dendl;
+        mds->mdlog->start_new_segment();
+        mds->mdlog->trim_all();
+        if (fin) {
+          MDSGatherBuilder expiry_gather(g_ceph_context);
+          const std::set<LogSegment*> &expiring_segments = mds->mdlog->get_expiring_segments();
+          for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
+               i != expiring_segments.end(); ++i) {
+            (*i)->wait_for_expiry(expiry_gather.new_sub());
+          }
+          expiry_gather.set_finisher(new MDSInternalContextWrapper(mds, fin));
+          expiry_gather.activate();
+        }
+      } else {
+        if (fin) {
+          fin->complete(r);
+        }
+      }
+  });
+
   if (!header->get_recursive()) {
-    Context *fin = cs->take_finisher();
     mds->scrubstack->enqueue_inode_top(in, header,
-                                      new MDSInternalContextWrapper(mds, fin));
-  } else
-    mds->scrubstack->enqueue_inode_bottom(in, header, NULL);
+                                      new MDSInternalContextWrapper(mds,
+                                         expiry_fin));
+  } else {
+    mds->scrubstack->enqueue_inode_bottom(in, header, 
+                                      new MDSInternalContextWrapper(mds,
+                                         expiry_fin));
+  }
 
   mds->server->respond_to_request(mdr, 0);
   return;
@@ -12362,6 +12437,19 @@ void MDCache::register_perfcounters()
     pcb.add_u64_counter(l_mdc_recovery_completed, "recovery_completed",
         "File recoveries completed", "recd", PerfCountersBuilder::PRIO_INTERESTING);
 
+    pcb.add_u64_counter(l_mdss_ireq_enqueue_scrub, "ireq_enqueue_scrub",
+        "Internal Request type enqueue scrub");
+    pcb.add_u64_counter(l_mdss_ireq_exportdir, "ireq_exportdir",
+        "Internal Request type export dir");
+    pcb.add_u64_counter(l_mdss_ireq_flush, "ireq_flush",
+        "Internal Request type flush");
+    pcb.add_u64_counter(l_mdss_ireq_fragmentdir, "ireq_fragmentdir",
+        "Internal Request type fragmentdir");
+    pcb.add_u64_counter(l_mdss_ireq_fragstats, "ireq_fragstats",
+        "Internal Request type frag stats");
+    pcb.add_u64_counter(l_mdss_ireq_inodestats, "ireq_inodestats",
+        "Internal Request type inode stats");
+
     logger.reset(pcb.create_perf_counters());
     g_ceph_context->get_perfcounters_collection()->add(logger.get());
     recovery_queue.set_logger(logger.get());