]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Locker.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mds / Locker.cc
index 2a587659afdef348ebc7706e899ceafc2136b4c6..a2fbe76625ab2435d2f2ae232f30bbcfda4dcda4 100644 (file)
@@ -26,7 +26,7 @@
 #include "MDSRank.h"
 #include "MDSMap.h"
 #include "messages/MInodeFileCaps.h"
-#include "messages/MMDSSlaveRequest.h"
+#include "messages/MMDSPeerRequest.h"
 #include "Migrator.h"
 #include "msg/Messenger.h"
 #include "osdc/Objecter.h"
@@ -242,7 +242,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       if ((lock->get_type() == CEPH_LOCK_ISNAP ||
           lock->get_type() == CEPH_LOCK_IPOLICY) &&
          mds->is_cluster_degraded() &&
-         mdr->is_master() &&
+         mdr->is_leader() &&
          !mdr->is_queued_for_replay()) {
        // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
        // get processed in proper order.
@@ -259,7 +259,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
            }
          }
        } else {
-         // if the lock is the latest locked one, it's possible that slave mds got the lock
+         // if the lock is the latest locked one, it's possible that peer mds got the lock
          // while there are recovering mds.
          if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock))
            wait = true;
@@ -283,11 +283,11 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        CDentry *dn = static_cast<CDentry*>(object);
        if (!dn->is_auth())
          continue;
-       if (mdr->is_master()) {
-         // master.  wrlock versionlock so we can pipeline dentry updates to journal.
+       if (mdr->is_leader()) {
+         // leader.  wrlock versionlock so we can pipeline dentry updates to journal.
          lov.add_wrlock(&dn->versionlock, i + 1);
        } else {
-         // slave.  exclusively lock the dentry version (i.e. block other journal updates).
+         // peer.  exclusively lock the dentry version (i.e. block other journal updates).
          // this makes rollback safe.
          lov.add_xlock(&dn->versionlock, i + 1);
        }
@@ -297,11 +297,11 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        CInode *in = static_cast<CInode*>(object);
        if (!in->is_auth())
          continue;
-       if (mdr->is_master()) {
-         // master.  wrlock versionlock so we can pipeline inode updates to journal.
+       if (mdr->is_leader()) {
+         // leader.  wrlock versionlock so we can pipeline inode updates to journal.
          lov.add_wrlock(&in->versionlock, i + 1);
        } else {
-         // slave.  exclusively lock the inode version (i.e. block other journal updates).
+         // peer.  exclusively lock the inode version (i.e. block other journal updates).
          // this makes rollback safe.
          lov.add_xlock(&in->versionlock, i + 1);
        }
@@ -313,7 +313,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        mustpin.insert(object);
       } else if (!object->is_auth() &&
                 !lock->can_wrlock(_client) &&  // we might have to request a scatter
-                !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
+                !mdr->is_peer()) {           // if we are peer (remote_wrlock), the leader already authpinned
        dout(15) << " will also auth_pin " << *object
                 << " in case we need to request a scatter" << dendl;
        mustpin.insert(object);
@@ -461,13 +461,13 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       if (mds->is_cluster_degraded() &&
          !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) {
        dout(10) << " mds." << p.first << " is not active" << dendl;
-       if (mdr->more()->waiting_on_slave.empty())
+       if (mdr->more()->waiting_on_peer.empty())
          mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr));
        return false;
       }
       
-      auto req = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt,
-                                               MMDSSlaveRequest::OP_AUTHPIN);
+      auto req = make_message<MMDSPeerRequest>(mdr->reqid, mdr->attempt,
+                                               MMDSPeerRequest::OP_AUTHPIN);
       for (auto& o : p.second) {
        dout(10) << " req remote auth_pin of " << *o << dendl;
        MDSCacheObjectInfo info;
@@ -485,7 +485,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       mds->send_message_mds(req, p.first);
 
       // put in waiting list
-      auto ret = mdr->more()->waiting_on_slave.insert(p.first);
+      auto ret = mdr->more()->waiting_on_peer.insert(p.first);
       ceph_assert(ret.second);
     }
     return false;
@@ -556,7 +556,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        continue;
       }
 
-      ceph_assert(mdr->is_master());
+      ceph_assert(mdr->is_leader());
       if (lock->needs_recover()) {
        if (mds->is_cluster_degraded()) {
          if (!mdr->is_queued_for_replay()) {
@@ -632,7 +632,7 @@ void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
 void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
                         bool drop_rdlocks)
 {
-  set<mds_rank_t> slaves;
+  set<mds_rank_t> peers;
 
   for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
     SimpleLock *lock = it->lock;
@@ -646,13 +646,13 @@ void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
          pneed_issue->insert(static_cast<CInode*>(obj));
       } else {
        ceph_assert(lock->get_sm()->can_remote_xlock);
-       slaves.insert(obj->authority().first);
+       peers.insert(obj->authority().first);
        lock->put_xlock();
        mut->locks.erase(it++);
       }
     } else if (it->is_wrlock() || it->is_remote_wrlock()) {
       if (it->is_remote_wrlock()) {
-       slaves.insert(it->wrlock_target);
+       peers.insert(it->wrlock_target);
        it->clear_remote_wrlock();
       }
       if (it->is_wrlock()) {
@@ -680,13 +680,13 @@ void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
     }
   }
 
-  for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
+  for (set<mds_rank_t>::iterator p = peers.begin(); p != peers.end(); ++p) {
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
       dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
-      auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt,
-                                                    MMDSSlaveRequest::OP_DROPLOCKS);
-      mds->send_message_mds(slavereq, *p);
+      auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt,
+                                                    MMDSPeerRequest::OP_DROPLOCKS);
+      mds->send_message_mds(peerreq, *p);
     }
   }
 }
@@ -903,8 +903,8 @@ void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *d
     return;
   }
 
-  if (mdr->has_more() && !mdr->more()->slaves.empty()) {
-    dout(10) << " there are slaves requests for " << *mdr << ", noop" << dendl;
+  if (mdr->has_more() && !mdr->more()->peers.empty()) {
+    dout(10) << " there are peers requests for " << *mdr << ", noop" << dendl;
     return;
   }
 
@@ -1414,7 +1414,7 @@ void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
    *
    * We can defer while freezing without causing a deadlock.  Honor
    * scatter_wanted flag here.  This will never get deferred by the
-   * checks above due to the auth_pin held by the master.
+   * checks above due to the auth_pin held by the leader.
    */
   if (lock->is_scatterlock()) {
     ScatterLock *slock = static_cast<ScatterLock *>(lock);
@@ -1703,7 +1703,7 @@ void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
 {
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
+    return local_wrlock_grab(static_cast<LocalLockC*>(lock), mut);
 
   dout(7) << "wrlock_force  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
@@ -1750,7 +1750,7 @@ bool Locker::wrlock_start(const MutationImpl::LockOp &op, MDRequestRef& mut)
   SimpleLock *lock = op.lock;
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
+    return local_wrlock_start(static_cast<LocalLockC*>(lock), mut);
 
   dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
 
@@ -1846,21 +1846,21 @@ void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestR
   if (mds->is_cluster_degraded() &&
       !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
     dout(7) << " mds." << target << " is not active" << dendl;
-    if (mut->more()->waiting_on_slave.empty())
+    if (mut->more()->waiting_on_peer.empty())
       mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
     return;
   }
 
   // send lock request
   mut->start_locking(lock, target);
-  mut->more()->slaves.insert(target);
-  auto r = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
+  mut->more()->peers.insert(target);
+  auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_WRLOCK);
   r->set_lock_type(lock->get_type());
   lock->get_parent()->set_object_info(r->get_object_info());
   mds->send_message_mds(r, target);
 
-  ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
-  mut->more()->waiting_on_slave.insert(target);
+  ceph_assert(mut->more()->waiting_on_peer.count(target) == 0);
+  mut->more()->waiting_on_peer.insert(target);
 }
 
 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
@@ -1878,10 +1878,10 @@ void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, Mutatio
          << " " << *lock->get_parent()  << dendl;
   if (!mds->is_cluster_degraded() ||
       mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
-    auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
-    slavereq->set_lock_type(lock->get_type());
-    lock->get_parent()->set_object_info(slavereq->get_object_info());
-    mds->send_message_mds(slavereq, target);
+    auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNWRLOCK);
+    peerreq->set_lock_type(lock->get_type());
+    lock->get_parent()->set_object_info(peerreq->get_object_info());
+    mds->send_message_mds(peerreq, target);
   }
 }
 
@@ -1893,7 +1893,7 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
 {
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_xlock_start(static_cast<LocalLock*>(lock), mut);
+    return local_xlock_start(static_cast<LocalLockC*>(lock), mut);
 
   dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
   client_t client = mut->get_client();
@@ -1941,7 +1941,7 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
   } else {
     // replica
     ceph_assert(lock->get_sm()->can_remote_xlock);
-    ceph_assert(!mut->slave_request);
+    ceph_assert(!mut->peer_request);
     
     // wait for single auth
     if (lock->get_parent()->is_ambiguous_auth()) {
@@ -1955,21 +1955,21 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
     if (mds->is_cluster_degraded() &&
        !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
       dout(7) << " mds." << auth << " is not active" << dendl;
-      if (mut->more()->waiting_on_slave.empty())
+      if (mut->more()->waiting_on_peer.empty())
        mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
       return false;
     }
 
     // send lock request
-    mut->more()->slaves.insert(auth);
+    mut->more()->peers.insert(auth);
     mut->start_locking(lock, auth);
-    auto r = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
+    auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_XLOCK);
     r->set_lock_type(lock->get_type());
     lock->get_parent()->set_object_info(r->get_object_info());
     mds->send_message_mds(r, auth);
 
-    ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
-    mut->more()->waiting_on_slave.insert(auth);
+    ceph_assert(mut->more()->waiting_on_peer.count(auth) == 0);
+    mut->more()->waiting_on_peer.insert(auth);
 
     return false;
   }
@@ -2032,10 +2032,10 @@ void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *m
     mds_rank_t auth = lock->get_parent()->authority().first;
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
-      auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
-      slavereq->set_lock_type(lock->get_type());
-      lock->get_parent()->set_object_info(slavereq->get_object_info());
-      mds->send_message_mds(slavereq, auth);
+      auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNXLOCK);
+      peerreq->set_lock_type(lock->get_type());
+      lock->get_parent()->set_object_info(peerreq->get_object_info());
+      mds->send_message_mds(peerreq, auth);
     }
     // others waiting?
     lock->finish_waiters(SimpleLock::WAIT_STABLE |
@@ -2106,7 +2106,7 @@ void Locker::xlock_downgrade(SimpleLock *lock, MutationImpl *mut)
 version_t Locker::issue_file_data_version(CInode *in)
 {
   dout(7) << "issue_file_data_version on " << *in << dendl;
-  return in->inode.file_data_version;
+  return in->get_inode()->file_data_version;
 }
 
 class C_Locker_FileUpdate_finish : public LockerLogContext {
@@ -2137,7 +2137,6 @@ void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
                                client_t client, const ref_t<MClientCaps> &ack)
 {
   dout(10) << "file_update_finish on " << *in << dendl;
-  in->pop_and_dirty_projected_inode(mut->ls);
 
   mut->apply();
 
@@ -2316,9 +2315,9 @@ int Locker::issue_caps(CInode *in, Capability *only_cap)
        allowed |= cap->get_lock_cache_allowed();
     }
 
-    if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
+    if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
         cap->is_noinline()) ||
-       (!in->inode.layout.pool_ns.empty() &&
+       (!in->get_inode()->layout.pool_ns.empty() &&
         cap->is_nopoolns()))
       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
 
@@ -2355,6 +2354,8 @@ int Locker::issue_caps(CInode *in, Capability *only_cap)
        dout(7) << "   sending MClientCaps to client." << it->first
                << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
 
+        if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
+
        auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, in->ino(),
                                           in->find_snaprealm()->inode->ino(),
                                           cap->get_cap_id(), cap->get_last_seq(),
@@ -2367,7 +2368,7 @@ int Locker::issue_caps(CInode *in, Capability *only_cap)
     }
 
     // notify clients about deleted inode, to make sure they release caps ASAP.
-    if (in->inode.nlink == 0)
+    if (in->get_inode()->nlink == 0)
       wanted |= CEPH_CAP_LINK_SHARED;
 
     // are there caps that the client _wants_ and can have, but aren't pending?
@@ -2394,10 +2395,13 @@ int Locker::issue_caps(CInode *in, Capability *only_cap)
 
       int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
       if (op == CEPH_CAP_OP_REVOKE) {
+        if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_revoke);
        revoking_caps.push_back(&cap->item_revoking_caps);
        revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
        cap->set_last_revoke_stamp(ceph_clock_now());
        cap->reset_num_revoke_warnings();
+      } else {
+        if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
       }
 
       auto m = make_message<MClientCaps>(op, in->ino(),
@@ -2422,6 +2426,7 @@ void Locker::issue_truncate(CInode *in)
   dout(7) << "issue_truncate on " << *in << dendl;
   
   for (auto &p : in->client_caps) {
+    if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_trunc);
     Capability *cap = &p.second;
     auto m = make_message<MClientCaps>(CEPH_CAP_OP_TRUNC,
                                        in->ino(),
@@ -2448,14 +2453,14 @@ void Locker::revoke_stale_cap(CInode *in, client_t client)
     return;
 
   if (cap->revoking() & CEPH_CAP_ANY_WR) {
-    std::stringstream ss;
-    mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr);
+    CachedStackStringStream css;
+    mds->evict_client(client.v, false, g_conf()->mds_session_blocklist_on_timeout, *css, nullptr);
     return;
   }
 
   cap->revoke();
 
-  if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
+  if (in->is_auth() && in->get_inode()->client_ranges.count(cap->get_client()))
     in->state_set(CInode::STATE_NEEDSRECOVER);
 
   if (in->state_test(CInode::STATE_EXPORTINGCAPS))
@@ -2513,7 +2518,7 @@ bool Locker::revoke_stale_caps(Session *session)
       eval_lock_caches(cap);
 
     if (in->is_auth() &&
-       in->inode.client_ranges.count(cap->get_client()))
+       in->get_inode()->client_ranges.count(cap->get_client()))
       in->state_set(CInode::STATE_NEEDSRECOVER);
 
     // eval lock/inode may finish contexts, which may modify other cap's position
@@ -2649,6 +2654,8 @@ void Locker::handle_inode_file_caps(const cref_t<MInodeFileCaps> &m)
 
   dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
 
+  if (mds->logger) mds->logger->inc(l_mdss_handle_inode_file_caps);
+
   in->set_mds_caps_wanted(from, m->get_caps());
 
   try_eval(in, CEPH_CAP_LOCKS);
@@ -2676,7 +2683,7 @@ public:
   }
 };
 
-uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
+uint64_t Locker::calc_new_max_size(const CInode::inode_const_ptr &pi, uint64_t size)
 {
   uint64_t new_max = (size + 1) << 1;
   uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
@@ -2689,7 +2696,7 @@ uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
 
 bool Locker::check_client_ranges(CInode *in, uint64_t size)
 {
-  auto latest = in->get_projected_inode();
+  const auto& latest = in->get_projected_inode();
   uint64_t ms;
   if (latest->has_layout()) {
     ms = calc_new_max_size(latest, size);
@@ -2715,15 +2722,16 @@ bool Locker::check_client_ranges(CInode *in, uint64_t size)
 
 bool Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool *max_increased)
 {
-  auto pi = in->get_projected_inode();
+  const auto& latest = in->get_projected_inode();
   uint64_t ms;
-  if (pi->has_layout()) {
-    ms = calc_new_max_size(pi, size);
+  if (latest->has_layout()) {
+    ms = calc_new_max_size(latest, size);
   } else {
     // Layout-less directories like ~mds0/, have zero size
     ms = 0;
   }
 
+  auto pi = in->_get_projected_inode();
   bool updated = false;
 
   // increase ranges as appropriate.
@@ -2779,7 +2787,7 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
   ceph_assert(in->is_auth());
   ceph_assert(in->is_file());
 
-  CInode::mempool_inode *latest = in->get_projected_inode();
+  const auto& latest = in->get_projected_inode();
   uint64_t size = latest->size;
   bool update_size = new_size > 0;
 
@@ -2824,27 +2832,27 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
   MutationRef mut(new MutationImpl());
   mut->ls = mds->mdlog->get_current_segment();
     
-  auto &pi = in->project_inode();
-  pi.inode.version = in->pre_dirty();
+  auto pi = in->project_inode(mut);
+  pi.inode->version = in->pre_dirty();
 
   bool max_increased = false;
   if (new_ranges &&
       calc_new_client_ranges(in, std::max(new_max_size, size), &max_increased)) {
     dout(10) << "check_inode_max_size client_ranges "
             << in->get_previous_projected_inode()->client_ranges
-            <<  " -> " << pi.inode.client_ranges << dendl;
+            <<  " -> " << pi.inode->client_ranges << dendl;
   }
 
   if (update_size) {
-    dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
-    pi.inode.size = new_size;
-    pi.inode.rstat.rbytes = new_size;
-    dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
-    pi.inode.mtime = new_mtime;
-    if (new_mtime > pi.inode.ctime) {
-      pi.inode.ctime = new_mtime;
-      if (new_mtime > pi.inode.rstat.rctime)
-       pi.inode.rstat.rctime = new_mtime;
+    dout(10) << "check_inode_max_size size " << pi.inode->size << " -> " << new_size << dendl;
+    pi.inode->size = new_size;
+    pi.inode->rstat.rbytes = new_size;
+    dout(10) << "check_inode_max_size mtime " << pi.inode->mtime << " -> " << new_mtime << dendl;
+    pi.inode->mtime = new_mtime;
+    if (new_mtime > pi.inode->ctime) {
+      pi.inode->ctime = new_mtime;
+      if (new_mtime > pi.inode->rstat.rctime)
+       pi.inode->rstat.rctime = new_mtime;
     }
   }
 
@@ -2864,15 +2872,13 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
     le = eu;
   }
   mds->mdlog->start_entry(le);
-  if (update_size) {  // FIXME if/when we do max_size nested accounting
-    mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
-    // no cow, here!
-    CDentry *parent = in->get_projected_parent_dn();
-    metablob->add_primary_dentry(parent, in, true);
-  } else {
-    metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
-    mdcache->journal_dirty_inode(mut.get(), metablob, in);
-  }
+
+  mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
+  // no cow, here!
+  CDentry *parent = in->get_projected_parent_dn();
+  metablob->add_primary_dentry(parent, in, true);
+  mdcache->journal_dirty_inode(mut.get(), metablob, in);
+
   mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
       UPDATE_SHAREMAX, ref_t<MClientCaps>()));
   wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
@@ -2906,6 +2912,7 @@ void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
       continue;
     if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
       dout(10) << "share_inode_max_size with client." << client << dendl;
+      if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
       cap->inc_last_seq();
       auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT,
                                          in->ino(),
@@ -3114,14 +3121,21 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
     return;
   }
 
+  if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps);
+  if (dirty) {
+      if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty);
+  }
+
   if (m->get_client_tid() > 0 && session &&
       session->have_completed_flush(m->get_client_tid())) {
     dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
            << " for client." << client << dendl;
     ref_t<MClientCaps> ack;
     if (op == CEPH_CAP_OP_FLUSHSNAP) {
+      if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
       ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
     } else {
+      if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
       ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
     }
     ack->set_snap_follows(follows);
@@ -3148,13 +3162,13 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
       if (session->get_num_completed_flushes() >=
          (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
        session->inc_num_trim_flushes_warnings();
-       stringstream ss;
-       ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
-          << m->get_oldest_flush_tid() << "), "
-          << session->get_num_completed_flushes()
-          << " completed flushes recorded in session";
-       mds->clog->warn() << ss.str();
-       dout(20) << __func__ << " " << ss.str() << dendl;
+       CachedStackStringStream css;
+       *css << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
+            << m->get_oldest_flush_tid() << "), "
+            << session->get_num_completed_flushes()
+            << " completed flushes recorded in session";
+       mds->clog->warn() << css->strv();
+       dout(20) << __func__ << " " << css->strv() << dendl;
       }
     }
   }
@@ -3266,8 +3280,10 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
        head_in->remove_need_snapflush(in, snap, client);
     } else {
       dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
-      if (ack)
+      if (ack) {
+        if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
        mds->send_message_client_counted(ack, m->get_connection());
+      }
     }
     goto out;
   }
@@ -3357,8 +3373,10 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
        need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
     } else {
       // no update, ack now.
-      if (ack)
+      if (ack) {
+        if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
        mds->send_message_client_counted(ack, m->get_connection());
+      }
       
       bool did_issue = eval(in, CEPH_CAP_LOCKS);
       if (!did_issue && (cap->wanted() & ~cap->pending()))
@@ -3453,6 +3471,8 @@ void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, con
     return;
   }
     
+  if (mds->logger) mds->logger->inc(l_mdss_process_request_cap_release);
+
   if (caps & ~cap->issued()) {
     dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
     caps &= cap->issued();
@@ -3536,8 +3556,12 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
     // hmm, i guess snap was already deleted?  just ack!
     dout(10) << " wow, the snap following " << follows
             << " was already deleted.  nothing to record, just ack." << dendl;
-    if (ack)
+    if (ack) {
+      if (ack->get_op() == CEPH_CAP_OP_FLUSHSNAP_ACK) {
+          if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
+      }
       mds->send_message_client_counted(ack, m->get_connection());
+    }
     return;
   }
 
@@ -3554,25 +3578,32 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
                 m->xattrbl.length() &&
                 m->head.xattr_version > in->get_projected_inode()->xattr_version;
 
-  CInode::mempool_old_inode *oi = 0;
-  if (in->is_multiversion()) {
-    oi = in->pick_old_inode(snap);
+  CInode::mempool_old_inode *oi = nullptr;
+  CInode::old_inode_map_ptr _old_inodes;
+  if (in->is_any_old_inodes()) {
+    auto last = in->pick_old_inode(snap);
+    if (last) {
+      _old_inodes = CInode::allocate_old_inode_map(*in->get_old_inodes());
+      oi = &_old_inodes->at(last);
+      if (snap > oi->first) {
+       (*_old_inodes)[snap - 1] = *oi;;
+       oi->first = snap;
+      }
+    }
   }
 
   CInode::mempool_inode *i;
   if (oi) {
     dout(10) << " writing into old inode" << dendl;
-    auto &pi = in->project_inode();
-    pi.inode.version = in->pre_dirty();
-    if (snap > oi->first)
-      in->split_old_inode(snap);
+    auto pi = in->project_inode(mut);
+    pi.inode->version = in->pre_dirty();
     i = &oi->inode;
     if (xattrs)
       px = &oi->xattrs;
   } else {
-    auto &pi = in->project_inode(xattrs);
-    pi.inode.version = in->pre_dirty();
-    i = &pi.inode;
+    auto pi = in->project_inode(mut, xattrs);
+    pi.inode->version = in->pre_dirty();
+    i = pi.inode.get();
     if (xattrs)
       px = pi.xattrs.get();
   }
@@ -3601,6 +3632,9 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
     }
   }
 
+  if (_old_inodes)
+    in->reset_old_inodes(std::move(_old_inodes));
+
   mut->auth_pin(in);
   mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
   mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
@@ -3653,19 +3687,19 @@ void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps>
       if (mtime > pi->rstat.rctime)
        pi->rstat.rctime = mtime;
     }
-    if (in->inode.is_file() &&   // ONLY if regular file
+    if (in->is_file() &&   // ONLY if regular file
        size > pi->size) {
       dout(7) << "  size " << pi->size << " -> " << size
              << " for " << *in << dendl;
       pi->size = size;
       pi->rstat.rbytes = size;
     }
-    if (in->inode.is_file() &&
+    if (in->is_file() &&
         (dirty & CEPH_CAP_FILE_WR) &&
         inline_version > pi->inline_data.version) {
       pi->inline_data.version = inline_version;
       if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
-       pi->inline_data.get_data() = m->inline_data;
+       pi->inline_data.set_data(m->inline_data);
       else
        pi->inline_data.free_data();
     }
@@ -3726,7 +3760,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
           << " on " << *in << dendl;
   ceph_assert(in->is_auth());
   client_t client = m->get_source().num();
-  CInode::mempool_inode *latest = in->get_projected_inode();
+  const auto& latest = in->get_projected_inode();
 
   // increase or zero max_size?
   uint64_t size = m->get_size();
@@ -3832,19 +3866,19 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
                m->xattrbl.length() &&
                m->head.xattr_version > in->get_projected_inode()->xattr_version;
 
-  auto &pi = in->project_inode(xattr);
-  pi.inode.version = in->pre_dirty();
-
   MutationRef mut(new MutationImpl());
   mut->ls = mds->mdlog->get_current_segment();
 
-  _update_cap_fields(in, dirty, m, &pi.inode);
+  auto pi = in->project_inode(mut, xattr);
+  pi.inode->version = in->pre_dirty();
+
+  _update_cap_fields(in, dirty, m, pi.inode.get());
 
   if (change_max) {
     dout(7) << "  max_size " << old_max << " -> " << new_max
            << " for " << *in << dendl;
     if (new_max) {
-      auto &cr = pi.inode.client_ranges[client];
+      auto &cr = pi.inode->client_ranges[client];
       cr.range.first = 0;
       cr.range.last = new_max;
       cr.follows = in->first - 1;
@@ -3852,8 +3886,8 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
       if (cap)
        cap->mark_clientwriteable();
     } else {
-      pi.inode.client_ranges.erase(client);
-      if (pi.inode.client_ranges.empty())
+      pi.inode->client_ranges.erase(client);
+      if (pi.inode->client_ranges.empty())
        in->clear_clientwriteable();
       if (cap)
        cap->clear_clientwriteable();
@@ -3869,8 +3903,8 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
 
   // xattrs update?
   if (xattr) {
-    dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
-    pi.inode.xattr_version = m->head.xattr_version;
+    dout(7) << " xattrs v" << pi.inode->xattr_version << " -> " << m->head.xattr_version << dendl;
+    pi.inode->xattr_version = m->head.xattr_version;
     auto p = m->xattrbl.cbegin();
     decode_noshare(*pi.xattrs, p);
     wrlock_force(&in->xattrlock, mut);
@@ -3910,6 +3944,8 @@ void Locker::handle_client_cap_release(const cref_t<MClientCapRelease> &m)
     return;
   }
 
+  if (mds->logger) mds->logger->inc(l_mdss_handle_client_cap_release);
+
   if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
     // Pause RADOS operations until we see the required epoch
     mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
@@ -4006,7 +4042,7 @@ void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
   if (in->is_auth()) {
     // make sure we clear out the client byte range
     if (in->get_projected_inode()->client_ranges.count(client) &&
-       !(in->inode.nlink == 0 && !in->is_any_caps())) {  // unless it's unlink + stray
+       !(in->get_inode()->nlink == 0 && !in->is_any_caps())) {  // unless it's unlink + stray
       if (kill)
        in->state_set(CInode::STATE_NEEDSRECOVER);
       else
@@ -4109,12 +4145,12 @@ void Locker::caps_tick()
     // exponential backoff of warning intervals
     if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
       cap->inc_num_revoke_warnings();
-      stringstream ss;
-      ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
-        << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
-        << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
-      mds->clog->warn() << ss.str();
-      dout(20) << __func__ << " " << ss.str() << dendl;
+      CachedStackStringStream css;
+      *css << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
+          << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
+          << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
+      mds->clog->warn() << css->strv();
+      dout(20) << __func__ << " " << css->strv() << dendl;
     } else {
       dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
     }
@@ -4216,6 +4252,7 @@ void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
     lstat.mask = CEPH_LEASE_VALID | mask;
     lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
     lstat.seq = ++l->seq;
+    lstat.alternate_name = std::string(dn->alternate_name);
     encode_lease(bl, session->info, lstat);
     dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
             << " on " << *dn << dendl;
@@ -4223,6 +4260,7 @@ void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
     // null lease
     LeaseStat lstat;
     lstat.mask = mask;
+    lstat.alternate_name = std::string(dn->alternate_name);
     encode_lease(bl, session->info, lstat);
     dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
   }
@@ -4255,10 +4293,11 @@ void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
                          const LeaseStat& ls)
 {
   if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(ls.mask, bl);
     encode(ls.duration_ms, bl);
     encode(ls.seq, bl);
+    encode(ls.alternate_name, bl);
     ENCODE_FINISH(bl);
   }
   else {
@@ -4890,7 +4929,7 @@ public:
 void Locker::scatter_writebehind(ScatterLock *lock)
 {
   CInode *in = static_cast<CInode*>(lock->get_parent());
-  dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
+  dout(10) << "scatter_writebehind " << in->get_inode()->mtime << " on " << *lock << " on " << *in << dendl;
 
   // journal
   MutationRef mut(new MutationImpl());
@@ -4902,10 +4941,10 @@ void Locker::scatter_writebehind(ScatterLock *lock)
 
   in->pre_cow_old_inode();  // avoid cow mayhem
 
-  auto &pi = in->project_inode();
-  pi.inode.version = in->pre_dirty();
+  auto pi = in->project_inode(mut);
+  pi.inode->version = in->pre_dirty();
 
-  in->finish_scatter_gather_update(lock->get_type());
+  in->finish_scatter_gather_update(lock->get_type(), mut);
   lock->start_flush();
 
   EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
@@ -4914,7 +4953,7 @@ void Locker::scatter_writebehind(ScatterLock *lock)
   mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
   mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
   
-  in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
+  in->finish_scatter_gather_update_accounted(lock->get_type(), &le->metablob);
 
   mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
 }
@@ -4923,7 +4962,8 @@ void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
 {
   CInode *in = static_cast<CInode*>(lock->get_parent());
   dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
-  in->pop_and_dirty_projected_inode(mut->ls);
+
+  mut->apply();
 
   lock->finish_flush();
 
@@ -4939,7 +4979,6 @@ void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
     }
   }
 
-  mut->apply();
   drop_locks(mut.get());
   mut->cleanup();
 
@@ -5232,7 +5271,7 @@ void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
 // ==========================================================================
 // local lock
 
-void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
+void Locker::local_wrlock_grab(LocalLockC *lock, MutationRef& mut)
 {
   dout(7) << "local_wrlock_grab  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
@@ -5245,7 +5284,7 @@ void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
   ceph_assert(it->is_wrlock());
 }
 
-bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
+bool Locker::local_wrlock_start(LocalLockC *lock, MDRequestRef& mut)
 {
   dout(7) << "local_wrlock_start  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
@@ -5265,7 +5304,7 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
   ceph_assert(it->is_wrlock());
-  LocalLock *lock = static_cast<LocalLock*>(it->lock);
+  LocalLockC *lock = static_cast<LocalLockC*>(it->lock);
   dout(7) << "local_wrlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_wrlock();
@@ -5277,7 +5316,7 @@ void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, Mutation
   }
 }
 
-bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
+bool Locker::local_xlock_start(LocalLockC *lock, MDRequestRef& mut)
 {
   dout(7) << "local_xlock_start  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
@@ -5296,7 +5335,7 @@ bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
 void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
   ceph_assert(it->is_xlock());
-  LocalLock *lock = static_cast<LocalLock*>(it->lock);
+  LocalLockC *lock = static_cast<LocalLockC*>(it->lock);
   dout(7) << "local_xlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_xlock();