]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
update sources to v12.2.5
[ceph.git] / ceph / src / mds / Server.cc
index cf32c95f6864f4a9e3b59fba72a5a0a905043c49..986f6e6b876c474755c832f6e2fb0aaacc8a9c0d 100644 (file)
@@ -61,6 +61,7 @@
 
 #include <list>
 #include <iostream>
+#include <boost/utility/string_view.hpp>
 using namespace std;
 
 #include "common/config.h"
@@ -70,7 +71,6 @@ using namespace std;
 #undef dout_prefix
 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".server "
 
-
 class ServerContext : public MDSInternalContextBase {
   protected:
   Server *server;
@@ -201,12 +201,11 @@ void Server::dispatch(Message *m)
   }
 
   // active?
-  if (!mds->is_active() &&
-      !(mds->is_stopping() && m->get_source().is_mds())) {
-    if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
-       (mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT)) {
-      MClientRequest *req = static_cast<MClientRequest*>(m);
-      Session *session = get_session(req);
+  // handle_slave_request()/handle_client_session() will wait if necessary
+  if (m->get_type() == CEPH_MSG_CLIENT_REQUEST && !mds->is_active()) {
+    MClientRequest *req = static_cast<MClientRequest*>(m);
+    if (mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) {
+      Session *session = mds->get_session(req);
       if (!session || session->is_closed()) {
        dout(5) << "session is closed, dropping " << req->get_reqid() << dendl;
        req->put();
@@ -236,20 +235,12 @@ void Server::dispatch(Message *m)
     }
 
     bool wait_for_active = true;
-    if (m->get_type() == MSG_MDS_SLAVE_REQUEST) {
-      // handle_slave_request() will wait if necessary
-      wait_for_active = false;
+    if (mds->is_stopping()) {
+      if (m->get_source().is_mds())
+       wait_for_active = false;
     } else if (mds->is_clientreplay()) {
-      // session open requests need to be handled during replay,
-      // close requests need to be delayed
-      if ((m->get_type() == CEPH_MSG_CLIENT_SESSION &&
-         (static_cast<MClientSession*>(m))->get_op() != CEPH_SESSION_REQUEST_CLOSE)) {
+      if (req->is_queued_for_replay()) {
        wait_for_active = false;
-      } else if (m->get_type() == CEPH_MSG_CLIENT_REQUEST) {
-       MClientRequest *req = static_cast<MClientRequest*>(m);
-       if (req->is_queued_for_replay()) {
-         wait_for_active = false;
-       }
       }
     }
     if (wait_for_active) {
@@ -302,25 +293,12 @@ public:
   }
 };
 
-Session *Server::get_session(Message *m)
-{
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  if (session) {
-    dout(20) << "get_session have " << session << " " << session->info.inst
-            << " state " << session->get_state_name() << dendl;
-    session->put();  // not carry ref
-  } else {
-    dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
-  }
-  return session;
-}
-
 /* This function DOES put the passed message before returning*/
 void Server::handle_client_session(MClientSession *m)
 {
   version_t pv;
   bool blacklisted = false;
-  Session *session = get_session(m);
+  Session *session = mds->get_session(m);
 
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
   assert(m->get_source().is_client()); // should _not_ come from an mds!
@@ -331,6 +309,21 @@ void Server::handle_client_session(MClientSession *m)
     return;
   }
 
+  if (m->get_op() == CEPH_SESSION_REQUEST_RENEWCAPS) {
+    // always handle renewcaps (state >= MDSMap::STATE_RECONNECT)
+  } else if (m->get_op() == CEPH_SESSION_REQUEST_CLOSE) {
+    // close requests need to be handled when mds is active
+    if (mds->get_state() < MDSMap::STATE_ACTIVE) {
+      mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
+      return;
+    }
+  } else {
+    if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
+      mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+      return;
+    }
+  }
+
   if (logger)
     logger->inc(l_mdss_handle_client_session);
 
@@ -342,12 +335,21 @@ void Server::handle_client_session(MClientSession *m)
        session->is_stale() ||
        session->is_killing()) {
       dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+      // set client metadata for session opened by prepare_force_open_sessions
+      if (!m->client_meta.empty())
+       session->set_client_metadata(m->client_meta);
       m->put();
       return;
     }
     assert(session->is_closed() ||
           session->is_closing());
 
+    if (mds->is_stopping()) {
+      dout(10) << "mds is stopping, dropping open req" << dendl;
+      m->put();
+      return;
+    }
+
     blacklisted = mds->objecter->with_osdmap(
         [session](const OSDMap &osd_map) -> bool {
           return osd_map.is_blacklisted(session->info.inst.addr);
@@ -456,7 +458,8 @@ void Server::handle_client_session(MClientSession *m)
     break;
 
   case CEPH_SESSION_REQUEST_FLUSH_MDLOG:
-    mdlog->flush();
+    if (mds->is_active())
+      mdlog->flush();
     break;
 
   default:
@@ -561,7 +564,8 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
     } else if (session->is_killing()) {
       // destroy session, close connection
       if (session->connection != NULL) {
-        session->connection->mark_down();
+       session->connection->mark_down();
+       session->connection->set_priv(NULL);
       }
       mds->sessionmap.remove_session(session);
     } else {
@@ -619,9 +623,7 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
           << " initial v " << mds->sessionmap.get_version() << dendl;
   
 
-  int sessions_inserted = 0;
   for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
-    sessions_inserted++;
 
     Session *session = mds->sessionmap.get_session(p->second.name);
     assert(session);
@@ -881,7 +883,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
 {
   dout(7) << "handle_client_reconnect " << m->get_source() << dendl;
   client_t from = m->get_source().num();
-  Session *session = get_session(m);
+  Session *session = mds->get_session(m);
   assert(session);
 
   if (!mds->is_reconnect() && mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) {
@@ -895,7 +897,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
   dout(10) << " reconnect_start " << reconnect_start << " delay " << delay << dendl;
 
   bool deny = false;
-  if (!mds->is_reconnect()) {
+  if (!mds->is_reconnect() || mds->get_want_state() != CEPH_MDS_STATE_RECONNECT || reconnect_evicting) {
     // XXX maybe in the future we can do better than this?
     dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
     mds->clog->info() << "denied reconnect attempt (mds is "
@@ -1083,10 +1085,22 @@ void Server::recover_filelocks(CInode *in, bufferlist locks, int64_t client)
  * to trim some caps, and consequently unpin some inodes in the MDCache so
  * that it can trim too.
  */
-void Server::recall_client_state(float ratio)
+void Server::recall_client_state(void)
 {
-  int max_caps_per_client = (int)(g_conf->mds_cache_size * .8);
-  int min_caps_per_client = 100;
+  /* try to recall at least 80% of all caps */
+  uint64_t max_caps_per_client = Capability::count() * g_conf->get_val<double>("mds_max_ratio_caps_per_client");
+  uint64_t min_caps_per_client = g_conf->get_val<uint64_t>("mds_min_caps_per_client");
+  if (max_caps_per_client < min_caps_per_client) {
+    dout(0) << "max_caps_per_client " << max_caps_per_client
+            << " < min_caps_per_client " << min_caps_per_client << dendl;
+    max_caps_per_client = min_caps_per_client + 1;
+  }
+
+  /* unless this ratio is smaller: */
+  /* ratio: determine the amount of caps to recall from each client. Use
+   * percentage full over the cache reservation. Cap the ratio at 80% of client
+   * caps. */
+  double ratio = 1.0-fmin(0.80, mdcache->cache_toofull_ratio());
 
   dout(10) << "recall_client_state " << ratio
           << ", caps per client " << min_caps_per_client << "-" << max_caps_per_client
@@ -1094,10 +1108,7 @@ void Server::recall_client_state(float ratio)
 
   set<Session*> sessions;
   mds->sessionmap.get_client_session_set(sessions);
-  for (set<Session*>::const_iterator p = sessions.begin();
-       p != sessions.end();
-       ++p) {
-    Session *session = *p;
+  for (auto &session : sessions) {
     if (!session->is_open() ||
        !session->info.inst.name.is_client())
       continue;
@@ -1107,14 +1118,12 @@ void Server::recall_client_state(float ratio)
             << ", leases " << session->leases.size()
             << dendl;
 
-    if (session->caps.size() > min_caps_per_client) {  
-      int newlim = MIN((int)(session->caps.size() * ratio), max_caps_per_client);
-      if (session->caps.size() > newlim) {
-          MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
-          m->head.max_caps = newlim;
-          mds->send_message_client(m, session);
-          session->notify_recall_sent(newlim);
-      }
+    uint64_t newlim = MAX(MIN((session->caps.size() * ratio), max_caps_per_client), min_caps_per_client);
+    if (session->caps.size() > newlim) {
+      MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
+      m->head.max_caps = newlim;
+      mds->send_message_client(m, session);
+      session->notify_recall_sent(newlim);
     }
   }
 }
@@ -1165,7 +1174,7 @@ void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEv
       mdlog->flush();
     }
   } else if (mdr->did_early_reply)
-    mds->locker->drop_rdlocks(mdr.get());
+    mds->locker->drop_rdlocks_for_early_reply(mdr.get());
   else
     mdlog->flush();
 }
@@ -1289,6 +1298,11 @@ void Server::early_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn)
   if (!g_conf->mds_early_reply)
     return;
 
+  if (mdr->no_early_reply) {
+    dout(10) << "early_reply - flag no_early_reply is set, not allowed." << dendl;
+    return;
+  }
+
   if (mdr->has_more() && mdr->more()->has_journaled_slaves) {
     dout(10) << "early_reply - there are journaled slaves, not allowed." << dendl;
     return; 
@@ -1587,7 +1601,7 @@ void Server::handle_client_request(MClientRequest *req)
   // active session?
   Session *session = 0;
   if (req->get_source().is_client()) {
-    session = get_session(req);
+    session = mds->get_session(req);
     if (!session) {
       dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
     } else if (session->is_closed() ||
@@ -1709,7 +1723,8 @@ void Server::handle_osd_map()
    * using osdmap_full_flag(), because we want to know "is the flag set"
    * rather than "does the flag apply to us?" */
   mds->objecter->with_osdmap([this](const OSDMap& o) {
-      is_full = o.test_flag(CEPH_OSDMAP_FULL);
+      auto pi = o.get_pg_pool(mds->mdsmap->get_metadata_pool());
+      is_full = pi && pi->has_flag(pg_pool_t::FLAG_FULL);
       dout(7) << __func__ << ": full = " << is_full << " epoch = "
              << o.get_epoch() << dendl;
     });
@@ -1723,6 +1738,10 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
   if (mdr->killed) {
     dout(10) << "request " << *mdr << " was killed" << dendl;
     return;
+  } else if (mdr->aborted) {
+    mdr->aborted = false;
+    mdcache->request_kill(mdr);
+    return;
   }
 
   MClientRequest *req = mdr->client_request;
@@ -2439,7 +2458,7 @@ bool Server::check_fragment_space(MDRequestRef &mdr, CDir *in)
  * verify that the dir exists and would own the dname.
  * do not check if the dentry exists.
  */
-CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, const string& dname)
+CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, boost::string_view dname)
 {
   // make sure parent is a dir?
   if (!diri->is_dir()) {
@@ -2469,7 +2488,7 @@ CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, const string&
  * prepare a null (or existing) dentry in given dir. 
  * wait for any dn lock.
  */
-CDentry* Server::prepare_null_dentry(MDRequestRef& mdr, CDir *dir, const string& dname, bool okexist)
+CDentry* Server::prepare_null_dentry(MDRequestRef& mdr, CDir *dir, boost::string_view dname, bool okexist)
 {
   dout(10) << "prepare_null_dentry " << dname << " in " << *dir << dendl;
   assert(dir->is_auth());
@@ -2633,11 +2652,13 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
     bufferlist::iterator p = req->get_data().begin();
 
     // xattrs on new inode?
-    map<string,bufferptr> xattrs;
+    CInode::mempool_xattr_map xattrs;
     ::decode(xattrs, p);
-    for (map<string,bufferptr>::iterator p = xattrs.begin(); p != xattrs.end(); ++p) {
-      dout(10) << "prepare_new_inode setting xattr " << p->first << dendl;
-      in->xattrs[p->first] = p->second;
+    for (const auto &p : xattrs) {
+      dout(10) << "prepare_new_inode setting xattr " << p.first << dendl;
+      auto em = in->xattrs.emplace(std::piecewise_construct, std::forward_as_tuple(p.first), std::forward_as_tuple(p.second));
+      if (!em.second)
+        em.first->second = p.second;
     }
   }
 
@@ -2865,7 +2886,7 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
   }
 
   // make a null dentry?
-  const string &dname = refpath.last_dentry();
+  boost::string_view dname = refpath.last_dentry();
   CDentry *dn;
   if (mustexist) {
     dn = dir->lookup(dname);
@@ -3002,10 +3023,27 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
     issued = cap->issued();
 
   int mask = req->head.args.getattr.mask;
-  if ((mask & CEPH_CAP_LINK_SHARED) && (issued & CEPH_CAP_LINK_EXCL) == 0) rdlocks.insert(&ref->linklock);
-  if ((mask & CEPH_CAP_AUTH_SHARED) && (issued & CEPH_CAP_AUTH_EXCL) == 0) rdlocks.insert(&ref->authlock);
-  if ((mask & CEPH_CAP_FILE_SHARED) && (issued & CEPH_CAP_FILE_EXCL) == 0) rdlocks.insert(&ref->filelock);
-  if ((mask & CEPH_CAP_XATTR_SHARED) && (issued & CEPH_CAP_XATTR_EXCL) == 0) rdlocks.insert(&ref->xattrlock);
+  if ((mask & CEPH_CAP_LINK_SHARED) && !(issued & CEPH_CAP_LINK_EXCL))
+    rdlocks.insert(&ref->linklock);
+  if ((mask & CEPH_CAP_AUTH_SHARED) && !(issued & CEPH_CAP_AUTH_EXCL))
+    rdlocks.insert(&ref->authlock);
+  if ((mask & CEPH_CAP_XATTR_SHARED) && !(issued & CEPH_CAP_XATTR_EXCL))
+    rdlocks.insert(&ref->xattrlock);
+  if ((mask & CEPH_CAP_FILE_SHARED) && !(issued & CEPH_CAP_FILE_EXCL)) {
+    // Don't wait on unstable filelock if client is allowed to read file size.
+    // This can reduce the response time of getattr in the case that multiple
+    // clients do stat(2) and there are writers.
+    // The downside of this optimization is that mds may not issue Fs caps along
+    // with getattr reply. Client may need to send more getattr requests.
+    if (mdr->rdlocks.count(&ref->filelock)) {
+      rdlocks.insert(&ref->filelock);
+    } else if (ref->filelock.is_stable() ||
+              ref->filelock.get_num_wrlocks() > 0 ||
+              !ref->filelock.can_read(mdr->get_client())) {
+      rdlocks.insert(&ref->filelock);
+      mdr->done_locking = false;
+    }
+  }
 
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
@@ -3095,9 +3133,11 @@ void Server::handle_client_lookup_ino(MDRequestRef& mdr,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    // need read access to directory inode
-    if (!check_access(mdr, diri, MAY_READ))
-      return;
+    if (diri != NULL) {
+      // need read access to directory inode
+      if (!check_access(mdr, diri, MAY_READ))
+        return;
+    }
   }
 
   if (want_parent) {
@@ -3165,7 +3205,8 @@ void Server::handle_client_open(MDRequestRef& mdr)
     return;
   }
   
-  bool need_auth = !file_mode_is_readonly(cmode) || (flags & CEPH_O_TRUNC);
+  bool need_auth = !file_mode_is_readonly(cmode) ||
+                  (flags & (CEPH_O_TRUNC | CEPH_O_DIRECTORY));
 
   if ((cmode & CEPH_FILE_MODE_WR) && mdcache->is_readonly()) {
     dout(7) << "read-only FS" << dendl;
@@ -3260,7 +3301,7 @@ void Server::handle_client_open(MDRequestRef& mdr)
       return;
 
     // wait for pending truncate?
-    const inode_t *pi = cur->get_projected_inode();
+    const auto pi = cur->get_projected_inode();
     if (pi->is_truncating()) {
       dout(10) << " waiting for pending truncate from " << pi->truncate_from
               << " to " << pi->truncate_size << " to complete on " << *cur << dendl;
@@ -3667,12 +3708,11 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   bufferlist dnbl;
   __u32 numfiles = 0;
   bool start = !offset_hash && offset_str.empty();
-  bool end = (dir->begin() == dir->end());
   // skip all dns < dentry_key_t(snapid, offset_str, offset_hash)
   dentry_key_t skip_key(snapid, offset_str.c_str(), offset_hash);
-  for (CDir::map_t::iterator it = start ? dir->begin() : dir->lower_bound(skip_key);
-       !end && numfiles < max;
-       end = (it == dir->end())) {
+  auto it = start ? dir->begin() : dir->lower_bound(skip_key);
+  bool end = (it == dir->end());
+  for (; !end && numfiles < max; end = (it == dir->end())) {
     CDentry *dn = it->second;
     ++it;
 
@@ -3712,9 +3752,10 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
        continue;
       } else {
        // touch everything i _do_ have
-       for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p)
-         if (!p->second->get_linkage()->is_null())
-           mdcache->lru.lru_touch(p->second);
+       for (auto &p : *dir) {
+         if (!p.second->get_linkage()->is_null())
+           mdcache->lru.lru_touch(p.second);
+        }
 
        // already issued caps and leases, reply immediately.
        if (dnbl.length() > 0) {
@@ -3732,7 +3773,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     }
     assert(in);
 
-    if ((int)(dnbl.length() + dn->name.length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
+    if ((int)(dnbl.length() + dn->get_name().length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
       dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
       break;
     }
@@ -3741,7 +3782,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
 
     // dentry
     dout(12) << "including    dn " << *dn << dendl;
-    ::encode(dn->name, dnbl);
+    ::encode(dn->get_name(), dnbl);
     mds->locker->issue_client_lease(dn, client, dnbl, now, mdr->session);
 
     // inode
@@ -4026,9 +4067,9 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     return;
 
   // trunc from bigger -> smaller?
-  inode_t *pi = cur->get_projected_inode();
+  auto pip = cur->get_projected_inode();
 
-  uint64_t old_size = MAX(pi->size, req->head.args.setattr.old_size);
+  uint64_t old_size = std::max<uint64_t>(pip->size, req->head.args.setattr.old_size);
 
   // ENOSPC on growing file while full, but allow shrinks
   if (is_full && req->head.args.setattr.size > old_size) {
@@ -4040,9 +4081,9 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   bool truncating_smaller = false;
   if (mask & CEPH_SETATTR_SIZE) {
     truncating_smaller = req->head.args.setattr.size < old_size;
-    if (truncating_smaller && pi->is_truncating()) {
-      dout(10) << " waiting for pending truncate from " << pi->truncate_from
-              << " to " << pi->truncate_size << " to complete on " << *cur << dendl;
+    if (truncating_smaller && pip->is_truncating()) {
+      dout(10) << " waiting for pending truncate from " << pip->truncate_from
+              << " to " << pip->truncate_size << " to complete on " << *cur << dendl;
       mds->locker->drop_locks(mdr.get());
       mdr->drop_local_auth_pins();
       cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
@@ -4057,54 +4098,53 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   EUpdate *le = new EUpdate(mdlog, "setattr");
   mdlog->start_entry(le);
 
-  pi = cur->project_inode();
+  auto &pi = cur->project_inode();
 
   if (mask & CEPH_SETATTR_UID)
-    pi->uid = req->head.args.setattr.uid;
+    pi.inode.uid = req->head.args.setattr.uid;
   if (mask & CEPH_SETATTR_GID)
-    pi->gid = req->head.args.setattr.gid;
+    pi.inode.gid = req->head.args.setattr.gid;
 
   if (mask & CEPH_SETATTR_MODE)
-    pi->mode = (pi->mode & ~07777) | (req->head.args.setattr.mode & 07777);
+    pi.inode.mode = (pi.inode.mode & ~07777) | (req->head.args.setattr.mode & 07777);
   else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
-           S_ISREG(pi->mode)) {
-    pi->mode &= ~S_ISUID;
-    if ((pi->mode & (S_ISGID|S_IXGRP)) == (S_ISGID|S_IXGRP))
-      pi->mode &= ~S_ISGID;
+           S_ISREG(pi.inode.mode) &&
+            (pi.inode.mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+    pi.inode.mode &= ~(S_ISUID|S_ISGID);
   }
 
   if (mask & CEPH_SETATTR_MTIME)
-    pi->mtime = req->head.args.setattr.mtime;
+    pi.inode.mtime = req->head.args.setattr.mtime;
   if (mask & CEPH_SETATTR_ATIME)
-    pi->atime = req->head.args.setattr.atime;
+    pi.inode.atime = req->head.args.setattr.atime;
   if (mask & CEPH_SETATTR_BTIME)
-    pi->btime = req->head.args.setattr.btime;
+    pi.inode.btime = req->head.args.setattr.btime;
   if (mask & (CEPH_SETATTR_ATIME | CEPH_SETATTR_MTIME | CEPH_SETATTR_BTIME))
-    pi->time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
+    pi.inode.time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
   if (mask & CEPH_SETATTR_SIZE) {
     if (truncating_smaller) {
-      pi->truncate(old_size, req->head.args.setattr.size);
+      pi.inode.truncate(old_size, req->head.args.setattr.size);
       le->metablob.add_truncate_start(cur->ino());
     } else {
-      pi->size = req->head.args.setattr.size;
-      pi->rstat.rbytes = pi->size;
+      pi.inode.size = req->head.args.setattr.size;
+      pi.inode.rstat.rbytes = pi.inode.size;
     }
-    pi->mtime = mdr->get_op_stamp();
+    pi.inode.mtime = mdr->get_op_stamp();
 
     // adjust client's max_size?
-    map<client_t,client_writeable_range_t> new_ranges;
+    CInode::mempool_inode::client_range_map new_ranges;
     bool max_increased = false;
-    mds->locker->calc_new_client_ranges(cur, pi->size, &new_ranges, &max_increased);
-    if (pi->client_ranges != new_ranges) {
-      dout(10) << " client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
-      pi->client_ranges = new_ranges;
+    mds->locker->calc_new_client_ranges(cur, pi.inode.size, &new_ranges, &max_increased);
+    if (pi.inode.client_ranges != new_ranges) {
+      dout(10) << " client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
+      pi.inode.client_ranges = new_ranges;
       changed_ranges = true;
     }
   }
 
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
 
   // log + wait
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
@@ -4137,22 +4177,22 @@ void Server::do_open_truncate(MDRequestRef& mdr, int cmode)
   mdlog->start_entry(le);
 
   // prepare
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
-  pi->mtime = pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
+  pi.inode.mtime = pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
 
-  uint64_t old_size = MAX(pi->size, mdr->client_request->head.args.open.old_size);
+  uint64_t old_size = std::max<uint64_t>(pi.inode.size, mdr->client_request->head.args.open.old_size);
   if (old_size > 0) {
-    pi->truncate(old_size, 0);
+    pi.inode.truncate(old_size, 0);
     le->metablob.add_truncate_start(in->ino());
   }
 
   bool changed_ranges = false;
   if (cmode & CEPH_FILE_MODE_WR) {
-    pi->client_ranges[client].range.first = 0;
-    pi->client_ranges[client].range.last = pi->get_layout_size_increment();
-    pi->client_ranges[client].follows = in->find_snaprealm()->get_newest_seq();
+    pi.inode.client_ranges[client].range.first = 0;
+    pi.inode.client_ranges[client].range.last = pi.inode.get_layout_size_increment();
+    pi.inode.client_ranges[client].follows = in->find_snaprealm()->get_newest_seq();
     changed_ranges = true;
   }
   
@@ -4252,13 +4292,13 @@ void Server::handle_client_setlayout(MDRequestRef& mdr)
     return;
 
   // project update
-  inode_t *pi = cur->project_inode();
-  pi->layout = layout;
+  auto &pi = cur->project_inode();
+  pi.inode.layout = layout;
   // add the old pool to the inode
-  pi->add_old_pool(old_layout.pool_id);
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
+  pi.inode.add_old_pool(old_layout.pool_id);
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
   
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4294,7 +4334,7 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
     return;
 
   // validate layout
-  const inode_t *old_pi = cur->get_projected_inode();
+  const auto old_pi = cur->get_projected_inode();
   file_layout_t layout;
   if (old_pi->has_layout())
     layout = old_pi->layout;
@@ -4341,9 +4381,9 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
   if (!check_access(mdr, cur, access))
     return;
 
-  inode_t *pi = cur->project_inode();
-  pi->layout = layout;
-  pi->version = cur->pre_dirty();
+  auto &pi = cur->project_inode();
+  pi.inode.layout = layout;
+  pi.inode.version = cur->pre_dirty();
 
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4353,6 +4393,7 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
   mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY);
   mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur);
 
+  mdr->no_early_reply = true;
   journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur));
 }
 
@@ -4553,7 +4594,7 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
            << " bytes on " << *cur
            << dendl;
 
-  inode_t *pi = NULL;
+  CInode::mempool_inode *pip = nullptr;
   string rest;
 
   if (!check_access(mdr, cur, MAY_SET_VXATTR)) {
@@ -4582,8 +4623,10 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
-    pi->layout = layout;
+    auto &pi = cur->project_inode();
+    pi.inode.layout = layout;
+    mdr->no_early_reply = true;
+    pip = &pi.inode;
   } else if (name.compare(0, 16, "ceph.file.layout") == 0) {
     if (!cur->is_file()) {
       respond_to_request(mdr, -EINVAL);
@@ -4603,11 +4646,12 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
-    int64_t old_pool = pi->layout.pool_id;
-    pi->add_old_pool(old_pool);
-    pi->layout = layout;
-    pi->ctime = mdr->get_op_stamp();
+    auto &pi = cur->project_inode();
+    int64_t old_pool = pi.inode.layout.pool_id;
+    pi.inode.add_old_pool(old_pool);
+    pi.inode.layout = layout;
+    pi.inode.ctime = mdr->get_op_stamp();
+    pip = &pi.inode;
   } else if (name.compare(0, 10, "ceph.quota") == 0) { 
     if (!cur->is_dir() || cur->is_root()) {
       respond_to_request(mdr, -EINVAL);
@@ -4627,8 +4671,11 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
-    pi->quota = quota;
+    auto &pi = cur->project_inode();
+    pi.inode.quota = quota;
+
+    mdr->no_early_reply = true;
+    pip = &pi.inode;
   } else if (name.find("ceph.dir.pin") == 0) {
     if (!cur->is_dir() || cur->is_root()) {
       respond_to_request(mdr, -EINVAL);
@@ -4649,19 +4696,20 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    pi = cur->project_inode();
+    auto &pi = cur->project_inode();
     cur->set_export_pin(rank);
+    pip = &pi.inode;
   } else {
     dout(10) << " unknown vxattr " << name << dendl;
     respond_to_request(mdr, -EINVAL);
     return;
   }
 
-  pi->change_attr++;
-  pi->ctime = mdr->get_op_stamp();
-  pi->version = cur->pre_dirty();
+  pip->change_attr++;
+  pip->ctime = mdr->get_op_stamp();
+  pip->version = cur->pre_dirty();
   if (cur->is_file())
-    pi->update_backtrace();
+    pip->update_backtrace();
 
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -4706,9 +4754,9 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
 
-    inode_t *pi = cur->project_inode();
-    pi->clear_layout();
-    pi->version = cur->pre_dirty();
+    auto &pi = cur->project_inode();
+    pi.inode.clear_layout();
+    pi.inode.version = cur->pre_dirty();
 
     // log + wait
     mdr->ls = mdlog->get_current_segment();
@@ -4718,6 +4766,7 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
     mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY);
     mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur);
 
+    mdr->no_early_reply = true;
     journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur));
     return;
   } else if (name == "ceph.dir.layout.pool_namespace"
@@ -4788,14 +4837,14 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   if (!check_access(mdr, cur, MAY_WRITE))
     return;
 
-  map<string, bufferptr> *pxattrs = cur->get_projected_xattrs();
+  auto pxattrs = cur->get_projected_xattrs();
   size_t len = req->get_data().length();
   size_t inc = len + name.length();
 
   // check xattrs kv pairs size
   size_t cur_xattrs_size = 0;
   for (const auto& p : *pxattrs) {
-    if ((flags & CEPH_XATTR_REPLACE) && (name.compare(p.first) == 0)) {
+    if ((flags & CEPH_XATTR_REPLACE) && (name.compare(std::string(boost::string_view(p.first))) == 0)) {
       continue;
     }
     cur_xattrs_size += p.first.length() + p.second.length();
@@ -4808,12 +4857,12 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
     return;
   }
 
-  if ((flags & CEPH_XATTR_CREATE) && pxattrs->count(name)) {
+  if ((flags & CEPH_XATTR_CREATE) && pxattrs->count(mempool::mds_co::string(boost::string_view(name)))) {
     dout(10) << "setxattr '" << name << "' XATTR_CREATE and EEXIST on " << *cur << dendl;
     respond_to_request(mdr, -EEXIST);
     return;
   }
-  if ((flags & CEPH_XATTR_REPLACE) && !pxattrs->count(name)) {
+  if ((flags & CEPH_XATTR_REPLACE) && !pxattrs->count(mempool::mds_co::string(boost::string_view(name)))) {
     dout(10) << "setxattr '" << name << "' XATTR_REPLACE and ENODATA on " << *cur << dendl;
     respond_to_request(mdr, -ENODATA);
     return;
@@ -4822,17 +4871,21 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   dout(10) << "setxattr '" << name << "' len " << len << " on " << *cur << dendl;
 
   // project update
-  map<string,bufferptr> *px = new map<string,bufferptr>;
-  inode_t *pi = cur->project_inode(px);
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->xattr_version++;
-  px->erase(name);
-  if (!(flags & CEPH_XATTR_REMOVE)) {
-    (*px)[name] = buffer::create(len);
+  auto &pi = cur->project_inode(true);
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.xattr_version++;
+  auto &px = *pi.xattrs;
+  if ((flags & CEPH_XATTR_REMOVE)) {
+    px.erase(mempool::mds_co::string(boost::string_view(name)));
+  } else {
+    bufferptr b = buffer::create(len);
     if (len)
-      req->get_data().copy(0, len, (*px)[name].c_str());
+      req->get_data().copy(0, len, b.c_str());
+    auto em = px.emplace(std::piecewise_construct, std::forward_as_tuple(mempool::mds_co::string(boost::string_view(name))), std::forward_as_tuple(b));
+    if (!em.second)
+      em.first->second = b;
   }
 
   // log + wait
@@ -4849,8 +4902,8 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
 void Server::handle_client_removexattr(MDRequestRef& mdr)
 {
   MClientRequest *req = mdr->client_request;
-  string name(req->get_path2());
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  std::string name(req->get_path2());
+  std::set<SimpleLock*> rdlocks, wrlocks, xlocks;
   file_layout_t *dir_layout = NULL;
   CInode *cur;
   if (name == "ceph.dir.layout")
@@ -4874,8 +4927,8 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
-  map<string, bufferptr> *pxattrs = cur->get_projected_xattrs();
-  if (pxattrs->count(name) == 0) {
+  auto pxattrs = cur->get_projected_xattrs();
+  if (pxattrs->count(mempool::mds_co::string(boost::string_view(name))) == 0) {
     dout(10) << "removexattr '" << name << "' and ENODATA on " << *cur << dendl;
     respond_to_request(mdr, -ENODATA);
     return;
@@ -4884,13 +4937,13 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
   dout(10) << "removexattr '" << name << "' on " << *cur << dendl;
 
   // project update
-  map<string,bufferptr> *px = new map<string,bufferptr>;
-  inode_t *pi = cur->project_inode(px);
-  pi->version = cur->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->xattr_version++;
-  px->erase(name);
+  auto &pi = cur->project_inode(true);
+  auto &px = *pi.xattrs;
+  pi.inode.version = cur->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.xattr_version++;
+  px.erase(mempool::mds_co::string(boost::string_view(name)));
 
   // log + wait
   mdr->ls = mdlog->get_current_segment();
@@ -5163,7 +5216,7 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   // it's a symlink
   dn->push_projected_linkage(newi);
 
-  newi->symlink = req->get_path2();
+  newi->symlink = mempool::mds_co::string(boost::string_view(req->get_path2()));
   newi->inode.size = newi->symlink.length();
   newi->inode.rstat.rbytes = newi->inode.size;
   newi->inode.rstat.rfiles = 1;
@@ -5213,9 +5266,15 @@ void Server::handle_client_link(MDRequestRef& mdr)
   dout(7) << "handle_client_link link " << dn->get_name() << " in " << *dir << dendl;
   dout(7) << "target is " << *targeti << dendl;
   if (targeti->is_dir()) {
-    dout(7) << "target is a dir, failing..." << dendl;
-    respond_to_request(mdr, -EINVAL);
-    return;
+    // if srcdn is replica, need to make sure its linkage is correct
+    vector<CDentry*>& trace = mdr->dn[1];
+    if (trace.empty() ||
+       trace.back()->is_auth() ||
+       trace.back()->lock.can_read(mdr->get_client())) {
+      dout(7) << "target is a dir, failing..." << dendl;
+      respond_to_request(mdr, -EINVAL);
+      return;
+    }
   }
 
   xlocks.insert(&targeti->linklock);
@@ -5273,11 +5332,11 @@ void Server::_link_local(MDRequestRef& mdr, CDentry *dn, CInode *targeti)
   version_t tipv = targeti->pre_dirty();
   
   // project inode update
-  inode_t *pi = targeti->project_inode();
-  pi->nlink++;
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->version = tipv;
+  auto &pi = targeti->project_inode();
+  pi.inode.nlink++;
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.version = tipv;
 
   // log + wait
   EUpdate *le = new EUpdate(mdlog, "link_local");
@@ -5505,16 +5564,16 @@ void Server::handle_slave_link_prep(MDRequestRef& mdr)
                                      ESlaveUpdate::OP_PREPARE, ESlaveUpdate::LINK);
   mdlog->start_entry(le);
 
-  inode_t *pi = dnl->get_inode()->project_inode();
+  auto &pi = dnl->get_inode()->project_inode();
 
   // update journaled target inode
   bool inc;
   if (mdr->slave_request->get_op() == MMDSSlaveRequest::OP_LINKPREP) {
     inc = true;
-    pi->nlink++;
+    pi.inode.nlink++;
   } else {
     inc = false;
-    pi->nlink--;
+    pi.inode.nlink--;
   }
 
   link_rollback rollback;
@@ -5528,10 +5587,10 @@ void Server::handle_slave_link_prep(MDRequestRef& mdr)
   ::encode(rollback, le->rollback);
   mdr->more()->rollback_bl = le->rollback;
 
-  pi->ctime = mdr->get_op_stamp();
-  pi->version = targeti->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.version = targeti->pre_dirty();
 
-  dout(10) << " projected inode " << pi << " v " << pi->version << dendl;
+  dout(10) << " projected inode " << pi.inode.ino << " v " << pi.inode.version << dendl;
 
   // commit case
   mdcache->predirty_journal_parents(mdr, &le->commit, dnl->get_inode(), 0, PREDIRTY_SHALLOW|PREDIRTY_PRIMARY);
@@ -5650,8 +5709,8 @@ void Server::do_link_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef&
   dout(10) << " target is " << *in << dendl;
   assert(!in->is_projected());  // live slave request hold versionlock xlock.
   
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
   mut->add_projected_inode(in);
 
   // parent dir rctime
@@ -5659,20 +5718,20 @@ void Server::do_link_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef&
   fnode_t *pf = parent->project_fnode();
   mut->add_projected_fnode(parent);
   pf->version = parent->pre_dirty();
-  if (pf->fragstat.mtime == pi->ctime) {
+  if (pf->fragstat.mtime == pi.inode.ctime) {
     pf->fragstat.mtime = rollback.old_dir_mtime;
-    if (pf->rstat.rctime == pi->ctime)
+    if (pf->rstat.rctime == pi.inode.ctime)
       pf->rstat.rctime = rollback.old_dir_rctime;
     mut->add_updated_lock(&parent->get_inode()->filelock);
     mut->add_updated_lock(&parent->get_inode()->nestlock);
   }
 
   // inode
-  pi->ctime = rollback.old_ctime;
+  pi.inode.ctime = rollback.old_ctime;
   if (rollback.was_inc)
-    pi->nlink--;
+    pi.inode.nlink--;
   else
-    pi->nlink++;
+    pi.inode.nlink++;
 
   // journal it
   ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_rollback", rollback.reqid, master,
@@ -5931,14 +5990,18 @@ void Server::_unlink_local(MDRequestRef& mdr, CDentry *dn, CDentry *straydn)
   // the unlinked dentry
   dn->pre_dirty();
 
-  inode_t *pi = in->project_inode();
-  dn->make_path_string(pi->stray_prior_path, true);
+  auto &pi = in->project_inode();
+  {
+    std::string t;
+    dn->make_path_string(t, true);
+    pi.inode.stray_prior_path = mempool::mds_co::string(boost::string_view(t));
+  }
   mdr->add_projected_inode(in); // do this _after_ my dn->pre_dirty().. we apply that one manually.
-  pi->version = in->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
-  pi->change_attr++;
-  pi->nlink--;
-  if (pi->nlink == 0)
+  pi.inode.version = in->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.change_attr++;
+  pi.inode.nlink--;
+  if (pi.inode.nlink == 0)
     in->state_set(CInode::STATE_ORPHAN);
 
   if (dnl->is_primary()) {
@@ -5951,7 +6014,7 @@ void Server::_unlink_local(MDRequestRef& mdr, CDentry *dn, CDentry *straydn)
     if (in->snaprealm || follows + 1 > in->get_oldest_snap())
       in->project_past_snaprealm_parent(straydn->get_dir()->inode->find_snaprealm());
 
-    pi->update_backtrace();
+    pi.inode.update_backtrace();
     le->metablob.add_primary_dentry(straydn, in, true, true);
   } else {
     // remote link.  update remote inode.
@@ -5973,6 +6036,8 @@ void Server::_unlink_local(MDRequestRef& mdr, CDentry *dn, CDentry *straydn)
   if (in->is_dir()) {
     assert(straydn);
     mdcache->project_subtree_rename(in, dn->get_dir(), straydn->get_dir());
+
+    in->maybe_export_pin(true);
   }
 
   journal_and_reply(mdr, 0, dn, le, new C_MDS_unlink_local_finish(this, mdr, dn, straydn));
@@ -6048,7 +6113,7 @@ bool Server::_rmdir_prepare_witness(MDRequestRef& mdr, mds_rank_t who, vector<CD
                                               MMDSSlaveRequest::OP_RMDIRPREP);
   req->srcdnpath = filepath(trace.front()->get_dir()->ino());
   for (auto dn : trace)
-    req->srcdnpath.push_dentry(dn->name);
+    req->srcdnpath.push_dentry(dn->get_name());
   mdcache->replicate_stray(straydn, who, req->stray);
 
   req->op_stamp = mdr->get_op_stamp();
@@ -6110,9 +6175,9 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
   rmdir_rollback rollback;
   rollback.reqid = mdr->reqid;
   rollback.src_dir = dn->get_dir()->dirfrag();
-  rollback.src_dname = dn->name;
+  rollback.src_dname = std::string(dn->get_name());
   rollback.dest_dir = straydn->get_dir()->dirfrag();
-  rollback.dest_dname = straydn->name;
+  rollback.dest_dname = std::string(straydn->get_name());
   ::encode(rollback, mdr->more()->rollback_bl);
   dout(20) << " rollback is " << mdr->more()->rollback_bl.length() << " bytes" << dendl;
 
@@ -6452,7 +6517,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     respond_to_request(mdr, -EINVAL);
     return;
   }
-  const string &destname = destpath.last_dentry();
+  boost::string_view destname = destpath.last_dentry();
 
   vector<CDentry*>& srctrace = mdr->dn[1];
   vector<CDentry*>& desttrace = mdr->dn[0];
@@ -6501,25 +6566,30 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     oldin = mdcache->get_dentry_inode(destdn, mdr, true);
     if (!oldin) return;
     dout(10) << " oldin " << *oldin << dendl;
-    
-    // mv /some/thing /to/some/existing_other_thing
-    if (oldin->is_dir() && !srci->is_dir()) {
-      respond_to_request(mdr, -EISDIR);
-      return;
-    }
-    if (!oldin->is_dir() && srci->is_dir()) {
-      respond_to_request(mdr, -ENOTDIR);
-      return;
-    }
 
     // non-empty dir? do trivial fast unlocked check, do another check later with read locks
     if (oldin->is_dir() && _dir_is_nonempty_unlocked(mdr, oldin)) {
       respond_to_request(mdr, -ENOTEMPTY);
       return;
     }
-    if (srci == oldin && !srcdn->get_dir()->inode->is_stray()) {
-      respond_to_request(mdr, 0);  // no-op.  POSIX makes no sense.
-      return;
+
+    // if srcdn is replica, need to make sure its linkage is correct
+    if (srcdn->is_auth() ||
+       srcdn->lock.can_read(mdr->get_client()) ||
+       (srcdn->lock.is_xlocked() && srcdn->lock.get_xlock_by() == mdr)) {
+      // mv /some/thing /to/some/existing_other_thing
+      if (oldin->is_dir() && !srci->is_dir()) {
+       respond_to_request(mdr, -EISDIR);
+       return;
+      }
+      if (!oldin->is_dir() && srci->is_dir()) {
+       respond_to_request(mdr, -ENOTDIR);
+       return;
+      }
+      if (srci == oldin && !srcdn->get_dir()->inode->is_stray()) {
+       respond_to_request(mdr, 0);  // no-op.  POSIX makes no sense.
+       return;
+      }
     }
   }
 
@@ -6552,7 +6622,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   }
 
   // src == dest?
-  if (srcdn->get_dir() == destdir && srcdn->name == destname) {
+  if (srcdn->get_dir() == destdir && srcdn->get_name() == destname) {
     dout(7) << "rename src=dest, noop" << dendl;
     respond_to_request(mdr, 0);
     return;
@@ -6898,10 +6968,10 @@ bool Server::_rename_prepare_witness(MDRequestRef& mdr, mds_rank_t who, set<mds_
 
   req->srcdnpath = filepath(srctrace.front()->get_dir()->ino());
   for (auto dn : srctrace)
-    req->srcdnpath.push_dentry(dn->name);
+    req->srcdnpath.push_dentry(dn->get_name());
   req->destdnpath = filepath(dsttrace.front()->get_dir()->ino());
   for (auto dn : dsttrace)
-    req->destdnpath.push_dentry(dn->name);
+    req->destdnpath.push_dentry(dn->get_name());
   if (straydn)
     mdcache->replicate_stray(straydn, who, req->stray);
 
@@ -7036,8 +7106,8 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   }
 
   // prepare
-  inode_t *pi = 0;    // renamed inode
-  inode_t *tpi = 0;  // target/overwritten inode
+  CInode::mempool_inode *spi = 0;    // renamed inode
+  CInode::mempool_inode *tpi = 0;  // target/overwritten inode
   
   // target inode
   if (!linkmerge) {
@@ -7045,16 +7115,18 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       assert(straydn);  // moving to straydn.
       // link--, and move.
       if (destdn->is_auth()) {
-       tpi = oldin->project_inode(); //project_snaprealm
-       tpi->version = straydn->pre_dirty(tpi->version);
-       tpi->update_backtrace();
+       auto &pi= oldin->project_inode(); //project_snaprealm
+       pi.inode.version = straydn->pre_dirty(pi.inode.version);
+       pi.inode.update_backtrace();
+        tpi = &pi.inode;
       }
       straydn->push_projected_linkage(oldin);
     } else if (destdnl->is_remote()) {
       // nlink-- targeti
       if (oldin->is_auth()) {
-       tpi = oldin->project_inode();
-       tpi->version = oldin->pre_dirty();
+       auto &pi = oldin->project_inode();
+       pi.inode.version = oldin->pre_dirty();
+        tpi = &pi.inode;
       }
     }
   }
@@ -7068,14 +7140,16 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       destdn->push_projected_linkage(srcdnl->get_remote_ino(), srcdnl->get_remote_d_type());
       // srci
       if (srci->is_auth()) {
-       pi = srci->project_inode();
-       pi->version = srci->pre_dirty();
+       auto &pi = srci->project_inode();
+       pi.inode.version = srci->pre_dirty();
+        spi = &pi.inode;
       }
     } else {
       dout(10) << " will merge remote onto primary link" << dendl;
       if (destdn->is_auth()) {
-       pi = oldin->project_inode();
-       pi->version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldin->inode.version);
+       auto &pi = oldin->project_inode();
+       pi.inode.version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldin->inode.version);
+        spi = &pi.inode;
       }
     }
   } else { // primary
@@ -7099,10 +7173,11 @@ void Server::_rename_prepare(MDRequestRef& mdr,
          dout(10) << " noting renamed dir open frags " << metablob->renamed_dir_frags << dendl;
        }
       }
-      pi = srci->project_inode(); // project snaprealm if srcdnl->is_primary
+      auto &pi = srci->project_inode(); // project snaprealm if srcdnl->is_primary
                                                  // & srcdnl->snaprealm
-      pi->version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldpv);
-      pi->update_backtrace();
+      pi.inode.version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldpv);
+      pi.inode.update_backtrace();
+      spi = &pi.inode;
     }
     destdn->push_projected_linkage(srci);
   }
@@ -7113,16 +7188,20 @@ void Server::_rename_prepare(MDRequestRef& mdr,
   srcdn->push_projected_linkage();  // push null linkage
 
   if (!silent) {
-    if (pi) {
-      pi->ctime = mdr->get_op_stamp();
-      pi->change_attr++;
+    if (spi) {
+      spi->ctime = mdr->get_op_stamp();
+      spi->change_attr++;
       if (linkmerge)
-       pi->nlink--;
+       spi->nlink--;
     }
     if (tpi) {
       tpi->ctime = mdr->get_op_stamp();
       tpi->change_attr++;
-      destdn->make_path_string(tpi->stray_prior_path, true);
+      {
+        std::string t;
+        destdn->make_path_string(t, true);
+        tpi->stray_prior_path = mempool::mds_co::string(boost::string_view(t));
+      }
       tpi->nlink--;
       if (tpi->nlink == 0)
        oldin->state_set(CInode::STATE_ORPHAN);
@@ -7622,7 +7701,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
   rollback.orig_src.dirfrag = srcdn->get_dir()->dirfrag();
   rollback.orig_src.dirfrag_old_mtime = srcdn->get_dir()->get_projected_fnode()->fragstat.mtime;
   rollback.orig_src.dirfrag_old_rctime = srcdn->get_dir()->get_projected_fnode()->rstat.rctime;
-  rollback.orig_src.dname = srcdn->name;
+  rollback.orig_src.dname = std::string(srcdn->get_name());
   if (srcdnl->is_primary())
     rollback.orig_src.ino = srcdnl->get_inode()->ino();
   else {
@@ -7634,7 +7713,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
   rollback.orig_dest.dirfrag = destdn->get_dir()->dirfrag();
   rollback.orig_dest.dirfrag_old_mtime = destdn->get_dir()->get_projected_fnode()->fragstat.mtime;
   rollback.orig_dest.dirfrag_old_rctime = destdn->get_dir()->get_projected_fnode()->rstat.rctime;
-  rollback.orig_dest.dname = destdn->name;
+  rollback.orig_dest.dname = std::string(destdn->get_name());
   if (destdnl->is_primary())
     rollback.orig_dest.ino = destdnl->get_inode()->ino();
   else if (destdnl->is_remote()) {
@@ -7646,7 +7725,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
     rollback.stray.dirfrag = straydn->get_dir()->dirfrag();
     rollback.stray.dirfrag_old_mtime = straydn->get_dir()->get_projected_fnode()->fragstat.mtime;
     rollback.stray.dirfrag_old_rctime = straydn->get_dir()->get_projected_fnode()->rstat.rctime;
-    rollback.stray.dname = straydn->name;
+    rollback.stray.dname = std::string(straydn->get_name());
   }
   ::encode(rollback, mdr->more()->rollback_bl);
   dout(20) << " rollback is " << mdr->more()->rollback_bl.length() << " bytes" << dendl;
@@ -7998,22 +8077,23 @@ void Server::do_rename_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef
                                    rollback.orig_src.remote_d_type);
   }
 
-  inode_t *pi = 0;
+  CInode::mempool_inode *pip = 0;
   if (in) {
     if (in->authority().first == whoami) {
-      pi = in->project_inode();
+      auto &pi = in->project_inode();
       mut->add_projected_inode(in);
-      pi->version = in->pre_dirty();
+      pi.inode.version = in->pre_dirty();
+      pip = &pi.inode;
     } else
-      pi = in->get_projected_inode();
-    if (pi->ctime == rollback.ctime)
-      pi->ctime = rollback.orig_src.old_ctime;
+      pip = in->get_projected_inode();
+    if (pip->ctime == rollback.ctime)
+      pip->ctime = rollback.orig_src.old_ctime;
   }
 
   if (srcdn && srcdn->authority().first == whoami) {
     nest_info_t blah;
     _rollback_repair_dir(mut, srcdir, rollback.orig_src, rollback.ctime,
-                        in ? in->is_dir() : false, 1, pi ? pi->accounted_rstat : blah);
+                        in ? in->is_dir() : false, 1, pip ? pip->accounted_rstat : blah);
   }
 
   // repair dest
@@ -8035,11 +8115,12 @@ void Server::do_rename_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef
     straydn->push_projected_linkage();
 
   if (target) {
-    inode_t *ti = NULL;
+    CInode::mempool_inode *ti = NULL;
     if (target->authority().first == whoami) {
-      ti = target->project_inode();
+      auto &pi = target->project_inode();
       mut->add_projected_inode(target);
-      ti->version = target->pre_dirty();
+      pi.inode.version = target->pre_dirty();
+      ti = &pi.inode;
     } else 
       ti = target->get_projected_inode();
     if (ti->ctime == rollback.ctime)
@@ -8357,9 +8438,9 @@ void Server::handle_client_lssnap(MDRequestRef& mdr)
     // actual
     string snap_name;
     if (p->second->ino == diri->ino())
-      snap_name = p->second->name;
+      snap_name = std::string(p->second->name);
     else
-      snap_name = p->second->get_long_name();
+      snap_name = std::string(p->second->get_long_name());
 
     unsigned start_len = dnbl.length();
     if (int(start_len + snap_name.length() + sizeof(__u32) + sizeof(LeaseStat)) > max_bytes)
@@ -8438,7 +8519,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     return;
   }
   
-  const string &snapname = req->get_filepath().last_dentry();
+  boost::string_view snapname = req->get_filepath().last_dentry();
 
   if (mdr->client_request->get_caller_uid() < g_conf->mds_snap_min_uid || mdr->client_request->get_caller_uid() > g_conf->mds_snap_max_uid) {
     dout(20) << "mksnap " << snapname << " on " << *diri << " denied to uid " << mdr->client_request->get_caller_uid() << dendl;
@@ -8493,18 +8574,21 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
   SnapInfo info;
   info.ino = diri->ino();
   info.snapid = snapid;
-  info.name = snapname;
+  info.name = std::string(snapname);
   info.stamp = mdr->get_op_stamp();
 
-  inode_t *pi = diri->project_inode();
-  pi->ctime = info.stamp;
-  pi->version = diri->pre_dirty();
+  auto &pi = diri->project_inode(false, true);
+  pi.inode.ctime = info.stamp;
+  pi.inode.version = diri->pre_dirty();
 
   // project the snaprealm
-  sr_t *newsnap = diri->project_snaprealm(snapid);
-  newsnap->snaps[snapid] = info;
-  newsnap->seq = snapid;
-  newsnap->last_created = snapid;
+  auto &newsnap = *pi.snapnode;
+  newsnap.created = snapid;
+  auto em = newsnap.snaps.emplace(std::piecewise_construct, std::forward_as_tuple(snapid), std::forward_as_tuple(info));
+  if (!em.second)
+    em.first->second = info;
+  newsnap.seq = snapid;
+  newsnap.last_created = snapid;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -8577,7 +8661,7 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
     return;
   }
 
-  const string &snapname = req->get_filepath().last_dentry();
+  boost::string_view snapname = req->get_filepath().last_dentry();
 
   if (mdr->client_request->get_caller_uid() < g_conf->mds_snap_min_uid || mdr->client_request->get_caller_uid() > g_conf->mds_snap_max_uid) {
     dout(20) << "rmsnap " << snapname << " on " << *diri << " denied to uid " << mdr->client_request->get_caller_uid() << dendl;
@@ -8624,19 +8708,19 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
 
   // journal
-  inode_t *pi = diri->project_inode();
-  pi->version = diri->pre_dirty();
-  pi->ctime = mdr->get_op_stamp();
+  auto &pi = diri->project_inode(false, true);
+  pi.inode.version = diri->pre_dirty();
+  pi.inode.ctime = mdr->get_op_stamp();
   
   mdr->ls = mdlog->get_current_segment();
   EUpdate *le = new EUpdate(mdlog, "rmsnap");
   mdlog->start_entry(le);
   
   // project the snaprealm
-  sr_t *newnode = diri->project_snaprealm();
-  newnode->snaps.erase(snapid);
-  newnode->seq = seq;
-  newnode->last_destroyed = seq;
+  auto &newnode = *pi.snapnode;
+  newnode.snaps.erase(snapid);
+  newnode.seq = seq;
+  newnode.last_destroyed = seq;
 
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
   le->metablob.add_table_transaction(TABLE_SNAP, stid);
@@ -8715,8 +8799,8 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
     return;
   }
 
-  const string &dstname = req->get_filepath().last_dentry();
-  const string &srcname = req->get_filepath2().last_dentry();
+  boost::string_view dstname = req->get_filepath().last_dentry();
+  boost::string_view srcname = req->get_filepath2().last_dentry();
   dout(10) << "renamesnap " << srcname << "->" << dstname << " on " << *diri << dendl;
 
   if (srcname.length() == 0 || srcname[0] == '_') {
@@ -8767,14 +8851,15 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
 
   // journal
-  inode_t *pi = diri->project_inode();
-  pi->ctime = mdr->get_op_stamp();
-  pi->version = diri->pre_dirty();
+  auto &pi = diri->project_inode(false, true);
+  pi.inode.ctime = mdr->get_op_stamp();
+  pi.inode.version = diri->pre_dirty();
 
   // project the snaprealm
-  sr_t *newsnap = diri->project_snaprealm();
-  assert(newsnap->snaps.count(snapid));
-  newsnap->snaps[snapid].name = dstname;
+  auto &newsnap = *pi.snapnode;
+  auto it = newsnap.snaps.find(snapid);
+  assert(it != newsnap.snaps.end());
+  it->second.name = std::string(dstname);
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();