]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/CDir.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mds / CDir.cc
index c190cca175b479f89f74d8dc55c118adff491351..f2396b31ba8b652db30d5444ce4295dc1bf833b0 100644 (file)
@@ -12,6 +12,7 @@
  * 
  */
 
+#include <boost/utility/string_view.hpp>
 
 #include "include/types.h"
 
@@ -183,7 +184,9 @@ CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
   cache(mdcache), inode(in), frag(fg),
   first(2),
   dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
-  projected_version(0),  item_dirty(this), item_new(this),
+  projected_version(0),
+  dirty_dentries(member_offset(CDentry, item_dir_dirty)),
+  item_dirty(this), item_new(this),
   num_head_items(0), num_head_null(0),
   num_snap_items(0), num_snap_null(0),
   num_dirty(0), committing_version(0), committed_version(0),
@@ -193,18 +196,16 @@ CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
   pop_nested(ceph_clock_now()),
   pop_auth_subtree(ceph_clock_now()),
   pop_auth_subtree_nested(ceph_clock_now()),
+  pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
   num_dentries_nested(0), num_dentries_auth_subtree(0),
   num_dentries_auth_subtree_nested(0),
   dir_auth(CDIR_AUTH_DEFAULT)
 {
-  state = STATE_INITIAL;
-
   memset(&fnode, 0, sizeof(fnode));
 
   // auth
   assert(in->is_dir());
-  if (auth) 
-    state |= STATE_AUTH;
+  if (auth) state_set(STATE_AUTH);
 }
 
 /**
@@ -226,7 +227,7 @@ bool CDir::check_rstats(bool scrub)
 
   frag_info_t frag_info;
   nest_info_t nest_info;
-  for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
+  for (auto i = items.begin(); i != items.end(); ++i) {
     if (i->second->last != CEPH_NOSNAP)
       continue;
     CDentry::linkage_t *dnl = i->second->get_linkage();
@@ -268,7 +269,7 @@ bool CDir::check_rstats(bool scrub)
 
   if (!good) {
     if (!scrub) {
-      for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
+      for (auto i = items.begin(); i != items.end(); ++i) {
        CDentry *dn = i->second;
        if (dn->get_linkage()->is_primary()) {
          CInode *in = dn->get_linkage()->inode;
@@ -289,14 +290,13 @@ bool CDir::check_rstats(bool scrub)
   return good;
 }
 
-CDentry *CDir::lookup(const string& name, snapid_t snap)
+CDentry *CDir::lookup(boost::string_view name, snapid_t snap)
 { 
   dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
-  map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
-                                                       inode->hash_dentry_name(name)));
+  auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
   if (iter == items.end())
     return 0;
-  if (iter->second->name == name &&
+  if (iter->second->get_name() == name &&
       iter->second->first <= snap &&
       iter->second->last >= snap) {
     dout(20) << "  hit -> " << iter->first << dendl;
@@ -306,9 +306,8 @@ CDentry *CDir::lookup(const string& name, snapid_t snap)
   return 0;
 }
 
-CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
-  map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
-                                             inode->hash_dentry_name(name)));
+CDentry *CDir::lookup_exact_snap(boost::string_view name, snapid_t last) {
+  auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
   if (p == items.end())
     return NULL;
   return p->second;
@@ -318,7 +317,7 @@ CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
  * linking fun
  */
 
-CDentry* CDir::add_null_dentry(const string& dname,
+CDentry* CDir::add_null_dentry(boost::string_view dname,
                               snapid_t first, snapid_t last)
 {
   // foreign
@@ -337,7 +336,7 @@ CDentry* CDir::add_null_dentry(const string& dname,
   
   // add to dir
   assert(items.count(dn->key()) == 0);
-  //assert(null_items.count(dn->name) == 0);
+  //assert(null_items.count(dn->get_name()) == 0);
 
   items[dn->key()] = dn;
   if (last == CEPH_NOSNAP)
@@ -361,7 +360,7 @@ CDentry* CDir::add_null_dentry(const string& dname,
 }
 
 
-CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
+CDentry* CDir::add_primary_dentry(boost::string_view dname, CInode *in,
                                  snapid_t first, snapid_t last) 
 {
   // primary
@@ -383,12 +382,11 @@ CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
   
   // add to dir
   assert(items.count(dn->key()) == 0);
-  //assert(null_items.count(dn->name) == 0);
+  //assert(null_items.count(dn->get_name()) == 0);
 
   items[dn->key()] = dn;
 
   dn->get_linkage()->inode = in;
-  in->set_primary_parent(dn);
 
   link_inode_work(dn, in);
 
@@ -411,7 +409,7 @@ CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
   return dn;
 }
 
-CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
+CDentry* CDir::add_remote_dentry(boost::string_view dname, inodeno_t ino, unsigned char d_type,
                                 snapid_t first, snapid_t last) 
 {
   // foreign
@@ -428,7 +426,7 @@ CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned ch
   
   // add to dir
   assert(items.count(dn->key()) == 0);
-  //assert(null_items.count(dn->name) == 0);
+  //assert(null_items.count(dn->get_name()) == 0);
 
   items[dn->key()] = dn;
   if (last == CEPH_NOSNAP)
@@ -535,7 +533,6 @@ void CDir::link_primary_inode(CDentry *dn, CInode *in)
   assert(dn->get_linkage()->is_null());
 
   dn->get_linkage()->inode = in;
-  in->set_primary_parent(dn);
 
   link_inode_work(dn, in);
 
@@ -560,7 +557,7 @@ void CDir::link_primary_inode(CDentry *dn, CInode *in)
 void CDir::link_inode_work( CDentry *dn, CInode *in)
 {
   assert(dn->get_linkage()->get_inode() == in);
-  assert(in->get_parent_dn() == dn);
+  in->set_primary_parent(dn);
 
   // set inode version
   //in->inode.version = dn->get_version();
@@ -648,9 +645,11 @@ void CDir::unlink_inode_work( CDentry *dn )
     // unlink auth_pin count
     if (in->auth_pins + in->nested_auth_pins)
       dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
-    
+
     // detach inode
     in->remove_primary_parent(dn);
+    if (in->is_dir())
+      in->item_pop_lru.remove_myself();
     dn->get_linkage()->inode = 0;
   } else {
     assert(!dn->get_linkage()->is_null());
@@ -676,20 +675,20 @@ void CDir::add_to_bloom(CDentry *dn)
     bloom.reset(new bloom_filter(size, 1.0 / size, 0));
   }
   /* This size and false positive probability is completely random.*/
-  bloom->insert(dn->name.c_str(), dn->name.size());
+  bloom->insert(dn->get_name().data(), dn->get_name().size());
 }
 
-bool CDir::is_in_bloom(const string& name)
+bool CDir::is_in_bloom(boost::string_view name)
 {
   if (!bloom)
     return false;
-  return bloom->contains(name.c_str(), name.size());
+  return bloom->contains(name.data(), name.size());
 }
 
 void CDir::remove_null_dentries() {
   dout(12) << "remove_null_dentries " << *this << dendl;
 
-  CDir::map_t::iterator p = items.begin();
+  auto p = items.begin();
   while (p != items.end()) {
     CDentry *dn = p->second;
     ++p;
@@ -717,7 +716,7 @@ void CDir::try_remove_dentries_for_stray()
   // clear dirty only when the directory was not snapshotted
   bool clear_dirty = !inode->snaprealm;
 
-  CDir::map_t::iterator p = items.begin();
+  auto p = items.begin();
   while (p != items.end()) {
     CDentry *dn = p->second;
     ++p;
@@ -784,7 +783,7 @@ void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
 {
   dout(10) << "purge_stale_snap_data " << snaps << dendl;
 
-  CDir::map_t::iterator p = items.begin();
+  auto p = items.begin();
   while (p != items.end()) {
     CDentry *dn = p->second;
     ++p;
@@ -824,11 +823,14 @@ void CDir::steal_dentry(CDentry *dn)
 
     if (dn->get_linkage()->is_primary()) {
       CInode *in = dn->get_linkage()->get_inode();
-      inode_t *pi = in->get_projected_inode();
-      if (dn->get_linkage()->get_inode()->is_dir())
+      auto pi = in->get_projected_inode();
+      if (in->is_dir()) {
        fnode.fragstat.nsubdirs++;
-      else
+       if (in->item_pop_lru.is_on_list())
+         pop_lru_subdirs.push_back(&in->item_pop_lru);
+      } else {
        fnode.fragstat.nfiles++;
+      }
       fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
       fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
       fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
@@ -863,8 +865,10 @@ void CDir::steal_dentry(CDentry *dn)
     dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
   }
 
-  if (dn->is_dirty()) 
+  if (dn->is_dirty()) {
+    dirty_dentries.push_back(&dn->item_dir_dirty);
     num_dirty++;
+  }
 
   dn->dir = this;
 }
@@ -877,8 +881,12 @@ void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextB
     auth_pin(this);
 
   if (!waiting_on_dentry.empty()) {
-    for (auto p = waiting_on_dentry.begin(); p != waiting_on_dentry.end(); ++p)
-      dentry_waiters[p->first].swap(p->second);
+    for (const auto &p : waiting_on_dentry) {
+      auto &e = dentry_waiters[p.first];
+      for (const auto &waiter : p.second) {
+        e.push_back(waiter);
+      }
+    }
     waiting_on_dentry.clear();
     put(PIN_DNWAITER);
   }
@@ -932,7 +940,7 @@ void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool repl
 
 void CDir::init_fragment_pins()
 {
-  if (!replica_map.empty())
+  if (is_replicated())
     get(PIN_REPLICATED);
   if (state_test(STATE_DIRTY))
     get(PIN_DIRTY);
@@ -976,7 +984,7 @@ void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& wai
   for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
     CDir *f = new CDir(inode, *p, cache, is_auth());
     f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
-    f->replica_map = replica_map;
+    f->get_replicas() = get_replicas();
     f->dir_auth = dir_auth;
     f->init_fragment_pins();
     f->set_version(get_version());
@@ -1002,24 +1010,27 @@ void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& wai
   
   // repartition dentries
   while (!items.empty()) {
-    CDir::map_t::iterator p = items.begin();
+    auto p = items.begin();
     
     CDentry *dn = p->second;
-    frag_t subfrag = inode->pick_dirfrag(dn->name);
+    frag_t subfrag = inode->pick_dirfrag(dn->get_name());
     int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
     dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
     CDir *f = subfrags[n];
     f->steal_dentry(dn);
   }
 
-  for (auto& p : dentry_waiters) {
+  for (const auto &p : dentry_waiters) {
     frag_t subfrag = inode->pick_dirfrag(p.first.name);
     int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
     CDir *f = subfrags[n];
 
     if (f->waiting_on_dentry.empty())
       f->get(PIN_DNWAITER);
-    f->waiting_on_dentry[p.first].swap(p.second);
+    auto &e = f->waiting_on_dentry[p.first];
+    for (const auto &waiter : p.second) {
+      e.push_back(waiter);
+    }
   }
 
   // FIXME: handle dirty old rstat
@@ -1085,12 +1096,10 @@ void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool
       steal_dentry(dir->items.begin()->second);
     
     // merge replica map
-    for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
-        p != dir->replicas_end();
-        ++p) {
-      unsigned cur = replica_map[p->first];
-      if (p->second > cur)
-       replica_map[p->first] = p->second;
+    for (const auto &p : dir->get_replicas()) {
+      unsigned cur = get_replicas()[p.first];
+      if (p.second > cur)
+       get_replicas()[p.first] = p.second;
     }
 
     // merge version
@@ -1106,8 +1115,11 @@ void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool
 
   if (!dentry_waiters.empty()) {
     get(PIN_DNWAITER);
-    for (auto& p : dentry_waiters) {
-      waiting_on_dentry[p.first].swap(p.second);
+    for (const auto &p : dentry_waiters) {
+      auto &e = waiting_on_dentry[p.first];
+      for (const auto &waiter : p.second) {
+        e.push_back(waiter);
+      }
     }
   }
 
@@ -1132,7 +1144,7 @@ void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool
 void CDir::resync_accounted_fragstat()
 {
   fnode_t *pf = get_projected_fnode();
-  inode_t *pi = inode->get_projected_inode();
+  auto pi = inode->get_projected_inode();
 
   if (pf->accounted_fragstat.version != pi->dirstat.version) {
     pf->fragstat.version = pi->dirstat.version;
@@ -1147,7 +1159,7 @@ void CDir::resync_accounted_fragstat()
 void CDir::resync_accounted_rstat()
 {
   fnode_t *pf = get_projected_fnode();
-  inode_t *pi = inode->get_projected_inode();
+  auto pi = inode->get_projected_inode();
   
   if (pf->accounted_rstat.version != pi->rstat.version) {
     pf->rstat.version = pi->rstat.version;
@@ -1167,8 +1179,8 @@ void CDir::assimilate_dirty_rstat_inodes()
     if (in->is_frozen())
       continue;
 
-    inode_t *pi = in->project_inode();
-    pi->version = in->pre_dirty();
+    auto &pi = in->project_inode();
+    pi.inode.version = in->pre_dirty();
 
     inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
   }
@@ -1210,7 +1222,7 @@ void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blo
  * WAITING
  */
 
-void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c) 
+void CDir::add_dentry_waiter(boost::string_view dname, snapid_t snapid, MDSInternalContextBase *c) 
 {
   if (waiting_on_dentry.empty())
     get(PIN_DNWAITER);
@@ -1220,7 +1232,7 @@ void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalCo
           << " " << c << " on " << *this << dendl;
 }
 
-void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
+void CDir::take_dentry_waiting(boost::string_view dname, snapid_t first, snapid_t last,
                               list<MDSInternalContextBase*>& ls)
 {
   if (waiting_on_dentry.empty())
@@ -1228,15 +1240,17 @@ void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t las
   
   string_snap_t lb(dname, first);
   string_snap_t ub(dname, last);
-  compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
-  while (p != waiting_on_dentry.end() &&
-        !(ub < p->first)) {
+  auto it = waiting_on_dentry.lower_bound(lb);
+  while (it != waiting_on_dentry.end() &&
+        !(ub < it->first)) {
     dout(10) << "take_dentry_waiting dentry " << dname
             << " [" << first << "," << last << "] found waiter on snap "
-            << p->first.snapid
+            << it->first.snapid
             << " on " << *this << dendl;
-    ls.splice(ls.end(), p->second);
-    waiting_on_dentry.erase(p++);
+    for (const auto &waiter : it->second) {
+      ls.push_back(waiter);
+    }
+    waiting_on_dentry.erase(it++);
   }
 
   if (waiting_on_dentry.empty())
@@ -1247,10 +1261,11 @@ void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
 {
   dout(10) << "take_sub_waiting" << dendl;
   if (!waiting_on_dentry.empty()) {
-    for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
-        p != waiting_on_dentry.end();
-        ++p) 
-      ls.splice(ls.end(), p->second);
+    for (const auto &p : waiting_on_dentry) {
+      for (const auto &waiter : p.second) {
+        ls.push_back(waiter);
+      }
+    }
     waiting_on_dentry.clear();
     put(PIN_DNWAITER);
   }
@@ -1295,13 +1310,14 @@ void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
 {
   if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
     // take all dentry waiters
-    while (!waiting_on_dentry.empty()) {
-      compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
-      dout(10) << "take_waiting dentry " << p->first.name
-              << " snap " << p->first.snapid << " on " << *this << dendl;
-      ls.splice(ls.end(), p->second);
-      waiting_on_dentry.erase(p);
+    for (const auto &p : waiting_on_dentry) {
+      dout(10) << "take_waiting dentry " << p.first.name
+              << " snap " << p.first.snapid << " on " << *this << dendl;
+      for (const auto &waiter : p.second) {
+        ls.push_back(waiter);
+      }
     }
+    waiting_on_dentry.clear();
     put(PIN_DNWAITER);
   }
   
@@ -1329,31 +1345,29 @@ void CDir::finish_waiting(uint64_t mask, int result)
 fnode_t *CDir::project_fnode()
 {
   assert(get_version() != 0);
-  fnode_t *p = new fnode_t;
-  *p = *get_projected_fnode();
-  projected_fnode.push_back(p);
+  projected_fnode.emplace_back(*get_projected_fnode());
+  auto &p = projected_fnode.back();
 
   if (scrub_infop && scrub_infop->last_scrub_dirty) {
-    p->localized_scrub_stamp = scrub_infop->last_local.time;
-    p->localized_scrub_version = scrub_infop->last_local.version;
-    p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
-    p->recursive_scrub_version = scrub_infop->last_recursive.version;
+    p.localized_scrub_stamp = scrub_infop->last_local.time;
+    p.localized_scrub_version = scrub_infop->last_local.version;
+    p.recursive_scrub_stamp = scrub_infop->last_recursive.time;
+    p.recursive_scrub_version = scrub_infop->last_recursive.version;
     scrub_infop->last_scrub_dirty = false;
     scrub_maybe_delete_info();
   }
 
-  dout(10) << "project_fnode " << p << dendl;
-  return p;
+  dout(10) << __func__ <<  " " << &p << dendl;
+  return &p;
 }
 
 void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
 {
   assert(!projected_fnode.empty());
-  dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
-          << " v" << projected_fnode.front()->version << dendl;
-  fnode = *projected_fnode.front();
+  auto &front = projected_fnode.front();
+  dout(15) << __func__ << " " << &front << " v" << front.version << dendl;
+  fnode = front;
   _mark_dirty(ls);
-  delete projected_fnode.front();
   projected_fnode.pop_front();
 }
 
@@ -1418,7 +1432,7 @@ void CDir::mark_clean()
 // caller should hold auth pin of this
 void CDir::log_mark_dirty()
 {
-  if (is_dirty() || is_projected())
+  if (is_dirty() || projected_version > get_version())
     return; // noop if it is already dirty or will be dirty
 
   version_t pv = pre_dirty();
@@ -1454,7 +1468,7 @@ void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
   return fetch(c, want, ignore_authpinnability);
 }
 
-void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
+void CDir::fetch(MDSInternalContextBase *c, boost::string_view want_dn, bool ignore_authpinnability)
 {
   dout(10) << "fetch on " << *this << dendl;
   
@@ -1492,7 +1506,7 @@ void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_a
   }
 
   if (c) add_waiter(WAIT_COMPLETE, c);
-  if (!want_dn.empty()) wanted_items.insert(want_dn);
+  if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
   
   // already fetching?
   if (state_test(CDir::STATE_FETCHING)) {
@@ -1601,9 +1615,9 @@ void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>&
   } else {
     assert(c);
     std::set<std::string> str_keys;
-    for (auto p = keys.begin(); p != keys.end(); ++p) {
+    for (auto p : keys) {
       string str;
-      p->encode(str);
+      p.encode(str);
       str_keys.insert(str);
     }
     rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
@@ -1643,14 +1657,13 @@ void CDir::_omap_fetch_more(
 }
 
 CDentry *CDir::_load_dentry(
-    const std::string &key,
-    const std::string &dname,
+    boost::string_view key,
+    boost::string_view dname,
     const snapid_t last,
     bufferlist &bl,
     const int pos,
     const std::set<snapid_t> *snaps,
-    bool *force_dirty,
-    list<CInode*> *undef_inodes)
+    bool *force_dirty)
 {
   bufferlist::iterator q = bl.begin();
 
@@ -1695,17 +1708,23 @@ CDentry *CDir::_load_dentry(
 
     if (stale) {
       if (!dn) {
-        stale_items.insert(key);
+        stale_items.insert(mempool::mds_co::string(key));
         *force_dirty = true;
       }
       return dn;
     }
 
     if (dn) {
-      if (dn->get_linkage()->get_inode() == 0) {
-        dout(12) << "_fetched  had NEG dentry " << *dn << dendl;
-      } else {
-        dout(12) << "_fetched  had dentry " << *dn << dendl;
+      CDentry::linkage_t *dnl = dn->get_linkage();
+      dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
+      if (committed_version == 0 &&
+         dnl->is_remote() &&
+         dn->is_dirty() &&
+         ino == dnl->get_remote_ino() &&
+         d_type == dnl->get_remote_d_type()) {
+       // see comment below
+       dout(10) << "_fetched  had underwater dentry " << *dn << ", marking clean" << dendl;
+       dn->mark_clean();
       }
     } else {
       // (remote) link
@@ -1730,7 +1749,7 @@ CDentry *CDir::_load_dentry(
     
     if (stale) {
       if (!dn) {
-        stale_items.insert(key);
+        stale_items.insert(mempool::mds_co::string(key));
         *force_dirty = true;
       }
       return dn;
@@ -1738,15 +1757,35 @@ CDentry *CDir::_load_dentry(
 
     bool undef_inode = false;
     if (dn) {
-      CInode *in = dn->get_linkage()->get_inode();
-      if (in) {
-        dout(12) << "_fetched  had dentry " << *dn << dendl;
-        if (in->state_test(CInode::STATE_REJOINUNDEF)) {
-          undef_inodes->push_back(in);
-          undef_inode = true;
-        }
-      } else
-        dout(12) << "_fetched  had NEG dentry " << *dn << dendl;
+      CDentry::linkage_t *dnl = dn->get_linkage();
+      dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
+
+      if (dnl->is_primary()) {
+       CInode *in = dnl->get_inode();
+       if (in->state_test(CInode::STATE_REJOINUNDEF)) {
+         undef_inode = true;
+       } else if (committed_version == 0 &&
+                  dn->is_dirty() &&
+                  inode_data.inode.ino == in->ino() &&
+                  inode_data.inode.version == in->get_version()) {
+         /* clean underwater item?
+          * Underwater item is something that is dirty in our cache from
+          * journal replay, but was previously flushed to disk before the
+          * mds failed.
+          *
+          * We only do this is committed_version == 0. that implies either
+          * - this is a fetch after from a clean/empty CDir is created
+          *   (and has no effect, since the dn won't exist); or
+          * - this is a fetch after _recovery_, which is what we're worried
+          *   about.  Items that are marked dirty from the journal should be
+          *   marked clean if they appear on disk.
+          */
+         dout(10) << "_fetched  had underwater dentry " << *dn << ", marking clean" << dendl;
+         dn->mark_clean();
+         dout(10) << "_fetched  had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
+         in->mark_clean();
+       }
+      }
     }
 
     if (!dn || undef_inode) {
@@ -1789,6 +1828,9 @@ CDentry *CDir::_load_dentry(
         //in->hack_accessed = false;
         //in->hack_load_stamp = ceph_clock_now();
         //num_new_inodes_loaded++;
+      } else if (g_conf->get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
+       dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
+       dn = add_primary_dentry(dname, in, first, last);
       } else {
         dout(0) << "_fetched  badness: got (but i already had) " << *in
                 << " mode " << in->inode.mode
@@ -1905,7 +1947,7 @@ void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
     try {
       dn = _load_dentry(
             p->first, dname, last, p->second, pos, snaps,
-            &force_dirty, &undef_inodes);
+            &force_dirty);
     } catch (const buffer::error &err) {
       cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
                                   "dir frag " << dirfrag() << ": "
@@ -1924,35 +1966,16 @@ void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
       continue;
     }
 
-    if (dn && (wanted_items.count(dname) > 0 || !complete)) {
-      dout(10) << " touching wanted dn " << *dn << dendl;
-      inode->mdcache->touch_dentry(dn);
-    }
+    if (!dn)
+      continue;
 
-    /** clean underwater item?
-     * Underwater item is something that is dirty in our cache from
-     * journal replay, but was previously flushed to disk before the
-     * mds failed.
-     *
-     * We only do this is committed_version == 0. that implies either
-     * - this is a fetch after from a clean/empty CDir is created
-     *   (and has no effect, since the dn won't exist); or
-     * - this is a fetch after _recovery_, which is what we're worried 
-     *   about.  Items that are marked dirty from the journal should be
-     *   marked clean if they appear on disk.
-     */
-    if (committed_version == 0 &&     
-       dn &&
-       dn->get_version() <= got_fnode.version &&
-       dn->is_dirty()) {
-      dout(10) << "_fetched  had underwater dentry " << *dn << ", marking clean" << dendl;
-      dn->mark_clean();
+    CDentry::linkage_t *dnl = dn->get_linkage();
+    if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
+      undef_inodes.push_back(dnl->get_inode());
 
-      if (dn->get_linkage()->is_primary()) {
-       assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
-       dout(10) << "_fetched  had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
-       dn->get_linkage()->get_inode()->mark_clean();
-      }
+    if (!complete || wanted_items.count(mempool::mds_co::string(boost::string_view(dname))) > 0) {
+      dout(10) << " touching wanted dn " << *dn << dendl;
+      inode->mdcache->touch_dentry(dn);
     }
   }
 
@@ -2004,11 +2027,14 @@ void CDir::_go_bad()
   finish_waiting(WAIT_COMPLETE, -EIO);
 }
 
-void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
+void CDir::go_bad_dentry(snapid_t last, boost::string_view dname)
 {
-  dout(10) << "go_bad_dentry " << dname << dendl;
+  dout(10) << __func__ << " " << dname << dendl;
+  std::string path(get_path());
+  path += "/";
+  path += std::string(dname);
   const bool fatal = cache->mds->damage_table.notify_dentry(
-      inode->ino(), frag, last, dname, get_path() + "/" + dname);
+      inode->ino(), frag, last, dname, path);
   if (fatal) {
     cache->mds->damaged();
     ceph_abort();  // unreachable, damaged() respawns us
@@ -2114,20 +2140,14 @@ void CDir::_omap_commit(int op_prio)
   object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
 
   if (!stale_items.empty()) {
-    for (compact_set<string>::iterator p = stale_items.begin();
-        p != stale_items.end();
-        ++p) {
-      to_remove.insert(*p);
-      write_size += (*p).length();
+    for (const auto &p : stale_items) {
+      to_remove.insert(std::string(boost::string_view(p)));
+      write_size += p.length();
     }
     stale_items.clear();
   }
 
-  for (map_t::iterator p = items.begin();
-      p != items.end(); ) {
-    CDentry *dn = p->second;
-    ++p;
-
+  auto write_one = [&](CDentry *dn) {
     string key;
     dn->key().encode(key);
 
@@ -2136,19 +2156,15 @@ void CDir::_omap_commit(int op_prio)
       dout(10) << " rm " << key << dendl;
       write_size += key.length();
       to_remove.insert(key);
-      continue;
+      return;
     }
 
-    if (!dn->is_dirty() &&
-       (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
-      continue;  // skip clean dentries
-
     if (dn->get_linkage()->is_null()) {
-      dout(10) << " rm " << dn->name << " " << *dn << dendl;
+      dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
       write_size += key.length();
       to_remove.insert(key);
     } else {
-      dout(10) << " set " << dn->name << " " << *dn << dendl;
+      dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
       bufferlist dnbl;
       _encode_dentry(dn, dnbl, snaps);
       write_size += key.length() + dnbl.length();
@@ -2176,6 +2192,22 @@ void CDir::_omap_commit(int op_prio)
       to_set.clear();
       to_remove.clear();
     }
+  };
+
+  if (state_test(CDir::STATE_FRAGMENTING)) {
+    for (auto p = items.begin(); p != items.end(); ) {
+      CDentry *dn = p->second;
+      ++p;
+      if (!dn->is_dirty() && dn->get_linkage()->is_null())
+       continue;
+      write_one(dn);
+    }
+  } else {
+    for (auto p = dirty_dentries.begin(); !p.end(); ) {
+      CDentry *dn = *p;
+      ++p;
+      write_one(dn);
+    }
   }
 
   ObjectOperation op;
@@ -2221,7 +2253,7 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
   if (dn->linkage.is_remote()) {
     inodeno_t ino = dn->linkage.get_remote_ino();
     unsigned char d_type = dn->linkage.get_remote_d_type();
-    dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
+    dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
     
     // marker, name, ino
     bl.append('L');         // remote link
@@ -2232,7 +2264,7 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
     CInode *in = dn->linkage.get_inode();
     assert(in);
     
-    dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
+    dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
     
     // marker, name, inode, [symlink string]
     bl.append('I');         // inode
@@ -2344,10 +2376,9 @@ void CDir::_committed(int r, version_t v)
     mark_clean();
 
   // dentries clean?
-  for (map_t::iterator it = items.begin();
-       it != items.end(); ) {
-    CDentry *dn = it->second;
-    ++it;
+  for (auto p = dirty_dentries.begin(); !p.end(); ) {
+    CDentry *dn = *p;
+    ++p;
     
     // inode?
     if (dn->linkage.is_primary()) {
@@ -2368,37 +2399,39 @@ void CDir::_committed(int r, version_t v)
 
     // dentry
     if (committed_version >= dn->get_version()) {
-      if (dn->is_dirty()) {
-       dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
-       dn->mark_clean();
+      dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
+      dn->mark_clean();
 
-       // drop clean null stray dentries immediately
-       if (stray && 
-           dn->get_num_ref() == 0 &&
-           !dn->is_projected() &&
-           dn->get_linkage()->is_null())
-         remove_dentry(dn);
-      } 
+      // drop clean null stray dentries immediately
+      if (stray &&
+         dn->get_num_ref() == 0 &&
+         !dn->is_projected() &&
+         dn->get_linkage()->is_null())
+       remove_dentry(dn);
     } else {
       dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
+      assert(dn->is_dirty());
     }
   }
 
   // finishers?
   bool were_waiters = !waiting_for_commit.empty();
   
-  compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
-  while (p != waiting_for_commit.end()) {
-    compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
-    ++n;
-    if (p->first > committed_version) {
-      dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
-      _commit(p->first, -1);
+  auto it = waiting_for_commit.begin();
+  while (it != waiting_for_commit.end()) {
+    auto _it = it;
+    ++_it;
+    if (it->first > committed_version) {
+      dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
+      _commit(it->first, -1);
       break;
     }
-    cache->mds->queue_waiters(p->second);
-    waiting_for_commit.erase(p);
-    p = n;
+    std::list<MDSInternalContextBase*> t;
+    for (const auto &waiter : it->second)
+      t.push_back(waiter);
+    cache->mds->queue_waiters(t);
+    waiting_for_commit.erase(it);
+    it = _it;
   } 
 
   // try drop dentries in this dirfrag if it's about to be purged
@@ -2432,7 +2465,7 @@ void CDir::encode_export(bufferlist& bl)
   ::encode(pop_auth_subtree, bl);
 
   ::encode(dir_rep_by, bl);  
-  ::encode(replica_map, bl);
+  ::encode(get_replicas(), bl);
 
   get(PIN_TEMPEXPORTING);
 }
@@ -2473,8 +2506,8 @@ void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
   pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
 
   ::decode(dir_rep_by, blp);
-  ::decode(replica_map, blp);
-  if (!replica_map.empty()) get(PIN_REPLICATED);
+  ::decode(get_replicas(), blp);
+  if (is_replicated()) get(PIN_REPLICATED);
 
   replica_nonce = 0;  // no longer defined
 
@@ -2710,7 +2743,7 @@ void CDir::verify_fragstat()
   frag_info_t c;
   memset(&c, 0, sizeof(c));
 
-  for (map_t::iterator it = items.begin();
+  for (auto it = items.begin();
        it != items.end();
        ++it) {
     CDentry *dn = it->second;
@@ -3034,6 +3067,28 @@ void CDir::dump(Formatter *f) const
   MDSCacheObject::dump(f);
 }
 
+void CDir::dump_load(Formatter *f, utime_t now, const DecayRate& rate)
+{
+  f->dump_stream("path") << get_path();
+  f->dump_stream("dirfrag") << dirfrag();
+
+  f->open_object_section("pop_me");
+  pop_me.dump(f, now, rate);
+  f->close_section();
+
+  f->open_object_section("pop_nested");
+  pop_nested.dump(f, now, rate);
+  f->close_section();
+
+  f->open_object_section("pop_auth_subtree");
+  pop_auth_subtree.dump(f, now, rate);
+  f->close_section();
+
+  f->open_object_section("pop_auth_subtree_nested");
+  pop_auth_subtree_nested.dump(f, now, rate);
+  f->close_section();
+}
+
 /****** Scrub Stuff *******/
 
 void CDir::scrub_info_create() const
@@ -3078,7 +3133,7 @@ void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
   scrub_infop->others_scrubbing.clear();
   scrub_infop->others_scrubbed.clear();
 
-  for (map_t::iterator i = items.begin();
+  for (auto i = items.begin();
       i != items.end();
       ++i) {
     // TODO: handle snapshot scrubbing
@@ -3116,7 +3171,7 @@ void CDir::scrub_finished()
   scrub_infop->last_scrub_dirty = true;
 }
 
-int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
+int CDir::_next_dentry_on_set(dentry_key_set &dns, bool missing_okay,
                               MDSInternalContext *cb, CDentry **dnout)
 {
   dentry_key_t dnkey;
@@ -3155,6 +3210,12 @@ int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
       continue;
     }
 
+    if (!dn->get_linkage()->is_primary()) {
+      dout(15) << " skip dentry " << dnkey.name
+              << ", no longer primary" << dendl;
+      continue;
+    }
+
     *dnout = dn;
     return 0;
   }
@@ -3295,3 +3356,4 @@ bool CDir::should_split_fast() const
   return effective_size > fast_limit;
 }
 
+MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);