]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Locker.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / mds / Locker.cc
index 6cb9a73511130e643d1c2ee69cb51db1007befab..90c16715f84bf4ee62d297a0f2758b3b2a27c3c1 100644 (file)
  * 
  */
 
+#include <string_view>
 
 #include "MDSRank.h"
 #include "MDCache.h"
 #include "Locker.h"
+#include "MDBalancer.h"
+#include "Migrator.h"
 #include "CInode.h"
 #include "CDir.h"
 #include "CDentry.h"
@@ -54,7 +57,7 @@ static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
 }
 
 
-class LockerContext : public MDSInternalContextBase {
+class LockerContext : public MDSContext {
 protected:
   Locker *locker;
   MDSRank *get_mds() override
@@ -64,7 +67,7 @@ protected:
 
 public:
   explicit LockerContext(Locker *locker_) : locker(locker_) {
-    assert(locker != NULL);
+    ceph_assert(locker != NULL);
   }
 };
 
@@ -78,40 +81,39 @@ protected:
 
 public:
   explicit LockerLogContext(Locker *locker_) : locker(locker_) {
-    assert(locker != NULL);
+    ceph_assert(locker != NULL);
   }
 };
 
-/* This function DOES put the passed message before returning */
-void Locker::dispatch(Message *m)
+Locker::Locker(MDSRank *m, MDCache *c) :
+  mds(m), mdcache(c), need_snapflush_inodes(member_offset(CInode, item_caps)) {}
+
+
+void Locker::dispatch(const Message::const_ref &m)
 {
 
   switch (m->get_type()) {
-
     // inter-mds locking
   case MSG_MDS_LOCK:
-    handle_lock(static_cast<MLock*>(m));
+    handle_lock(MLock::msgref_cast(m));
     break;
     // inter-mds caps
   case MSG_MDS_INODEFILECAPS:
-    handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
+    handle_inode_file_caps(MInodeFileCaps::msgref_cast(m));
     break;
-
     // client sync
   case CEPH_MSG_CLIENT_CAPS:
-    handle_client_caps(static_cast<MClientCaps*>(m));
-
+    handle_client_caps(MClientCaps::msgref_cast(m));
     break;
   case CEPH_MSG_CLIENT_CAPRELEASE:
-    handle_client_cap_release(static_cast<MClientCapRelease*>(m));
+    handle_client_cap_release(MClientCapRelease::msgref_cast(m));
     break;
   case CEPH_MSG_CLIENT_LEASE:
-    handle_client_lease(static_cast<MClientLease*>(m));
+    handle_client_lease(MClientLease::msgref_cast(m));
     break;
-    
   default:
     derr << "locker unknown message " << m->get_type() << dendl;
-    assert(0 == "locker unknown message");
+    ceph_abort_msg("locker unknown message");
   }
 }
 
@@ -130,57 +132,53 @@ void Locker::tick()
 
 void Locker::send_lock_message(SimpleLock *lock, int msg)
 {
-  for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
-       it != lock->get_parent()->replicas_end();
-       ++it) {
+  for (const auto &it : lock->get_parent()->get_replicas()) {
     if (mds->is_cluster_degraded() &&
-       mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN) 
+       mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
       continue;
-    MLock *m = new MLock(lock, msg, mds->get_nodeid());
-    mds->send_message_mds(m, it->first);
+    auto m = MLock::create(lock, msg, mds->get_nodeid());
+    mds->send_message_mds(m, it.first);
   }
 }
 
 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
 {
-  for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
-       it != lock->get_parent()->replicas_end();
-       ++it) {
+  for (const auto &it : lock->get_parent()->get_replicas()) {
     if (mds->is_cluster_degraded() &&
-       mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN) 
+       mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
       continue;
-    MLock *m = new MLock(lock, msg, mds->get_nodeid());
+    auto m = MLock::create(lock, msg, mds->get_nodeid());
     m->set_data(data);
-    mds->send_message_mds(m, it->first);
+    mds->send_message_mds(m, it.first);
   }
 }
 
 
 
 
-void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
+void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
 {
   // rdlock ancestor snaps
   CInode *t = in;
-  rdlocks.insert(&in->snaplock);
   while (t->get_projected_parent_dn()) {
     t = t->get_projected_parent_dn()->get_dir()->get_inode();
-    rdlocks.insert(&t->snaplock);
+    lov.add_rdlock(&t->snaplock);
   }
+  lov.add_rdlock(&in->snaplock);
 }
 
-void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
+void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
                                          file_layout_t **layout)
 {
   //rdlock ancestor snaps
   CInode *t = in;
-  rdlocks.insert(&in->snaplock);
-  rdlocks.insert(&in->policylock);
+  lov.add_rdlock(&in->snaplock);
+  lov.add_rdlock(&in->policylock);
   bool found_layout = false;
   while (t) {
-    rdlocks.insert(&t->snaplock);
+    lov.add_rdlock(&t->snaplock);
     if (!found_layout) {
-      rdlocks.insert(&t->policylock);
+      lov.add_rdlock(&t->policylock);
       if (t->get_projected_inode()->has_layout()) {
         *layout = &t->get_projected_inode()->layout;
         found_layout = true;
@@ -195,12 +193,12 @@ void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
 
 struct MarkEventOnDestruct {
   MDRequestRef& mdr;
-  const char* message;
+  std::string_view message;
   bool mark_event;
-  MarkEventOnDestruct(MDRequestRef& _mdr,
-                      const char *_message) : mdr(_mdr),
-                          message(_message),
-                          mark_event(true) {}
+  MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
+      mdr(_mdr),
+      message(_message),
+      mark_event(true) {}
   ~MarkEventOnDestruct() {
     if (mark_event)
       mdr->mark_event(message);
@@ -210,10 +208,7 @@ struct MarkEventOnDestruct {
 /* If this function returns false, the mdr has been placed
  * on the appropriate wait list */
 bool Locker::acquire_locks(MDRequestRef& mdr,
-                          set<SimpleLock*> &rdlocks,
-                          set<SimpleLock*> &wrlocks,
-                          set<SimpleLock*> &xlocks,
-                          map<SimpleLock*,mds_rank_t> *remote_wrlocks,
+                          MutationImpl::LockOpVec& lov,
                           CInode *auth_pin_freeze,
                           bool auth_pin_nonblock)
 {
@@ -228,105 +223,122 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   client_t client = mdr->get_client();
 
-  set<SimpleLock*, SimpleLock::ptr_lt> sorted;  // sort everything we will lock
-  set<MDSCacheObject*> mustpin;            // items to authpin
+  set<MDSCacheObject*> mustpin;  // items to authpin
 
   // xlocks
-  for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
-    dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
-    sorted.insert(*p);
-    mustpin.insert((*p)->get_parent());
-
-    // augment xlock with a versionlock?
-    if ((*p)->get_type() == CEPH_LOCK_DN) {
-      CDentry *dn = (CDentry*)(*p)->get_parent();
-      if (!dn->is_auth())
-       continue;
-
-      if (xlocks.count(&dn->versionlock))
-       continue;  // we're xlocking the versionlock too; don't wrlock it!
-
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline dentry updates to journal.
-       wrlocks.insert(&dn->versionlock);
-      } else {
-       // slave.  exclusively lock the dentry version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&dn->versionlock);
-       sorted.insert(&dn->versionlock);
-      }
-    }
-    if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
-      // inode version lock?
-      CInode *in = (CInode*)(*p)->get_parent();
-      if (!in->is_auth())
-       continue;
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline inode updates to journal.
-       wrlocks.insert(&in->versionlock);
-      } else {
-       // slave.  exclusively lock the inode version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&in->versionlock);
-       sorted.insert(&in->versionlock);
+  for (int i = 0, size = lov.size(); i < size; ++i) {
+    auto& p = lov[i];
+    SimpleLock *lock = p.lock;
+    MDSCacheObject *object = lock->get_parent();
+
+    if (p.is_xlock()) {
+      if ((lock->get_type() == CEPH_LOCK_ISNAP ||
+          lock->get_type() == CEPH_LOCK_IPOLICY) &&
+         mds->is_cluster_degraded() &&
+         mdr->is_master() &&
+         !mdr->is_queued_for_replay()) {
+       // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
+       // get processed in proper order.
+       bool wait = false;
+       if (object->is_auth()) {
+         if (!mdr->locks.count(lock)) {
+           set<mds_rank_t> ls;
+           object->list_replicas(ls);
+           for (auto m : ls) {
+             if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
+               wait = true;
+               break;
+             }
+           }
+         }
+       } else {
+         // if the lock is the latest locked one, it's possible that slave mds got the lock
+         // while there are recovering mds.
+         if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
+           wait = true;
+       }
+       if (wait) {
+         dout(10) << " must xlock " << *lock << " " << *object
+                  << ", waiting for cluster recovered" << dendl;
+         mds->locker->drop_locks(mdr.get(), NULL);
+         mdr->drop_local_auth_pins();
+         mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+         return false;
+       }
       }
-    }
-  }
 
-  // wrlocks
-  for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must wrlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_wrlock(client) &&  // we might have to request a scatter
-            !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a scatter" << dendl;
-      mustpin.insert(object);
-    }
-  }
+      dout(20) << " must xlock " << *lock << " " << *object << dendl;
 
-  // remote_wrlocks
-  if (remote_wrlocks) {
-    for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
-      MDSCacheObject *object = p->first->get_parent();
-      dout(20) << " must remote_wrlock on mds." << p->second << " "
-              << *p->first << " " << *object << dendl;
-      sorted.insert(p->first);
       mustpin.insert(object);
-    }
-  }
 
-  // rdlocks
-  for (set<SimpleLock*>::iterator p = rdlocks.begin();
-        p != rdlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must rdlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_rdlock(client)) {      // we might have to request an rdlock
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a rdlock" << dendl;
+      // augment xlock with a versionlock?
+      if (lock->get_type() == CEPH_LOCK_DN) {
+       CDentry *dn = static_cast<CDentry*>(object);
+       if (!dn->is_auth())
+         continue;
+       if (mdr->is_master()) {
+         // master.  wrlock versionlock so we can pipeline dentry updates to journal.
+         lov.add_wrlock(&dn->versionlock);
+       } else {
+         // slave.  exclusively lock the dentry version (i.e. block other journal updates).
+         // this makes rollback safe.
+         lov.add_xlock(&dn->versionlock);
+       }
+      }
+      if (lock->get_type() > CEPH_LOCK_IVERSION) {
+       // inode version lock?
+       CInode *in = static_cast<CInode*>(object);
+       if (!in->is_auth())
+         continue;
+       if (mdr->is_master()) {
+         // master.  wrlock versionlock so we can pipeline inode updates to journal.
+         lov.add_wrlock(&in->versionlock);
+       } else {
+         // slave.  exclusively lock the inode version (i.e. block other journal updates).
+         // this makes rollback safe.
+         lov.add_xlock(&in->versionlock);
+       }
+      }
+    } else if (p.is_wrlock()) {
+      dout(20) << " must wrlock " << *lock << " " << *object << dendl;
+      if (object->is_auth()) {
+       mustpin.insert(object);
+      } else if (!object->is_auth() &&
+                !lock->can_wrlock(client) &&  // we might have to request a scatter
+                !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
+       dout(15) << " will also auth_pin " << *object
+                << " in case we need to request a scatter" << dendl;
+       mustpin.insert(object);
+      }
+    } else if (p.is_remote_wrlock()) {
+      dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
+              << *lock << " " << *object << dendl;
       mustpin.insert(object);
+    } else if (p.is_rdlock()) {
+
+      dout(20) << " must rdlock " << *lock << " " << *object << dendl;
+      if (object->is_auth()) {
+       mustpin.insert(object);
+      } else if (!object->is_auth() &&
+                !lock->can_rdlock(client)) {      // we might have to request an rdlock
+       dout(15) << " will also auth_pin " << *object
+                << " in case we need to request a rdlock" << dendl;
+       mustpin.insert(object);
+      }
+    } else {
+      ceph_assert(0 == "locker unknown lock operation");
     }
   }
 
+  lov.sort_and_merge();
  
   // AUTH PINS
   map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote;  // mds -> (object set)
   
   // can i auth pin them all now?
   marker.message = "failed to authpin local pins";
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
+  for (const auto &p : mustpin) {
+    MDSCacheObject *object = p;
 
     dout(10) << " must authpin " << *object << dendl;
 
@@ -346,6 +358,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        drop_locks(mdr.get());
       if (object->is_ambiguous_auth()) {
        // wait
+       marker.message = "waiting for single auth, object is being migrated";
        dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
        object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
        mdr->drop_local_auth_pins();
@@ -354,7 +367,8 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       mustpin_remote[object->authority().first].insert(object);
       continue;
     }
-    if (!object->can_auth_pin()) {
+    int err = 0;
+    if (!object->can_auth_pin(&err)) {
       // wait
       drop_locks(mdr.get());
       mdr->drop_local_auth_pins();
@@ -363,17 +377,26 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        mdr->aborted = true;
        return false;
       }
+      if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
+       marker.message = "failed to authpin, subtree is being exported";
+      } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
+       marker.message = "failed to authpin, dir is being fragmented";
+      } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
+       marker.message = "failed to authpin, inode is being exported";
+      }
       dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
       object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
+
+      if (!mdr->remote_auth_pins.empty())
+       notify_freeze_waiter(object);
+
       return false;
     }
   }
 
   // ok, grab local auth pins
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
+  for (const auto& p : mustpin) {
+    MDSCacheObject *object = p;
     if (mdr->is_auth_pinned(object)) {
       dout(10) << " already auth_pinned " << *object << dendl;
     } else if (object->is_auth()) {
@@ -385,14 +408,12 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
   // request remote auth_pins
   if (!mustpin_remote.empty()) {
     marker.message = "requesting remote authpins";
-    for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
-        p != mdr->remote_auth_pins.end();
-        ++p) {
-      if (mustpin.count(p->first)) {
-       assert(p->second == p->first->authority().first);
-       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
+    for (const auto& p : mdr->remote_auth_pins) {
+      if (mustpin.count(p.first)) {
+       ceph_assert(p.second == p.first->authority().first);
+       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
        if (q != mustpin_remote.end())
-         q->second.insert(p->first);
+         q->second.insert(p.first);
       }
     }
     for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
@@ -409,8 +430,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        return false;
       }
       
-      MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
-                                                  MMDSSlaveRequest::OP_AUTHPIN);
+      auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN);
       for (set<MDSCacheObject*>::iterator q = p->second.begin();
           q != p->second.end();
           ++q) {
@@ -427,7 +447,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       mds->send_message_mds(req, p->first);
 
       // put in waiting list
-      assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
+      ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
       mdr->more()->waiting_on_slave.insert(p->first);
     }
     return false;
@@ -439,170 +459,154 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   // acquire locks.
   // make sure they match currently acquired locks.
-  set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
-  for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
-       p != sorted.end();
-       ++p) {
-    bool need_wrlock = !!wrlocks.count(*p);
-    bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
+  auto existing = mdr->locks.begin();
+  for (const auto& p : lov) {
+    bool need_wrlock = p.is_wrlock();
+    bool need_remote_wrlock = p.is_remote_wrlock();
 
     // already locked?
-    if (existing != mdr->locks.end() && *existing == *p) {
+    if (existing != mdr->locks.end() && existing->lock == p.lock) {
       // right kind?
-      SimpleLock *have = *existing;
-      ++existing;
-      if (xlocks.count(have) && mdr->xlocks.count(have)) {
-       dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
+      auto it = existing++;
+      auto have = *it; // don't reference
+
+      if (have.is_xlock() && p.is_xlock()) {
+       dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
        continue;
       }
-      if (mdr->remote_wrlocks.count(have)) {
-       if (!need_remote_wrlock ||
-           mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
-         dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
-                  << " " << *have << " " << *have->get_parent() << dendl;
-         remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
-       }
+
+      if (have.is_remote_wrlock() &&
+         (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
+       dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
+                << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
+       remote_wrlock_finish(it, mdr.get());
+       have.clear_remote_wrlock();
       }
+
       if (need_wrlock || need_remote_wrlock) {
-       if (need_wrlock == !!mdr->wrlocks.count(have) &&
-           need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
+       if (need_wrlock == have.is_wrlock() &&
+           need_remote_wrlock == have.is_remote_wrlock()) {
          if (need_wrlock)
-           dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
+           dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
          if (need_remote_wrlock)
-           dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
+           dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
          continue;
        }
-      }
-      if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
-       dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
+
+       if (have.is_wrlock()) {
+         if (!need_wrlock)
+           dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
+         else if (need_remote_wrlock) // acquire remote_wrlock first
+           dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
+         bool need_issue = false;
+         wrlock_finish(it, mdr.get(), &need_issue);
+         if (need_issue)
+           issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
+       }
+      } else if (have.is_rdlock() && p.is_rdlock()) {
+       dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
        continue;
       }
     }
     
     // hose any stray locks
-    if (existing != mdr->locks.end() && *existing == *p) {
-      assert(need_wrlock || need_remote_wrlock);
-      SimpleLock *lock = *existing;
-      if (mdr->wrlocks.count(lock)) {
-       if (!need_wrlock)
-         dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
-       else if (need_remote_wrlock) // acquire remote_wrlock first
-         dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
-       bool need_issue = false;
-       wrlock_finish(lock, mdr.get(), &need_issue);
-       if (need_issue)
-         issue_set.insert(static_cast<CInode*>(lock->get_parent()));
-      }
-      ++existing;
-    }
     while (existing != mdr->locks.end()) {
-      SimpleLock *stray = *existing;
-      ++existing;
-      dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
+      auto it = existing++;
+      auto stray = *it; // don't reference
+      dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
       bool need_issue = false;
-      if (mdr->xlocks.count(stray)) {
-       xlock_finish(stray, mdr.get(), &need_issue);
-      } else if (mdr->rdlocks.count(stray)) {
-       rdlock_finish(stray, mdr.get(), &need_issue);
+      if (stray.is_xlock()) {
+       xlock_finish(it, mdr.get(), &need_issue);
+      } else if (stray.is_rdlock()) {
+       rdlock_finish(it, mdr.get(), &need_issue);
       } else {
        // may have acquired both wrlock and remore wrlock
-       if (mdr->wrlocks.count(stray))
-         wrlock_finish(stray, mdr.get(), &need_issue);
-       if (mdr->remote_wrlocks.count(stray))
-         remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+       if (stray.is_wrlock())
+         wrlock_finish(it, mdr.get(), &need_issue);
+       if (stray.is_remote_wrlock())
+         remote_wrlock_finish(it, mdr.get());
       }
       if (need_issue)
-       issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+       issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
     }
 
     // lock
-    if (mdr->locking && *p != mdr->locking) {
+    if (mdr->locking && p.lock != mdr->locking) {
       cancel_locking(mdr.get(), &issue_set);
     }
-    if (xlocks.count(*p)) {
+    if (p.is_xlock()) {
       marker.message = "failed to xlock, waiting";
-      if (!xlock_start(*p, mdr)) 
+      if (!xlock_start(p.lock, mdr))
        goto out;
-      dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
     } else if (need_wrlock || need_remote_wrlock) {
-      if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
+      if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
         marker.message = "waiting for remote wrlocks";
-       remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+       remote_wrlock_start(p, p.wrlock_target, mdr);
        goto out;
       }
-      if (need_wrlock && !mdr->wrlocks.count(*p)) {
+      if (need_wrlock) {
         marker.message = "failed to wrlock, waiting";
-       if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
+       if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
          marker.message = "failed to wrlock, dropping remote wrlock and waiting";
          // can't take the wrlock because the scatter lock is gathering. need to
          // release the remote wrlock, so that the gathering process can finish.
-         remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
-         remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+         auto it =  mdr->locks.end();
+         ++it;
+         remote_wrlock_finish(it, mdr.get());
+         remote_wrlock_start(p, p.wrlock_target, mdr);
          goto out;
        }
        // nowait if we have already gotten remote wrlock
-       if (!wrlock_start(*p, mdr, need_remote_wrlock))
+       if (!wrlock_start(p, mdr, need_remote_wrlock))
          goto out;
-       dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
+       dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
       }
     } else {
-      assert(mdr->is_master());
-      if ((*p)->is_scatterlock()) {
-       ScatterLock *slock = static_cast<ScatterLock *>(*p);
-       if (slock->is_rejoin_mix()) {
-         // If there is a recovering mds who replcated an object when it failed
-         // and scatterlock in the object was in MIX state, It's possible that
-         // the recovering mds needs to take wrlock on the scatterlock when it
-         // replays unsafe requests. So this mds should delay taking rdlock on
-         // the scatterlock until the recovering mds finishes replaying unsafe.
-         // Otherwise unsafe requests may get replayed after current request.
-         //
-         // For example:
-         // The recovering mds is auth mds of a dirfrag, this mds is auth mds
-         // of correspinding inode. when 'rm -rf' the direcotry, this mds should
-         // delay the rmdir request until the recovering mds has replayed unlink
-         // requests.
-         if (mds->is_cluster_degraded()) {
-           if (!mdr->is_replay()) {
-             drop_locks(mdr.get());
-             mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-             dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
-                      << ", waiting for cluster recovered" << dendl;
-             marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
-             return false;
-           }
-         } else {
-           slock->clear_rejoin_mix();
+      ceph_assert(mdr->is_master());
+      if (p.lock->needs_recover()) {
+       if (mds->is_cluster_degraded()) {
+         if (!mdr->is_queued_for_replay()) {
+           // see comments in SimpleLock::set_state_rejoin() and
+           // ScatterLock::encode_state_for_rejoin()
+           drop_locks(mdr.get());
+           mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+           dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
+                    << ", waiting for cluster recovered" << dendl;
+           marker.message = "rejoin recovering lock, waiting for cluster recovered";
+           return false;
          }
+       } else {
+         p.lock->clear_need_recover();
        }
       }
 
       marker.message = "failed to rdlock, waiting";
-      if (!rdlock_start(*p, mdr)) 
+      if (!rdlock_start(p, mdr))
        goto out;
-      dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
     }
   }
     
   // any extra unneeded locks?
   while (existing != mdr->locks.end()) {
-    SimpleLock *stray = *existing;
-    ++existing;
-    dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
+    auto it = existing++;
+    auto stray = *it;
+    dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
     bool need_issue = false;
-    if (mdr->xlocks.count(stray)) {
-      xlock_finish(stray, mdr.get(), &need_issue);
-    } else if (mdr->rdlocks.count(stray)) {
-      rdlock_finish(stray, mdr.get(), &need_issue);
+    if (stray.is_xlock()) {
+      xlock_finish(it, mdr.get(), &need_issue);
+    } else if (stray.is_rdlock()) {
+      rdlock_finish(it, mdr.get(), &need_issue);
     } else {
       // may have acquired both wrlock and remore wrlock
-      if (mdr->wrlocks.count(stray))
-       wrlock_finish(stray, mdr.get(), &need_issue);
-      if (mdr->remote_wrlocks.count(stray))
-       remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+      if (stray.is_wrlock())
+       wrlock_finish(it, mdr.get(), &need_issue);
+      if (stray.is_remote_wrlock())
+       remote_wrlock_finish(it, mdr.get());
     }
     if (need_issue)
-      issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+      issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
   }
 
   mdr->done_locking = true;
@@ -615,76 +619,93 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
   return result;
 }
 
-
-void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
+void Locker::notify_freeze_waiter(MDSCacheObject *o)
 {
-  for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
-       p != mut->xlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    assert(object->is_auth());
-    if (skip_dentry &&
-       ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
-      continue;
-    dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
-    (*p)->set_xlock_done();
+  CDir *dir = NULL;
+  if (CInode *in = dynamic_cast<CInode*>(o)) {
+    if (!in->is_root())
+      dir = in->get_parent_dir();
+  } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
+    dir = dn->get_dir();
+  } else {
+    dir = dynamic_cast<CDir*>(o);
+    ceph_assert(dir);
+  }
+  if (dir) {
+    if (dir->is_freezing_dir())
+      mdcache->fragment_freeze_inc_num_waiters(dir);
+    if (dir->is_freezing_tree()) {
+      while (!dir->is_freezing_tree_root())
+       dir = dir->get_parent_dir();
+      mdcache->migrator->export_freeze_inc_num_waiters(dir);
+    }
   }
 }
 
-void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
 {
-  while (!mut->rdlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
-    rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
+  for (const auto &p : mut->locks) {
+    if (!p.is_xlock())
+      continue;
+    MDSCacheObject *obj = p.lock->get_parent();
+    ceph_assert(obj->is_auth());
+    if (skip_dentry &&
+       (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
+      continue;
+    dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
+    p.lock->set_xlock_done();
   }
 }
 
-void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
+                        bool drop_rdlocks)
 {
   set<mds_rank_t> slaves;
 
-  while (!mut->xlocks.empty()) {
-    SimpleLock *lock = *mut->xlocks.begin();
-    MDSCacheObject *p = lock->get_parent();
-    if (!p->is_auth()) {
-      assert(lock->get_sm()->can_remote_xlock);
-      slaves.insert(p->authority().first);
-      lock->put_xlock();
-      mut->locks.erase(lock);
-      mut->xlocks.erase(lock);
-      continue;
-    }
-    bool ni = false;
-    xlock_finish(lock, mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-
-  while (!mut->remote_wrlocks.empty()) {
-    map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
-    slaves.insert(p->second);
-    if (mut->wrlocks.count(p->first) == 0)
-      mut->locks.erase(p->first);
-    mut->remote_wrlocks.erase(p);
-  }
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    SimpleLock *lock = it->lock;
+    MDSCacheObject *obj = lock->get_parent();
 
-  while (!mut->wrlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
-    wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
+    if (it->is_xlock()) {
+      if (obj->is_auth()) {
+       bool ni = false;
+       xlock_finish(it++, mut, &ni);
+       if (ni)
+         pneed_issue->insert(static_cast<CInode*>(obj));
+      } else {
+       ceph_assert(lock->get_sm()->can_remote_xlock);
+       slaves.insert(obj->authority().first);
+       lock->put_xlock();
+       mut->locks.erase(it++);
+      }
+    } else if (it->is_wrlock() || it->is_remote_wrlock()) {
+      if (it->is_remote_wrlock()) {
+       slaves.insert(it->wrlock_target);
+       it->clear_remote_wrlock();
+      }
+      if (it->is_wrlock()) {
+       bool ni = false;
+       wrlock_finish(it++, mut, &ni);
+       if (ni)
+         pneed_issue->insert(static_cast<CInode*>(obj));
+      } else {
+       mut->locks.erase(it++);
+      }
+    } else if (drop_rdlocks && it->is_rdlock()) {
+      bool ni = false;
+      rdlock_finish(it++, mut, &ni);
+      if (ni)
+       pneed_issue->insert(static_cast<CInode*>(obj));
+    } else {
+      ++it;
+    }
   }
 
   for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
       dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
-      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                       MMDSSlaveRequest::OP_DROPLOCKS);
+      auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS);
       mds->send_message_mds(slavereq, *p);
     }
   }
@@ -693,15 +714,14 @@ void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
 {
   SimpleLock *lock = mut->locking;
-  assert(lock);
+  ceph_assert(lock);
   dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
 
   if (lock->get_parent()->is_auth()) {
     bool need_issue = false;
     if (lock->get_state() == LOCK_PREXLOCK) {
       _finish_xlock(lock, -1, &need_issue);
-    } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
-              lock->get_num_xlocks() == 0) {
+    } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
       lock->set_state(LOCK_XLOCKDONE);
       eval_gather(lock, true, &need_issue);
     }
@@ -720,8 +740,7 @@ void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
 
   if (mut->locking)
     cancel_locking(mut, pneed_issue);
-  _drop_non_rdlocks(mut, pneed_issue);
-  _drop_rdlocks(mut, pneed_issue);
+  _drop_locks(mut, pneed_issue, true);
 
   if (pneed_issue == &my_need_issue)
     issue_caps_set(*pneed_issue);
@@ -734,31 +753,61 @@ void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
   if (!pneed_issue)
     pneed_issue = &my_need_issue;
 
-  _drop_non_rdlocks(mut, pneed_issue);
+  _drop_locks(mut, pneed_issue, false);
 
   if (pneed_issue == &my_need_issue)
     issue_caps_set(*pneed_issue);
 }
 
-void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
 {
-  set<CInode*> my_need_issue;
-  if (!pneed_issue)
-    pneed_issue = &my_need_issue;
+  set<CInode*> need_issue;
 
-  _drop_rdlocks(mut, pneed_issue);
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    if (!it->is_rdlock()) {
+      ++it;
+      continue;
+    }
+    SimpleLock *lock = it->lock;
+    // make later mksnap/setlayout (at other mds) wait for this unsafe request
+    if (lock->get_type() == CEPH_LOCK_ISNAP ||
+       lock->get_type() == CEPH_LOCK_IPOLICY) {
+      ++it;
+      continue;
+    }
+    bool ni = false;
+    rdlock_finish(it++, mut, &ni);
+    if (ni)
+      need_issue.insert(static_cast<CInode*>(lock->get_parent()));
+  }
 
-  if (pneed_issue == &my_need_issue)
-    issue_caps_set(*pneed_issue);
+  issue_caps_set(need_issue);
 }
 
+void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
+{
+  set<CInode*> need_issue;
+
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    SimpleLock *lock = it->lock;
+    if (lock->get_type() == CEPH_LOCK_IDFT) {
+      ++it;
+      continue;
+    }
+    bool ni = false;
+    wrlock_finish(it++, mut, &ni);
+    if (ni)
+      need_issue.insert(static_cast<CInode*>(lock->get_parent()));
+  }
+  issue_caps_set(need_issue);
+}
 
 // generics
 
-void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
+void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
 {
   dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(!lock->is_stable());
+  ceph_assert(!lock->is_stable());
 
   int next = lock->get_next_state();
 
@@ -770,7 +819,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
   bool need_issue = false;
 
   int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
-  assert(!caps || in != NULL);
+  ceph_assert(!caps || in != NULL);
   if (caps && in->is_head()) {
     in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
                        lock->get_cap_shift(), lock->get_cap_mask());
@@ -807,7 +856,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
            << " on " << *lock->get_parent() << dendl;
 
     if (lock->get_sm() == &sm_filelock) {
-      assert(in);
+      ceph_assert(in);
       if (in->state_test(CInode::STATE_RECOVERING)) {
        dout(7) << "eval_gather finished gather, but still recovering" << dendl;
        return;
@@ -834,13 +883,12 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
          mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
        switch (lock->get_state()) {
        case LOCK_SYNC_LOCK:
-         mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
-                               auth);
+         mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
          break;
 
        case LOCK_MIX_SYNC:
          {
-           MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
+           auto reply = MLock::create(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
            lock->encode_locked_state(reply->get_data());
            mds->send_message_mds(reply, auth);
            next = LOCK_MIX_SYNC2;
@@ -858,7 +906,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
          
        case LOCK_SYNC_MIX:
          { 
-           MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
+           auto reply = MLock::create(lock, LOCK_AC_MIXACK, mds->get_nodeid());
            mds->send_message_mds(reply, auth);
            next = LOCK_SYNC_MIX2;
          }
@@ -868,7 +916,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
          {
            bufferlist data;
            lock->encode_locked_state(data);
-           mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
+           mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
            (static_cast<ScatterLock *>(lock))->start_flush();
            // we'll get an AC_LOCKFLUSHED to complete
          }
@@ -904,6 +952,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
       case LOCK_TSYN_MIX:
       case LOCK_SYNC_MIX:
       case LOCK_EXCL_MIX:
+      case LOCK_XSYN_MIX:
        in->start_scatter(static_cast<ScatterLock *>(lock));
        if (lock->get_parent()->is_replicated()) {
          bufferlist softdata;
@@ -978,21 +1027,21 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
 bool Locker::eval(CInode *in, int mask, bool caps_imported)
 {
   bool need_issue = caps_imported;
-  list<MDSInternalContextBase*> finishers;
+  MDSContext::vec finishers;
   
   dout(10) << "eval " << mask << " " << *in << dendl;
 
   // choose loner?
   if (in->is_auth() && in->is_head()) {
-    if (in->choose_ideal_loner() >= 0) {
-      if (in->try_set_loner()) {
-       dout(10) << "eval set loner to client." << in->get_loner() << dendl;
-       need_issue = true;
-       mask = -1;
-      } else
-       dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
-    } else
-      dout(10) << "eval doesn't want loner" << dendl;
+    client_t orig_loner = in->get_loner();
+    if (in->choose_ideal_loner()) {
+      dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
+      need_issue = true;
+      mask = -1;
+    } else if (in->get_wanted_loner() != in->get_loner()) {
+      dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
+      mask = -1;
+    }
   }
 
  retry:
@@ -1013,19 +1062,14 @@ bool Locker::eval(CInode *in, int mask, bool caps_imported)
 
   // drop loner?
   if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
-    dout(10) << "  trying to drop loner" << dendl;
     if (in->try_drop_loner()) {
-      dout(10) << "  dropped loner" << dendl;
       need_issue = true;
-
       if (in->get_wanted_loner() >= 0) {
-       if (in->try_set_loner()) {
-         dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
-         mask = -1;
-         goto retry;
-       } else {
-         dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
-       }
+       dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
+       bool ok = in->try_set_loner();
+       ceph_assert(ok);
+       mask = -1;
+       goto retry;
       }
     }
   }
@@ -1046,7 +1090,7 @@ public:
   C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
     // We are used as an MDSCacheObject waiter, so should
     // only be invoked by someone already holding the big lock.
-    assert(locker->mds->mds_lock.is_locked_by_me());
+    ceph_assert(locker->mds->mds_lock.is_locked_by_me());
     p->get(MDSCacheObject::PIN_PTRWAITER);    
   }
   void finish(int r) override {
@@ -1071,7 +1115,7 @@ void Locker::try_eval(MDSCacheObject *p, int mask)
   }
 
   if (mask & CEPH_LOCK_DN) {
-    assert(mask == CEPH_LOCK_DN);
+    ceph_assert(mask == CEPH_LOCK_DN);
     bool need_issue = false;  // ignore this, no caps on dentries
     CDentry *dn = static_cast<CDentry *>(p);
     eval_any(&dn->lock, &need_issue);
@@ -1132,7 +1176,9 @@ void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
     }
   }
 
-  if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
+  if (lock->get_type() != CEPH_LOCK_DN &&
+      lock->get_type() != CEPH_LOCK_ISNAP &&
+      p->is_freezing()) {
     dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
     p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
     return;
@@ -1144,7 +1190,7 @@ void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
 {
   bool need_issue = false;
-  list<MDSInternalContextBase*> finishers;
+  MDSContext::vec finishers;
 
   // kick locks now
   if (!in->filelock.is_stable())
@@ -1169,7 +1215,7 @@ void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
 void Locker::eval_scatter_gathers(CInode *in)
 {
   bool need_issue = false;
-  list<MDSInternalContextBase*> finishers;
+  MDSContext::vec finishers;
 
   dout(10) << "eval_scatter_gathers " << *in << dendl;
 
@@ -1233,7 +1279,7 @@ bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
          mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
        dout(10) << "requesting rdlock from auth on "
                 << *lock << " on " << *lock->get_parent() << dendl;
-       mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
+       mds->send_message_mds(MLock::create(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
       }
       return false;
     }
@@ -1248,7 +1294,7 @@ bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
   return false;
 }
 
-bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
+bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSContext *con)
 {
   dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;  
 
@@ -1295,8 +1341,7 @@ bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
     // can read?  grab ref.
     if (lock->can_rdlock(client)) {
       lock->get_rdlock();
-      mut->rdlocks.insert(lock);
-      mut->locks.insert(lock);
+      mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
       return true;
     }
 
@@ -1305,11 +1350,14 @@ bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
        lock->get_state() == LOCK_SNAP_SYNC) {
       // okay, we actually need to kick the head's lock to get ourselves synced up.
       CInode *head = mdcache->get_inode(in->ino());
-      assert(head);
-      SimpleLock *hlock = head->get_lock(lock->get_type());
+      ceph_assert(head);
+      SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
+      if (hlock->get_state() == LOCK_SYNC)
+       hlock = head->get_lock(lock->get_type());
+
       if (hlock->get_state() != LOCK_SYNC) {
        dout(10) << "rdlock_start trying head inode " << *head << dendl;
-       if (!rdlock_start(head->get_lock(lock->get_type()), mut, true)) // ** as_anon, no rdlock on EXCL **
+       if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
          return false;
        // oh, check our lock again then
       }
@@ -1338,14 +1386,14 @@ void Locker::nudge_log(SimpleLock *lock)
     mds->mdlog->flush();
 }
 
-void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_rdlock());
+  SimpleLock *lock = it->lock;
   // drop ref
   lock->put_rdlock();
-  if (mut) {
-    mut->rdlocks.erase(lock);
-    mut->locks.erase(lock);
-  }
+  if (mut)
+    mut->locks.erase(it);
 
   dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
   
@@ -1359,35 +1407,27 @@ void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issu
 }
 
 
-bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
+bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
 {
-  dout(10) << "can_rdlock_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!(*p)->can_rdlock(-1)) {
-      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+  dout(10) << "can_rdlock_set " << dendl;
+  for (const auto& p : lov) {
+    ceph_assert(p.is_rdlock());
+    if (!p.lock->can_rdlock(-1)) {
+      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
       return false;
     }
+  }
   return true;
 }
 
-bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
-{
-  dout(10) << "rdlock_try_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!rdlock_try(*p, -1, NULL)) {
-      dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
-      return false;
-    }
-  return true;
-}
 
-void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
+void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
 {
-  dout(10) << "rdlock_take_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
-    (*p)->get_rdlock();
-    mut->rdlocks.insert(*p);
-    mut->locks.insert(*p);
+  dout(10) << "rdlock_take_set "  << dendl;
+  for (const auto& p : lov) {
+    ceph_assert(p.is_rdlock());
+    p.lock->get_rdlock();
+    mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
   }
 }
 
@@ -1403,8 +1443,7 @@ void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
   dout(7) << "wrlock_force  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
 }
 
 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
@@ -1426,8 +1465,8 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
     if (lock->can_wrlock(client) &&
        (!want_scatter || lock->get_state() == LOCK_MIX)) {
       lock->get_wrlock();
-      mut->wrlocks.insert(lock);
-      mut->locks.insert(lock);
+      auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+      it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
       return true;
     }
 
@@ -1462,7 +1501,7 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
          mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
        dout(10) << "requesting scatter from auth on "
                 << *lock << " on " << *lock->get_parent() << dendl;
-       mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
+       mds->send_message_mds(MLock::create(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
       }
       break;
     }
@@ -1477,19 +1516,22 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
   return false;
 }
 
-void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_wrlock());
+  SimpleLock* lock = it->lock;
+
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
+    return local_wrlock_finish(it, mut);
 
   dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
   lock->put_wrlock();
-  if (mut) {
-    mut->wrlocks.erase(lock);
-    if (mut->remote_wrlocks.count(lock) == 0)
-      mut->locks.erase(lock);
-  }
+
+  if (it->is_remote_wrlock())
+    it->clear_wrlock();
+  else
+    mut->locks.erase(it);
 
   if (!lock->is_wrlocked()) {
     if (!lock->is_stable())
@@ -1518,30 +1560,31 @@ void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestR
   // send lock request
   mut->start_locking(lock, target);
   mut->more()->slaves.insert(target);
-  MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                            MMDSSlaveRequest::OP_WRLOCK);
+  auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
   r->set_lock_type(lock->get_type());
   lock->get_parent()->set_object_info(r->get_object_info());
   mds->send_message_mds(r, target);
 
-  assert(mut->more()->waiting_on_slave.count(target) == 0);
+  ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
   mut->more()->waiting_on_slave.insert(target);
 }
 
-void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
-                                  MutationImpl *mut)
+void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
-  // drop ref
-  mut->remote_wrlocks.erase(lock);
-  if (mut->wrlocks.count(lock) == 0)
-    mut->locks.erase(lock);
+  ceph_assert(it->is_remote_wrlock());
+  SimpleLock *lock = it->lock;
+  mds_rank_t target = it->wrlock_target;
+
+  if (it->is_wrlock())
+    it->clear_remote_wrlock();
+  else
+    mut->locks.erase(it);
   
   dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
          << " " << *lock->get_parent()  << dendl;
   if (!mds->is_cluster_degraded() ||
       mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
-    MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                     MMDSSlaveRequest::OP_UNWRLOCK);
+    auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
     slavereq->set_lock_type(lock->get_type());
     lock->get_parent()->set_object_info(slavereq->get_object_info());
     mds->send_message_mds(slavereq, target);
@@ -1561,24 +1604,27 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
   dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
   client_t client = mut->get_client();
 
+  CInode *in = nullptr;
+  if (lock->get_cap_shift())
+    in = static_cast<CInode *>(lock->get_parent());
+
   // auth?
   if (lock->get_parent()->is_auth()) {
     // auth
     while (1) {
-      if (lock->can_xlock(client)) {
+      if (lock->can_xlock(client) &&
+         !(lock->get_state() == LOCK_LOCK_XLOCK &&     // client is not xlocker or
+           in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
        lock->set_state(LOCK_XLOCK);
        lock->get_xlock(mut, client);
-       mut->xlocks.insert(lock);
-       mut->locks.insert(lock);
+       mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
        mut->finish_locking(lock);
        return true;
       }
       
-      if (lock->get_type() == CEPH_LOCK_IFILE) {
-       CInode *in = static_cast<CInode*>(lock->get_parent());
-       if (in->state_test(CInode::STATE_RECOVERING)) {
-         mds->mdcache->recovery_queue.prioritize(in);
-       }
+      if (lock->get_type() == CEPH_LOCK_IFILE &&
+         in->state_test(CInode::STATE_RECOVERING)) {
+       mds->mdcache->recovery_queue.prioritize(in);
       }
 
       if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
@@ -1599,8 +1645,8 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
     return false;
   } else {
     // replica
-    assert(lock->get_sm()->can_remote_xlock);
-    assert(!mut->slave_request);
+    ceph_assert(lock->get_sm()->can_remote_xlock);
+    ceph_assert(!mut->slave_request);
     
     // wait for single auth
     if (lock->get_parent()->is_ambiguous_auth()) {
@@ -1622,13 +1668,12 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
     // send lock request
     mut->more()->slaves.insert(auth);
     mut->start_locking(lock, auth);
-    MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                              MMDSSlaveRequest::OP_XLOCK);
+    auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
     r->set_lock_type(lock->get_type());
     lock->get_parent()->set_object_info(r->get_object_info());
     mds->send_message_mds(r, auth);
 
-    assert(mut->more()->waiting_on_slave.count(auth) == 0);
+    ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
     mut->more()->waiting_on_slave.insert(auth);
 
     return false;
@@ -1637,12 +1682,13 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
 
 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
 {
-  assert(!lock->is_stable());
-  if (lock->get_num_rdlocks() == 0 &&
+  ceph_assert(!lock->is_stable());
+  if (lock->get_type() != CEPH_LOCK_DN &&
+      lock->get_type() != CEPH_LOCK_ISNAP &&
+      lock->get_num_rdlocks() == 0 &&
       lock->get_num_wrlocks() == 0 &&
-      lock->get_num_client_lease() == 0 &&
-      lock->get_state() != LOCK_XLOCKSNAP &&
-      lock->get_type() != CEPH_LOCK_DN) {
+      !lock->is_leased() &&
+      lock->get_state() != LOCK_XLOCKSNAP) {
     CInode *in = static_cast<CInode*>(lock->get_parent());
     client_t loner = in->get_target_loner();
     if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
@@ -1661,11 +1707,14 @@ void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue
   eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
 }
 
-void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_xlock());
+  SimpleLock *lock = it->lock;
+
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
+    return local_xlock_finish(it, mut);
 
   dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
 
@@ -1673,23 +1722,21 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
 
   // drop ref
   lock->put_xlock();
-  assert(mut);
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  ceph_assert(mut);
+  mut->locks.erase(it);
   
   bool do_issue = false;
 
   // remote xlock?
   if (!lock->get_parent()->is_auth()) {
-    assert(lock->get_sm()->can_remote_xlock);
+    ceph_assert(lock->get_sm()->can_remote_xlock);
 
     // tell auth
     dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent()  << dendl;
     mds_rank_t auth = lock->get_parent()->authority().first;
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
-      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                       MMDSSlaveRequest::OP_UNXLOCK);
+      auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
       slavereq->set_lock_type(lock->get_type());
       lock->get_parent()->set_object_info(slavereq->get_object_info());
       mds->send_message_mds(slavereq, auth);
@@ -1699,9 +1746,8 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
                         SimpleLock::WAIT_WR | 
                         SimpleLock::WAIT_RD, 0); 
   } else {
-    if (lock->get_num_xlocks() == 0) {
-      if (lock->get_state() == LOCK_LOCK_XLOCK)
-       lock->set_state(LOCK_XLOCKDONE);
+    if (lock->get_num_xlocks() == 0 &&
+        lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
       _finish_xlock(lock, xlocker, &do_issue);
     }
   }
@@ -1717,16 +1763,17 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
   }
 }
 
-void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
+void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_xlock());
+  SimpleLock *lock = it->lock;
   dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
 
   lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
 
   MDSCacheObject *p = lock->get_parent();
-  assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
+  ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
 
   if (!lock->is_stable())
     lock->get_parent()->auth_unpin(lock);
@@ -1753,85 +1800,80 @@ version_t Locker::issue_file_data_version(CInode *in)
 class C_Locker_FileUpdate_finish : public LockerLogContext {
   CInode *in;
   MutationRef mut;
-  bool share_max;
-  bool need_issue;
+  unsigned flags;
   client_t client;
-  MClientCaps *ack;
+  MClientCaps::ref ack;
 public:
-  C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
-                               bool sm=false, bool ni=false, client_t c=-1,
-                               MClientCaps *ac = 0)
-    : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
-      client(c), ack(ac) {
+  C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
+                             const MClientCaps::ref &ack, client_t c=-1)
+    : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
     in->get(CInode::PIN_PTRWAITER);
   }
   void finish(int r) override {
-    locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
+    locker->file_update_finish(in, mut, flags, client, ack);
     in->put(CInode::PIN_PTRWAITER);
   }
 };
 
-void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
-                               client_t client, MClientCaps *ack)
+enum {
+  UPDATE_SHAREMAX = 1,
+  UPDATE_NEEDSISSUE = 2,
+  UPDATE_SNAPFLUSH = 4,
+};
+
+void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
+                               client_t client, const MClientCaps::ref &ack)
 {
   dout(10) << "file_update_finish on " << *in << dendl;
   in->pop_and_dirty_projected_inode(mut->ls);
 
   mut->apply();
-  
+
   if (ack) {
     Session *session = mds->get_session(client);
-    if (session) {
+    if (session && !session->is_closed()) {
       // "oldest flush tid" > 0 means client uses unique TID for each flush
       if (ack->get_oldest_flush_tid() > 0)
-       session->add_completed_flush(ack->get_client_tid());
+        session->add_completed_flush(ack->get_client_tid());
       mds->send_message_client_counted(ack, session);
     } else {
       dout(10) << " no session for client." << client << " " << *ack << dendl;
-      ack->put();
     }
   }
 
   set<CInode*> need_issue;
   drop_locks(mut.get(), &need_issue);
 
-  if (!in->is_head() && !in->client_snap_caps.empty()) {
-    dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
-    // check for snap writeback completion
-    bool gather = false;
-    compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
-    while (p != in->client_snap_caps.end()) {
-      SimpleLock *lock = in->get_lock(p->first);
-      assert(lock);
-      dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
-              << " lock " << *lock << " on " << *in << dendl;
-      lock->put_wrlock();
-
-      p->second.erase(client);
-      if (p->second.empty()) {
-       gather = true;
-       in->client_snap_caps.erase(p++);
-      } else
-       ++p;
-    }
-    if (gather) {
-      if (in->client_snap_caps.empty())
-       in->item_open_file.remove_myself();
-      eval_cap_gather(in, &need_issue);
-    }
-  } else {
-    if (issue_client_cap && need_issue.count(in) == 0) {
+  if (in->is_head()) {
+    if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
       Capability *cap = in->get_client_cap(client);
       if (cap && (cap->wanted() & ~cap->pending()))
        issue_caps(in, cap);
     }
-  
-    if (share_max && in->is_auth() &&
+
+    if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
        (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
       share_inode_max_size(in);
+
+  } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
+    dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
+    // check for snap writeback completion
+    in->client_snap_caps.erase(client);
+    if (in->client_snap_caps.empty()) {
+      for (int i = 0; i < num_cinode_locks; i++) {
+       SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock);
+       ceph_assert(lock);
+       lock->put_wrlock();
+      }
+      in->item_open_file.remove_myself();
+      in->item_caps.remove_myself();
+      eval_cap_gather(in, &need_issue);
+    }
   }
   issue_caps_set(need_issue);
 
+  mds->balancer->hit_inode(in, META_POP_IWR);
+
   // auth unpin after issuing caps
   mut->cleanup();
 }
@@ -1846,14 +1888,13 @@ Capability* Locker::issue_new_caps(CInode *in,
   bool is_new;
 
   // if replay, try to reconnect cap, and otherwise do nothing.
-  if (is_replay) {
-    mds->mdcache->try_reconnect_cap(in, session);
-    return 0;
-  }
+  if (is_replay)
+    return mds->mdcache->try_reconnect_cap(in, session);
+
 
   // my needs
-  assert(session->info.inst.name.is_client());
-  client_t my_client = session->info.inst.name.num();
+  ceph_assert(session->info.inst.name.is_client());
+  client_t my_client = session->get_client();
   int my_want = ceph_caps_for_mode(mode);
 
   // register a capability
@@ -1905,7 +1946,21 @@ void Locker::issue_caps_set(set<CInode*>& inset)
     issue_caps(*p);
 }
 
-bool Locker::issue_caps(CInode *in, Capability *only_cap)
+class C_Locker_RevokeStaleCap : public LockerContext {
+  CInode *in;
+  client_t client;
+public:
+  C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
+    LockerContext(l), in(i), client(c) {
+    in->get(CInode::PIN_PTRWAITER);
+  }
+  void finish(int r) override {
+    locker->revoke_stale_cap(in, client);
+    in->put(CInode::PIN_PTRWAITER);
+  }
+};
+
+int Locker::issue_caps(CInode *in, Capability *only_cap)
 {
   // allowed caps are determined by the lock mode.
   int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
@@ -1925,21 +1980,19 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
            << " on " << *in << dendl;
   }
 
-  assert(in->is_head());
+  ceph_assert(in->is_head());
 
   // count conflicts with
   int nissued = 0;        
 
   // client caps
-  map<client_t, Capability*>::iterator it;
+  map<client_t, Capability>::iterator it;
   if (only_cap)
     it = in->client_caps.find(only_cap->get_client());
   else
     it = in->client_caps.begin();
   for (; it != in->client_caps.end(); ++it) {
-    Capability *cap = it->second;
-    if (cap->is_stale())
-      continue;
+    Capability *cap = &it->second;
 
     // do not issue _new_ bits when size|mtime is projected
     int allowed;
@@ -1951,10 +2004,10 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
     // add in any xlocker-only caps (for locks this client is the xlocker for)
     allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
 
-    Session *session = mds->get_session(it->first);
-    if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
-       !(session && session->connection &&
-         session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
+    if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
+        cap->is_noinline()) ||
+       (!in->inode.layout.pool_ns.empty() &&
+        cap->is_nopoolns()))
       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
 
     int pending = cap->pending();
@@ -1968,10 +2021,37 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
 
     if (!(pending & ~allowed)) {
       // skip if suppress or new, and not revocation
-      if (cap->is_new() || cap->is_suppress()) {
-       dout(20) << "  !revoke and new|suppressed, skipping client." << it->first << dendl;
+      if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
+       dout(20) << "  !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
+       continue;
+      }
+    } else {
+      ceph_assert(!cap->is_new());
+      if (cap->is_stale()) {
+       dout(20) << "  revoke stale cap from client." << it->first << dendl;
+       ceph_assert(!cap->is_valid());
+       cap->issue(allowed & pending, false);
+       mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
        continue;
       }
+
+      if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) {
+       // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
+       // mds needs to re-issue caps, then do revocation.
+       long seq = cap->issue(pending, true);
+
+       dout(7) << "   sending MClientCaps to client." << it->first
+               << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
+
+       auto m = MClientCaps::create(CEPH_CAP_OP_GRANT, in->ino(),
+                                    in->find_snaprealm()->inode->ino(),
+                                    cap->get_cap_id(), cap->get_last_seq(),
+                                    pending, wanted, 0, cap->get_mseq(),
+                                    mds->get_osd_epoch_barrier());
+       in->encode_cap_message(m, cap);
+
+       mds->send_message_client_counted(m, cap->get_session());
+      }
     }
 
     // notify clients about deleted inode, to make sure they release caps ASAP.
@@ -1980,8 +2060,9 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
 
     // are there caps that the client _wants_ and can have, but aren't pending?
     // or do we need to revoke?
-    if (((wanted & allowed) & ~pending) ||  // missing wanted+allowed caps
-       (pending & ~allowed)) {             // need to revoke ~allowed caps.
+    if ((pending & ~allowed) ||                        // need to revoke ~allowed caps.
+       ((wanted & allowed) & ~pending) ||      // missing wanted+allowed caps
+       !cap->is_valid()) {                     // after stale->resume circle
       // issue
       nissued++;
 
@@ -1990,65 +2071,55 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
       int before = pending;
       long seq;
       if (pending & ~allowed)
-       seq = cap->issue((wanted|likes) & allowed & pending);  // if revoking, don't issue anything new.
+       seq = cap->issue((wanted|likes) & allowed & pending, true);  // if revoking, don't issue anything new.
       else
-       seq = cap->issue((wanted|likes) & allowed);
+       seq = cap->issue((wanted|likes) & allowed, true);
       int after = cap->pending();
 
-      if (cap->is_new()) {
-       // haven't send caps to client yet
-       if (before & ~after)
-         cap->confirm_receipt(seq, after);
-      } else {
-        dout(7) << "   sending MClientCaps to client." << it->first
-               << " seq " << cap->get_last_seq()
-               << " new pending " << ccap_string(after) << " was " << ccap_string(before) 
-               << dendl;
-
-       int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
-       if (op == CEPH_CAP_OP_REVOKE) {
-               revoking_caps.push_back(&cap->item_revoking_caps);
-               revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
-               cap->set_last_revoke_stamp(ceph_clock_now());
-               cap->reset_num_revoke_warnings();
-       }
-
-       MClientCaps *m = new MClientCaps(op, in->ino(),
-                                        in->find_snaprealm()->inode->ino(),
-                                        cap->get_cap_id(), cap->get_last_seq(),
-                                        after, wanted, 0,
-                                        cap->get_mseq(),
-                                         mds->get_osd_epoch_barrier());
-       in->encode_cap_message(m, cap);
+      dout(7) << "   sending MClientCaps to client." << it->first
+             << " seq " << seq << " new pending " << ccap_string(after)
+             << " was " << ccap_string(before) << dendl;
 
-       mds->send_message_client_counted(m, it->first);
+      int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
+      if (op == CEPH_CAP_OP_REVOKE) {
+       revoking_caps.push_back(&cap->item_revoking_caps);
+       revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
+       cap->set_last_revoke_stamp(ceph_clock_now());
+       cap->reset_num_revoke_warnings();
       }
+
+      auto m = MClientCaps::create(op, in->ino(),
+                                  in->find_snaprealm()->inode->ino(),
+                                  cap->get_cap_id(), cap->get_last_seq(),
+                                  after, wanted, 0, cap->get_mseq(),
+                                  mds->get_osd_epoch_barrier());
+      in->encode_cap_message(m, cap);
+
+      mds->send_message_client_counted(m, cap->get_session());
     }
 
     if (only_cap)
       break;
   }
 
-  return (nissued == 0);  // true if no re-issued, no callbacks
+  return nissued;
 }
 
 void Locker::issue_truncate(CInode *in)
 {
   dout(7) << "issue_truncate on " << *in << dendl;
   
-  for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
-       it != in->client_caps.end();
-       ++it) {
-    Capability *cap = it->second;
-    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
-                                    in->ino(),
-                                    in->find_snaprealm()->inode->ino(),
-                                    cap->get_cap_id(), cap->get_last_seq(),
-                                    cap->pending(), cap->wanted(), 0,
-                                    cap->get_mseq(),
-                                     mds->get_osd_epoch_barrier());
+  for (auto &p : in->client_caps) {
+    Capability *cap = &p.second;
+    auto m = MClientCaps::create(CEPH_CAP_OP_TRUNC,
+                                       in->ino(),
+                                       in->find_snaprealm()->inode->ino(),
+                                       cap->get_cap_id(), cap->get_last_seq(),
+                                       cap->pending(), cap->wanted(), 0,
+                                       cap->get_mseq(),
+                                       mds->get_osd_epoch_barrier());
     in->encode_cap_message(m, cap);                         
-    mds->send_message_client_counted(m, it->first);
+    mds->send_message_client_counted(m, p.first);
   }
 
   // should we increase max_size?
@@ -2057,18 +2128,73 @@ void Locker::issue_truncate(CInode *in)
 }
 
 
-void Locker::revoke_stale_caps(Capability *cap)
+void Locker::revoke_stale_cap(CInode *in, client_t client)
 {
-  CInode *in = cap->get_inode();
-  if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
-    // if export succeeds, the cap will be removed. if export fails, we need to
-    // revoke the cap if it's still stale.
-    in->state_set(CInode::STATE_EVALSTALECAPS);
+  dout(7) << __func__ << " client." << client << " on " << *in << dendl;
+  Capability *cap = in->get_client_cap(client);
+  if (!cap)
+    return;
+
+  if (cap->revoking() & CEPH_CAP_ANY_WR) {
+    std::stringstream ss;
+    mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr);
     return;
   }
 
-  int issued = cap->issued();
-  if (issued & ~CEPH_CAP_PIN) {
+  cap->revoke();
+
+  if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
+    in->state_set(CInode::STATE_NEEDSRECOVER);
+
+  if (in->state_test(CInode::STATE_EXPORTINGCAPS))
+    return;
+
+  if (!in->filelock.is_stable())
+    eval_gather(&in->filelock);
+  if (!in->linklock.is_stable())
+    eval_gather(&in->linklock);
+  if (!in->authlock.is_stable())
+    eval_gather(&in->authlock);
+  if (!in->xattrlock.is_stable())
+    eval_gather(&in->xattrlock);
+
+  if (in->is_auth())
+    try_eval(in, CEPH_CAP_LOCKS);
+  else
+    request_inode_file_caps(in);
+}
+
+bool Locker::revoke_stale_caps(Session *session)
+{
+  dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
+
+  // invalidate all caps
+  session->inc_cap_gen();
+
+  bool ret = true;
+  std::vector<CInode*> to_eval;
+
+  for (auto p = session->caps.begin(); !p.end(); ) {
+    Capability *cap = *p;
+    ++p;
+    if (!cap->is_notable()) {
+      // the rest ones are not being revoked and don't have writeable range
+      // and don't want exclusive caps or want file read/write. They don't
+      // need recover, they don't affect eval_gather()/try_eval()
+      break;
+    }
+
+    int revoking = cap->revoking();
+    if (!revoking)
+      continue;
+
+    if (revoking & CEPH_CAP_ANY_WR) {
+      ret = false;
+      break;
+    }
+
+    int issued = cap->issued();
+    CInode *in = cap->get_inode();
     dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
     cap->revoke();
 
@@ -2076,52 +2202,57 @@ void Locker::revoke_stale_caps(Capability *cap)
        in->inode.client_ranges.count(cap->get_client()))
       in->state_set(CInode::STATE_NEEDSRECOVER);
 
-    if (!in->filelock.is_stable()) eval_gather(&in->filelock);
-    if (!in->linklock.is_stable()) eval_gather(&in->linklock);
-    if (!in->authlock.is_stable()) eval_gather(&in->authlock);
-    if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
+    // eval lock/inode may finish contexts, which may modify other cap's position
+    // in the session->caps.
+    to_eval.push_back(in);
+  }
+
+  for (auto in : to_eval) {
+    if (in->state_test(CInode::STATE_EXPORTINGCAPS))
+      continue;
 
-    if (in->is_auth()) {
+    if (!in->filelock.is_stable())
+      eval_gather(&in->filelock);
+    if (!in->linklock.is_stable())
+      eval_gather(&in->linklock);
+    if (!in->authlock.is_stable())
+      eval_gather(&in->authlock);
+    if (!in->xattrlock.is_stable())
+      eval_gather(&in->xattrlock);
+
+    if (in->is_auth())
       try_eval(in, CEPH_CAP_LOCKS);
-    } else {
+    else
       request_inode_file_caps(in);
-    }
   }
-}
-
-void Locker::revoke_stale_caps(Session *session)
-{
-  dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
 
-  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
-    Capability *cap = *p;
-    cap->mark_stale();
-    revoke_stale_caps(cap);
-  }
+  return ret;
 }
 
 void Locker::resume_stale_caps(Session *session)
 {
   dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
 
-  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
+  bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
+  for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
     Capability *cap = *p;
+    ++p;
+    if (lazy && !cap->is_notable())
+      break; // see revoke_stale_caps()
+
     CInode *in = cap->get_inode();
-    assert(in->is_head());
-    if (cap->is_stale()) {
-      dout(10) << " clearing stale flag on " << *in << dendl;
-      cap->clear_stale();
-
-      if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
-       // if export succeeds, the cap will be removed. if export fails,
-       // we need to re-issue the cap if it's not stale.
-       in->state_set(CInode::STATE_EVALSTALECAPS);
-       continue;
-      }
+    ceph_assert(in->is_head());
+    dout(10) << " clearing stale flag on " << *in << dendl;
 
-      if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
-       issue_caps(in, cap);
+    if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
+      // if export succeeds, the cap will be removed. if export fails,
+      // we need to re-issue the cap if it's not stale.
+      in->state_set(CInode::STATE_EVALSTALECAPS);
+      continue;
     }
+
+    if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
+      issue_caps(in, cap);
   }
 }
 
@@ -2154,9 +2285,9 @@ public:
 
 void Locker::request_inode_file_caps(CInode *in)
 {
-  assert(!in->is_auth());
+  ceph_assert(!in->is_auth());
 
-  int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
+  int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
   if (wanted != in->replica_caps_wanted) {
     // wait for single auth
     if (in->is_ambiguous_auth()) {
@@ -2180,33 +2311,33 @@ void Locker::request_inode_file_caps(CInode *in)
 
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
-      mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
-                           auth);
+      mds->send_message_mds(MInodeFileCaps::create(in->ino(), in->replica_caps_wanted), auth);
   }
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_inode_file_caps(MInodeFileCaps *m)
+void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref &m)
 {
   // nobody should be talking to us during recovery.
-  assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+  if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
+    if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
+      mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+      return;
+    }
+    ceph_abort_msg("got unexpected message during recovery");
+  }
 
   // ok
   CInode *in = mdcache->get_inode(m->get_ino());
   mds_rank_t from = mds_rank_t(m->get_source().num());
 
-  assert(in);
-  assert(in->is_auth());
+  ceph_assert(in);
+  ceph_assert(in->is_auth());
 
   dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
 
-  if (m->get_caps())
-    in->mds_caps_wanted[from] = m->get_caps();
-  else
-    in->mds_caps_wanted.erase(from);
+  in->set_mds_caps_wanted(from, m->get_caps());
 
   try_eval(in, CEPH_CAP_LOCKS);
-  m->put();
 }
 
 
@@ -2231,24 +2362,24 @@ public:
   }
 };
 
-uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
+uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
 {
   uint64_t new_max = (size + 1) << 1;
-  uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
+  uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
   if (max_inc > 0) {
-    max_inc *= pi->get_layout_size_increment();
-    new_max = MIN(new_max, size + max_inc);
+    max_inc *= pi->layout.object_size;
+    new_max = std::min(new_max, size + max_inc);
   }
-  return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
+  return round_up_to(new_max, pi->get_layout_size_increment());
 }
 
-void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
-                                   map<client_t,client_writeable_range_t> *new_ranges,
+void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
+                                   CInode::mempool_inode::client_range_map *new_ranges,
                                    bool *max_increased)
 {
-  inode_t *latest = in->get_projected_inode();
+  auto latest = in->get_projected_inode();
   uint64_t ms;
-  if(latest->has_layout()) {
+  if (latest->has_layout()) {
     ms = calc_new_max_size(latest, size);
   } else {
     // Layout-less directories like ~mds0/, have zero size
@@ -2257,23 +2388,26 @@ void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
 
   // increase ranges as appropriate.
   // shrink to 0 if no WR|BUFFER caps issued.
-  for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
-       p != in->client_caps.end();
-       ++p) {
-    if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
-      client_writeable_range_t& nr = (*new_ranges)[p->first];
+  for (auto &p : in->client_caps) {
+    if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
+      client_writeable_range_t& nr = (*new_ranges)[p.first];
       nr.range.first = 0;
-      if (latest->client_ranges.count(p->first)) {
-       client_writeable_range_t& oldr = latest->client_ranges[p->first];
+      if (latest->client_ranges.count(p.first)) {
+       client_writeable_range_t& oldr = latest->client_ranges[p.first];
        if (ms > oldr.range.last)
          *max_increased = true;
-       nr.range.last = MAX(ms, oldr.range.last);
+       nr.range.last = std::max(ms, oldr.range.last);
        nr.follows = oldr.follows;
       } else {
        *max_increased = true;
        nr.range.last = ms;
        nr.follows = in->first - 1;
       }
+      if (update)
+       p.second.mark_clientwriteable();
+    } else {
+      if (update)
+       p.second.clear_clientwriteable();
     }
   }
 }
@@ -2282,24 +2416,40 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
                                  uint64_t new_max_size, uint64_t new_size,
                                  utime_t new_mtime)
 {
-  assert(in->is_auth());
-  assert(in->is_file());
+  ceph_assert(in->is_auth());
+  ceph_assert(in->is_file());
 
-  inode_t *latest = in->get_projected_inode();
-  map<client_t, client_writeable_range_t> new_ranges;
+  CInode::mempool_inode *latest = in->get_projected_inode();
+  CInode::mempool_inode::client_range_map new_ranges;
   uint64_t size = latest->size;
   bool update_size = new_size > 0;
   bool update_max = false;
   bool max_increased = false;
 
   if (update_size) {
-    new_size = size = MAX(size, new_size);
-    new_mtime = MAX(new_mtime, latest->mtime);
+    new_size = size = std::max(size, new_size);
+    new_mtime = std::max(new_mtime, latest->mtime);
     if (latest->size == new_size && latest->mtime == new_mtime)
       update_size = false;
   }
 
-  calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
+  int can_update = 1;
+  if (in->is_frozen()) {
+    can_update = -1;
+  } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
+    // lock?
+    if (in->filelock.is_stable()) {
+      if (in->get_target_loner() >= 0)
+       file_excl(&in->filelock);
+      else
+       simple_lock(&in->filelock);
+    }
+    if (!in->filelock.can_wrlock(in->get_loner()))
+      can_update = -2;
+  }
+
+  calc_new_client_ranges(in, std::max(new_max_size, size), can_update > 0,
+                        &new_ranges, &max_increased);
 
   if (max_increased || latest->client_ranges != new_ranges)
     update_max = true;
@@ -2313,53 +2463,40 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
           << " update_size " << update_size
           << " on " << *in << dendl;
 
-  if (in->is_frozen()) {
-    dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
-    C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
-                                                     new_max_size,
-                                                     new_size,
-                                                     new_mtime);
-    in->add_waiter(CInode::WAIT_UNFREEZE, cms);
-    return false;
-  }
-  if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
-    // lock?
-    if (in->filelock.is_stable()) {
-      if (in->get_target_loner() >= 0)
-       file_excl(&in->filelock);
-      else
-       simple_lock(&in->filelock);
-    }
-    if (!in->filelock.can_wrlock(in->get_loner())) {
-      // try again later
-      C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
-                                                       new_max_size,
-                                                       new_size,
-                                                       new_mtime);
-
+  if (can_update < 0) {
+    auto cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime);
+    if (can_update == -1) {
+      dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
+      in->add_waiter(CInode::WAIT_UNFREEZE, cms);
+    } else {
       in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
       dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
-      return false;    
     }
+    return false;
   }
 
   MutationRef mut(new MutationImpl());
   mut->ls = mds->mdlog->get_current_segment();
     
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
 
   if (update_max) {
-    dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
-    pi->client_ranges = new_ranges;
+    dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
+    pi.inode.client_ranges = new_ranges;
   }
 
   if (update_size) {
-    dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
-    pi->size = new_size;
-    pi->rstat.rbytes = new_size;
-    dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
-    pi->mtime = new_mtime;
+    dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
+    pi.inode.size = new_size;
+    pi.inode.rstat.rbytes = new_size;
+    dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
+    pi.inode.mtime = new_mtime;
+    if (new_mtime > pi.inode.ctime) {
+      pi.inode.ctime = new_mtime;
+      if (new_mtime > pi.inode.rstat.rctime)
+       pi.inode.rstat.rctime = new_mtime;
+    }
   }
 
   // use EOpen if the file is still open; otherwise, use EUpdate.
@@ -2372,7 +2509,6 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
     eo->add_ino(in->ino());
     metablob = &eo->metablob;
     le = eo;
-    mut->ls->open_files.push_back(&in->item_open_file);
   } else {
     EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
     metablob = &eu->metablob;
@@ -2388,8 +2524,8 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
     metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
     mdcache->journal_dirty_inode(mut.get(), metablob, in);
   }
-  mds->mdlog->submit_entry(le,
-          new C_Locker_FileUpdate_finish(this, in, mut, true));
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
+      UPDATE_SHAREMAX, MClientCaps::ref()));
   wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
   mut->auth_pin(in);
 
@@ -2409,26 +2545,28 @@ void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
    * the cap later.
    */
   dout(10) << "share_inode_max_size on " << *in << dendl;
-  map<client_t, Capability*>::iterator it;
+  map<client_t, Capability>::iterator it;
   if (only_cap)
     it = in->client_caps.find(only_cap->get_client());
   else
     it = in->client_caps.begin();
   for (; it != in->client_caps.end(); ++it) {
     const client_t client = it->first;
-    Capability *cap = it->second;
+    Capability *cap = &it->second;
     if (cap->is_suppress())
       continue;
     if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
       dout(10) << "share_inode_max_size with client." << client << dendl;
       cap->inc_last_seq();
-      MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
-                                      in->ino(),
-                                      in->find_snaprealm()->inode->ino(),
-                                      cap->get_cap_id(), cap->get_last_seq(),
-                                      cap->pending(), cap->wanted(), 0,
-                                       cap->get_mseq(),
-                                       mds->get_osd_epoch_barrier());
+      auto m = MClientCaps::create(CEPH_CAP_OP_GRANT,
+                                         in->ino(),
+                                         in->find_snaprealm()->inode->ino(),
+                                         cap->get_cap_id(),
+                                         cap->get_last_seq(),
+                                         cap->pending(),
+                                         cap->wanted(), 0,
+                                         cap->get_mseq(),
+                                         mds->get_osd_epoch_barrier());
       in->encode_cap_message(m, cap);
       mds->send_message_client_counted(m, client);
     }
@@ -2478,66 +2616,92 @@ void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
     return;
   }
 
-  if (cap->wanted() == 0) {
-    if (cur->item_open_file.is_on_list() &&
-       !cur->is_any_caps_wanted()) {
-      dout(10) << " removing unwanted file from open file list " << *cur << dendl;
-      cur->item_open_file.remove_myself();
-    }
-  } else {
+  if (cap->wanted()) {
     if (cur->state_test(CInode::STATE_RECOVERING) &&
        (cap->wanted() & (CEPH_CAP_FILE_RD |
                          CEPH_CAP_FILE_WR))) {
       mds->mdcache->recovery_queue.prioritize(cur);
     }
 
-    if (!cur->item_open_file.is_on_list()) {
-      dout(10) << " adding to open file list " << *cur << dendl;
-      assert(cur->last == CEPH_NOSNAP);
-      LogSegment *ls = mds->mdlog->get_current_segment();
+    if (mdcache->open_file_table.should_log_open(cur)) {
+      ceph_assert(cur->last == CEPH_NOSNAP);
       EOpen *le = new EOpen(mds->mdlog);
       mds->mdlog->start_entry(le);
       le->add_clean_inode(cur);
-      ls->open_files.push_back(&cur->item_open_file);
       mds->mdlog->submit_entry(le);
     }
   }
 }
 
+void Locker::snapflush_nudge(CInode *in)
+{
+  ceph_assert(in->last != CEPH_NOSNAP);
+  if (in->client_snap_caps.empty())
+    return;
+
+  CInode *head = mdcache->get_inode(in->ino());
+  // head inode gets unpinned when snapflush starts. It might get trimmed
+  // before snapflush finishes.
+  if (!head)
+    return;
 
+  ceph_assert(head->is_auth());
+  if (head->client_need_snapflush.empty())
+    return;
+
+  SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
+  if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
+    hlock = NULL;
+    for (int i = 0; i < num_cinode_locks; i++) {
+      SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
+      if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
+       hlock = lock;
+       break;
+      }
+    }
+  }
+  if (hlock) {
+    _rdlock_kick(hlock, true);
+  } else {
+    // also, requeue, in case of unstable lock
+    need_snapflush_inodes.push_back(&in->item_caps);
+  }
+}
+
+void Locker::mark_need_snapflush_inode(CInode *in)
+{
+  ceph_assert(in->last != CEPH_NOSNAP);
+  if (!in->item_caps.is_on_list()) {
+    need_snapflush_inodes.push_back(&in->item_caps);
+    utime_t now = ceph_clock_now();
+    in->last_dirstat_prop = now;
+    dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
+  }
+}
+
+bool Locker::is_revoking_any_caps_from(client_t client)
+{
+  auto it = revoking_caps_by_client.find(client);
+  if (it == revoking_caps_by_client.end())
+    return false;
+  return !it->second.empty();
+}
 
-void Locker::_do_null_snapflush(CInode *head_in, client_t client)
+void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
 {
   dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
-  compact_map<snapid_t, set<client_t> >::iterator p = head_in->client_need_snapflush.begin();
-  while (p != head_in->client_need_snapflush.end()) {
+  for (auto p = head_in->client_need_snapflush.begin();
+       p != head_in->client_need_snapflush.end() && p->first < last; ) {
     snapid_t snapid = p->first;
-    set<client_t>& clients = p->second;
+    auto &clients = p->second;
     ++p;  // be careful, q loop below depends on this
 
     if (clients.count(client)) {
       dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
-      CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
-      if (!sin) {
-       // hrm, look forward until we find the inode. 
-       //  (we can only look it up by the last snapid it is valid for)
-       dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
-       for (compact_map<snapid_t, set<client_t> >::iterator q = p;  // p is already at next entry
-            q != head_in->client_need_snapflush.end();
-            ++q) {
-         dout(10) << " trying snapid " << q->first << dendl;
-         sin = mdcache->get_inode(head_in->ino(), q->first);
-         if (sin) {
-           assert(sin->first <= snapid);
-           break;
-         }
-         dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
-       }
-       if (!sin && head_in->is_multiversion())
-         sin = head_in;
-       assert(sin);
-      }
-      _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
+      CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
+      ceph_assert(sin);
+      ceph_assert(sin->first <= snapid);
+      _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref());
       head_in->remove_need_snapflush(sin, snapid, client);
     }
   }
@@ -2562,38 +2726,35 @@ bool Locker::should_defer_client_cap_frozen(CInode *in)
   return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
 }
 
-/*
- * This function DOES put the passed message before returning
- */
-void Locker::handle_client_caps(MClientCaps *m)
+void Locker::handle_client_caps(const MClientCaps::const_ref &m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
   client_t client = m->get_source().num();
-
   snapid_t follows = m->get_snap_follows();
+  auto op = m->get_op();
+  auto dirty = m->get_dirty();
   dout(7) << "handle_client_caps "
-         << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
          << " on " << m->get_ino()
          << " tid " << m->get_client_tid() << " follows " << follows
-         << " op " << ceph_cap_op_name(m->get_op()) << dendl;
+         << " op " << ceph_cap_op_name(op)
+         << " flags 0x" << std::hex << m->flags << std::dec << dendl;
 
+  Session *session = mds->get_session(m);
   if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
     if (!session) {
       dout(5) << " no session, dropping " << *m << dendl;
-      m->put();
       return;
     }
     if (session->is_closed() ||
        session->is_closing() ||
        session->is_killing()) {
       dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
-      m->put();
       return;
     }
-    if (mds->is_reconnect() &&
-       m->get_dirty() && m->get_client_tid() > 0 &&
+    if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
+       dirty && m->get_client_tid() > 0 &&
        !session->have_completed_flush(m->get_client_tid())) {
-      mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
+      mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
+                                         op == CEPH_CAP_OP_FLUSHSNAP);
     }
     mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
     return;
@@ -2603,25 +2764,21 @@ void Locker::handle_client_caps(MClientCaps *m)
       session->have_completed_flush(m->get_client_tid())) {
     dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
            << " for client." << client << dendl;
-    MClientCaps *ack;
-    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
-                           m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+    MClientCaps::ref ack;
+    if (op == CEPH_CAP_OP_FLUSHSNAP) {
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
     } else {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
-                           m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
-                           mds->get_osd_epoch_barrier());
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
     }
     ack->set_snap_follows(follows);
     ack->set_client_tid(m->get_client_tid());
     mds->send_message_client_counted(ack, m->get_connection());
-    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
-      m->put();
+    if (op == CEPH_CAP_OP_FLUSHSNAP) {
       return;
     } else {
       // fall-thru because the message may release some caps
-      m->clear_dirty();
-      m->set_op(CEPH_CAP_OP_UPDATE);
+      dirty = false;
+      op = CEPH_CAP_OP_UPDATE;
     }
   }
 
@@ -2631,11 +2788,11 @@ void Locker::handle_client_caps(MClientCaps *m)
       mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
 
       if (session->get_num_trim_flushes_warnings() > 0 &&
-         session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
+         session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
        session->reset_num_trim_flushes_warnings();
     } else {
       if (session->get_num_completed_flushes() >=
-         (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
+         (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
        session->inc_num_trim_flushes_warnings();
        stringstream ss;
        ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
@@ -2656,8 +2813,16 @@ void Locker::handle_client_caps(MClientCaps *m)
       mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
       return;
     }
-    dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
-    m->put();
+
+    /*
+     * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
+     * Sequence of events that cause this are:
+     *   - client sends caps message to mds.a
+     *   - mds finishes subtree migration, send cap export to client
+     *   - mds trim its cache
+     *   - mds receives cap messages from client
+     */
+    dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
     return;
   }
 
@@ -2671,57 +2836,61 @@ void Locker::handle_client_caps(MClientCaps *m)
     mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
   }
 
-  CInode *in = head_in;
-  if (follows > 0) {
-    in = mdcache->pick_inode_snap(head_in, follows);
-    if (in != head_in)
-      dout(10) << " head inode " << *head_in << dendl;
-  }
-  dout(10) << "  cap inode " << *in << dendl;
+  dout(10) << " head inode " << *head_in << dendl;
 
   Capability *cap = 0;
-  cap = in->get_client_cap(client);
-  if (!cap && in != head_in)
-    cap = head_in->get_client_cap(client);
+  cap = head_in->get_client_cap(client);
   if (!cap) {
-    dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
-    m->put();
+    dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
     return;
   }  
-  assert(cap);
+  ceph_assert(cap);
 
   // freezing|frozen?
-  if (should_defer_client_cap_frozen(in)) {
-    dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
-    in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
+  if (should_defer_client_cap_frozen(head_in)) {
+    dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
+    head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
     return;
   }
   if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
     dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
            << ", dropping" << dendl;
-    m->put();
     return;
   }
 
-  int op = m->get_op();
+  bool need_unpin = false;
 
   // flushsnap?
   if (op == CEPH_CAP_OP_FLUSHSNAP) {
-    if (!in->is_auth()) {
-      dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
+    if (!head_in->is_auth()) {
+      dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
       goto out;
     }
 
-    SnapRealm *realm = in->find_snaprealm();
+    SnapRealm *realm = head_in->find_snaprealm();
     snapid_t snap = realm->get_snap_following(follows);
     dout(10) << "  flushsnap follows " << follows << " -> snap " << snap << dendl;
 
+    auto p = head_in->client_need_snapflush.begin();
+    if (p != head_in->client_need_snapflush.end() && p->first < snap) {
+      head_in->auth_pin(this); // prevent subtree frozen
+      need_unpin = true;
+      _do_null_snapflush(head_in, client, snap);
+    }
+
+    CInode *in = head_in;
+    if (snap != CEPH_NOSNAP) {
+      in = mdcache->pick_inode_snap(head_in, snap - 1);
+      if (in != head_in)
+       dout(10) << " snapped inode " << *in << dendl;
+    }
+
     // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
     // other cap ops.  (except possibly duplicate FLUSHSNAP requests, but worst
     // case we get a dup response, so whatever.)
-    MClientCaps *ack = 0;
-    if (m->get_dirty()) {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+    MClientCaps::ref ack;
+    if (dirty) {
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
       ack->set_snap_follows(follows);
       ack->set_client_tid(m->get_client_tid());
       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
@@ -2737,11 +2906,10 @@ void Locker::handle_client_caps(MClientCaps *m)
       if (in == head_in)
        cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
    
-      _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
+      _do_snap_update(in, snap, dirty, follows, client, m, ack);
 
       if (in != head_in)
        head_in->remove_need_snapflush(in, snap, client);
-      
     } else {
       dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
       if (ack)
@@ -2753,18 +2921,22 @@ void Locker::handle_client_caps(MClientCaps *m)
   if (cap->get_cap_id() != m->get_cap_id()) {
     dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
   } else {
-    // intermediate snap inodes
-    while (in != head_in) {
-      assert(in->last != CEPH_NOSNAP);
-      if (in->is_auth() && m->get_dirty()) {
-       dout(10) << " updating intermediate snapped inode " << *in << dendl;
-       _do_cap_update(in, NULL, m->get_dirty(), follows, m);
+    CInode *in = head_in;
+    if (follows > 0) {
+      in = mdcache->pick_inode_snap(head_in, follows);
+      // intermediate snap inodes
+      while (in != head_in) {
+       ceph_assert(in->last != CEPH_NOSNAP);
+       if (in->is_auth() && dirty) {
+         dout(10) << " updating intermediate snapped inode " << *in << dendl;
+         _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref());
+       }
+       in = mdcache->pick_inode_snap(head_in, in->last);
       }
-      in = mdcache->pick_inode_snap(head_in, in->last);
     }
  
     // head inode, and cap
-    MClientCaps *ack = 0;
+    MClientCaps::ref ack;
 
     int caps = m->get_caps();
     if (caps & ~cap->issued()) {
@@ -2775,7 +2947,7 @@ void Locker::handle_client_caps(MClientCaps *m)
     cap->confirm_receipt(m->get_seq(), caps);
     dout(10) << " follows " << follows
             << " retains " << ccap_string(m->get_caps())
-            << " dirty " << ccap_string(m->get_dirty())
+            << " dirty " << ccap_string(dirty)
             << " on " << *in << dendl;
 
 
@@ -2787,36 +2959,41 @@ void Locker::handle_client_caps(MClientCaps *m)
     //  released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
     //  update/release).
     if (!head_in->client_need_snapflush.empty()) {
-      if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
+      if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
+         !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
+       head_in->auth_pin(this); // prevent subtree frozen
+       need_unpin = true;
        _do_null_snapflush(head_in, client);
       } else {
        dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
       }
     }
-    
-    if (m->get_dirty() && in->is_auth()) {
-      dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) 
+    if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
+      cap->clear_needsnapflush();
+
+    if (dirty && in->is_auth()) {
+      dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
              << " seq " << m->get_seq() << " on " << *in << dendl;
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
-                           m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
+          m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
       ack->set_client_tid(m->get_client_tid());
       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
     }
 
     // filter wanted based on what we could ever give out (given auth/replica status)
-    bool need_flush = m->flags & CLIENT_CAPS_SYNC;
-    int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
+    bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
+    int new_wanted = m->get_wanted();
     if (new_wanted != cap->wanted()) {
-      if (!need_flush && (new_wanted & ~cap->pending())) {
+      if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
        // exapnding caps.  make sure we aren't waiting for a log flush
        need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
       }
 
       adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
     }
-      
+
     if (in->is_auth() &&
-       _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
+       _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) {
       // updated
       eval(in, CEPH_CAP_LOCKS);
 
@@ -2833,7 +3010,6 @@ void Locker::handle_client_caps(MClientCaps *m)
 
       if (cap->get_last_seq() == 0 &&
          (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
-       cap->issue_norevoke(cap->issued());
        share_inode_max_size(in, cap);
       }
     }
@@ -2843,7 +3019,8 @@ void Locker::handle_client_caps(MClientCaps *m)
   }
 
  out:
-  m->put();
+  if (need_unpin)
+    head_in->auth_unpin(this);
 }
 
 
@@ -2861,7 +3038,7 @@ public:
 };
 
 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
-                                        const string &dname)
+                                        std::string_view dname)
 {
   inodeno_t ino = (uint64_t)item.ino;
   uint64_t cap_id = item.cap_id;
@@ -2883,14 +3060,14 @@ void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, con
       if (dn) {
        ClientLease *l = dn->get_client_lease(client);
        if (l) {
-         dout(10) << "process_cap_release removing lease on " << *dn << dendl;
+         dout(10) << __func__ << " removing lease on " << *dn << dendl;
          dn->remove_client_lease(l, this);
        } else {
-         dout(7) << "process_cap_release client." << client
+         dout(7) << __func__ << " client." << client
                  << " doesn't have lease on " << *dn << dendl;
        }
       } else {
-       dout(7) << "process_cap_release client." << client << " released lease on dn "
+       dout(7) << __func__ << " client." << client << " released lease on dn "
                << dir->dirfrag() << "/" << dname << " which dne" << dendl;
       }
     }
@@ -2900,7 +3077,7 @@ void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, con
   if (!cap)
     return;
 
-  dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
+  dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in
           << (mdr ? "" : " (DEFERRED, no mdr)")
           << dendl;
     
@@ -2962,7 +3139,7 @@ public:
 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
 {
   Capability *cap = in->get_client_cap(client);
-  if (!cap || cap->get_last_sent() != seq)
+  if (!cap || cap->get_last_seq() != seq)
     return;
   if (in->is_frozen()) {
     dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
@@ -2991,7 +3168,7 @@ void Locker::kick_cap_releases(MDRequestRef& mdr)
 /**
  * m and ack might be NULL, so don't dereference them unless dirty != 0
  */
-void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
+void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const MClientCaps::const_ref &m, const MClientCaps::ref &ack)
 {
   dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
           << " follows " << follows << " snap " << snap
@@ -3014,53 +3191,55 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
   // normal metadata updates that we can apply to the head as well.
 
   // update xattrs?
-  bool xattrs = false;
-  map<string,bufferptr> *px = 0;
-  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
-      m->xattrbl.length() &&
-      m->head.xattr_version > in->get_projected_inode()->xattr_version)
-    xattrs = true;
-
-  old_inode_t *oi = 0;
+  CInode::mempool_xattr_map *px = nullptr;
+  bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
+                m->xattrbl.length() &&
+                m->head.xattr_version > in->get_projected_inode()->xattr_version;
+
+  CInode::mempool_old_inode *oi = 0;
   if (in->is_multiversion()) {
     oi = in->pick_old_inode(snap);
   }
 
-  inode_t *pi;
+  CInode::mempool_inode *i;
   if (oi) {
     dout(10) << " writing into old inode" << dendl;
-    pi = in->project_inode();
-    pi->version = in->pre_dirty();
+    auto &pi = in->project_inode();
+    pi.inode.version = in->pre_dirty();
     if (snap > oi->first)
       in->split_old_inode(snap);
-    pi = &oi->inode;
+    i = &oi->inode;
     if (xattrs)
       px = &oi->xattrs;
   } else {
+    auto &pi = in->project_inode(xattrs);
+    pi.inode.version = in->pre_dirty();
+    i = &pi.inode;
     if (xattrs)
-      px = new map<string,bufferptr>;
-    pi = in->project_inode(px);
-    pi->version = in->pre_dirty();
+      px = pi.xattrs.get();
   }
 
-  _update_cap_fields(in, dirty, m, pi);
+  _update_cap_fields(in, dirty, m, i);
 
   // xattr
-  if (px) {
-    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
+  if (xattrs) {
+    dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
            << " len " << m->xattrbl.length() << dendl;
-    pi->xattr_version = m->head.xattr_version;
-    bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*px, p);
+    i->xattr_version = m->head.xattr_version;
+    auto p = m->xattrbl.cbegin();
+    decode(*px, p);
   }
 
-  if (pi->client_ranges.count(client)) {
-    if (in->last == snap) {
-      dout(10) << "  removing client_range entirely" << dendl;
-      pi->client_ranges.erase(client);
-    } else {
-      dout(10) << "  client_range now follows " << snap << dendl;
-      pi->client_ranges[client].follows = snap;
+  {
+    auto it = i->client_ranges.find(client);
+    if (it != i->client_ranges.end()) {
+      if (in->last == snap) {
+        dout(10) << "  removing client_range entirely" << dendl;
+        i->client_ranges.erase(it);
+      } else {
+        dout(10) << "  client_range now follows " << snap << dendl;
+        it->second.follows = snap;
+      }
     }
   }
 
@@ -3073,23 +3252,25 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
     le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
                                  ack->get_oldest_flush_tid());
 
-  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
-                                                             client, ack));
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
+                                                             ack, client));
 }
 
-void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
+void Locker::_update_cap_fields(CInode *in, int dirty, const MClientCaps::const_ref &m, CInode::mempool_inode *pi)
 {
   if (dirty == 0)
     return;
 
   /* m must be valid if there are dirty caps */
-  assert(m);
+  ceph_assert(m);
   uint64_t features = m->get_connection()->get_features();
 
   if (m->get_ctime() > pi->ctime) {
     dout(7) << "  ctime " << pi->ctime << " -> " << m->get_ctime()
            << " for " << *in << dendl;
     pi->ctime = m->get_ctime();
+    if (m->get_ctime() > pi->rstat.rctime)
+      pi->rstat.rctime = m->get_ctime();
   }
 
   if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
@@ -3111,6 +3292,8 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
       dout(7) << "  mtime " << pi->mtime << " -> " << mtime
              << " for " << *in << dendl;
       pi->mtime = mtime;
+      if (mtime > pi->rstat.rctime)
+       pi->rstat.rctime = mtime;
     }
     if (in->inode.is_file() &&   // ONLY if regular file
        size > pi->size) {
@@ -3176,16 +3359,16 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
  */
 bool Locker::_do_cap_update(CInode *in, Capability *cap,
                            int dirty, snapid_t follows,
-                           MClientCaps *m, MClientCaps *ack,
+                           const MClientCaps::const_ref &m, const MClientCaps::ref &ack,
                            bool *need_flush)
 {
   dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
           << " issued " << ccap_string(cap ? cap->issued() : 0)
           << " wanted " << ccap_string(cap ? cap->wanted() : 0)
           << " on " << *in << dendl;
-  assert(in->is_auth());
+  ceph_assert(in->is_auth());
   client_t client = m->get_source().num();
-  inode_t *latest = in->get_projected_inode();
+  CInode::mempool_inode *latest = in->get_projected_inode();
 
   // increase or zero max_size?
   uint64_t size = m->get_size();
@@ -3229,7 +3412,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
        bool need_issue = false;
        if (cap)
          cap->inc_suppress();
-       if (in->mds_caps_wanted.empty() &&
+       if (in->get_mds_caps_wanted().empty() &&
            (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
          if (in->filelock.get_state() != LOCK_EXCL)
            file_excl(&in->filelock, &need_issue);
@@ -3254,19 +3437,19 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
 
   if (m->flockbl.length()) {
     int32_t num_locks;
-    bufferlist::iterator bli = m->flockbl.begin();
-    ::decode(num_locks, bli);
+    auto bli = m->flockbl.cbegin();
+    decode(num_locks, bli);
     for ( int i=0; i < num_locks; ++i) {
       ceph_filelock decoded_lock;
-      ::decode(decoded_lock, bli);
+      decode(decoded_lock, bli);
       in->get_fcntl_lock_state()->held_locks.
        insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
       ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
     }
-    ::decode(num_locks, bli);
+    decode(num_locks, bli);
     for ( int i=0; i < num_locks; ++i) {
       ceph_filelock decoded_lock;
-      ::decode(decoded_lock, bli);
+      decode(decoded_lock, bli);
       in->get_flock_lock_state()->held_locks.
        insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
       ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
@@ -3276,43 +3459,44 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   if (!dirty && !change_max)
     return false;
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  Session *session = mds->get_session(m);
   if (session->check_access(in, MAY_WRITE,
                            m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
-    session->put();
     dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
     return false;
   }
-  session->put();
 
   // do the update.
   EUpdate *le = new EUpdate(mds->mdlog, "cap update");
   mds->mdlog->start_entry(le);
 
-  // xattrs update?
-  map<string,bufferptr> *px = 0;
-  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
-      m->xattrbl.length() &&
-      m->head.xattr_version > in->get_projected_inode()->xattr_version)
-    px = new map<string,bufferptr>;
+  bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
+               m->xattrbl.length() &&
+               m->head.xattr_version > in->get_projected_inode()->xattr_version;
 
-  inode_t *pi = in->project_inode(px);
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode(xattr);
+  pi.inode.version = in->pre_dirty();
 
   MutationRef mut(new MutationImpl());
   mut->ls = mds->mdlog->get_current_segment();
 
-  _update_cap_fields(in, dirty, m, pi);
+  _update_cap_fields(in, dirty, m, &pi.inode);
 
   if (change_max) {
     dout(7) << "  max_size " << old_max << " -> " << new_max
            << " for " << *in << dendl;
     if (new_max) {
-      pi->client_ranges[client].range.first = 0;
-      pi->client_ranges[client].range.last = new_max;
-      pi->client_ranges[client].follows = in->first - 1;
-    } else 
-      pi->client_ranges.erase(client);
+      auto &cr = pi.inode.client_ranges[client];
+      cr.range.first = 0;
+      cr.range.last = new_max;
+      cr.follows = in->first - 1;
+      if (cap)
+       cap->mark_clientwriteable();
+    } else {
+      pi.inode.client_ranges.erase(client);
+      if (cap)
+       cap->clear_clientwriteable();
+    }
   }
     
   if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) 
@@ -3322,13 +3506,12 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   if (dirty & CEPH_CAP_AUTH_EXCL)
     wrlock_force(&in->authlock, mut);
 
-  // xattr
-  if (px) {
-    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
-    pi->xattr_version = m->head.xattr_version;
-    bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*px, p);
-
+  // xattrs update?
+  if (xattr) {
+    dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
+    pi.inode.xattr_version = m->head.xattr_version;
+    auto p = m->xattrbl.cbegin();
+    decode(*pi.xattrs, p);
     wrlock_force(&in->xattrlock, mut);
   }
   
@@ -3341,9 +3524,13 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
     le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
                                  ack->get_oldest_flush_tid());
 
-  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
-                                                             change_max, !!cap,
-                                                             client, ack));
+  unsigned update_flags = 0;
+  if (change_max)
+    update_flags |= UPDATE_SHAREMAX;
+  if (cap)
+    update_flags |= UPDATE_NEEDSISSUE;
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
+                                                             ack, client));
   if (need_flush && !*need_flush &&
       ((change_max && new_max) || // max INCREASE
        _need_flush_mdlog(in, dirty)))
@@ -3352,8 +3539,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   return true;
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_client_cap_release(MClientCapRelease *m)
+void Locker::handle_client_cap_release(const MClientCapRelease::const_ref &m)
 {
   client_t client = m->get_source().num();
   dout(10) << "handle_client_cap_release " << *m << dendl;
@@ -3373,17 +3559,15 @@ void Locker::handle_client_cap_release(MClientCapRelease *m)
     mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
   }
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  Session *session = mds->get_session(m);
 
-  for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
-    _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
+  for (const auto &cap : m->caps) {
+    _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
   }
 
   if (session) {
     session->notify_cap_release(m->caps.size());
   }
-
-  m->put();
 }
 
 class C_Locker_RetryCapRelease : public LockerContext {
@@ -3437,24 +3621,30 @@ void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
     eval_cap_gather(in);
     return;
   }
-  remove_client_cap(in, client);
+  remove_client_cap(in, cap);
 }
 
-/* This function DOES put the passed message before returning */
-
-void Locker::remove_client_cap(CInode *in, client_t client)
+void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
 {
+  client_t client = cap->get_client();
   // clean out any pending snapflush state
   if (!in->client_need_snapflush.empty())
     _do_null_snapflush(in, client);
 
+  bool notable = cap->is_notable();
   in->remove_client_cap(client);
+  if (!notable)
+    return;
 
   if (in->is_auth()) {
     // make sure we clear out the client byte range
     if (in->get_projected_inode()->client_ranges.count(client) &&
-       !(in->inode.nlink == 0 && !in->is_any_caps()))    // unless it's unlink + stray
-      check_inode_max_size(in);
+       !(in->inode.nlink == 0 && !in->is_any_caps())) {  // unless it's unlink + stray
+      if (kill)
+       in->state_set(CInode::STATE_NEEDSRECOVER);
+      else
+       check_inode_max_size(in);
+    }
   } else {
     request_inode_file_caps(in);
   }
@@ -3465,9 +3655,10 @@ void Locker::remove_client_cap(CInode *in, client_t client)
 
 /**
  * Return true if any currently revoking caps exceed the
- * mds_revoke_cap_timeout threshold.
+ * session_timeout threshold.
  */
-bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
+bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
+                                    double timeout) const
 {
     xlist<Capability*>::const_iterator p = revoking.begin();
     if (p.end()) {
@@ -3476,7 +3667,7 @@ bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
     } else {
       utime_t now = ceph_clock_now();
       utime_t age = now - (*p)->get_last_revoke_stamp();
-      if (age <= g_conf->mds_revoke_cap_timeout) {
+      if (age <= timeout) {
           return false;
       } else {
           return true;
@@ -3484,22 +3675,21 @@ bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
     }
 }
 
-
-void Locker::get_late_revoking_clients(std::list<client_t> *result) const
+void Locker::get_late_revoking_clients(std::list<client_t> *result,
+                                       double timeout) const
 {
-  if (!any_late_revoking_caps(revoking_caps)) {
+  if (!any_late_revoking_caps(revoking_caps, timeout)) {
     // Fast path: no misbehaving clients, execute in O(1)
     return;
   }
 
   // Slow path: execute in O(N_clients)
-  std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
-  for (client_rc_iter = revoking_caps_by_client.begin();
-       client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
-    xlist<Capability*> const &client_rc = client_rc_iter->second;
-    bool any_late = any_late_revoking_caps(client_rc);
-    if (any_late) {
-        result->push_back(client_rc_iter->first);
+  for (auto &p : revoking_caps_by_client) {
+    if (any_late_revoking_caps(p.second, timeout)) {
+      // Search the list for duplicate and only insert if unique
+      std::list<client_t>::const_iterator it = std::find(result->begin(), result->end(), p.first);
+      if (it == result->end())
+        result->push_back(p.first);
     }
   }
 }
@@ -3513,27 +3703,46 @@ void Locker::caps_tick()
 {
   utime_t now = ceph_clock_now();
 
+  if (!need_snapflush_inodes.empty()) {
+    // snap inodes that needs flush are auth pinned, they affect
+    // subtree/difrarg freeze.
+    utime_t cutoff = now;
+    cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
+
+    CInode *last = need_snapflush_inodes.back();
+    while (!need_snapflush_inodes.empty()) {
+      CInode *in = need_snapflush_inodes.front();
+      if (in->last_dirstat_prop >= cutoff)
+       break;
+      in->item_caps.remove_myself();
+      snapflush_nudge(in);
+      if (in == last)
+       break;
+    }
+  }
+
   dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
 
-  int i = 0;
+  now = ceph_clock_now();
+  int n = 0;
   for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
     Capability *cap = *p;
 
     utime_t age = now - cap->get_last_revoke_stamp();
-    dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
-    if (age <= g_conf->mds_revoke_cap_timeout) {
-      dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
+    dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+    if (age <= mds->mdsmap->get_session_timeout()) {
+      dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
       break;
     } else {
-      ++i;
-      if (i > MAX_WARN_CAPS) {
+      ++n;
+      if (n > MAX_WARN_CAPS) {
         dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
           << "revoking, ignoring subsequent caps" << dendl;
         break;
       }
     }
     // exponential backoff of warning intervals
-    if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
+    if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
       cap->inc_num_revoke_warnings();
       stringstream ss;
       ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
@@ -3542,23 +3751,22 @@ void Locker::caps_tick()
       mds->clog->warn() << ss.str();
       dout(20) << __func__ << " " << ss.str() << dendl;
     } else {
-      dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+      dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
     }
   }
 }
 
 
-void Locker::handle_client_lease(MClientLease *m)
+void Locker::handle_client_lease(const MClientLease::const_ref &m)
 {
   dout(10) << "handle_client_lease " << *m << dendl;
 
-  assert(m->get_source().is_client());
+  ceph_assert(m->get_source().is_client());
   client_t client = m->get_source().num();
 
   CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
   if (!in) {
     dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
-    m->put();
     return;
   }
   CDentry *dn = 0;
@@ -3569,7 +3777,6 @@ void Locker::handle_client_lease(MClientLease *m)
     dn = dir->lookup(m->dname);
   if (!dn) {
     dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
-    m->put();
     return;
   }
   dout(10) << " on " << *dn << dendl;
@@ -3578,7 +3785,6 @@ void Locker::handle_client_lease(MClientLease *m)
   ClientLease *l = dn->get_client_lease(client);
   if (!l) {
     dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
-    m->put();
     return;
   } 
 
@@ -3592,7 +3798,6 @@ void Locker::handle_client_lease(MClientLease *m)
              << " on " << *dn << dendl;
       dn->remove_client_lease(l, this);
     }
-    m->put();
     break;
 
   case CEPH_MDS_LEASE_RENEW:
@@ -3600,16 +3805,17 @@ void Locker::handle_client_lease(MClientLease *m)
       dout(7) << "handle_client_lease client." << client << " renew on " << *dn
              << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
       if (dn->lock.can_lease(client)) {
+        auto reply = MClientLease::create(*m);
        int pool = 1;   // fixme.. do something smart!
-       m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
-       m->h.seq = ++l->seq;
-       m->clear_payload();
+       reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
+       reply->h.seq = ++l->seq;
+       reply->clear_payload();
 
        utime_t now = ceph_clock_now();
        now += mdcache->client_lease_durations[pool];
        mdcache->touch_client_lease(l, pool, now);
 
-       mds->send_message_client_counted(m, m->get_connection());
+       mds->send_message_client_counted(reply, m->get_connection());
       }
     }
     break;
@@ -3637,20 +3843,17 @@ void Locker::issue_client_lease(CDentry *dn, client_t client,
     now += mdcache->client_lease_durations[pool];
     mdcache->touch_client_lease(l, pool, now);
 
-    LeaseStat e;
-    e.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
-    e.seq = ++l->seq;
-    e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
-    ::encode(e, bl);
-    dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
+    LeaseStat lstat;
+    lstat.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
+    lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
+    lstat.seq = ++l->seq;
+    encode_lease(bl, session->info, lstat);
+    dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
             << " on " << *dn << dendl;
   } else {
     // null lease
-    LeaseStat e;
-    e.mask = 0;
-    e.seq = 0;
-    e.duration_ms = 0;
-    ::encode(e, bl);
+    LeaseStat lstat;
+    encode_lease(bl, session->info, lstat);
     dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
   }
 }
@@ -3666,28 +3869,38 @@ void Locker::revoke_client_leases(SimpleLock *lock)
     ClientLease *l = p->second;
     
     n++;
-    assert(lock->get_type() == CEPH_LOCK_DN);
+    ceph_assert(lock->get_type() == CEPH_LOCK_DN);
 
     CDentry *dn = static_cast<CDentry*>(lock->get_parent());
     int mask = 1 | CEPH_LOCK_DN; // old and new bits
     
     // i should also revoke the dir ICONTENT lease, if they have it!
     CInode *diri = dn->get_dir()->get_inode();
-    mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
-                                             mask,
-                                             diri->ino(),
-                                             diri->first, CEPH_NOSNAP,
-                                             dn->get_name()),
-                            l->client);
-  }
-  assert(n == lock->get_num_client_lease());
+    auto lease = MClientLease::create(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
+    mds->send_message_client_counted(lease, l->client);
+  }
 }
 
-
+void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
+                         const LeaseStat& ls)
+{
+  if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
+    ENCODE_START(1, 1, bl);
+    encode(ls.mask, bl);
+    encode(ls.duration_ms, bl);
+    encode(ls.seq, bl);
+    ENCODE_FINISH(bl);
+  }
+  else {
+    encode(ls.mask, bl);
+    encode(ls.duration_ms, bl);
+    encode(ls.seq, bl);
+  }
+}
 
 // locks ----------------------------------------------------------------
 
-SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) 
+SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info) 
 {
   switch (lock_type) {
   case CEPH_LOCK_DN:
@@ -3747,16 +3960,14 @@ SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
   return 0;  
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_lock(MLock *m)
+void Locker::handle_lock(const MLock::const_ref &m)
 {
   // nobody should be talking to us during recovery.
-  assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+  ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
 
   SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
   if (!lock) {
     dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
-    m->put();
     return;
   }
 
@@ -3796,7 +4007,7 @@ void Locker::handle_lock(MLock *m)
 
 /** This function may take a reference to m if it needs one, but does
  * not put references. */
-void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
+void Locker::handle_reqrdlock(SimpleLock *lock, const MLock::const_ref &m)
 {
   MDSCacheObject *parent = lock->get_parent();
   if (parent->is_auth() &&
@@ -3804,13 +4015,13 @@ void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
       !parent->is_frozen()) {
     dout(7) << "handle_reqrdlock got rdlock request on " << *lock
            << " on " << *parent << dendl;
-    assert(parent->is_auth()); // replica auth pinned if they're doing this!
+    ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
     if (lock->is_stable()) {
       simple_sync(lock);
     } else {
       dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
       lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
-                       new C_MDS_RetryMessage(mds, m->get()));
+                       new C_MDS_RetryMessage(mds, m));
     }
   } else {
     dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
@@ -3819,8 +4030,7 @@ void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
   }
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
+void Locker::handle_simple_lock(SimpleLock *lock, const MLock::const_ref &m)
 {
   int from = m->get_asker();
   
@@ -3831,7 +4041,6 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
     if (lock->get_parent()->is_rejoining()) {
       dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
              << ", dropping " << *m << dendl;
-      m->put();
       return;
     }
   }
@@ -3839,14 +4048,14 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
   switch (m->get_action()) {
     // -- replica --
   case LOCK_AC_SYNC:
-    assert(lock->get_state() == LOCK_LOCK);
+    ceph_assert(lock->get_state() == LOCK_LOCK);
     lock->decode_locked_state(m->get_data());
     lock->set_state(LOCK_SYNC);
     lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
     break;
     
   case LOCK_AC_LOCK:
-    assert(lock->get_state() == LOCK_SYNC);
+    ceph_assert(lock->get_state() == LOCK_SYNC);
     lock->set_state(LOCK_SYNC_LOCK);
     if (lock->is_leased())
       revoke_client_leases(lock);
@@ -3858,9 +4067,9 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
 
     // -- auth --
   case LOCK_AC_LOCKACK:
-    assert(lock->get_state() == LOCK_SYNC_LOCK ||
+    ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
           lock->get_state() == LOCK_SYNC_EXCL);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     if (lock->is_gathering()) {
@@ -3878,8 +4087,6 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
     break;
 
   }
-
-  m->put();
 }
 
 /* unused, currently.
@@ -3927,14 +4134,15 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
 {
   dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
 
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_parent()->is_freezing_or_frozen()) {
-    // dentry lock in unreadable state can block path traverse
-    if ((lock->get_type() != CEPH_LOCK_DN ||
+    // dentry/snap lock in unreadable state can block path traverse
+    if ((lock->get_type() != CEPH_LOCK_DN &&
+        lock->get_type() != CEPH_LOCK_ISNAP) ||
         lock->get_state() == LOCK_SYNC ||
-        lock->get_parent()->is_frozen()))
+        lock->get_parent()->is_frozen())
       return;
   }
 
@@ -3948,7 +4156,7 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
 
   CInode *in = 0;
   int wanted = 0;
-  if (lock->get_type() != CEPH_LOCK_DN) {
+  if (lock->get_cap_shift()) {
     in = static_cast<CInode*>(lock->get_parent());
     in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
   }
@@ -3979,8 +4187,8 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4020,7 +4228,7 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
     
     bool need_recover = false;
     if (lock->get_type() == CEPH_LOCK_IFILE) {
-      assert(in);
+      ceph_assert(in);
       if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
         mds->mdcache->queue_file_recover(in);
        need_recover = true;
@@ -4062,8 +4270,8 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4117,9 +4325,9 @@ void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-  assert(lock->get_state() != LOCK_LOCK);
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
+  ceph_assert(lock->get_state() != LOCK_LOCK);
   
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4129,11 +4337,7 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 
   switch (lock->get_state()) {
   case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
-  case LOCK_XSYN:
-    file_excl(static_cast<ScatterLock*>(lock), need_issue);
-    if (lock->get_state() != LOCK_EXCL)
-      return;
-    // fall-thru
+  case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
   case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
   case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
     (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
@@ -4161,7 +4365,7 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 
   bool need_recover = false;
   if (lock->get_type() == CEPH_LOCK_IFILE) {
-    assert(in);
+    ceph_assert(in);
     if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
       mds->mdcache->queue_file_recover(in);
       need_recover = true;
@@ -4207,9 +4411,9 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 void Locker::simple_xlock(SimpleLock *lock)
 {
   dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->get_parent()->is_auth());
   //assert(lock->is_stable());
-  assert(lock->get_state() != LOCK_XLOCK);
+  ceph_assert(lock->get_state() != LOCK_XLOCK);
   
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4310,13 +4514,12 @@ void Locker::scatter_writebehind(ScatterLock *lock)
 
   // forcefully take a wrlock
   lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
 
   in->pre_cow_old_inode();  // avoid cow mayhem
 
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
 
   in->finish_scatter_gather_update(lock->get_type());
   lock->start_flush();
@@ -4366,8 +4569,8 @@ void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
 {
   dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
 
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_parent()->is_freezing_or_frozen()) {
     dout(20) << "  freezing|frozen" << dendl;
@@ -4443,7 +4646,7 @@ void Locker::mark_updated_scatterlock(ScatterLock *lock)
  * we need to lock|scatter in order to push fnode changes into the
  * inode.dirstat.
  */
-void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
+void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
 {
   CInode *p = static_cast<CInode *>(lock->get_parent());
 
@@ -4451,7 +4654,7 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
     dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
     if (c) 
       p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
-    else
+    else if (lock->is_dirty())
       // just requeue.  not ideal.. starvation prone..
       updated_scatterlocks.push_back(lock->get_updated_item());
     return;
@@ -4461,7 +4664,7 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
     dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
     if (c) 
       p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
-    else
+    else if (lock->is_dirty())
       // just requeue.  not ideal.. starvation prone..
       updated_scatterlocks.push_back(lock->get_updated_item());
     return;
@@ -4527,7 +4730,7 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
          // handle_file_lock due to AC_NUDGE, because the rest of the
          // time we are replicated or have dirty data and won't get
          // called.  bailing here avoids an infinite loop.
-         assert(!c); 
+         ceph_assert(!c); 
          break;
        }
       } else {
@@ -4542,16 +4745,17 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
             << *lock << " on " << *p << dendl;
     // request unscatter?
     mds_rank_t auth = lock->get_parent()->authority().first;
-    if (!mds->is_cluster_degraded() ||
-       mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
-      mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+    if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+      mds->send_message_mds(MLock::create(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+    }
 
     // wait...
     if (c)
       lock->add_waiter(SimpleLock::WAIT_STABLE, c);
 
     // also, requeue, in case we had wrong auth or something
-    updated_scatterlocks.push_back(lock->get_updated_item());
+    if (lock->is_dirty())
+      updated_scatterlocks.push_back(lock->get_updated_item());
   }
 }
 
@@ -4573,7 +4777,7 @@ void Locker::scatter_tick()
               << *lock << " " << *lock->get_parent() << dendl;
       continue;
     }
-    if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
+    if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
       break;
     updated_scatterlocks.pop_front();
     scatter_nudge(lock, 0);
@@ -4586,10 +4790,10 @@ void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
 {
   dout(10) << "scatter_tempsync " << *lock
           << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
-  assert(0 == "not fully implemented, at least not for filelock");
+  ceph_abort_msg("not fully implemented, at least not for filelock");
 
   CInode *in = static_cast<CInode *>(lock->get_parent());
 
@@ -4646,12 +4850,12 @@ void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
   dout(7) << "local_wrlock_grab  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   
-  assert(lock->get_parent()->is_auth());
-  assert(lock->can_wrlock());
-  assert(!mut->wrlocks.count(lock));
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->can_wrlock());
   lock->get_wrlock(mut->get_client());
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+
+  auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+  ceph_assert(ret.second);
 }
 
 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
@@ -4659,12 +4863,11 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
   dout(7) << "local_wrlock_start  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   
-  assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->get_parent()->is_auth());
   if (lock->can_wrlock()) {
-    assert(!mut->wrlocks.count(lock));
     lock->get_wrlock(mut->get_client());
-    mut->wrlocks.insert(lock);
-    mut->locks.insert(lock);
+    auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+    ceph_assert(it->is_wrlock());
     return true;
   } else {
     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
@@ -4672,13 +4875,14 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
   }
 }
 
-void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_wrlock());
+  LocalLock *lock = static_cast<LocalLock*>(it->lock);
   dout(7) << "local_wrlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_wrlock();
-  mut->wrlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
   if (lock->get_num_wrlocks() == 0) {
     lock->finish_waiters(SimpleLock::WAIT_STABLE |
                          SimpleLock::WAIT_WR |
@@ -4691,25 +4895,25 @@ bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
   dout(7) << "local_xlock_start  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   
-  assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->get_parent()->is_auth());
   if (!lock->can_xlock_local()) {
     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
     return false;
   }
 
   lock->get_xlock(mut, mut->get_client());
-  mut->xlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
   return true;
 }
 
-void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_xlock());
+  LocalLock *lock = static_cast<LocalLock*>(it->lock);
   dout(7) << "local_xlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
 
   lock->finish_waiters(SimpleLock::WAIT_STABLE | 
                       SimpleLock::WAIT_WR | 
@@ -4733,8 +4937,8 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
          << "  filelock=" << *lock << " on " << *lock->get_parent()
          << dendl;
 
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_parent()->is_freezing_or_frozen())
     return;
@@ -4799,7 +5003,7 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
           !lock->is_rdlocked() &&
           //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
           (lock->get_scatter_wanted() ||
-           (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
+           (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
     dout(7) << "file_eval stable, bump to mixed " << *lock
            << " on " << *lock->get_parent() << dendl;
     scatter_mix(lock, need_issue);
@@ -4809,12 +5013,12 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
   else if (lock->get_state() != LOCK_SYNC &&
           !lock->is_wrlocked() &&   // drain wrlocks first!
           !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
-          !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
+          !(wanted & CEPH_CAP_GWR) &&
           !((lock->get_state() == LOCK_MIX) &&
             in->is_dir() && in->has_subtree_or_exporting_dirfrag())  // if we are a delegation point, stay where we are
           //((wanted & CEPH_CAP_RD) || 
           //in->is_replicated() || 
-          //lock->get_num_client_lease() || 
+          //lock->is_leased() ||
           //(!loner && lock->get_state() == LOCK_EXCL)) &&
           ) {
     dout(7) << "file_eval stable, bump to sync " << *lock 
@@ -4830,8 +5034,8 @@ void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
   dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
 
   CInode *in = static_cast<CInode*>(lock->get_parent());
-  assert(in->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(in->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_state() == LOCK_LOCK) {
     in->start_scatter(lock);
@@ -4857,12 +5061,8 @@ void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
     // gather?
     switch (lock->get_state()) {
     case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
-    case LOCK_XSYN:
-      file_excl(lock, need_issue);
-      if (lock->get_state() != LOCK_EXCL)
-       return;
-      // fall-thru
     case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
+    case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
     case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
     default: ceph_abort();
     }
@@ -4871,8 +5071,7 @@ void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
     if (lock->is_rdlocked())
       gather++;
     if (in->is_replicated()) {
-      if (lock->get_state() != LOCK_EXCL_MIX &&   // EXCL replica is already LOCK
-         lock->get_state() != LOCK_XSYN_EXCL) {  // XSYN replica is already LOCK;  ** FIXME here too!
+      if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
        send_lock_message(lock, LOCK_AC_MIX);
        lock->init_gather();
        gather++;
@@ -4927,10 +5126,10 @@ void Locker::file_excl(ScatterLock *lock, bool *need_issue)
   CInode *in = static_cast<CInode*>(lock->get_parent());
   dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;  
 
-  assert(in->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(in->is_auth());
+  ceph_assert(lock->is_stable());
 
-  assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
+  ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
         (lock->get_state() == LOCK_XSYN));  // must do xsyn -> excl -> <anything else>
   
   switch (lock->get_state()) {
@@ -4990,8 +5189,8 @@ void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
   CInode *in = static_cast<CInode *>(lock->get_parent());
-  assert(in->is_auth());
-  assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
+  ceph_assert(in->is_auth());
+  ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
 
   switch (lock->get_state()) {
   case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
@@ -5028,9 +5227,9 @@ void Locker::file_recover(ScatterLock *lock)
   CInode *in = static_cast<CInode *>(lock->get_parent());
   dout(7) << "file_recover " << *lock << " on " << *in << dendl;
 
-  assert(in->is_auth());
+  ceph_assert(in->is_auth());
   //assert(lock->is_stable());
-  assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
+  ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
 
   int gather = 0;
   
@@ -5057,8 +5256,7 @@ void Locker::file_recover(ScatterLock *lock)
 
 
 // messenger
-/* This function DOES put the passed message before returning */
-void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
+void Locker::handle_file_lock(ScatterLock *lock, const MLock::const_ref &m)
 {
   CInode *in = static_cast<CInode*>(lock->get_parent());
   int from = m->get_asker();
@@ -5067,12 +5265,11 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     if (in->is_rejoining()) {
       dout(7) << "handle_file_lock still rejoining " << *in
              << ", dropping " << *m << dendl;
-      m->put();
       return;
     }
   }
 
-  dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
+  dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
          << " on " << *lock
          << " from mds." << from << " " 
          << *in << dendl;
@@ -5082,7 +5279,7 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
   switch (m->get_action()) {
     // -- replica --
   case LOCK_AC_SYNC:
-    assert(lock->get_state() == LOCK_LOCK ||
+    ceph_assert(lock->get_state() == LOCK_LOCK ||
           lock->get_state() == LOCK_MIX ||
           lock->get_state() == LOCK_MIX_SYNC2);
     
@@ -5130,7 +5327,7 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     break;
     
   case LOCK_AC_MIX:
-    assert(lock->get_state() == LOCK_SYNC ||
+    ceph_assert(lock->get_state() == LOCK_SYNC ||
            lock->get_state() == LOCK_LOCK ||
           lock->get_state() == LOCK_SYNC_MIX2);
     
@@ -5142,10 +5339,10 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
        mds->mdlog->flush();
       break;
     } 
-    
+
     // ok
-    lock->decode_locked_state(m->get_data());
     lock->set_state(LOCK_MIX);
+    lock->decode_locked_state(m->get_data());
 
     if (caps)
       issue_caps(in);
@@ -5156,14 +5353,14 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
 
     // -- auth --
   case LOCK_AC_LOCKACK:
-    assert(lock->get_state() == LOCK_SYNC_LOCK ||
+    ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
            lock->get_state() == LOCK_MIX_LOCK ||
            lock->get_state() == LOCK_MIX_LOCK2 ||
            lock->get_state() == LOCK_MIX_EXCL ||
            lock->get_state() == LOCK_SYNC_EXCL ||
            lock->get_state() == LOCK_SYNC_MIX ||
           lock->get_state() == LOCK_MIX_TSYN);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     if (lock->get_state() == LOCK_MIX_LOCK ||
@@ -5187,8 +5384,8 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     break;
     
   case LOCK_AC_SYNCACK:
-    assert(lock->get_state() == LOCK_MIX_SYNC);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     lock->decode_locked_state(m->get_data());
@@ -5204,8 +5401,8 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     break;
 
   case LOCK_AC_MIXACK:
-    assert(lock->get_state() == LOCK_SYNC_MIX);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     if (lock->is_gathering()) {
@@ -5278,12 +5475,4 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
   default:
     ceph_abort();
   }  
-  
-  m->put();
 }
-
-
-
-
-
-