]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/MDCache.cc
update sources to v12.1.0
[ceph.git] / ceph / src / mds / MDCache.cc
index 8f0ca76accb298a1b8f63a9b203b88afcf1bd794..6c9f47a7290343266f4fbf77bc52e3fdf36250be 100644 (file)
@@ -205,6 +205,9 @@ MDCache::MDCache(MDSRank *m, PurgeQueue &purge_queue_) :
   lru.lru_set_max(g_conf->mds_cache_size);
   lru.lru_set_midpoint(g_conf->mds_cache_mid);
 
+  bottom_lru.lru_set_max(0);
+  bottom_lru.lru_set_midpoint(0);
+
   decayrate.set_halflife(g_conf->mds_decay_halflife);
 
   did_shutdown_log_cap = false;
@@ -295,7 +298,8 @@ void MDCache::remove_inode(CInode *o)
 
   o->item_open_file.remove_myself();
 
-  export_pin_queue.erase(o);
+  if (o->state_test(CInode::STATE_QUEUEDEXPORTPIN))
+    export_pin_queue.erase(o);
 
   // remove from inode map
   inode_map.erase(o->vino());    
@@ -2875,6 +2879,7 @@ void MDCache::handle_mds_failure(mds_rank_t who)
 
   rejoin_gather.insert(who);
   rejoin_sent.erase(who);        // i need to send another
+  rejoin_ack_sent.erase(who);    // i need to send another
   rejoin_ack_gather.erase(who);  // i'll need/get another.
 
   dout(10) << " resolve_gather " << resolve_gather << dendl;
@@ -2934,6 +2939,13 @@ void MDCache::handle_mds_failure(mds_rank_t who)
        dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl;
        add_ambiguous_slave_update(p->first, mdr->slave_to_mds);
       }
+    } else if (mdr->slave_request) {
+      MMDSSlaveRequest *slave_req = mdr->slave_request;
+      // FIXME: Slave rename request can arrive after we notice mds failure.
+      //       This can cause mds to crash (does not affect integrity of FS).
+      if (slave_req->get_op() == MMDSSlaveRequest::OP_RENAMEPREP &&
+         slave_req->srcdn_auth == who)
+       slave_req->mark_interrupted();
     }
     
     // failed node is slave?
@@ -3656,7 +3668,7 @@ void MDCache::remove_inode_recursive(CInode *in)
       CDentry::linkage_t *dnl = dn->get_linkage();
       if (dnl->is_primary()) {
        CInode *tin = dnl->get_inode();
-       subdir->unlink_inode(dn);
+       subdir->unlink_inode(dn, false);
        remove_inode_recursive(tin);
       }
       subdir->remove_dentry(dn);
@@ -5925,11 +5937,16 @@ void MDCache::rejoin_send_acks()
   rejoin_unlinked_inodes.clear();
   
   // send acks to everyone in the recovery set
-  map<mds_rank_t,MMDSCacheRejoin*> ack;
+  map<mds_rank_t,MMDSCacheRejoin*> acks;
   for (set<mds_rank_t>::iterator p = recovery_set.begin();
        p != recovery_set.end();
-       ++p) 
-    ack[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
+       ++p) {
+    if (rejoin_ack_sent.count(*p))
+      continue;
+    acks[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
+  }
+
+  rejoin_ack_sent = recovery_set;
   
   // walk subtrees
   for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin(); 
@@ -5952,8 +5969,11 @@ void MDCache::rejoin_send_acks()
       for (compact_map<mds_rank_t,unsigned>::iterator r = dir->replicas_begin();
           r != dir->replicas_end();
           ++r) {
-       ack[r->first]->add_strong_dirfrag(dir->dirfrag(), ++r->second, dir->dir_rep);
-       ack[r->first]->add_dirfrag_base(dir);
+       auto it = acks.find(r->first);
+       if (it == acks.end())
+         continue;
+       it->second->add_strong_dirfrag(dir->dirfrag(), ++r->second, dir->dir_rep);
+       it->second->add_dirfrag_base(dir);
       }
           
       for (CDir::map_t::iterator q = dir->items.begin();
@@ -5971,7 +5991,10 @@ void MDCache::rejoin_send_acks()
        for (compact_map<mds_rank_t,unsigned>::iterator r = dn->replicas_begin();
             r != dn->replicas_end();
             ++r) {
-         ack[r->first]->add_strong_dentry(dir->dirfrag(), dn->name, dn->first, dn->last,
+         auto it = acks.find(r->first);
+         if (it == acks.end())
+           continue;
+         it->second->add_strong_dentry(dir->dirfrag(), dn->name, dn->first, dn->last,
                                           dnl->is_primary() ? dnl->get_inode()->ino():inodeno_t(0),
                                           dnl->is_remote() ? dnl->get_remote_ino():inodeno_t(0),
                                           dnl->is_remote() ? dnl->get_remote_d_type():0,
@@ -5988,10 +6011,13 @@ void MDCache::rejoin_send_acks()
        for (compact_map<mds_rank_t,unsigned>::iterator r = in->replicas_begin();
             r != in->replicas_end();
             ++r) {
-         ack[r->first]->add_inode_base(in, mds->mdsmap->get_up_features());
+         auto it = acks.find(r->first);
+         if (it == acks.end())
+           continue;
+         it->second->add_inode_base(in, mds->mdsmap->get_up_features());
          bufferlist bl;
          in->_encode_locks_state_for_rejoin(bl, r->first);
-         ack[r->first]->add_inode_locks(in, ++r->second, bl);
+         it->second->add_inode_locks(in, ++r->second, bl);
        }
        
        // subdirs in this subtree?
@@ -6005,19 +6031,25 @@ void MDCache::rejoin_send_acks()
     for (compact_map<mds_rank_t,unsigned>::iterator r = root->replicas_begin();
         r != root->replicas_end();
         ++r) {
-      ack[r->first]->add_inode_base(root, mds->mdsmap->get_up_features());
+      auto it = acks.find(r->first);
+      if (it == acks.end())
+       continue;
+      it->second->add_inode_base(root, mds->mdsmap->get_up_features());
       bufferlist bl;
       root->_encode_locks_state_for_rejoin(bl, r->first);
-      ack[r->first]->add_inode_locks(root, ++r->second, bl);
+      it->second->add_inode_locks(root, ++r->second, bl);
     }
   if (myin)
     for (compact_map<mds_rank_t,unsigned>::iterator r = myin->replicas_begin();
         r != myin->replicas_end();
         ++r) {
-      ack[r->first]->add_inode_base(myin, mds->mdsmap->get_up_features());
+      auto it = acks.find(r->first);
+      if (it == acks.end())
+       continue;
+      it->second->add_inode_base(myin, mds->mdsmap->get_up_features());
       bufferlist bl;
       myin->_encode_locks_state_for_rejoin(bl, r->first);
-      ack[r->first]->add_inode_locks(myin, ++r->second, bl);
+      it->second->add_inode_locks(myin, ++r->second, bl);
     }
 
   // include inode base for any inodes whose scatterlocks may have updated
@@ -6027,14 +6059,16 @@ void MDCache::rejoin_send_acks()
     CInode *in = *p;
     for (compact_map<mds_rank_t,unsigned>::iterator r = in->replicas_begin();
         r != in->replicas_end();
-        ++r)
-      ack[r->first]->add_inode_base(in, mds->mdsmap->get_up_features());
+        ++r) {
+      auto it = acks.find(r->first);
+      if (it == acks.end())
+       continue;
+      it->second->add_inode_base(in, mds->mdsmap->get_up_features());
+    }
   }
 
   // send acks
-  for (map<mds_rank_t,MMDSCacheRejoin*>::iterator p = ack.begin();
-       p != ack.end();
-       ++p) {
+  for (auto p = acks.begin(); p != acks.end(); ++p) {
     ::encode(rejoin_imported_caps[p->first], p->second->imported_caps);
     mds->send_message_mds(p->second, p->first);
   }
@@ -6410,7 +6444,8 @@ bool MDCache::trim(int max, int count)
     if (max <= 0)
       return false;
   }
-  dout(7) << "trim max=" << max << "  cur=" << lru.lru_get_size() << dendl;
+  dout(7) << "trim max=" << max << "  cur=" << lru.lru_get_size()
+         << "/" << bottom_lru.lru_get_size() << dendl;
 
   // process delayed eval_stray()
   stray_manager.advance_delayed();
@@ -6420,22 +6455,26 @@ bool MDCache::trim(int max, int count)
   int unexpirable = 0;
   list<CDentry*> unexpirables;
 
+  for (;;) {
+    CDentry *dn = static_cast<CDentry*>(bottom_lru.lru_expire());
+    if (!dn)
+      break;
+    if (trim_dentry(dn, expiremap)) {
+      unexpirables.push_back(dn);
+      ++unexpirable;
+    }
+  }
+
+  for(auto dn : unexpirables)
+    bottom_lru.lru_insert_mid(dn);
+  unexpirables.clear();
+
   // trim dentries from the LRU: only enough to satisfy `max`,
-  // unless we see null dentries at the bottom of the LRU,
-  // in which case trim all those.
-  bool trimming_nulls = true;
-  while (trimming_nulls || lru.lru_get_size() + unexpirable > (unsigned)max) {
+  while (lru.lru_get_size() + unexpirable > (unsigned)max) {
     CDentry *dn = static_cast<CDentry*>(lru.lru_expire());
     if (!dn) {
       break;
     }
-    if (!dn->get_linkage()->is_null()) {
-      trimming_nulls = false;
-      if (lru.lru_get_size() + unexpirable < (unsigned)max) {
-       unexpirables.push_back(dn);
-       break;
-      }
-    }
     if ((is_standby_replay && dn->get_linkage()->inode &&
         dn->get_linkage()->inode->item_open_file.is_on_list()) ||
        trim_dentry(dn, expiremap)) {
@@ -6443,24 +6482,41 @@ bool MDCache::trim(int max, int count)
       ++unexpirable;
     }
   }
-  for(list<CDentry*>::iterator i = unexpirables.begin();
-      i != unexpirables.end();
-      ++i)
-    lru.lru_insert_mid(*i);
+  for(auto dn : unexpirables)
+    lru.lru_insert_mid(dn);
+  unexpirables.clear();
 
   // trim non-auth, non-bound subtrees
   for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
        p != subtrees.end();) {
     CDir *dir = p->first;
     ++p;
-    if (!dir->is_auth() && !dir->get_inode()->is_auth()) {
-      // don't trim subtree root if its auth MDS is recovering.
-      // This simplify the cache rejoin code.
-      if (dir->is_subtree_root() &&
-         rejoin_ack_gather.count(dir->get_dir_auth().first))
-       continue;
-      if (dir->get_num_ref() == 1)  // subtree pin
+    CInode *diri = dir->get_inode();
+    if (dir->is_auth()) {
+      if (!diri->is_auth() && !diri->is_base() &&
+         dir->get_num_head_items() == 0) {
+       if (dir->state_test(CDir::STATE_EXPORTING) ||
+           dir->is_freezing() || dir->is_frozen())
+         continue;
+
+       migrator->export_empty_import(dir);
+      }
+    } else {
+      if (!diri->is_auth()) {
+       if (dir->get_num_ref() > 1)  // only subtree pin
+         continue;
+       list<CDir*> ls;
+       diri->get_subtree_dirfrags(ls);
+       if (diri->get_num_ref() > (int)ls.size()) // only pinned by subtrees
+         continue;
+
+       // don't trim subtree root if its auth MDS is recovering.
+       // This simplify the cache rejoin code.
+       if (dir->is_subtree_root() &&
+           rejoin_ack_gather.count(dir->get_dir_auth().first))
+         continue;
        trim_dirfrag(dir, 0, expiremap);
+      }
     }
   }
 
@@ -6583,7 +6639,7 @@ bool MDCache::trim_dentry(CDentry *dn, map<mds_rank_t, MCacheExpire*>& expiremap
   // unlink the dentry
   if (dnl->is_remote()) {
     // just unlink.
-    dir->unlink_inode(dn);
+    dir->unlink_inode(dn, false);
   } else if (dnl->is_primary()) {
     // expire the inode, too.
     CInode *in = dnl->get_inode();
@@ -6622,10 +6678,6 @@ bool MDCache::trim_dentry(CDentry *dn, map<mds_rank_t, MCacheExpire*>& expiremap
   if (clear_complete)
     dir->state_clear(CDir::STATE_COMPLETE);
   
-  // reexport?
-  if (dir->get_num_head_items() == 0 && dir->is_subtree_root())
-    migrator->export_empty_import(dir);
-  
   if (mds->logger) mds->logger->inc(l_mds_inodes_expired);
   return false;
 }
@@ -6753,7 +6805,7 @@ bool MDCache::trim_inode(CDentry *dn, CInode *in, CDir *con, map<mds_rank_t, MCa
     
   // unlink
   if (dn)
-    dn->get_dir()->unlink_inode(dn);
+    dn->get_dir()->unlink_inode(dn, false);
   remove_inode(in);
   return false;
 }
@@ -6785,29 +6837,26 @@ void MDCache::trim_non_auth()
        ++p) 
     p->first->get(CDir::PIN_SUBTREETEMP);
 
-  // note first auth item we see.  
-  // when we see it the second time, stop.
-  CDentry *first_auth = 0;
+  list<CDentry*> auth_list;
   
   // trim non-auth items from the lru
-  while (lru.lru_get_size() > 0) {
-    CDentry *dn = static_cast<CDentry*>(lru.lru_expire());
-    if (!dn) break;
+  for (;;) {
+    CDentry *dn = NULL;
+    if (bottom_lru.lru_get_size() > 0)
+      dn = static_cast<CDentry*>(bottom_lru.lru_expire());
+    if (!dn && lru.lru_get_size() > 0)
+      dn = static_cast<CDentry*>(lru.lru_expire());
+    if (!dn)
+       break;
+
     CDentry::linkage_t *dnl = dn->get_linkage();
 
     if (dn->is_auth()) {
       // add back into lru (at the top)
-      lru.lru_insert_top(dn);
+      auth_list.push_back(dn);
 
       if (dnl->is_remote() && dnl->get_inode() && !dnl->get_inode()->is_auth())
        dn->unlink_remote(dnl);
-
-      if (!first_auth) {
-       first_auth = dn;
-      } else {
-       if (first_auth == dn) 
-         break;
-      }
     } else {
       // non-auth.  expire.
       CDir *dir = dn->get_dir();
@@ -6816,7 +6865,7 @@ void MDCache::trim_non_auth()
       // unlink the dentry
       dout(10) << " removing " << *dn << dendl;
       if (dnl->is_remote()) {
-       dir->unlink_inode(dn);
+       dir->unlink_inode(dn, false);
       } 
       else if (dnl->is_primary()) {
        CInode *in = dnl->get_inode();
@@ -6828,7 +6877,7 @@ void MDCache::trim_non_auth()
          assert(!subdir->is_subtree_root());
          in->close_dirfrag(subdir->dirfrag().frag);
        }
-       dir->unlink_inode(dn);
+       dir->unlink_inode(dn, false);
        remove_inode(in);
       } 
       else {
@@ -6845,6 +6894,13 @@ void MDCache::trim_non_auth()
     }
   }
 
+  for (auto dn : auth_list) {
+      if (dn->state_test(CDentry::STATE_BOTTOMLRU))
+       bottom_lru.lru_insert_mid(dn);
+      else
+       lru.lru_insert_top(dn);
+  }
+
   // move everything in the pintail to the top bit of the lru.
   lru.lru_touch_entire_pintail();
 
@@ -6854,7 +6910,8 @@ void MDCache::trim_non_auth()
        ++p) 
     p->first->put(CDir::PIN_SUBTREETEMP);
 
-  if (lru.lru_get_size() == 0) {
+  if (lru.lru_get_size() == 0 &&
+      bottom_lru.lru_get_size() == 0) {
     // root, stray, etc.?
     ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
     while (p != inode_map.end()) {
@@ -6929,7 +6986,7 @@ bool MDCache::trim_non_auth_subtree(CDir *dir)
       }
       if (!keep_inode) { // remove it!
         dout(20) << "trim_non_auth_subtree(" << dir << ") removing inode " << in << " with dentry" << dn << dendl;
-        dir->unlink_inode(dn);
+        dir->unlink_inode(dn, false);
         remove_inode(in);
        assert(!dir->has_bloom());
         dir->remove_dentry(dn);
@@ -6943,7 +7000,7 @@ bool MDCache::trim_non_auth_subtree(CDir *dir)
     } else { // just remove it
       dout(20) << "trim_non_auth_subtree(" << dir << ") removing dentry " << dn << dendl;
       if (dnl->is_remote())
-        dir->unlink_inode(dn);
+        dir->unlink_inode(dn, false);
       dir->remove_dentry(dn);
     }
   }
@@ -7430,7 +7487,7 @@ void MDCache::shutdown_check()
   mds->timer.add_event_after(g_conf->mds_shutdown_check, new C_MDC_ShutdownCheck(this));
 
   // this
-  dout(0) << "lru size now " << lru.lru_get_size() << dendl;
+  dout(0) << "lru size now " << lru.lru_get_size() << "/" << bottom_lru.lru_get_size() << dendl;
   dout(0) << "log len " << mds->mdlog->get_num_events() << dendl;
 
 
@@ -7482,7 +7539,7 @@ bool MDCache::shutdown_pass()
 
   // trim cache
   trim(0);
-  dout(5) << "lru size now " << lru.lru_get_size() << dendl;
+  dout(5) << "lru size now " << lru.lru_get_size() << "/" << bottom_lru.lru_get_size() << dendl;
 
   // SUBTREES
   int num_auth_subtree = 0;
@@ -7547,20 +7604,6 @@ bool MDCache::shutdown_pass()
   assert(!migrator->is_exporting());
   assert(!migrator->is_importing());
 
-  // make mydir subtree go away
-  if (mydir) {
-    adjust_subtree_auth(mydir, CDIR_AUTH_UNKNOWN);
-    remove_subtree(mydir);
-  }
-  assert(subtrees.empty());
-
-  // Still replicas of mydir?
-  if ((mydir != NULL) && mydir->inode->is_replicated()) {
-    // We do this because otherwise acks to locks could come in after
-    // we cap the log.
-    dout(7) << "waiting for mydir replicas to release: " << *mydir << dendl;
-    return false;
-  }
 
   // flush what we can from the log
   mds->mdlog->trim(0);
@@ -7600,12 +7643,28 @@ bool MDCache::shutdown_pass()
   }
 
   // trim what we can from the cache
-  if (lru.lru_get_size() > 0) {
-    dout(7) << "there's still stuff in the cache: " << lru.lru_get_size() << dendl;
+  if (lru.lru_get_size() > 0 || bottom_lru.lru_get_size() > 0) {
+    dout(7) << "there's still stuff in the cache: " << lru.lru_get_size() << "/" << bottom_lru.lru_get_size()  << dendl;
     show_cache();
     //dump();
     return false;
   }
+
+  // make mydir subtree go away
+  if (mydir) {
+    if (mydir->get_num_ref() > 1) { // subtree pin
+      dout(7) << "there's still reference to mydir " << *mydir << dendl;
+      show_cache();
+      return false;
+    }
+
+    remove_subtree(mydir);
+    myin->close_dirfrag(mydir->get_frag());
+  }
+  assert(subtrees.empty());
+
+  if (myin)
+    remove_inode(myin);
   
   // done!
   dout(2) << "shutdown done." << dendl;
@@ -8174,9 +8233,12 @@ struct C_MDC_OpenRemoteDentry : public MDCacheContext {
   MDSInternalContextBase *onfinish;
   bool want_xlocked;
   C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, inodeno_t i, MDSInternalContextBase *f, bool wx) :
-    MDCacheContext(m), dn(d), ino(i), onfinish(f), want_xlocked(wx) {}
+    MDCacheContext(m), dn(d), ino(i), onfinish(f), want_xlocked(wx) {
+    dn->get(MDSCacheObject::PIN_PTRWAITER);
+  }
   void finish(int r) override {
     mdcache->_open_remote_dentry_finish(dn, ino, onfinish, want_xlocked, r);
+    dn->put(MDSCacheObject::PIN_PTRWAITER);
   }
 };
 
@@ -8194,22 +8256,26 @@ void MDCache::_open_remote_dentry_finish(CDentry *dn, inodeno_t ino, MDSInternal
                                         bool want_xlocked, int r)
 {
   if (r < 0) {
+    CDentry::linkage_t *dnl = dn->get_projected_linkage();
+    if (dnl->is_remote() && dnl->get_remote_ino() == ino) {
       dout(0) << "open_remote_dentry_finish bad remote dentry " << *dn << dendl;
       dn->state_set(CDentry::STATE_BADREMOTEINO);
 
       std::string path;
       CDir *dir = dn->get_dir();
       if (dir) {
-        dir->get_inode()->make_path_string(path);
-        path =  path + "/" + dn->get_name();
+       dir->get_inode()->make_path_string(path);
+       path =  path + "/" + dn->get_name();
       }
 
-      bool fatal = mds->damage_table.notify_remote_damaged(
-          dn->get_projected_linkage()->get_remote_ino(), path);
+      bool fatal = mds->damage_table.notify_remote_damaged(ino, path);
       if (fatal) {
-        mds->damaged();
-        ceph_abort();  // unreachable, damaged() respawns us
+       mds->damaged();
+       ceph_abort();  // unreachable, damaged() respawns us
       }
+    } else {
+      r = 0;
+    }
   }
   fin->complete(r < 0 ? r : 0);
 }
@@ -9426,59 +9492,6 @@ void MDCache::scan_stray_dir(dirfrag_t next)
   }
 }
 
-/**
- * If a remote dentry refers to an inode whose primary
- * dentry is a stray, then evaluate the inode for purging if
- * we have the auth copy, or migrate the stray to use if we
- * do not.
- */
-void MDCache::eval_remote(CDentry *remote_dn)
-{
-  assert(remote_dn);
-  dout(10) << __func__ << " " << *remote_dn << dendl;
-
-  CDentry::linkage_t *dnl = remote_dn->get_projected_linkage();
-  assert(dnl->is_remote());
-  CInode *in = dnl->get_inode();
-
-  if (!in) {
-    dout(20) << __func__ << ": no inode, cannot evaluate" << dendl;
-    return;
-  }
-
-  if (remote_dn->last != CEPH_NOSNAP) {
-    dout(20) << __func__ << ": snap dentry, cannot evaluate" << dendl;
-    return;
-  }
-
-  // refers to stray?
-  CDentry *primary_dn = in->get_projected_parent_dn();
-  assert(primary_dn != NULL);
-  if (primary_dn->get_dir()->get_inode()->is_stray()) {
-    if (in->is_auth()) {
-      dout(20) << __func__ << ": have auth for inode, evaluating" << dendl;
-
-      stray_manager.eval_remote_stray(primary_dn, remote_dn);
-    } else {
-      dout(20) << __func__ << ": do not have auth for inode, migrating " << dendl;
-      /*
-       * Inodes get filed into a stray dentry when a client unlinks
-       * the primary DN for them.  However, that doesn't mean there
-       * isn't a remote DN still in the world.  The remote DN just
-       * ends up pointing at a stray.  Strays can pretty much live
-       * forever in this scenario.
-       *
-       * Therefore, we have a special behaviour here: migrate a stray
-       * to <me> when <I> handle a client request with a trace referring
-       * to a stray inode on another MDS.
-       */
-      stray_manager.migrate_stray(primary_dn, mds->get_nodeid());
-    }
-  } else {
-    dout(20) << __func__ << ": inode's primary dn not stray" << dendl;
-  }
-}
-
 void MDCache::fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin)
 {
   object_t oid = CInode::get_object_name(ino, frag_t(), "");
@@ -9645,7 +9658,7 @@ void MDCache::discover_path(CDir *base,
       !base->is_waiting_for_dentry(want_path[0].c_str(), snap) || !onfinish) {
     discover_info_t& d = _create_discover(from);
     d.ino = base->ino();
-    d.pin_base(base);
+    d.pin_base(base->inode);
     d.frag = base->get_frag();
     d.snap = snap;
     d.want_path = want_path;
@@ -9826,15 +9839,15 @@ void MDCache::handle_discover(MDiscover *dis)
       curdir = cur->get_or_open_dirfrag(this, fg);
     } else if (curdir->is_frozen_tree() ||
               (curdir->is_frozen_dir() && fragment_are_all_frozen(curdir))) {
+      if (!reply->is_empty()) {
+       dout(7) << *curdir << " is frozen, non-empty reply, stopping" << dendl;
+       break;
+      }
       if (dis->wants_base_dir() && dis->get_base_dir_frag() != curdir->get_frag()) {
        dout(7) << *curdir << " is frozen, dirfrag mismatch, stopping" << dendl;
        reply->set_flag_error_dir();
        break;
       }
-      if (!reply->is_empty()) {
-       dout(7) << *curdir << " is frozen, non-empty reply, stopping" << dendl;
-       break;
-      }
       dout(7) << *curdir << " is frozen, empty reply, waiting" << dendl;
       curdir->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, dis));
       reply->put();
@@ -9861,6 +9874,7 @@ void MDCache::handle_discover(MDiscover *dis)
     CDentry *dn = 0;
     if (curdir->get_version() == 0) {
       // fetch newly opened dir
+      assert(!curdir->has_bloom());
     } else if (dis->get_want().depth() > 0) {
       // lookup dentry
       dn = curdir->lookup(dis->get_dentry(i), snapid);
@@ -9869,7 +9883,8 @@ void MDCache::handle_discover(MDiscover *dis)
           
     // incomplete dir?
     if (!dn) {
-      if (!curdir->is_complete()) {
+      if (!curdir->is_complete() &&
+         (!curdir->has_bloom() || curdir->is_in_bloom(dis->get_dentry(i)))) {
        // readdir
        dout(7) << "incomplete dir contents for " << *curdir << ", fetching" << dendl;
        if (reply->is_empty()) {
@@ -9892,6 +9907,13 @@ void MDCache::handle_discover(MDiscover *dis)
     }
     assert(dn);
 
+    // don't add replica to purging dentry/inode
+    if (dn->state_test(CDentry::STATE_PURGING)) {
+      if (reply->is_empty())
+       reply->set_flag_error_dn(dis->get_dentry(i));
+      break;
+    }
+
     CDentry::linkage_t *dnl = dn->get_linkage();
 
     // xlocked dentry?
@@ -10057,10 +10079,11 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
     if (who >= 0)
       dout(7) << " dir_auth_hint is " << m->get_dir_auth_hint() << dendl;
 
-    frag_t fg = m->get_base_dir_frag();
-    CDir *dir = cur->get_dirfrag(fg);
 
     if (m->get_wanted_base_dir()) {
+      frag_t fg = m->get_base_dir_frag();
+      CDir *dir = cur->get_dirfrag(fg);
+
       if (cur->is_waiting_for_dir(fg)) {
        if (cur->is_auth())
          cur->take_waiting(CInode::WAIT_DIR, finished);
@@ -10074,6 +10097,8 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
 
     // try again?
     if (m->get_error_dentry().length()) {
+      frag_t fg = cur->pick_dirfrag(m->get_error_dentry());
+      CDir *dir = cur->get_dirfrag(fg);
       // wanted a dentry
       if (dir && dir->is_waiting_for_dentry(m->get_error_dentry(), m->get_wanted_snapid())) {
        if (dir->is_auth() || dir->lookup(m->get_error_dentry())) {
@@ -10087,6 +10112,17 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
        dout(7) << " doing nothing, have dir but nobody is waiting on dentry "
                << m->get_error_dentry() << dendl;
     }
+  } else if (m->is_flag_error_dn()) {
+    frag_t fg = cur->pick_dirfrag(m->get_error_dentry());
+    CDir *dir = cur->get_dirfrag(fg);
+    if (dir) {
+      if (dir->is_auth()) {
+       dir->take_sub_waiting(finished);
+      } else {
+       dir->take_dentry_waiting(m->get_error_dentry(), m->get_wanted_snapid(),
+                                m->get_wanted_snapid(), error);
+      }
+    }
   }
 
   // waiters
@@ -10480,7 +10516,6 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m)
            !in->state_test(CInode::STATE_EXPORTINGCAPS))
          migrator->export_caps(in);
        
-       touch_dentry_bottom(straydn); // move stray to end of lru
        straydn = NULL;
       } else {
        assert(!straydn);
@@ -10488,9 +10523,6 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m)
        dn->dir->unlink_inode(dn);
       }
       assert(dnl->is_null());
-      
-      // move to bottom of lru
-      touch_dentry_bottom(dn);
     }
   }
 
@@ -10671,13 +10703,26 @@ void MDCache::adjust_dir_fragments(CInode *diri,
 
     // are my constituent bits subtrees?  if so, i will be too.
     // (it's all or none, actually.)
-    bool was_subtree = false;
-    set<CDir*> new_bounds;
-    for (list<CDir*>::iterator p = srcfrags.begin(); p != srcfrags.end(); ++p) {
-      CDir *dir = *p;
+    bool any_subtree = false;
+    for (CDir *dir : srcfrags) {
       if (dir->is_subtree_root()) {
+       any_subtree = true;
+       break;
+      }
+    }
+    set<CDir*> new_bounds;
+    if (any_subtree)  {
+      for (CDir *dir : srcfrags) {
+       // this simplifies the code that find subtrees underneath the dirfrag
+       if (!dir->is_subtree_root()) {
+         dir->state_set(CDir::STATE_AUXSUBTREE);
+         adjust_subtree_auth(dir, mds->get_nodeid());
+       }
+      }
+
+      for (CDir *dir : srcfrags) {
+       assert(dir->is_subtree_root());
        dout(10) << " taking srcfrag subtree bounds from " << *dir << dendl;
-       was_subtree = true;
        map<CDir*, set<CDir*> >::iterator q = subtrees.find(dir);
        set<CDir*>::iterator r = q->second.begin();
        while (r != subtrees[dir].end()) {
@@ -10685,7 +10730,7 @@ void MDCache::adjust_dir_fragments(CInode *diri,
          subtrees[dir].erase(r++);
        }
        subtrees.erase(q);
-       
+
        // remove myself as my parent's bound
        if (parent_subtree)
          subtrees[parent_subtree].erase(dir);
@@ -10695,9 +10740,8 @@ void MDCache::adjust_dir_fragments(CInode *diri,
     // merge
     CDir *f = new CDir(diri, basefrag, this, srcfrags.front()->is_auth());
     f->merge(srcfrags, waiters, replay);
-    diri->add_dirfrag(f);
 
-    if (was_subtree) {
+    if (any_subtree) {
       assert(f->is_subtree_root());
       subtrees[f].swap(new_bounds);
       if (parent_subtree)
@@ -10785,6 +10829,11 @@ void MDCache::split_dir(CDir *dir, int bits)
     return;
   }
 
+  if (dir->frag.bits() + bits > 24) {
+    dout(7) << __func__ << " frag bits > 24, dropping" << dendl;
+    return;
+  }
+
   MDRequestRef mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR);
   mdr->more()->fragment_base = dir->dirfrag();
 
@@ -11780,26 +11829,26 @@ void MDCache::show_cache()
   }
 }
 
-void MDCache::dump_cache(std::string const &file_name)
+int MDCache::dump_cache(std::string const &file_name)
 {
-  dump_cache(file_name.c_str(), NULL);
+  return dump_cache(file_name.c_str(), NULL);
 }
 
-void MDCache::dump_cache(Formatter *f)
+int MDCache::dump_cache(Formatter *f)
 {
-  dump_cache(NULL, f);
+  return dump_cache(NULL, f);
 }
 
-void MDCache::dump_cache(const string& dump_root, int depth, Formatter *f)
+int MDCache::dump_cache(const string& dump_root, int depth, Formatter *f)
 {
-  dump_cache(NULL, f, dump_root, depth);
+  return dump_cache(NULL, f, dump_root, depth);
 }
 
 /**
  * Dump the metadata cache, either to a Formatter, if
  * provided, else to a plain text file.
  */
-void MDCache::dump_cache(const char *fn, Formatter *f,
+int MDCache::dump_cache(const char *fn, Formatter *f,
                         const string& dump_root, int depth)
 {
   int r = 0;
@@ -11819,7 +11868,7 @@ void MDCache::dump_cache(const char *fn, Formatter *f,
     fd = ::open(fn, O_WRONLY|O_CREAT|O_EXCL, 0600);
     if (fd < 0) {
       derr << "failed to open " << fn << ": " << cpp_strerror(errno) << dendl;
-      return;
+      return errno;
     }
   }
 
@@ -11921,6 +11970,7 @@ void MDCache::dump_cache(const char *fn, Formatter *f,
   } else {
     ::close(fd);
   }
+  return r;
 }
 
 
@@ -12349,3 +12399,19 @@ void MDCache::maybe_eval_stray(CInode *in, bool delay) {
   }
 }
 
+void MDCache::clear_dirty_bits_for_stray(CInode* diri) {
+  dout(10) << __func__ << " " << *diri << dendl;
+  assert(diri->get_projected_parent_dir()->inode->is_stray());
+  list<CDir*> ls;
+  diri->get_dirfrags(ls);
+  for (auto p : ls) {
+    if (p->is_auth() && !(p->is_frozen() || p->is_freezing()))
+      p->try_remove_dentries_for_stray();
+  }
+  if (!diri->snaprealm) {
+    if (diri->is_auth())
+      diri->clear_dirty_rstat();
+    diri->clear_scatter_dirty();
+  }
+}
+