]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mds / Server.cc
index 500fd3ec629c2a91d996e9d3bd142c58f1deae38..6e9c61191d3f45d6bcc56243dcf25c1d4f38d3a7 100644 (file)
@@ -61,6 +61,7 @@
 
 #include <list>
 #include <iostream>
+#include <boost/utility/string_view.hpp>
 using namespace std;
 
 #include "common/config.h"
@@ -70,7 +71,6 @@ using namespace std;
 #undef dout_prefix
 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".server "
 
-
 class ServerContext : public MDSInternalContextBase {
   protected:
   Server *server;
@@ -111,11 +111,11 @@ void Server::create_logger()
 {
   PerfCountersBuilder plb(g_ceph_context, "mds_server", l_mdss_first, l_mdss_last);
   plb.add_u64_counter(l_mdss_handle_client_request,"handle_client_request",
-      "Client requests", "hcr");
+      "Client requests", "hcr", PerfCountersBuilder::PRIO_INTERESTING);
   plb.add_u64_counter(l_mdss_handle_slave_request, "handle_slave_request",
-      "Slave requests", "hsr");
+      "Slave requests", "hsr", PerfCountersBuilder::PRIO_INTERESTING);
   plb.add_u64_counter(l_mdss_handle_client_session, "handle_client_session",
-      "Client session messages", "hcs");
+      "Client session messages", "hcs", PerfCountersBuilder::PRIO_INTERESTING);
   plb.add_u64_counter(l_mdss_dispatch_client_request, "dispatch_client_request", "Client requests dispatched");
   plb.add_u64_counter(l_mdss_dispatch_slave_request, "dispatch_server_request", "Server requests dispatched");
   plb.add_u64_counter(l_mdss_req_lookuphash, "req_lookuphash",
@@ -185,6 +185,7 @@ Server::Server(MDSRank *m) :
   is_full(false),
   reconnect_done(NULL),
   failed_reconnects(0),
+  reconnect_evicting(false),
   terminating_sessions(false)
 {
 }
@@ -200,12 +201,11 @@ void Server::dispatch(Message *m)
   }
 
   // active?
-  if (!mds->is_active() &&
-      !(mds->is_stopping() && m->get_source().is_mds())) {
-    if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
-       (mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT)) {
-      MClientRequest *req = static_cast<MClientRequest*>(m);
-      Session *session = get_session(req);
+  // handle_slave_request()/handle_client_session() will wait if necessary
+  if (m->get_type() == CEPH_MSG_CLIENT_REQUEST && !mds->is_active()) {
+    MClientRequest *req = static_cast<MClientRequest*>(m);
+    if (mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) {
+      Session *session = mds->get_session(req);
       if (!session || session->is_closed()) {
        dout(5) << "session is closed, dropping " << req->get_reqid() << dendl;
        req->put();
@@ -235,20 +235,11 @@ void Server::dispatch(Message *m)
     }
 
     bool wait_for_active = true;
-    if (m->get_type() == MSG_MDS_SLAVE_REQUEST) {
-      // handle_slave_request() will wait if necessary
+    if (mds->is_stopping()) {
       wait_for_active = false;
     } else if (mds->is_clientreplay()) {
-      // session open requests need to be handled during replay,
-      // close requests need to be delayed
-      if ((m->get_type() == CEPH_MSG_CLIENT_SESSION &&
-         (static_cast<MClientSession*>(m))->get_op() != CEPH_SESSION_REQUEST_CLOSE)) {
+      if (req->is_queued_for_replay()) {
        wait_for_active = false;
-      } else if (m->get_type() == CEPH_MSG_CLIENT_REQUEST) {
-       MClientRequest *req = static_cast<MClientRequest*>(m);
-       if (req->is_queued_for_replay()) {
-         wait_for_active = false;
-       }
       }
     }
     if (wait_for_active) {
@@ -301,24 +292,12 @@ public:
   }
 };
 
-Session *Server::get_session(Message *m)
-{
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  if (session) {
-    dout(20) << "get_session have " << session << " " << session->info.inst
-            << " state " << session->get_state_name() << dendl;
-    session->put();  // not carry ref
-  } else {
-    dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
-  }
-  return session;
-}
-
 /* This function DOES put the passed message before returning*/
 void Server::handle_client_session(MClientSession *m)
 {
   version_t pv;
-  Session *session = get_session(m);
+  bool blacklisted = false;
+  Session *session = mds->get_session(m);
 
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
   assert(m->get_source().is_client()); // should _not_ come from an mds!
@@ -329,6 +308,21 @@ void Server::handle_client_session(MClientSession *m)
     return;
   }
 
+  if (m->get_op() == CEPH_SESSION_REQUEST_RENEWCAPS) {
+    // always handle renewcaps (state >= MDSMap::STATE_RECONNECT)
+  } else if (m->get_op() == CEPH_SESSION_REQUEST_CLOSE) {
+    // close requests need to be handled when mds is active
+    if (mds->get_state() < MDSMap::STATE_ACTIVE) {
+      mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
+      return;
+    }
+  } else {
+    if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
+      mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+      return;
+    }
+  }
+
   if (logger)
     logger->inc(l_mdss_handle_client_session);
 
@@ -338,14 +332,36 @@ void Server::handle_client_session(MClientSession *m)
     if (session->is_opening() ||
        session->is_open() ||
        session->is_stale() ||
-       session->is_killing()) {
+       session->is_killing() ||
+       terminating_sessions) {
       dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+      // set client metadata for session opened by prepare_force_open_sessions
+      if (!m->client_meta.empty())
+       session->set_client_metadata(m->client_meta);
       m->put();
       return;
     }
     assert(session->is_closed() ||
           session->is_closing());
 
+    if (mds->is_stopping()) {
+      dout(10) << "mds is stopping, dropping open req" << dendl;
+      m->put();
+      return;
+    }
+
+    blacklisted = mds->objecter->with_osdmap(
+        [session](const OSDMap &osd_map) -> bool {
+          return osd_map.is_blacklisted(session->info.inst.addr);
+        });
+
+    if (blacklisted) {
+      dout(10) << "rejecting blacklisted client " << session->info.inst.addr << dendl;
+      mds->send_message_client(new MClientSession(CEPH_SESSION_REJECT), session);
+      m->put();
+      return;
+    }
+
     session->set_client_metadata(m->client_meta);
     dout(20) << __func__ << " CEPH_SESSION_REQUEST_OPEN "
       << session->info.client_metadata.size() << " metadata entries:" << dendl;
@@ -442,6 +458,11 @@ void Server::handle_client_session(MClientSession *m)
     finish_flush_session(session, m->get_seq());
     break;
 
+  case CEPH_SESSION_REQUEST_FLUSH_MDLOG:
+    if (mds->is_active())
+      mdlog->flush();
+    break;
+
   default:
     ceph_abort();
   }
@@ -544,7 +565,8 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
     } else if (session->is_killing()) {
       // destroy session, close connection
       if (session->connection != NULL) {
-        session->connection->mark_down();
+       session->connection->mark_down();
+       session->connection->set_priv(NULL);
       }
       mds->sessionmap.remove_session(session);
     } else {
@@ -565,32 +587,48 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
  *  - sessions learned from other MDSs during a cross-MDS rename
  */
 version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
-                                             map<client_t,uint64_t>& sseqmap)
+                                             map<client_t, pair<Session*,uint64_t> >& smap)
 {
   version_t pv = mds->sessionmap.get_projected();
 
   dout(10) << "prepare_force_open_sessions " << pv 
           << " on " << cm.size() << " clients"
           << dendl;
-  for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
 
+  mds->objecter->with_osdmap(
+      [this, &cm](const OSDMap &osd_map) {
+       for (auto p = cm.begin(); p != cm.end(); ) {
+         if (osd_map.is_blacklisted(p->second.addr)) {
+           dout(10) << " ignoring blacklisted client." << p->first
+                    << " (" <<  p->second.addr << ")" << dendl;
+           cm.erase(p++);
+         } else {
+           ++p;
+         }
+       }
+      });
+
+  for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
     Session *session = mds->sessionmap.get_or_add_session(p->second);
     pv = mds->sessionmap.mark_projected(session);
+    uint64_t sseq;
     if (session->is_closed() || 
        session->is_closing() ||
-       session->is_killing())
-      sseqmap[p->first] = mds->sessionmap.set_state(session, Session::STATE_OPENING);
-    else
+       session->is_killing()) {
+      sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
+    } else {
       assert(session->is_open() ||
             session->is_opening() ||
             session->is_stale());
+      sseq = 0;
+    }
+    smap[p->first] = make_pair(session, sseq);
     session->inc_importing();
   }
   return pv;
 }
 
-void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
-                                       map<client_t,uint64_t>& sseqmap,
+void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_t> >& smap,
                                        bool dec_import)
 {
   /*
@@ -598,19 +636,13 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
    * client trying to close a session and an MDS doing an import
    * trying to force open a session...  
    */
-  dout(10) << "finish_force_open_sessions on " << cm.size() << " clients,"
+  dout(10) << "finish_force_open_sessions on " << smap.size() << " clients,"
           << " initial v " << mds->sessionmap.get_version() << dendl;
-  
 
-  int sessions_inserted = 0;
-  for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
-    sessions_inserted++;
-
-    Session *session = mds->sessionmap.get_session(p->second.name);
-    assert(session);
-    
-    if (sseqmap.count(p->first)) {
-      uint64_t sseq = sseqmap[p->first];
+  for (auto &it : smap) {
+    Session *session = it.second.first;
+    uint64_t sseq = it.second.second;
+    if (sseq > 0) {
       if (session->get_state_seq() != sseq) {
        dout(10) << "force_open_sessions skipping changed " << session->info.inst << dendl;
       } else {
@@ -713,10 +745,16 @@ void Server::find_idle_sessions()
     return;
   }
 
-  while (1) {
-    Session *session = mds->sessionmap.get_oldest_session(Session::STATE_STALE);
-    if (!session)
-      break;
+  // Collect a list of sessions exceeding the autoclose threshold
+  std::vector<Session *> to_evict;
+  const auto sessions_p = mds->sessionmap.by_state.find(Session::STATE_STALE);
+  if (sessions_p == mds->sessionmap.by_state.end() || sessions_p->second->empty()) {
+    return;
+  }
+  const auto &stale_sessions = sessions_p->second;
+  assert(stale_sessions != nullptr);
+
+  for (const auto &session: *stale_sessions) {
     if (session->is_importing()) {
       dout(10) << "stopping at importing session " << session->info.inst << dendl;
       break;
@@ -727,13 +765,25 @@ void Server::find_idle_sessions()
               << session->last_cap_renew << ")" << dendl;
       break;
     }
-    
+
+    to_evict.push_back(session);
+  }
+
+  for (const auto &session: to_evict) {
     utime_t age = now;
     age -= session->last_cap_renew;
-    mds->clog->info() << "closing stale session " << session->info.inst
-       << " after " << age;
-    dout(10) << "autoclosing stale session " << session->info.inst << " last " << session->last_cap_renew << dendl;
-    kill_session(session, NULL);
+    mds->clog->warn() << "evicting unresponsive client " << *session
+                      << ", after " << age << " seconds";
+    dout(10) << "autoclosing stale session " << session->info.inst << " last "
+             << session->last_cap_renew << dendl;
+
+    if (g_conf->mds_session_blacklist_on_timeout) {
+      std::stringstream ss;
+      mds->evict_client(session->info.inst.name.num(), false, true,
+                        ss, nullptr);
+    } else {
+      kill_session(session, NULL);
+    }
   }
 }
 
@@ -743,6 +793,8 @@ void Server::find_idle_sessions()
  */
 void Server::kill_session(Session *session, Context *on_safe)
 {
+  assert(mds->mds_lock.is_locked_by_me());
+
   if ((session->is_opening() ||
        session->is_open() ||
        session->is_stale()) &&
@@ -761,6 +813,32 @@ void Server::kill_session(Session *session, Context *on_safe)
   }
 }
 
+size_t Server::apply_blacklist(const std::set<entity_addr_t> &blacklist)
+{
+  std::list<Session*> victims;
+  const auto sessions = mds->sessionmap.get_sessions();
+  for (const auto p : sessions)  {
+    if (!p.first.is_client()) {
+      // Do not apply OSDMap blacklist to MDS daemons, we find out
+      // about their death via MDSMap.
+      continue;
+    }
+
+    Session *s = p.second;
+    if (blacklist.count(s->info.inst.addr)) {
+      victims.push_back(s);
+    }
+  }
+
+  for (const auto s : victims) {
+    kill_session(s, nullptr);
+  }
+
+  dout(10) << "apply_blacklist: killed " << victims.size() << dendl;
+
+  return victims.size();
+}
+
 void Server::journal_close_session(Session *session, int state, Context *on_safe)
 {
   uint64_t sseq = mds->sessionmap.set_state(session, state);
@@ -798,7 +876,13 @@ void Server::journal_close_session(Session *session, int state, Context *on_safe
 void Server::reconnect_clients(MDSInternalContext *reconnect_done_)
 {
   reconnect_done = reconnect_done_;
-  mds->sessionmap.get_client_set(client_reconnect_gather);
+
+  set<Session*> sessions;
+  mds->sessionmap.get_client_session_set(sessions);
+  for (auto session : sessions) {
+    if (session->is_open())
+       client_reconnect_gather.insert(session->get_client());
+  }
 
   if (client_reconnect_gather.empty()) {
     dout(7) << "reconnect_clients -- no sessions, doing nothing." << dendl;
@@ -818,7 +902,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
 {
   dout(7) << "handle_client_reconnect " << m->get_source() << dendl;
   client_t from = m->get_source().num();
-  Session *session = get_session(m);
+  Session *session = mds->get_session(m);
   assert(session);
 
   if (!mds->is_reconnect() && mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) {
@@ -832,7 +916,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
   dout(10) << " reconnect_start " << reconnect_start << " delay " << delay << dendl;
 
   bool deny = false;
-  if (!mds->is_reconnect()) {
+  if (!mds->is_reconnect() || mds->get_want_state() != CEPH_MDS_STATE_RECONNECT || reconnect_evicting) {
     // XXX maybe in the future we can do better than this?
     dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
     mds->clog->info() << "denied reconnect attempt (mds is "
@@ -840,7 +924,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
        << ") from " << m->get_source_inst()
        << " after " << delay << " (allowed interval " << g_conf->mds_reconnect_timeout << ")";
     deny = true;
-  } else if (session->is_closed()) {
+  } else if (!session->is_open()) {
     dout(1) << " session is closed, ignoring reconnect, sending close" << dendl;
     mds->clog->info() << "denied reconnect attempt (mds is "
        << ceph_mds_state_name(mds->get_state())
@@ -921,6 +1005,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
       mdcache->rejoin_recovered_caps(p->first, from, p->second, MDS_RANK_NONE);
     }
   }
+  mdcache->rejoin_recovered_client(session->get_client(), session->info.inst);
 
   // remove from gather set
   client_reconnect_gather.erase(from);
@@ -942,22 +1027,53 @@ void Server::reconnect_gather_finish()
 
 void Server::reconnect_tick()
 {
+  if (reconnect_evicting) {
+    dout(4) << "reconnect_tick: waiting for evictions" << dendl;
+    return;
+  }
+
   utime_t reconnect_end = reconnect_start;
   reconnect_end += g_conf->mds_reconnect_timeout;
   if (ceph_clock_now() >= reconnect_end &&
       !client_reconnect_gather.empty()) {
     dout(10) << "reconnect timed out" << dendl;
+
+    // If we're doing blacklist evictions, use this to wait for them before
+    // proceeding to reconnect_gather_finish
+    MDSGatherBuilder gather(g_ceph_context);
+
     for (set<client_t>::iterator p = client_reconnect_gather.begin();
         p != client_reconnect_gather.end();
         ++p) {
       Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
       assert(session);
       dout(1) << "reconnect gave up on " << session->info.inst << dendl;
-      kill_session(session, NULL);
+
+      mds->clog->warn() << "evicting unresponsive client " << *session
+                        << ", after waiting " << g_conf->mds_reconnect_timeout
+                        << " seconds during MDS startup";
+
+      if (g_conf->mds_session_blacklist_on_timeout) {
+        std::stringstream ss;
+        mds->evict_client(session->info.inst.name.num(), false, true, ss,
+                          gather.new_sub());
+      } else {
+        kill_session(session, NULL);
+      }
+
       failed_reconnects++;
     }
     client_reconnect_gather.clear();
-    reconnect_gather_finish();
+
+    if (gather.has_subs()) {
+      dout(1) << "reconnect will complete once clients are evicted" << dendl;
+      gather.set_finisher(new MDSInternalContextWrapper(mds, new FunctionContext(
+            [this](int r){reconnect_gather_finish();})));
+      gather.activate();
+      reconnect_evicting = true;
+    } else {
+      reconnect_gather_finish();
+    }
   }
 }
 
@@ -989,10 +1105,22 @@ void Server::recover_filelocks(CInode *in, bufferlist locks, int64_t client)
  * to trim some caps, and consequently unpin some inodes in the MDCache so
  * that it can trim too.
  */
-void Server::recall_client_state(float ratio)
+void Server::recall_client_state(void)
 {
-  int max_caps_per_client = (int)(g_conf->mds_cache_size * .8);
-  int min_caps_per_client = 100;
+  /* try to recall at least 80% of all caps */
+  uint64_t max_caps_per_client = Capability::count() * g_conf->get_val<double>("mds_max_ratio_caps_per_client");
+  uint64_t min_caps_per_client = g_conf->get_val<uint64_t>("mds_min_caps_per_client");
+  if (max_caps_per_client < min_caps_per_client) {
+    dout(0) << "max_caps_per_client " << max_caps_per_client
+            << " < min_caps_per_client " << min_caps_per_client << dendl;
+    max_caps_per_client = min_caps_per_client + 1;
+  }
+
+  /* unless this ratio is smaller: */
+  /* ratio: determine the amount of caps to recall from each client. Use
+   * percentage full over the cache reservation. Cap the ratio at 80% of client
+   * caps. */
+  double ratio = 1.0-fmin(0.80, mdcache->cache_toofull_ratio());
 
   dout(10) << "recall_client_state " << ratio
           << ", caps per client " << min_caps_per_client << "-" << max_caps_per_client
@@ -1000,10 +1128,7 @@ void Server::recall_client_state(float ratio)
 
   set<Session*> sessions;
   mds->sessionmap.get_client_session_set(sessions);
-  for (set<Session*>::const_iterator p = sessions.begin();
-       p != sessions.end();
-       ++p) {
-    Session *session = *p;
+  for (auto &session : sessions) {
     if (!session->is_open() ||
        !session->info.inst.name.is_client())
       continue;
@@ -1013,14 +1138,12 @@ void Server::recall_client_state(float ratio)
             << ", leases " << session->leases.size()
             << dendl;
 
-    if (session->caps.size() > min_caps_per_client) {  
-      int newlim = MIN((int)(session->caps.size() * ratio), max_caps_per_client);
-      if (session->caps.size() > newlim) {
-          MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
-          m->head.max_caps = newlim;
-          mds->send_message_client(m, session);
-          session->notify_recall_sent(newlim);
-      }
+    uint64_t newlim = MAX(MIN((session->caps.size() * ratio), max_caps_per_client), min_caps_per_client);
+    if (session->caps.size() > newlim) {
+      MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
+      m->head.max_caps = newlim;
+      mds->send_message_client(m, session);
+      session->notify_recall_sent(newlim);
     }
   }
 }
@@ -1071,7 +1194,7 @@ void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEv
       mdlog->flush();
     }
   } else if (mdr->did_early_reply)
-    mds->locker->drop_rdlocks(mdr.get());
+    mds->locker->drop_rdlocks_for_early_reply(mdr.get());
   else
     mdlog->flush();
 }
@@ -1195,6 +1318,11 @@ void Server::early_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn)
   if (!g_conf->mds_early_reply)
     return;
 
+  if (mdr->no_early_reply) {
+    dout(10) << "early_reply - flag no_early_reply is set, not allowed." << dendl;
+    return;
+  }
+
   if (mdr->has_more() && mdr->more()->has_journaled_slaves) {
     dout(10) << "early_reply - there are journaled slaves, not allowed." << dendl;
     return; 
@@ -1493,7 +1621,7 @@ void Server::handle_client_request(MClientRequest *req)
   // active session?
   Session *session = 0;
   if (req->get_source().is_client()) {
-    session = get_session(req);
+    session = mds->get_session(req);
     if (!session) {
       dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
     } else if (session->is_closed() ||
@@ -1615,7 +1743,8 @@ void Server::handle_osd_map()
    * using osdmap_full_flag(), because we want to know "is the flag set"
    * rather than "does the flag apply to us?" */
   mds->objecter->with_osdmap([this](const OSDMap& o) {
-      is_full = o.test_flag(CEPH_OSDMAP_FULL);
+      auto pi = o.get_pg_pool(mds->mdsmap->get_metadata_pool());
+      is_full = pi && pi->has_flag(pg_pool_t::FLAG_FULL);
       dout(7) << __func__ << ": full = " << is_full << " epoch = "
              << o.get_epoch() << dendl;
     });
@@ -1629,6 +1758,10 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
   if (mdr->killed) {
     dout(10) << "request " << *mdr << " was killed" << dendl;
     return;
+  } else if (mdr->aborted) {
+    mdr->aborted = false;
+    mdcache->request_kill(mdr);
+    return;
   }
 
   MClientRequest *req = mdr->client_request;
@@ -1846,6 +1979,7 @@ void Server::handle_slave_request(MMDSSlaveRequest *m)
       } else {
        mdcache->request_finish(mdr);
       }
+      m->put();
       return;
     }
   }
@@ -2162,24 +2296,7 @@ void Server::handle_slave_auth_pin(MDRequestRef& mdr)
        (*p)->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
        mdr->drop_local_auth_pins();
 
-       CDir *dir = NULL;
-       if (CInode *in = dynamic_cast<CInode*>(*p)) {
-         if (!in->is_root())
-           dir = in->get_parent_dir();
-       } else if (CDentry *dn = dynamic_cast<CDentry*>(*p)) {
-         dir = dn->get_dir();
-       } else {
-         ceph_abort();
-       }
-       if (dir) {
-         if (dir->is_freezing_dir())
-           mdcache->fragment_freeze_inc_num_waiters(dir);
-         if (dir->is_freezing_tree()) {
-           while (!dir->is_freezing_tree_root())
-             dir = dir->get_parent_dir();
-           mdcache->migrator->export_freeze_inc_num_waiters(dir);
-         }
-       }
+       mds->locker->notify_freeze_waiter(*p);
        return;
       }
     }
@@ -2362,7 +2479,7 @@ bool Server::check_fragment_space(MDRequestRef &mdr, CDir *in)
  * verify that the dir exists and would own the dname.
  * do not check if the dentry exists.
  */
-CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, const string& dname)
+CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, boost::string_view dname)
 {
   // make sure parent is a dir?
   if (!diri->is_dir()) {
@@ -2392,7 +2509,7 @@ CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, const string&
  * prepare a null (or existing) dentry in given dir. 
  * wait for any dn lock.
  */
-CDentry* Server::prepare_null_dentry(MDRequestRef& mdr, CDir *dir, const string& dname, bool okexist)
+CDentry* Server::prepare_null_dentry(MDRequestRef& mdr, CDir *dir, boost::string_view dname, bool okexist)
 {
   dout(10) << "prepare_null_dentry " << dname << " in " << *dir << dendl;
   assert(dir->is_auth());
@@ -2556,11 +2673,13 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
     bufferlist::iterator p = req->get_data().begin();
 
     // xattrs on new inode?
-    map<string,bufferptr> xattrs;
+    CInode::mempool_xattr_map xattrs;
     ::decode(xattrs, p);
-    for (map<string,bufferptr>::iterator p = xattrs.begin(); p != xattrs.end(); ++p) {
-      dout(10) << "prepare_new_inode setting xattr " << p->first << dendl;
-      in->xattrs[p->first] = p->second;
+    for (const auto &p : xattrs) {
+      dout(10) << "prepare_new_inode setting xattr " << p.first << dendl;
+      auto em = in->xattrs.emplace(std::piecewise_construct, std::forward_as_tuple(p.first), std::forward_as_tuple(p.second));
+      if (!em.second)
+        em.first->second = p.second;
     }
   }
 
@@ -2731,6 +2850,8 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
        */
       mds->locker->drop_locks(mdr.get(), NULL);
       mdr->drop_local_auth_pins();
+      if (!mdr->remote_auth_pins.empty())
+       mds->locker->notify_freeze_waiter(ref);
       return 0;
     }
 
@@ -2772,14 +2893,6 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
 
   CDir *dir = traverse_to_auth_dir(mdr, mdr->dn[n], refpath);
   if (!dir) return 0;
-  dout(10) << "rdlock_path_xlock_dentry dir " << *dir << dendl;
-
-  // make sure we can auth_pin (or have already authpinned) dir
-  if (dir->is_frozen()) {
-    dout(7) << "waiting for !frozen/authpinnable on " << *dir << dendl;
-    dir->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
-    return 0;
-  }
 
   CInode *diri = dir->get_inode();
   if (!mdr->reqid.name.is_mds()) {
@@ -2794,7 +2907,7 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
   }
 
   // make a null dentry?
-  const string &dname = refpath.last_dentry();
+  boost::string_view dname = refpath.last_dentry();
   CDentry *dn;
   if (mustexist) {
     dn = dir->lookup(dname);
@@ -2876,7 +2989,7 @@ CDir* Server::try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr)
   if (!dir && diri->is_frozen()) {
     dout(10) << "try_open_auth_dirfrag: dir inode is frozen, waiting " << *diri << dendl;
     assert(diri->get_parent_dir());
-    diri->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
+    diri->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
     return 0;
   }
 
@@ -2912,7 +3025,13 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
     return;
   }
 
-  CInode *ref = rdlock_path_pin_ref(mdr, 0, rdlocks, false, false, NULL, !is_lookup);
+  bool want_auth = false;
+  int mask = req->head.args.getattr.mask;
+  if (mask & CEPH_STAT_RSTAT)
+    want_auth = true; // set want_auth for CEPH_STAT_RSTAT mask
+
+  CInode *ref = rdlock_path_pin_ref(mdr, 0, rdlocks, want_auth, false, NULL, 
+                                   !is_lookup);
   if (!ref) return;
 
   /*
@@ -2930,11 +3049,27 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
              mdr->snapid <= cap->client_follows))
     issued = cap->issued();
 
-  int mask = req->head.args.getattr.mask;
-  if ((mask & CEPH_CAP_LINK_SHARED) && (issued & CEPH_CAP_LINK_EXCL) == 0) rdlocks.insert(&ref->linklock);
-  if ((mask & CEPH_CAP_AUTH_SHARED) && (issued & CEPH_CAP_AUTH_EXCL) == 0) rdlocks.insert(&ref->authlock);
-  if ((mask & CEPH_CAP_FILE_SHARED) && (issued & CEPH_CAP_FILE_EXCL) == 0) rdlocks.insert(&ref->filelock);
-  if ((mask & CEPH_CAP_XATTR_SHARED) && (issued & CEPH_CAP_XATTR_EXCL) == 0) rdlocks.insert(&ref->xattrlock);
+  if ((mask & CEPH_CAP_LINK_SHARED) && !(issued & CEPH_CAP_LINK_EXCL))
+    rdlocks.insert(&ref->linklock);
+  if ((mask & CEPH_CAP_AUTH_SHARED) && !(issued & CEPH_CAP_AUTH_EXCL))
+    rdlocks.insert(&ref->authlock);
+  if ((mask & CEPH_CAP_XATTR_SHARED) && !(issued & CEPH_CAP_XATTR_EXCL))
+    rdlocks.insert(&ref->xattrlock);
+  if ((mask & CEPH_CAP_FILE_SHARED) && !(issued & CEPH_CAP_FILE_EXCL)) {
+    // Don't wait on unstable filelock if client is allowed to read file size.
+    // This can reduce the response time of getattr in the case that multiple
+    // clients do stat(2) and there are writers.
+    // The downside of this optimization is that mds may not issue Fs caps along
+    // with getattr reply. Client may need to send more getattr requests.
+    if (mdr->rdlocks.count(&ref->filelock)) {
+      rdlocks.insert(&ref->filelock);
+    } else if (ref->filelock.is_stable() ||
+              ref->filelock.get_num_wrlocks() > 0 ||
+              !ref->filelock.can_read(mdr->get_client())) {
+      rdlocks.insert(&ref->filelock);
+      mdr->done_locking = false;
+    }
+  }
 
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
@@ -2942,11 +3077,14 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   if (!check_access(mdr, ref, MAY_READ))
     return;
 
+  utime_t now = ceph_clock_now();
+  mdr->set_mds_stamp(now);
+
   // note which caps are requested, so we return at least a snapshot
   // value for them.  (currently this matters for xattrs and inline data)
   mdr->getattr_caps = mask;
 
-  mds->balancer->hit_inode(ceph_clock_now(), ref, META_POP_IRD,
+  mds->balancer->hit_inode(now, ref, META_POP_IRD,
                           req->get_source().num());
 
   // reply
@@ -3024,9 +3162,11 @@ void Server::handle_client_lookup_ino(MDRequestRef& mdr,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    // need read access to directory inode
-    if (!check_access(mdr, diri, MAY_READ))
-      return;
+    if (diri != NULL) {
+      // need read access to directory inode
+      if (!check_access(mdr, diri, MAY_READ))
+        return;
+    }
   }
 
   if (want_parent) {
@@ -3094,7 +3234,8 @@ void Server::handle_client_open(MDRequestRef& mdr)
     return;
   }
   
-  bool need_auth = !file_mode_is_readonly(cmode) || (flags & CEPH_O_TRUNC);
+  bool need_auth = !file_mode_is_readonly(cmode) ||
+                  (flags & (CEPH_O_TRUNC | CEPH_O_DIRECTORY));
 
   if ((cmode & CEPH_FILE_MODE_WR) && mdcache->is_readonly()) {
     dout(7) << "read-only FS" << dendl;
@@ -3189,7 +3330,7 @@ void Server::handle_client_open(MDRequestRef& mdr)
       return;
 
     // wait for pending truncate?
-    const inode_t *pi = cur->get_projected_inode();
+    const auto pi = cur->get_projected_inode();
     if (pi->is_truncating()) {
       dout(10) << " waiting for pending truncate from " << pi->truncate_from
               << " to " << pi->truncate_size << " to complete on " << *cur << dendl;
@@ -3219,6 +3360,9 @@ void Server::handle_client_open(MDRequestRef& mdr)
   if (!check_access(mdr, cur, mask))
     return;
 
+  utime_t now = ceph_clock_now();
+  mdr->set_mds_stamp(now);
+
   if (cur->is_file() || cur->is_dir()) {
     if (mdr->snapid == CEPH_NOSNAP) {
       // register new cap
@@ -3254,9 +3398,9 @@ void Server::handle_client_open(MDRequestRef& mdr)
   
   // hit pop
   if (cmode & CEPH_FILE_MODE_WR)
-    mds->balancer->hit_inode(mdr->get_mds_stamp(), cur, META_POP_IWR);
+    mds->balancer->hit_inode(now, cur, META_POP_IWR);
   else
-    mds->balancer->hit_inode(mdr->get_mds_stamp(), cur, META_POP_IRD,
+    mds->balancer->hit_inode(now, cur, META_POP_IRD,
                             mdr->client_request->get_source().num());
 
   CDentry *dn = 0;
@@ -3285,7 +3429,7 @@ public:
     // dirty inode, dn, dir
     newi->inode.version--;   // a bit hacky, see C_MDS_mknod_finish
     newi->mark_dirty(newi->inode.version+1, mdr->ls);
-    newi->_mark_dirty_parent(mdr->ls, true);
+    newi->mark_dirty_parent(mdr->ls, true);
 
     mdr->apply();
 
@@ -3294,7 +3438,8 @@ public:
     MDRequestRef null_ref;
     get_mds()->mdcache->send_dentry_link(dn, null_ref);
 
-    get_mds()->balancer->hit_inode(mdr->get_mds_stamp(), newi, META_POP_IWR);
+    utime_t now = ceph_clock_now();
+    get_mds()->balancer->hit_inode(now, newi, META_POP_IWR);
 
     server->respond_to_request(mdr, 0);
 
@@ -3316,7 +3461,9 @@ void Server::handle_client_openc(MDRequestRef& mdr)
     return;
   }
 
-  if (!(req->head.args.open.flags & CEPH_O_EXCL)) {
+  bool excl = req->head.args.open.flags & CEPH_O_EXCL;
+
+  if (!excl) {
     int r = mdcache->path_traverse(mdr, NULL, NULL, req->get_filepath(),
                                   &mdr->dn[0], NULL, MDS_TRAVERSE_FORWARD);
     if (r > 0) return;
@@ -3336,10 +3483,8 @@ void Server::handle_client_openc(MDRequestRef& mdr)
       }
       return;
     }
-    // r == -ENOENT
   }
 
-  bool excl = (req->head.args.open.flags & CEPH_O_EXCL);
   set<SimpleLock*> rdlocks, wrlocks, xlocks;
   file_layout_t *dir_layout = NULL;
   CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks,
@@ -3397,6 +3542,7 @@ void Server::handle_client_openc(MDRequestRef& mdr)
     return;
   }
 
+  // created null dn.
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
   rdlocks.insert(&diri->authlock);
@@ -3421,8 +3567,6 @@ void Server::handle_client_openc(MDRequestRef& mdr)
     return;
   }
 
-  // created null dn.
-    
   // create inode.
   SnapRealm *realm = diri->find_snaprealm();   // use directory's realm; inode isn't attached yet.
   snapid_t follows = realm->get_newest_seq();
@@ -3597,12 +3741,11 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   bufferlist dnbl;
   __u32 numfiles = 0;
   bool start = !offset_hash && offset_str.empty();
-  bool end = (dir->begin() == dir->end());
   // skip all dns < dentry_key_t(snapid, offset_str, offset_hash)
   dentry_key_t skip_key(snapid, offset_str.c_str(), offset_hash);
-  for (CDir::map_t::iterator it = start ? dir->begin() : dir->lower_bound(skip_key);
-       !end && numfiles < max;
-       end = (it == dir->end())) {
+  auto it = start ? dir->begin() : dir->lower_bound(skip_key);
+  bool end = (it == dir->end());
+  for (; !end && numfiles < max; end = (it == dir->end())) {
     CDentry *dn = it->second;
     ++it;
 
@@ -3642,9 +3785,10 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
        continue;
       } else {
        // touch everything i _do_ have
-       for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p)
-         if (!p->second->get_linkage()->is_null())
-           mdcache->lru.lru_touch(p->second);
+       for (auto &p : *dir) {
+         if (!p.second->get_linkage()->is_null())
+           mdcache->lru.lru_touch(p.second);
+        }
 
        // already issued caps and leases, reply immediately.
        if (dnbl.length() > 0) {
@@ -3662,7 +3806,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     }
     assert(in);
 
-    if ((int)(dnbl.length() + dn->name.length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
+    if ((int)(dnbl.length() + dn->get_name().length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
       dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
       break;
     }
@@ -3671,7 +3815,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
 
     // dentry
     dout(12) << "including    dn " << *dn << dendl;
-    ::encode(dn->name, dnbl);
+    ::encode(dn->get_name(), dnbl);
     mds->locker->issue_client_lease(dn, client, dnbl, now, mdr->session);
 
     // inode
@@ -3753,7 +3897,8 @@ public:
       get_mds()->mdcache->truncate_inode(in, mdr->ls);
     }
 
-    get_mds()->balancer->hit_inode(mdr->get_mds_stamp(), in, META_POP_IWR);
+    utime_t now = ceph_clock_now();
+    get_mds()->balancer->hit_inode(now, in, META_POP_IWR);
 
     server->respond_to_request(mdr, 0);
 
@@ -3956,9 +4101,9 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     return;
 
   // trunc from bigger -> smaller?
-  inode_t *pi = cur->get_projected_inode();
+  auto pip = cur->get_projected_inode();
 
-  uint64_t old_size = MAX(pi->size, req->head.args.setattr.old_size);
+  uint64_t old_size = std::max<uint64_t>(pip->size, req->head.args.setattr.old_size);
 
   // ENOSPC on growing file while full, but allow shrinks
   if (is_full && req->head.args.setattr.size > old_size) {
@@ -3970,9 +4115,9 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   bool truncating_smaller = false;
   if (mask & CEPH_SETATTR_SIZE) {
     truncating_smaller = req->head.args.setattr.size < old_size;
-    if (truncating_smaller && pi->is_truncating()) {
-      dout(10) << " waiting for pending truncate from " << pi->truncate_from
-              << " to " << pi->truncate_size << " to complete on " << *cur << dendl;
+    if (truncating_smaller && pip->is_truncating()) {
+      dout(10) << " waiting for pending truncate from " << pip->truncate_from
+              << " to " << pip->truncate_size << " to complete on " << *cur << dendl;
       mds->locker->drop_locks(mdr.get());
       mdr->drop_local_auth_pins();
       cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
@@ -3987,54 +4132,53 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   EUpdate *le = new EUpdate(mdlog, "setattr");
   mdlog->start_entry(le);
 
-  pi = cur->project_inode();
+  auto &pi = cur->project_inode();
 
   if (mask & CEPH_SETATTR_UID)
-    pi->uid = req->head.args.setattr.uid;
+    pi.inode.uid = req->head.args.setattr.uid;
   if (mask & CEPH_SETATTR_GID)
-    pi->gid = req->head.args.setattr.gid;
+    pi.inode.gid = req->head.args.setattr.gid;
 
   if (mask & CEPH_SETATTR_MODE)
-    pi->mode = (pi->mode & ~07777) | (req->head.args.setattr.mode & 07777);
+    pi.inode.mode = (pi.inode.mode & ~07777) | (req->head.args.setattr.mode & 07777);
   else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
-           S_ISREG(pi->mode)) {
-    pi->mode &= ~S_ISUID;
-    if ((pi->mode & (S_ISGID|S_IXGRP)) == (S_ISGID|S_IXGRP))
-      pi->mode &= ~S_ISGID;
+           S_ISREG(pi.inode.mode) &&
+            (pi.inode.mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+    pi.inode.mode &= ~(S_ISUID|S_ISGID);
   }
 
   if (mask & CEPH_SETATTR_MTIME)
-    pi->mtime = req->head.args.setattr.mtime;
+    pi.inode.mtime = req->head.args.setattr.mtime;
   if (mask & CEPH_SETATTR_ATIME)
-    pi->atime = req->head.args.setattr.atime;
+    pi.inode.atime = req->head.args.setattr.atime;
   if (mask & CEPH_SETATTR_BTIME)
-    pi->btime = req->head.args.setattr.btime;
+    pi.inode.btime = req->head.args.setattr.btime;
   if (mask & (CEPH_SETATTR_ATIME | CEPH_SETATTR_MTIME | CEPH_SETATTR_BTIME))
-    pi->time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
+    pi.inode.time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
   if (mask & CEPH_SETATTR_SIZE) {
     if (truncating_smaller) {
-      pi->truncate(old_size, req->head.args.setattr.size);
+      pi.inode.truncate(old_size, req->head.args.setattr.size);
       le->metablob.add_truncate_start(cur->ino());
     } else {
-      pi->size = req->head.args.setattr.size;
-      pi->rstat.rbytes = pi->size;
+      pi.inode.size = req->head.args.setattr.size;
+      pi.inode.rstat.rbytes = pi.inode.size;
     }
-    pi->mtime = mdr->get_op_stamp();
+    pi.inode.mtime = mdr->get_op_stamp();
 
     // adjust client's max_size?
-    map<client_t,client_writeable_range_t> new_ranges;
+    CInode::mempool_inode::client_range_map new_ranges;
     bool max_increased = false;
-    mds->locker->calc_new_client_ranges(cur, pi->size, &new_ranges, &max_increased);
-    if (pi->client_ranges != new_ranges) {
-      dout(10) << " client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
-      pi->client_ranges = new_ranges;
+    mds->locker->calc_new_client_ranges(cur, pi.inode.size, &new_ranges, &max_increased);
+    if (pi.inode.client_ranges != new_ranges) {
+      dout(10) << " client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
+      pi.inode.client_ranges = new_ranges;
       changed_ranges = true;
     }
   }
 
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
 
   // log + wait
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
@@ -4067,22 +4211,22 @@ void Server::do_open_truncate(MDRequestRef& mdr, int cmode)
   mdlog->start_entry(le);
 
   // prepare
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
-  pi->mtime = pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
+  pi.inode.mtime = pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
 
-  uint64_t old_size = MAX(pi->size, mdr->client_request->head.args.open.old_size);
+  uint64_t old_size = std::max<uint64_t>(pi.inode.size, mdr->client_request->head.args.open.old_size);
   if (old_size > 0) {
-    pi->truncate(old_size, 0);
+    pi.inode.truncate(old_size, 0);
     le->metablob.add_truncate_start(in->ino());
   }
 
   bool changed_ranges = false;
   if (cmode & CEPH_FILE_MODE_WR) {
-    pi->client_ranges[client].range.first = 0;
-    pi->client_ranges[client].range.last = pi->get_layout_size_increment();
-    pi->client_ranges[client].follows = in->find_snaprealm()->get_newest_seq();
+    pi.inode.client_ranges[client].range.first = 0;
+    pi.inode.client_ranges[client].range.last = pi.inode.get_layout_size_increment();
+    pi.inode.client_ranges[client].follows = in->find_snaprealm()->get_newest_seq();
     changed_ranges = true;
   }
   
@@ -4182,13 +4326,13 @@ void Server::handle_client_setlayout(MDRequestRef& mdr)
     return;
 
   // project update
-  inode_t *pi = cur->project_inode();
-  pi->layout = layout;
+  auto &pi = cur->project_inode();
+  pi.inode.layout = layout;
   // add the old pool to the inode
-  pi->add_old_pool(old_layout.pool_id);
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
+  pi.inode.add_old_pool(old_layout.pool_id);
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
   
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4224,7 +4368,7 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
     return;
 
   // validate layout
-  const inode_t *old_pi = cur->get_projected_inode();
+  const auto old_pi = cur->get_projected_inode();
   file_layout_t layout;
   if (old_pi->has_layout())
     layout = old_pi->layout;
@@ -4271,9 +4415,9 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
   if (!check_access(mdr, cur, access))
     return;
 
-  inode_t *pi = cur->project_inode();
-  pi->layout = layout;
-  pi->version = cur->pre_dirty();
+  auto &pi = cur->project_inode();
+  pi.inode.layout = layout;
+  pi.inode.version = cur->pre_dirty();
 
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4283,6 +4427,7 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
   mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY);
   mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur);
 
+  mdr->no_early_reply = true;
   journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur));
 }
 
@@ -4483,7 +4628,7 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
            << " bytes on " << *cur
            << dendl;
 
-  inode_t *pi = NULL;
+  CInode::mempool_inode *pip = nullptr;
   string rest;
 
   if (!check_access(mdr, cur, MAY_SET_VXATTR)) {
@@ -4512,8 +4657,10 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
-    pi->layout = layout;
+    auto &pi = cur->project_inode();
+    pi.inode.layout = layout;
+    mdr->no_early_reply = true;
+    pip = &pi.inode;
   } else if (name.compare(0, 16, "ceph.file.layout") == 0) {
     if (!cur->is_file()) {
       respond_to_request(mdr, -EINVAL);
@@ -4533,11 +4680,11 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
-    int64_t old_pool = pi->layout.pool_id;
-    pi->add_old_pool(old_pool);
-    pi->layout = layout;
-    pi->ctime = mdr->get_op_stamp();
+    auto &pi = cur->project_inode();
+    int64_t old_pool = pi.inode.layout.pool_id;
+    pi.inode.add_old_pool(old_pool);
+    pi.inode.layout = layout;
+    pip = &pi.inode;
   } else if (name.compare(0, 10, "ceph.quota") == 0) { 
     if (!cur->is_dir() || cur->is_root()) {
       respond_to_request(mdr, -EINVAL);
@@ -4557,8 +4704,14 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
-    pi->quota = quota;
+    auto &pi = cur->project_inode();
+    pi.inode.quota = quota;
+
+    mdr->no_early_reply = true;
+    pip = &pi.inode;
+
+    client_t exclude_ct = mdr->get_client();
+    mdcache->broadcast_quota_to_client(cur, exclude_ct);
   } else if (name.find("ceph.dir.pin") == 0) {
     if (!cur->is_dir() || cur->is_root()) {
       respond_to_request(mdr, -EINVAL);
@@ -4579,19 +4732,20 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
+    auto &pi = cur->project_inode();
     cur->set_export_pin(rank);
+    pip = &pi.inode;
   } else {
     dout(10) << " unknown vxattr " << name << dendl;
     respond_to_request(mdr, -EINVAL);
     return;
   }
 
-  pi->change_attr++;
-  pi->ctime = mdr->get_op_stamp();
-  pi->version = cur->pre_dirty();
+  pip->change_attr++;
+  pip->ctime = pip->rstat.rctime = mdr->get_op_stamp();
+  pip->version = cur->pre_dirty();
   if (cur->is_file())
-    pi->update_backtrace();
+    pip->update_backtrace();
 
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4636,9 +4790,9 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    inode_t *pi = cur->project_inode();
-    pi->clear_layout();
-    pi->version = cur->pre_dirty();
+    auto &pi = cur->project_inode();
+    pi.inode.clear_layout();
+    pi.inode.version = cur->pre_dirty();
 
     // log + wait
     mdr->ls = mdlog->get_current_segment();
@@ -4648,6 +4802,7 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
     mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY);
     mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur);
 
+    mdr->no_early_reply = true;
     journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur));
     return;
   } else if (name == "ceph.dir.layout.pool_namespace"
@@ -4677,7 +4832,8 @@ public:
     
     mdr->apply();
 
-    get_mds()->balancer->hit_inode(mdr->get_mds_stamp(), in, META_POP_IWR);
+    utime_t now = ceph_clock_now();
+    get_mds()->balancer->hit_inode(now, in, META_POP_IWR);
 
     server->respond_to_request(mdr, 0);
   }
@@ -4718,14 +4874,14 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   if (!check_access(mdr, cur, MAY_WRITE))
     return;
 
-  map<string, bufferptr> *pxattrs = cur->get_projected_xattrs();
+  auto pxattrs = cur->get_projected_xattrs();
   size_t len = req->get_data().length();
   size_t inc = len + name.length();
 
   // check xattrs kv pairs size
   size_t cur_xattrs_size = 0;
   for (const auto& p : *pxattrs) {
-    if ((flags & CEPH_XATTR_REPLACE) && (name.compare(p.first) == 0)) {
+    if ((flags & CEPH_XATTR_REPLACE) && (name.compare(std::string(boost::string_view(p.first))) == 0)) {
       continue;
     }
     cur_xattrs_size += p.first.length() + p.second.length();
@@ -4738,12 +4894,12 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
     return;
   }
 
-  if ((flags & CEPH_XATTR_CREATE) && pxattrs->count(name)) {
+  if ((flags & CEPH_XATTR_CREATE) && pxattrs->count(mempool::mds_co::string(boost::string_view(name)))) {
     dout(10) << "setxattr '" << name << "' XATTR_CREATE and EEXIST on " << *cur << dendl;
     respond_to_request(mdr, -EEXIST);
     return;
   }
-  if ((flags & CEPH_XATTR_REPLACE) && !pxattrs->count(name)) {
+  if ((flags & CEPH_XATTR_REPLACE) && !pxattrs->count(mempool::mds_co::string(boost::string_view(name)))) {
     dout(10) << "setxattr '" << name << "' XATTR_REPLACE and ENODATA on " << *cur << dendl;
     respond_to_request(mdr, -ENODATA);
     return;
@@ -4752,17 +4908,21 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   dout(10) << "setxattr '" << name << "' len " << len << " on " << *cur << dendl;
 
   // project update
-  map<string,bufferptr> *px = new map<string,bufferptr>;
-  inode_t *pi = cur->project_inode(px);
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->xattr_version++;
-  px->erase(name);
-  if (!(flags & CEPH_XATTR_REMOVE)) {
-    (*px)[name] = buffer::create(len);
+  auto &pi = cur->project_inode(true);
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.xattr_version++;
+  auto &px = *pi.xattrs;
+  if ((flags & CEPH_XATTR_REMOVE)) {
+    px.erase(mempool::mds_co::string(boost::string_view(name)));
+  } else {
+    bufferptr b = buffer::create(len);
     if (len)
-      req->get_data().copy(0, len, (*px)[name].c_str());
+      req->get_data().copy(0, len, b.c_str());
+    auto em = px.emplace(std::piecewise_construct, std::forward_as_tuple(mempool::mds_co::string(boost::string_view(name))), std::forward_as_tuple(b));
+    if (!em.second)
+      em.first->second = b;
   }
 
   // log + wait
@@ -4779,8 +4939,8 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
 void Server::handle_client_removexattr(MDRequestRef& mdr)
 {
   MClientRequest *req = mdr->client_request;
-  string name(req->get_path2());
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  std::string name(req->get_path2());
+  std::set<SimpleLock*> rdlocks, wrlocks, xlocks;
   file_layout_t *dir_layout = NULL;
   CInode *cur;
   if (name == "ceph.dir.layout")
@@ -4804,8 +4964,8 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
-  map<string, bufferptr> *pxattrs = cur->get_projected_xattrs();
-  if (pxattrs->count(name) == 0) {
+  auto pxattrs = cur->get_projected_xattrs();
+  if (pxattrs->count(mempool::mds_co::string(boost::string_view(name))) == 0) {
     dout(10) << "removexattr '" << name << "' and ENODATA on " << *cur << dendl;
     respond_to_request(mdr, -ENODATA);
     return;
@@ -4814,13 +4974,13 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
   dout(10) << "removexattr '" << name << "' on " << *cur << dendl;
 
   // project update
-  map<string,bufferptr> *px = new map<string,bufferptr>;
-  inode_t *pi = cur->project_inode(px);
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->xattr_version++;
-  px->erase(name);
+  auto &pi = cur->project_inode(true);
+  auto &px = *pi.xattrs;
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.xattr_version++;
+  px.erase(mempool::mds_co::string(boost::string_view(name)));
 
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4859,7 +5019,7 @@ public:
     // a new version of hte inode since it's just been created)
     newi->inode.version--; 
     newi->mark_dirty(newi->inode.version + 1, mdr->ls);
-    newi->_mark_dirty_parent(mdr->ls, true);
+    newi->mark_dirty_parent(mdr->ls, true);
 
     // mkdir?
     if (newi->inode.is_dir()) { 
@@ -4879,7 +5039,8 @@ public:
       get_mds()->locker->share_inode_max_size(newi);
 
     // hit pop
-    get_mds()->balancer->hit_inode(mdr->get_mds_stamp(), newi, META_POP_IWR);
+    utime_t now = ceph_clock_now();
+    get_mds()->balancer->hit_inode(now, newi, META_POP_IWR);
 
     // reply
     server->respond_to_request(mdr, 0);
@@ -5093,7 +5254,7 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   // it's a symlink
   dn->push_projected_linkage(newi);
 
-  newi->symlink = req->get_path2();
+  newi->symlink = mempool::mds_co::string(boost::string_view(req->get_path2()));
   newi->inode.size = newi->symlink.length();
   newi->inode.rstat.rbytes = newi->inode.size;
   newi->inode.rstat.rfiles = 1;
@@ -5143,9 +5304,15 @@ void Server::handle_client_link(MDRequestRef& mdr)
   dout(7) << "handle_client_link link " << dn->get_name() << " in " << *dir << dendl;
   dout(7) << "target is " << *targeti << dendl;
   if (targeti->is_dir()) {
-    dout(7) << "target is a dir, failing..." << dendl;
-    respond_to_request(mdr, -EINVAL);
-    return;
+    // if srcdn is replica, need to make sure its linkage is correct
+    vector<CDentry*>& trace = mdr->dn[1];
+    if (trace.empty() ||
+       trace.back()->is_auth() ||
+       trace.back()->lock.can_read(mdr->get_client())) {
+      dout(7) << "target is a dir, failing..." << dendl;
+      respond_to_request(mdr, -EINVAL);
+      return;
+    }
   }
 
   xlocks.insert(&targeti->linklock);
@@ -5203,11 +5370,11 @@ void Server::_link_local(MDRequestRef& mdr, CDentry *dn, CInode *targeti)
   version_t tipv = targeti->pre_dirty();
   
   // project inode update
-  inode_t *pi = targeti->project_inode();
-  pi->nlink++;
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->version = tipv;
+  auto &pi = targeti->project_inode();
+  pi.inode.nlink++;
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.version = tipv;
 
   // log + wait
   EUpdate *le = new EUpdate(mdlog, "link_local");
@@ -5230,7 +5397,9 @@ void Server::_link_local_finish(MDRequestRef& mdr, CDentry *dn, CInode *targeti,
   dout(10) << "_link_local_finish " << *dn << " to " << *targeti << dendl;
 
   // link and unlock the NEW dentry
-  dn->pop_projected_linkage();
+  CDentry::linkage_t *dnl = dn->pop_projected_linkage();
+  if (!dnl->get_inode())
+    dn->link_remote(dnl, targeti);
   dn->mark_dirty(dnpv, mdr->ls);
 
   // target inode
@@ -5242,8 +5411,9 @@ void Server::_link_local_finish(MDRequestRef& mdr, CDentry *dn, CInode *targeti,
   mdcache->send_dentry_link(dn, null_ref);
 
   // bump target popularity
-  mds->balancer->hit_inode(mdr->get_mds_stamp(), targeti, META_POP_IWR);
-  mds->balancer->hit_dir(mdr->get_mds_stamp(), dn->get_dir(), META_POP_IWR);
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_inode(now, targeti, META_POP_IWR);
+  mds->balancer->hit_dir(now, dn->get_dir(), META_POP_IWR);
 
   // reply
   respond_to_request(mdr, 0);
@@ -5327,6 +5497,7 @@ void Server::_link_remote(MDRequestRef& mdr, bool inc, CDentry *dn, CInode *targ
     mdcache->predirty_journal_parents(mdr, &le->metablob, targeti, dn->get_dir(), PREDIRTY_DIR, -1);
     mdcache->journal_cow_dentry(mdr.get(), &le->metablob, dn);
     le->metablob.add_null_dentry(dn, true);
+    dn->push_projected_linkage();
   }
 
   journal_and_reply(mdr, targeti, dn, le, new C_MDS_link_remote_finish(this, mdr, inc, dn, targeti));
@@ -5347,11 +5518,14 @@ void Server::_link_remote_finish(MDRequestRef& mdr, bool inc,
 
   if (inc) {
     // link the new dentry
-    dn->pop_projected_linkage();
+    CDentry::linkage_t *dnl = dn->pop_projected_linkage();
+    if (!dnl->get_inode())
+      dn->link_remote(dnl, targeti);
     dn->mark_dirty(dpv, mdr->ls);
   } else {
     // unlink main dentry
     dn->get_dir()->unlink_inode(dn);
+    dn->pop_projected_linkage();
     dn->mark_dirty(dn->get_projected_version(), mdr->ls);  // dirty old dentry
   }
 
@@ -5364,8 +5538,9 @@ void Server::_link_remote_finish(MDRequestRef& mdr, bool inc,
     mdcache->send_dentry_unlink(dn, NULL, null_ref);
   
   // bump target popularity
-  mds->balancer->hit_inode(mdr->get_mds_stamp(), targeti, META_POP_IWR);
-  mds->balancer->hit_dir(mdr->get_mds_stamp(), dn->get_dir(), META_POP_IWR);
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_inode(now, targeti, META_POP_IWR);
+  mds->balancer->hit_dir(now, dn->get_dir(), META_POP_IWR);
 
   // reply
   respond_to_request(mdr, 0);
@@ -5429,16 +5604,16 @@ void Server::handle_slave_link_prep(MDRequestRef& mdr)
                                      ESlaveUpdate::OP_PREPARE, ESlaveUpdate::LINK);
   mdlog->start_entry(le);
 
-  inode_t *pi = dnl->get_inode()->project_inode();
+  auto &pi = dnl->get_inode()->project_inode();
 
   // update journaled target inode
   bool inc;
   if (mdr->slave_request->get_op() == MMDSSlaveRequest::OP_LINKPREP) {
     inc = true;
-    pi->nlink++;
+    pi.inode.nlink++;
   } else {
     inc = false;
-    pi->nlink--;
+    pi.inode.nlink--;
   }
 
   link_rollback rollback;
@@ -5452,10 +5627,10 @@ void Server::handle_slave_link_prep(MDRequestRef& mdr)
   ::encode(rollback, le->rollback);
   mdr->more()->rollback_bl = le->rollback;
 
-  pi->ctime = mdr->get_op_stamp();
-  pi->version = targeti->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.version = targeti->pre_dirty();
 
-  dout(10) << " projected inode " << pi << " v " << pi->version << dendl;
+  dout(10) << " projected inode " << pi.inode.ino << " v " << pi.inode.version << dendl;
 
   // commit case
   mdcache->predirty_journal_parents(mdr, &le->commit, dnl->get_inode(), 0, PREDIRTY_SHALLOW|PREDIRTY_PRIMARY);
@@ -5482,7 +5657,8 @@ void Server::_logged_slave_link(MDRequestRef& mdr, CInode *targeti)
   mdr->apply();
 
   // hit pop
-  mds->balancer->hit_inode(mdr->get_mds_stamp(), targeti, META_POP_IWR);
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_inode(now, targeti, META_POP_IWR);
 
   // done.
   mdr->slave_request->put();
@@ -5574,8 +5750,8 @@ void Server::do_link_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef&
   dout(10) << " target is " << *in << dendl;
   assert(!in->is_projected());  // live slave request hold versionlock xlock.
   
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
   mut->add_projected_inode(in);
 
   // parent dir rctime
@@ -5583,20 +5759,20 @@ void Server::do_link_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef&
   fnode_t *pf = parent->project_fnode();
   mut->add_projected_fnode(parent);
   pf->version = parent->pre_dirty();
-  if (pf->fragstat.mtime == pi->ctime) {
+  if (pf->fragstat.mtime == pi.inode.ctime) {
     pf->fragstat.mtime = rollback.old_dir_mtime;
-    if (pf->rstat.rctime == pi->ctime)
+    if (pf->rstat.rctime == pi.inode.ctime)
       pf->rstat.rctime = rollback.old_dir_rctime;
     mut->add_updated_lock(&parent->get_inode()->filelock);
     mut->add_updated_lock(&parent->get_inode()->nestlock);
   }
 
   // inode
-  pi->ctime = rollback.old_ctime;
+  pi.inode.ctime = pi.inode.rstat.rctime = rollback.old_ctime;
   if (rollback.was_inc)
-    pi->nlink--;
+    pi.inode.nlink--;
   else
-    pi->nlink++;
+    pi.inode.nlink++;
 
   // journal it
   ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_rollback", rollback.reqid, master,
@@ -5855,14 +6031,18 @@ void Server::_unlink_local(MDRequestRef& mdr, CDentry *dn, CDentry *straydn)
   // the unlinked dentry
   dn->pre_dirty();
 
-  inode_t *pi = in->project_inode();
-  dn->make_path_string(pi->stray_prior_path);
+  auto &pi = in->project_inode();
+  {
+    std::string t;
+    dn->make_path_string(t, true);
+    pi.inode.stray_prior_path = mempool::mds_co::string(boost::string_view(t));
+  }
   mdr->add_projected_inode(in); // do this _after_ my dn->pre_dirty().. we apply that one manually.
-  pi->version = in->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->nlink--;
-  if (pi->nlink == 0)
+  pi.inode.version = in->pre_dirty();
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.nlink--;
+  if (pi.inode.nlink == 0)
     in->state_set(CInode::STATE_ORPHAN);
 
   if (dnl->is_primary()) {
@@ -5875,7 +6055,7 @@ void Server::_unlink_local(MDRequestRef& mdr, CDentry *dn, CDentry *straydn)
     if (in->snaprealm || follows + 1 > in->get_oldest_snap())
       in->project_past_snaprealm_parent(straydn->get_dir()->inode->find_snaprealm());
 
-    pi->update_backtrace();
+    pi.inode.update_backtrace();
     le->metablob.add_primary_dentry(straydn, in, true, true);
   } else {
     // remote link.  update remote inode.
@@ -5897,6 +6077,8 @@ void Server::_unlink_local(MDRequestRef& mdr, CDentry *dn, CDentry *straydn)
   if (in->is_dir()) {
     assert(straydn);
     mdcache->project_subtree_rename(in, dn->get_dir(), straydn->get_dir());
+
+    in->maybe_export_pin(true);
   }
 
   journal_and_reply(mdr, 0, dn, le, new C_MDS_unlink_local_finish(this, mdr, dn, straydn));
@@ -5940,7 +6122,8 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
     mdcache->adjust_subtree_after_rename(strayin, dn->get_dir(), true);
 
   // bump pop
-  mds->balancer->hit_dir(mdr->get_mds_stamp(), dn->get_dir(), META_POP_IWR);
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_dir(now, dn->get_dir(), META_POP_IWR);
 
   // reply
   respond_to_request(mdr, 0);
@@ -5972,7 +6155,7 @@ bool Server::_rmdir_prepare_witness(MDRequestRef& mdr, mds_rank_t who, vector<CD
                                               MMDSSlaveRequest::OP_RMDIRPREP);
   req->srcdnpath = filepath(trace.front()->get_dir()->ino());
   for (auto dn : trace)
-    req->srcdnpath.push_dentry(dn->name);
+    req->srcdnpath.push_dentry(dn->get_name());
   mdcache->replicate_stray(straydn, who, req->stray);
 
   req->op_stamp = mdr->get_op_stamp();
@@ -5994,10 +6177,11 @@ struct C_MDS_SlaveRmdirPrep : public ServerLogContext {
 
 struct C_MDS_SlaveRmdirCommit : public ServerContext {
   MDRequestRef mdr;
-  C_MDS_SlaveRmdirCommit(Server *s, MDRequestRef& r)
-    : ServerContext(s), mdr(r) { }
+  CDentry *straydn;
+  C_MDS_SlaveRmdirCommit(Server *s, MDRequestRef& r, CDentry *sd)
+    : ServerContext(s), mdr(r), straydn(sd) { }
   void finish(int r) override {
-    server->_commit_slave_rmdir(mdr, r);
+    server->_commit_slave_rmdir(mdr, r, straydn);
   }
 };
 
@@ -6033,14 +6217,14 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
   rmdir_rollback rollback;
   rollback.reqid = mdr->reqid;
   rollback.src_dir = dn->get_dir()->dirfrag();
-  rollback.src_dname = dn->name;
+  rollback.src_dname = std::string(dn->get_name());
   rollback.dest_dir = straydn->get_dir()->dirfrag();
-  rollback.dest_dname = straydn->name;
+  rollback.dest_dname = std::string(straydn->get_name());
   ::encode(rollback, mdr->more()->rollback_bl);
   dout(20) << " rollback is " << mdr->more()->rollback_bl.length() << " bytes" << dendl;
 
   // set up commit waiter
-  mdr->more()->slave_commit = new C_MDS_SlaveRmdirCommit(this, mdr);
+  mdr->more()->slave_commit = new C_MDS_SlaveRmdirCommit(this, mdr, straydn);
 
   if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
     dout(10) << " no auth subtree in " << *in << ", skipping journal" << dendl;
@@ -6141,11 +6325,17 @@ void Server::handle_slave_rmdir_prep_ack(MDRequestRef& mdr, MMDSSlaveRequest *ac
     dout(10) << "still waiting on slaves " << mdr->more()->waiting_on_slave << dendl;
 }
 
-void Server::_commit_slave_rmdir(MDRequestRef& mdr, int r)
+void Server::_commit_slave_rmdir(MDRequestRef& mdr, int r, CDentry *straydn)
 {
   dout(10) << "_commit_slave_rmdir " << *mdr << " r=" << r << dendl;
   
   if (r == 0) {
+    if (mdr->more()->slave_update_journaled) {
+      CInode *strayin = straydn->get_projected_linkage()->get_inode();
+      if (strayin && !strayin->snaprealm)
+       mdcache->clear_dirty_bits_for_stray(strayin);
+    }
+
     mdr->cleanup();
 
     if (mdr->more()->slave_update_journaled) {
@@ -6369,7 +6559,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     respond_to_request(mdr, -EINVAL);
     return;
   }
-  const string &destname = destpath.last_dentry();
+  boost::string_view destname = destpath.last_dentry();
 
   vector<CDentry*>& srctrace = mdr->dn[1];
   vector<CDentry*>& desttrace = mdr->dn[0];
@@ -6418,25 +6608,30 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     oldin = mdcache->get_dentry_inode(destdn, mdr, true);
     if (!oldin) return;
     dout(10) << " oldin " << *oldin << dendl;
-    
-    // mv /some/thing /to/some/existing_other_thing
-    if (oldin->is_dir() && !srci->is_dir()) {
-      respond_to_request(mdr, -EISDIR);
-      return;
-    }
-    if (!oldin->is_dir() && srci->is_dir()) {
-      respond_to_request(mdr, -ENOTDIR);
-      return;
-    }
 
     // non-empty dir? do trivial fast unlocked check, do another check later with read locks
     if (oldin->is_dir() && _dir_is_nonempty_unlocked(mdr, oldin)) {
       respond_to_request(mdr, -ENOTEMPTY);
       return;
     }
-    if (srci == oldin && !srcdn->get_dir()->inode->is_stray()) {
-      respond_to_request(mdr, 0);  // no-op.  POSIX makes no sense.
-      return;
+
+    // if srcdn is replica, need to make sure its linkage is correct
+    if (srcdn->is_auth() ||
+       srcdn->lock.can_read(mdr->get_client()) ||
+       (srcdn->lock.is_xlocked() && srcdn->lock.get_xlock_by() == mdr)) {
+      // mv /some/thing /to/some/existing_other_thing
+      if (oldin->is_dir() && !srci->is_dir()) {
+       respond_to_request(mdr, -EISDIR);
+       return;
+      }
+      if (!oldin->is_dir() && srci->is_dir()) {
+       respond_to_request(mdr, -ENOTDIR);
+       return;
+      }
+      if (srci == oldin && !srcdn->get_dir()->inode->is_stray()) {
+       respond_to_request(mdr, 0);  // no-op.  POSIX makes no sense.
+       return;
+      }
     }
   }
 
@@ -6469,7 +6664,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   }
 
   // src == dest?
-  if (srcdn->get_dir() == destdir && srcdn->name == destname) {
+  if (srcdn->get_dir() == destdir && srcdn->get_name() == destname) {
     dout(7) << "rename src=dest, noop" << dendl;
     respond_to_request(mdr, 0);
     return;
@@ -6773,9 +6968,10 @@ void Server::_rename_finish(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn,
     assert(g_conf->mds_kill_rename_at != 6);
   
   // bump popularity
-  mds->balancer->hit_dir(mdr->get_mds_stamp(), srcdn->get_dir(), META_POP_IWR);
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_dir(now, srcdn->get_dir(), META_POP_IWR);
   if (destdnl->is_remote() && in->is_auth())
-    mds->balancer->hit_inode(mdr->get_mds_stamp(), in, META_POP_IWR);
+    mds->balancer->hit_inode(now, in, META_POP_IWR);
 
   // did we import srci?  if so, explicitly ack that import that, before we unlock and reply.
 
@@ -6815,12 +7011,14 @@ bool Server::_rename_prepare_witness(MDRequestRef& mdr, mds_rank_t who, set<mds_
 
   req->srcdnpath = filepath(srctrace.front()->get_dir()->ino());
   for (auto dn : srctrace)
-    req->srcdnpath.push_dentry(dn->name);
+    req->srcdnpath.push_dentry(dn->get_name());
   req->destdnpath = filepath(dsttrace.front()->get_dir()->ino());
   for (auto dn : dsttrace)
-    req->destdnpath.push_dentry(dn->name);
+    req->destdnpath.push_dentry(dn->get_name());
   if (straydn)
     mdcache->replicate_stray(straydn, who, req->stray);
+
+  req->srcdn_auth = mdr->more()->srcdn_auth_mds;
   
   // srcdn auth will verify our current witness list is sufficient
   req->witnesses = witnesse;
@@ -6843,10 +7041,10 @@ version_t Server::_rename_prepare_import(MDRequestRef& mdr, CDentry *srcdn, buff
   bufferlist::iterator blp = mdr->more()->inode_import.begin();
          
   // imported caps
-  ::decode(mdr->more()->imported_client_map, blp);
-  ::encode(mdr->more()->imported_client_map, *client_map_bl,
-           mds->mdsmap->get_up_features());
-  prepare_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
+  map<client_t,entity_inst_t> client_map;
+  decode(client_map, blp);
+  prepare_force_open_sessions(client_map, mdr->more()->imported_session_map);
+  encode(client_map, *client_map_bl, mds->mdsmap->get_up_features());
 
   list<ScatterLock*> updated_scatterlocks;
   mdcache->migrator->decode_import_inode(srcdn, blp, srcdn->authority().first, mdr->ls,
@@ -6951,8 +7149,8 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   }
 
   // prepare
-  inode_t *pi = 0;    // renamed inode
-  inode_t *tpi = 0;  // target/overwritten inode
+  CInode::mempool_inode *spi = 0;    // renamed inode
+  CInode::mempool_inode *tpi = 0;  // target/overwritten inode
   
   // target inode
   if (!linkmerge) {
@@ -6960,16 +7158,18 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       assert(straydn);  // moving to straydn.
       // link--, and move.
       if (destdn->is_auth()) {
-       tpi = oldin->project_inode(); //project_snaprealm
-       tpi->version = straydn->pre_dirty(tpi->version);
-       tpi->update_backtrace();
+       auto &pi= oldin->project_inode(); //project_snaprealm
+       pi.inode.version = straydn->pre_dirty(pi.inode.version);
+       pi.inode.update_backtrace();
+        tpi = &pi.inode;
       }
       straydn->push_projected_linkage(oldin);
     } else if (destdnl->is_remote()) {
       // nlink-- targeti
       if (oldin->is_auth()) {
-       tpi = oldin->project_inode();
-       tpi->version = oldin->pre_dirty();
+       auto &pi = oldin->project_inode();
+       pi.inode.version = oldin->pre_dirty();
+        tpi = &pi.inode;
       }
     }
   }
@@ -6983,14 +7183,16 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       destdn->push_projected_linkage(srcdnl->get_remote_ino(), srcdnl->get_remote_d_type());
       // srci
       if (srci->is_auth()) {
-       pi = srci->project_inode();
-       pi->version = srci->pre_dirty();
+       auto &pi = srci->project_inode();
+       pi.inode.version = srci->pre_dirty();
+        spi = &pi.inode;
       }
     } else {
       dout(10) << " will merge remote onto primary link" << dendl;
       if (destdn->is_auth()) {
-       pi = oldin->project_inode();
-       pi->version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldin->inode.version);
+       auto &pi = oldin->project_inode();
+       pi.inode.version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldin->inode.version);
+        spi = &pi.inode;
       }
     }
   } else { // primary
@@ -7014,10 +7216,11 @@ void Server::_rename_prepare(MDRequestRef& mdr,
          dout(10) << " noting renamed dir open frags " << metablob->renamed_dir_frags << dendl;
        }
       }
-      pi = srci->project_inode(); // project snaprealm if srcdnl->is_primary
+      auto &pi = srci->project_inode(); // project snaprealm if srcdnl->is_primary
                                                  // & srcdnl->snaprealm
-      pi->version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldpv);
-      pi->update_backtrace();
+      pi.inode.version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldpv);
+      pi.inode.update_backtrace();
+      spi = &pi.inode;
     }
     destdn->push_projected_linkage(srci);
   }
@@ -7028,16 +7231,20 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   srcdn->push_projected_linkage();  // push null linkage
 
   if (!silent) {
-    if (pi) {
-      pi->ctime = mdr->get_op_stamp();
-      pi->change_attr++;
+    if (spi) {
+      spi->ctime = spi->rstat.rctime = mdr->get_op_stamp();
+      spi->change_attr++;
       if (linkmerge)
-       pi->nlink--;
+       spi->nlink--;
     }
     if (tpi) {
-      tpi->ctime = mdr->get_op_stamp();
+      tpi->ctime = tpi->rstat.rctime = mdr->get_op_stamp();
       tpi->change_attr++;
-      destdn->make_path_string(tpi->stray_prior_path);
+      {
+        std::string t;
+        destdn->make_path_string(t, true);
+        tpi->stray_prior_path = mempool::mds_co::string(boost::string_view(t));
+      }
       tpi->nlink--;
       if (tpi->nlink == 0)
        oldin->state_set(CInode::STATE_ORPHAN);
@@ -7061,9 +7268,11 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   if (destdn->is_auth() && !destdnl->is_null()) {
     mdcache->predirty_journal_parents(mdr, metablob, oldin, destdn->get_dir(),
                                      (destdnl->is_primary() ? PREDIRTY_PRIMARY:0)|predirty_dir, -1);
-    if (destdnl->is_primary())
+    if (destdnl->is_primary()) {
+      assert(straydn);
       mdcache->predirty_journal_parents(mdr, metablob, oldin, straydn->get_dir(),
                                        PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
+    }
   }
   
   // move srcdn
@@ -7082,6 +7291,7 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   // target inode
   if (!linkmerge) {
     if (destdnl->is_primary()) {
+      assert(straydn);
       if (destdn->is_auth()) {
        // project snaprealm, too
        if (oldin->snaprealm || dest_realm->get_newest_seq() + 1 > oldin->get_oldest_snap())
@@ -7175,8 +7385,10 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   if (srcdnl->is_primary() && destdn->is_auth())
     srci->first = destdn->first;  
 
-  if (oldin && oldin->is_dir())
+  if (oldin && oldin->is_dir()) {
+    assert(straydn);
     mdcache->project_subtree_rename(oldin, destdn->get_dir(), straydn->get_dir());
+  }
   if (srci->is_dir())
     mdcache->project_subtree_rename(srci, srcdn->get_dir(), destdn->get_dir());
 
@@ -7192,8 +7404,6 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
   CDentry::linkage_t *destdnl = destdn->get_linkage();
 
   CInode *oldin = destdnl->get_inode();
-  
-  bool imported_inode = false;
 
   // primary+remote link merge?
   bool linkmerge = (srcdnl->get_inode() == destdnl->get_inode() &&
@@ -7204,7 +7414,7 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
     if (destdnl->is_primary()) {
       assert(straydn);
       dout(10) << "straydn is " << *straydn << dendl;
-      destdn->get_dir()->unlink_inode(destdn);
+      destdn->get_dir()->unlink_inode(destdn, false);
 
       straydn->pop_projected_linkage();
       if (mdr->is_slave() && !mdr->more()->slave_update_journaled)
@@ -7223,7 +7433,7 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
        //oldin->open_snaprealm();  might be sufficient..       
       }
     } else if (destdnl->is_remote()) {
-      destdn->get_dir()->unlink_inode(destdn);
+      destdn->get_dir()->unlink_inode(destdn, false);
       if (oldin->is_auth())
        oldin->pop_and_dirty_projected_inode(mdr->ls);
     }
@@ -7257,7 +7467,7 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
   } else { // primary
     if (linkmerge) {
       dout(10) << "merging primary onto remote link" << dendl;
-      destdn->get_dir()->unlink_inode(destdn);
+      destdn->get_dir()->unlink_inode(destdn, false);
     }
     destdnl = destdn->pop_projected_linkage();
     if (mdr->is_slave() && !mdr->more()->slave_update_journaled)
@@ -7270,12 +7480,13 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
       map<client_t,Capability::Import> imported_caps;
       
       // finish cap imports
-      finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
+      finish_force_open_sessions(mdr->more()->imported_session_map);
       if (mdr->more()->cap_imports.count(destdnl->get_inode())) {
        mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(),
-                                                        mdr->more()->srcdn_auth_mds, true,
-                                                        mdr->more()->cap_imports[destdnl->get_inode()],
-                                                        imported_caps);
+                                                   mdr->more()->srcdn_auth_mds, true,
+                                                   mdr->more()->imported_session_map,
+                                                   mdr->more()->cap_imports[destdnl->get_inode()],
+                                                   imported_caps);
       }
 
       mdr->more()->inode_import.clear();
@@ -7293,7 +7504,6 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
       
       // hack: fix auth bit
       in->state_set(CInode::STATE_AUTH);
-      imported_inode = true;
 
       mdr->clear_ambiguous_auth();
     }
@@ -7318,7 +7528,7 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
 
   // update subtree map?
   if (destdnl->is_primary() && in->is_dir()) 
-    mdcache->adjust_subtree_after_rename(in, srcdn->get_dir(), true, imported_inode);
+    mdcache->adjust_subtree_after_rename(in, srcdn->get_dir(), true);
 
   if (straydn && oldin->is_dir())
     mdcache->adjust_subtree_after_rename(oldin, destdn->get_dir(), true);
@@ -7371,7 +7581,17 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
           << " " << mdr->slave_request->srcdnpath 
           << " to " << mdr->slave_request->destdnpath
           << dendl;
-  
+
+  if (mdr->slave_request->is_interrupted()) {
+    dout(10) << " slave request interrupted, sending noop reply" << dendl;
+    MMDSSlaveRequest *reply= new MMDSSlaveRequest(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_RENAMEPREPACK);
+    reply->mark_interrupted();
+    mds->send_message_mds(reply, mdr->slave_to_mds);
+    mdr->slave_request->put();
+    mdr->slave_request = 0;
+    return;
+  }
+
   // discover destdn
   filepath destpath(mdr->slave_request->destdnpath);
   dout(10) << " dest " << destpath << dendl;
@@ -7525,7 +7745,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
   rollback.orig_src.dirfrag = srcdn->get_dir()->dirfrag();
   rollback.orig_src.dirfrag_old_mtime = srcdn->get_dir()->get_projected_fnode()->fragstat.mtime;
   rollback.orig_src.dirfrag_old_rctime = srcdn->get_dir()->get_projected_fnode()->rstat.rctime;
-  rollback.orig_src.dname = srcdn->name;
+  rollback.orig_src.dname = std::string(srcdn->get_name());
   if (srcdnl->is_primary())
     rollback.orig_src.ino = srcdnl->get_inode()->ino();
   else {
@@ -7537,7 +7757,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
   rollback.orig_dest.dirfrag = destdn->get_dir()->dirfrag();
   rollback.orig_dest.dirfrag_old_mtime = destdn->get_dir()->get_projected_fnode()->fragstat.mtime;
   rollback.orig_dest.dirfrag_old_rctime = destdn->get_dir()->get_projected_fnode()->rstat.rctime;
-  rollback.orig_dest.dname = destdn->name;
+  rollback.orig_dest.dname = std::string(destdn->get_name());
   if (destdnl->is_primary())
     rollback.orig_dest.ino = destdnl->get_inode()->ino();
   else if (destdnl->is_remote()) {
@@ -7549,7 +7769,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
     rollback.stray.dirfrag = straydn->get_dir()->dirfrag();
     rollback.stray.dirfrag_old_mtime = straydn->get_dir()->get_projected_fnode()->fragstat.mtime;
     rollback.stray.dirfrag_old_rctime = straydn->get_dir()->get_projected_fnode()->rstat.rctime;
-    rollback.stray.dname = straydn->name;
+    rollback.stray.dname = std::string(straydn->get_name());
   }
   ::encode(rollback, mdr->more()->rollback_bl);
   dout(20) << " rollback is " << mdr->more()->rollback_bl.length() << " bytes" << dendl;
@@ -7634,10 +7854,10 @@ void Server::_logged_slave_rename(MDRequestRef& mdr,
   destdnl = destdn->get_linkage();
 
   // bump popularity
-  mds->balancer->hit_dir(mdr->get_mds_stamp(), srcdn->get_dir(), META_POP_IWR);
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_dir(now, srcdn->get_dir(), META_POP_IWR);
   if (destdnl->get_inode() && destdnl->get_inode()->is_auth())
-    mds->balancer->hit_inode(mdr->get_mds_stamp(), destdnl->get_inode(),
-                            META_POP_IWR);
+    mds->balancer->hit_inode(now, destdnl->get_inode(), META_POP_IWR);
 
   // done.
   mdr->slave_request->put();
@@ -7685,8 +7905,7 @@ void Server::_commit_slave_rename(MDRequestRef& mdr, int r,
       ::decode(peer_imported, bp);
 
       dout(10) << " finishing inode export on " << *destdnl->get_inode() << dendl;
-      mdcache->migrator->finish_export_inode(destdnl->get_inode(),
-                                            mdr->get_mds_stamp(),
+      mdcache->migrator->finish_export_inode(destdnl->get_inode(), ceph_clock_now(),
                                             mdr->slave_to_mds, peer_imported, finished);
       mds->queue_waiters(finished);   // this includes SINGLEAUTH waiters.
 
@@ -7701,6 +7920,11 @@ void Server::_commit_slave_rename(MDRequestRef& mdr, int r,
       mdr->more()->is_ambiguous_auth = false;
     }
 
+    if (straydn && mdr->more()->slave_update_journaled) {
+      CInode *strayin = straydn->get_projected_linkage()->get_inode();
+      if (strayin && !strayin->snaprealm)
+       mdcache->clear_dirty_bits_for_stray(strayin);
+    }
 
     mds->queue_waiters(finished);
     mdr->cleanup();
@@ -7896,22 +8120,23 @@ void Server::do_rename_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef
                                    rollback.orig_src.remote_d_type);
   }
 
-  inode_t *pi = 0;
+  CInode::mempool_inode *pip = 0;
   if (in) {
     if (in->authority().first == whoami) {
-      pi = in->project_inode();
+      auto &pi = in->project_inode();
       mut->add_projected_inode(in);
-      pi->version = in->pre_dirty();
+      pi.inode.version = in->pre_dirty();
+      pip = &pi.inode;
     } else
-      pi = in->get_projected_inode();
-    if (pi->ctime == rollback.ctime)
-      pi->ctime = rollback.orig_src.old_ctime;
+      pip = in->get_projected_inode();
+    if (pip->ctime == rollback.ctime)
+      pip->ctime = pip->rstat.rctime = rollback.orig_src.old_ctime;
   }
 
   if (srcdn && srcdn->authority().first == whoami) {
     nest_info_t blah;
     _rollback_repair_dir(mut, srcdir, rollback.orig_src, rollback.ctime,
-                        in ? in->is_dir() : false, 1, pi ? pi->accounted_rstat : blah);
+                        in ? in->is_dir() : false, 1, pip ? pip->accounted_rstat : blah);
   }
 
   // repair dest
@@ -7933,15 +8158,16 @@ void Server::do_rename_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef
     straydn->push_projected_linkage();
 
   if (target) {
-    inode_t *ti = NULL;
+    CInode::mempool_inode *ti = NULL;
     if (target->authority().first == whoami) {
-      ti = target->project_inode();
+      auto &pi = target->project_inode();
       mut->add_projected_inode(target);
-      ti->version = target->pre_dirty();
+      pi.inode.version = target->pre_dirty();
+      ti = &pi.inode;
     } else 
       ti = target->get_projected_inode();
     if (ti->ctime == rollback.ctime)
-      ti->ctime = rollback.orig_dest.old_ctime;
+      ti->ctime = ti->rstat.rctime = rollback.orig_dest.old_ctime;
     if (MDS_INO_IS_STRAY(rollback.orig_src.dirfrag.ino)) {
       if (MDS_INO_IS_STRAY(rollback.orig_dest.dirfrag.ino))
        assert(!rollback.orig_dest.ino && !rollback.orig_dest.remote_ino);
@@ -8129,7 +8355,9 @@ void Server::handle_slave_rename_prep_ack(MDRequestRef& mdr, MMDSSlaveRequest *a
 
   // witnessed?  or add extra witnesses?
   assert(mdr->more()->witnessed.count(from) == 0);
-  if (ack->witnesses.empty()) {
+  if (ack->is_interrupted()) {
+    dout(10) << " slave request interrupted, noop" << dendl;
+  } else if (ack->witnesses.empty()) {
     mdr->more()->witnessed.insert(from);
     if (!ack->is_not_journaled())
       mdr->more()->has_journaled_slaves = true;
@@ -8253,9 +8481,9 @@ void Server::handle_client_lssnap(MDRequestRef& mdr)
     // actual
     string snap_name;
     if (p->second->ino == diri->ino())
-      snap_name = p->second->name;
+      snap_name = std::string(p->second->name);
     else
-      snap_name = p->second->get_long_name();
+      snap_name = std::string(p->second->get_long_name());
 
     unsigned start_len = dnbl.length();
     if (int(start_len + snap_name.length() + sizeof(__u32) + sizeof(LeaseStat)) > max_bytes)
@@ -8334,7 +8562,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     return;
   }
   
-  const string &snapname = req->get_filepath().last_dentry();
+  boost::string_view snapname = req->get_filepath().last_dentry();
 
   if (mdr->client_request->get_caller_uid() < g_conf->mds_snap_min_uid || mdr->client_request->get_caller_uid() > g_conf->mds_snap_max_uid) {
     dout(20) << "mksnap " << snapname << " on " << *diri << " denied to uid " << mdr->client_request->get_caller_uid() << dendl;
@@ -8389,18 +8617,21 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
   SnapInfo info;
   info.ino = diri->ino();
   info.snapid = snapid;
-  info.name = snapname;
+  info.name = std::string(snapname);
   info.stamp = mdr->get_op_stamp();
 
-  inode_t *pi = diri->project_inode();
-  pi->ctime = info.stamp;
-  pi->version = diri->pre_dirty();
+  auto &pi = diri->project_inode(false, true);
+  pi.inode.ctime = pi.inode.rstat.rctime = info.stamp;
+  pi.inode.version = diri->pre_dirty();
 
   // project the snaprealm
-  sr_t *newsnap = diri->project_snaprealm(snapid);
-  newsnap->snaps[snapid] = info;
-  newsnap->seq = snapid;
-  newsnap->last_created = snapid;
+  auto &newsnap = *pi.snapnode;
+  newsnap.created = snapid;
+  auto em = newsnap.snaps.emplace(std::piecewise_construct, std::forward_as_tuple(snapid), std::forward_as_tuple(info));
+  if (!em.second)
+    em.first->second = info;
+  newsnap.seq = snapid;
+  newsnap.last_created = snapid;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -8473,7 +8704,7 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
     return;
   }
 
-  const string &snapname = req->get_filepath().last_dentry();
+  boost::string_view snapname = req->get_filepath().last_dentry();
 
   if (mdr->client_request->get_caller_uid() < g_conf->mds_snap_min_uid || mdr->client_request->get_caller_uid() > g_conf->mds_snap_max_uid) {
     dout(20) << "rmsnap " << snapname << " on " << *diri << " denied to uid " << mdr->client_request->get_caller_uid() << dendl;
@@ -8520,19 +8751,19 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
 
   // journal
-  inode_t *pi = diri->project_inode();
-  pi->version = diri->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
+  auto &pi = diri->project_inode(false, true);
+  pi.inode.version = diri->pre_dirty();
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
   
   mdr->ls = mdlog->get_current_segment();
   EUpdate *le = new EUpdate(mdlog, "rmsnap");
   mdlog->start_entry(le);
   
   // project the snaprealm
-  sr_t *newnode = diri->project_snaprealm();
-  newnode->snaps.erase(snapid);
-  newnode->seq = seq;
-  newnode->last_destroyed = seq;
+  auto &newnode = *pi.snapnode;
+  newnode.snaps.erase(snapid);
+  newnode.seq = seq;
+  newnode.last_destroyed = seq;
 
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
   le->metablob.add_table_transaction(TABLE_SNAP, stid);
@@ -8611,8 +8842,8 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
     return;
   }
 
-  const string &dstname = req->get_filepath().last_dentry();
-  const string &srcname = req->get_filepath2().last_dentry();
+  boost::string_view dstname = req->get_filepath().last_dentry();
+  boost::string_view srcname = req->get_filepath2().last_dentry();
   dout(10) << "renamesnap " << srcname << "->" << dstname << " on " << *diri << dendl;
 
   if (srcname.length() == 0 || srcname[0] == '_') {
@@ -8663,14 +8894,15 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
 
   // journal
-  inode_t *pi = diri->project_inode();
-  pi->ctime = mdr->get_op_stamp();
-  pi->version = diri->pre_dirty();
+  auto &pi = diri->project_inode(false, true);
+  pi.inode.ctime = pi.inode.rstat.rctime = mdr->get_op_stamp();
+  pi.inode.version = diri->pre_dirty();
 
   // project the snaprealm
-  sr_t *newsnap = diri->project_snaprealm();
-  assert(newsnap->snaps.count(snapid));
-  newsnap->snaps[snapid].name = dstname;
+  auto &newsnap = *pi.snapnode;
+  auto it = newsnap.snaps.find(snapid);
+  assert(it != newsnap.snaps.end());
+  it->second.name = std::string(dstname);
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();