]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Locker.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mds / Locker.cc
index b2b4c21dc45cd936c539cf5c0cb099122e70e6f1..9c439a2b65f9abd47ae1001c04ec328f4b948230 100644 (file)
@@ -12,7 +12,7 @@
  * 
  */
 
-#include <boost/utility/string_view.hpp>
+#include <string_view>
 
 #include "MDSRank.h"
 #include "MDCache.h"
@@ -57,7 +57,7 @@ static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
 }
 
 
-class LockerContext : public MDSInternalContextBase {
+class LockerContext : public MDSContext {
 protected:
   Locker *locker;
   MDSRank *get_mds() override
@@ -67,7 +67,7 @@ protected:
 
 public:
   explicit LockerContext(Locker *locker_) : locker(locker_) {
-    assert(locker != NULL);
+    ceph_assert(locker != NULL);
   }
 };
 
@@ -81,40 +81,39 @@ protected:
 
 public:
   explicit LockerLogContext(Locker *locker_) : locker(locker_) {
-    assert(locker != NULL);
+    ceph_assert(locker != NULL);
   }
 };
 
-/* This function DOES put the passed message before returning */
-void Locker::dispatch(Message *m)
+Locker::Locker(MDSRank *m, MDCache *c) :
+  mds(m), mdcache(c), need_snapflush_inodes(member_offset(CInode, item_caps)) {}
+
+
+void Locker::dispatch(const Message::const_ref &m)
 {
 
   switch (m->get_type()) {
-
     // inter-mds locking
   case MSG_MDS_LOCK:
-    handle_lock(static_cast<MLock*>(m));
+    handle_lock(MLock::msgref_cast(m));
     break;
     // inter-mds caps
   case MSG_MDS_INODEFILECAPS:
-    handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
+    handle_inode_file_caps(MInodeFileCaps::msgref_cast(m));
     break;
-
     // client sync
   case CEPH_MSG_CLIENT_CAPS:
-    handle_client_caps(static_cast<MClientCaps*>(m));
-
+    handle_client_caps(MClientCaps::msgref_cast(m));
     break;
   case CEPH_MSG_CLIENT_CAPRELEASE:
-    handle_client_cap_release(static_cast<MClientCapRelease*>(m));
+    handle_client_cap_release(MClientCapRelease::msgref_cast(m));
     break;
   case CEPH_MSG_CLIENT_LEASE:
-    handle_client_lease(static_cast<MClientLease*>(m));
+    handle_client_lease(MClientLease::msgref_cast(m));
     break;
-    
   default:
     derr << "locker unknown message " << m->get_type() << dendl;
-    assert(0 == "locker unknown message");
+    ceph_abort_msg("locker unknown message");
   }
 }
 
@@ -137,7 +136,7 @@ void Locker::send_lock_message(SimpleLock *lock, int msg)
     if (mds->is_cluster_degraded() &&
        mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
       continue;
-    MLock *m = new MLock(lock, msg, mds->get_nodeid());
+    auto m = MLock::create(lock, msg, mds->get_nodeid());
     mds->send_message_mds(m, it.first);
   }
 }
@@ -148,7 +147,7 @@ void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data
     if (mds->is_cluster_degraded() &&
        mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
       continue;
-    MLock *m = new MLock(lock, msg, mds->get_nodeid());
+    auto m = MLock::create(lock, msg, mds->get_nodeid());
     m->set_data(data);
     mds->send_message_mds(m, it.first);
   }
@@ -157,29 +156,29 @@ void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data
 
 
 
-void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
+void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
 {
   // rdlock ancestor snaps
   CInode *t = in;
-  rdlocks.insert(&in->snaplock);
   while (t->get_projected_parent_dn()) {
     t = t->get_projected_parent_dn()->get_dir()->get_inode();
-    rdlocks.insert(&t->snaplock);
+    lov.add_rdlock(&t->snaplock);
   }
+  lov.add_rdlock(&in->snaplock);
 }
 
-void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
+void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
                                          file_layout_t **layout)
 {
   //rdlock ancestor snaps
   CInode *t = in;
-  rdlocks.insert(&in->snaplock);
-  rdlocks.insert(&in->policylock);
+  lov.add_rdlock(&in->snaplock);
+  lov.add_rdlock(&in->policylock);
   bool found_layout = false;
   while (t) {
-    rdlocks.insert(&t->snaplock);
+    lov.add_rdlock(&t->snaplock);
     if (!found_layout) {
-      rdlocks.insert(&t->policylock);
+      lov.add_rdlock(&t->policylock);
       if (t->get_projected_inode()->has_layout()) {
         *layout = &t->get_projected_inode()->layout;
         found_layout = true;
@@ -194,12 +193,12 @@ void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
 
 struct MarkEventOnDestruct {
   MDRequestRef& mdr;
-  const char* message;
+  std::string_view message;
   bool mark_event;
-  MarkEventOnDestruct(MDRequestRef& _mdr,
-                      const char *_message) : mdr(_mdr),
-                          message(_message),
-                          mark_event(true) {}
+  MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
+      mdr(_mdr),
+      message(_message),
+      mark_event(true) {}
   ~MarkEventOnDestruct() {
     if (mark_event)
       mdr->mark_event(message);
@@ -209,10 +208,7 @@ struct MarkEventOnDestruct {
 /* If this function returns false, the mdr has been placed
  * on the appropriate wait list */
 bool Locker::acquire_locks(MDRequestRef& mdr,
-                          set<SimpleLock*> &rdlocks,
-                          set<SimpleLock*> &wrlocks,
-                          set<SimpleLock*> &xlocks,
-                          map<SimpleLock*,mds_rank_t> *remote_wrlocks,
+                          MutationImpl::LockOpVec& lov,
                           CInode *auth_pin_freeze,
                           bool auth_pin_nonblock)
 {
@@ -227,143 +223,122 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   client_t client = mdr->get_client();
 
-  set<SimpleLock*, SimpleLock::ptr_lt> sorted;  // sort everything we will lock
-  set<MDSCacheObject*> mustpin;            // items to authpin
+  set<MDSCacheObject*> mustpin;  // items to authpin
 
   // xlocks
-  for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
-    SimpleLock *lock = *p;
-
-    if ((lock->get_type() == CEPH_LOCK_ISNAP ||
-         lock->get_type() == CEPH_LOCK_IPOLICY) &&
-       mds->is_cluster_degraded() &&
-       mdr->is_master() &&
-       !mdr->is_queued_for_replay()) {
-      // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
-      // get processed in proper order.
-      bool wait = false;
-      if (lock->get_parent()->is_auth()) {
-       if (!mdr->locks.count(lock)) {
-         set<mds_rank_t> ls;
-         lock->get_parent()->list_replicas(ls);
-         for (auto m : ls) {
-           if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
-             wait = true;
-             break;
+  for (int i = 0, size = lov.size(); i < size; ++i) {
+    auto& p = lov[i];
+    SimpleLock *lock = p.lock;
+    MDSCacheObject *object = lock->get_parent();
+
+    if (p.is_xlock()) {
+      if ((lock->get_type() == CEPH_LOCK_ISNAP ||
+          lock->get_type() == CEPH_LOCK_IPOLICY) &&
+         mds->is_cluster_degraded() &&
+         mdr->is_master() &&
+         !mdr->is_queued_for_replay()) {
+       // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
+       // get processed in proper order.
+       bool wait = false;
+       if (object->is_auth()) {
+         if (!mdr->locks.count(lock)) {
+           set<mds_rank_t> ls;
+           object->list_replicas(ls);
+           for (auto m : ls) {
+             if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
+               wait = true;
+               break;
+             }
            }
          }
+       } else {
+         // if the lock is the latest locked one, it's possible that slave mds got the lock
+         // while there are recovering mds.
+         if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
+           wait = true;
+       }
+       if (wait) {
+         dout(10) << " must xlock " << *lock << " " << *object
+                  << ", waiting for cluster recovered" << dendl;
+         mds->locker->drop_locks(mdr.get(), NULL);
+         mdr->drop_local_auth_pins();
+         mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+         return false;
        }
-      } else {
-       // if the lock is the latest locked one, it's possible that slave mds got the lock
-       // while there are recovering mds.
-       if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
-         wait = true;
-      }
-      if (wait) {
-       dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
-                << ", waiting for cluster recovered" << dendl;
-       mds->locker->drop_locks(mdr.get(), NULL);
-       mdr->drop_local_auth_pins();
-       mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-       return false;
       }
-    }
-
-    dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
-
-    sorted.insert(lock);
-    mustpin.insert(lock->get_parent());
 
-    // augment xlock with a versionlock?
-    if ((*p)->get_type() == CEPH_LOCK_DN) {
-      CDentry *dn = (CDentry*)lock->get_parent();
-      if (!dn->is_auth())
-       continue;
+      dout(20) << " must xlock " << *lock << " " << *object << dendl;
 
-      if (xlocks.count(&dn->versionlock))
-       continue;  // we're xlocking the versionlock too; don't wrlock it!
+      mustpin.insert(object);
 
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline dentry updates to journal.
-       wrlocks.insert(&dn->versionlock);
-      } else {
-       // slave.  exclusively lock the dentry version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&dn->versionlock);
-       sorted.insert(&dn->versionlock);
+      // augment xlock with a versionlock?
+      if (lock->get_type() == CEPH_LOCK_DN) {
+       CDentry *dn = static_cast<CDentry*>(object);
+       if (!dn->is_auth())
+         continue;
+       if (mdr->is_master()) {
+         // master.  wrlock versionlock so we can pipeline dentry updates to journal.
+         lov.add_wrlock(&dn->versionlock);
+       } else {
+         // slave.  exclusively lock the dentry version (i.e. block other journal updates).
+         // this makes rollback safe.
+         lov.add_xlock(&dn->versionlock);
+       }
       }
-    }
-    if (lock->get_type() > CEPH_LOCK_IVERSION) {
-      // inode version lock?
-      CInode *in = (CInode*)lock->get_parent();
-      if (!in->is_auth())
-       continue;
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline inode updates to journal.
-       wrlocks.insert(&in->versionlock);
-      } else {
-       // slave.  exclusively lock the inode version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&in->versionlock);
-       sorted.insert(&in->versionlock);
+      if (lock->get_type() > CEPH_LOCK_IVERSION) {
+       // inode version lock?
+       CInode *in = static_cast<CInode*>(object);
+       if (!in->is_auth())
+         continue;
+       if (mdr->is_master()) {
+         // master.  wrlock versionlock so we can pipeline inode updates to journal.
+         lov.add_wrlock(&in->versionlock);
+       } else {
+         // slave.  exclusively lock the inode version (i.e. block other journal updates).
+         // this makes rollback safe.
+         lov.add_xlock(&in->versionlock);
+       }
       }
-    }
-  }
-
-  // wrlocks
-  for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must wrlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_wrlock(client) &&  // we might have to request a scatter
-            !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a scatter" << dendl;
-      mustpin.insert(object);
-    }
-  }
-
-  // remote_wrlocks
-  if (remote_wrlocks) {
-    for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
-      MDSCacheObject *object = p->first->get_parent();
-      dout(20) << " must remote_wrlock on mds." << p->second << " "
-              << *p->first << " " << *object << dendl;
-      sorted.insert(p->first);
-      mustpin.insert(object);
-    }
-  }
-
-  // rdlocks
-  for (set<SimpleLock*>::iterator p = rdlocks.begin();
-        p != rdlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must rdlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_rdlock(client)) {      // we might have to request an rdlock
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a rdlock" << dendl;
+    } else if (p.is_wrlock()) {
+      dout(20) << " must wrlock " << *lock << " " << *object << dendl;
+      if (object->is_auth()) {
+       mustpin.insert(object);
+      } else if (!object->is_auth() &&
+                !lock->can_wrlock(client) &&  // we might have to request a scatter
+                !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
+       dout(15) << " will also auth_pin " << *object
+                << " in case we need to request a scatter" << dendl;
+       mustpin.insert(object);
+      }
+    } else if (p.is_remote_wrlock()) {
+      dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
+              << *lock << " " << *object << dendl;
       mustpin.insert(object);
+    } else if (p.is_rdlock()) {
+
+      dout(20) << " must rdlock " << *lock << " " << *object << dendl;
+      if (object->is_auth()) {
+       mustpin.insert(object);
+      } else if (!object->is_auth() &&
+                !lock->can_rdlock(client)) {      // we might have to request an rdlock
+       dout(15) << " will also auth_pin " << *object
+                << " in case we need to request a rdlock" << dendl;
+       mustpin.insert(object);
+      }
+    } else {
+      ceph_assert(0 == "locker unknown lock operation");
     }
   }
 
+  lov.sort_and_merge();
  
   // AUTH PINS
   map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote;  // mds -> (object set)
   
   // can i auth pin them all now?
   marker.message = "failed to authpin local pins";
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
+  for (const auto &p : mustpin) {
+    MDSCacheObject *object = p;
 
     dout(10) << " must authpin " << *object << dendl;
 
@@ -420,10 +395,8 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
   }
 
   // ok, grab local auth pins
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
+  for (const auto& p : mustpin) {
+    MDSCacheObject *object = p;
     if (mdr->is_auth_pinned(object)) {
       dout(10) << " already auth_pinned " << *object << dendl;
     } else if (object->is_auth()) {
@@ -435,14 +408,12 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
   // request remote auth_pins
   if (!mustpin_remote.empty()) {
     marker.message = "requesting remote authpins";
-    for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
-        p != mdr->remote_auth_pins.end();
-        ++p) {
-      if (mustpin.count(p->first)) {
-       assert(p->second == p->first->authority().first);
-       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
+    for (const auto& p : mdr->remote_auth_pins) {
+      if (mustpin.count(p.first)) {
+       ceph_assert(p.second == p.first->authority().first);
+       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
        if (q != mustpin_remote.end())
-         q->second.insert(p->first);
+         q->second.insert(p.first);
       }
     }
     for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
@@ -459,8 +430,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        return false;
       }
       
-      MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
-                                                  MMDSSlaveRequest::OP_AUTHPIN);
+      auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN);
       for (set<MDSCacheObject*>::iterator q = p->second.begin();
           q != p->second.end();
           ++q) {
@@ -477,7 +447,7 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       mds->send_message_mds(req, p->first);
 
       // put in waiting list
-      assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
+      ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
       mdr->more()->waiting_on_slave.insert(p->first);
     }
     return false;
@@ -489,157 +459,154 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   // acquire locks.
   // make sure they match currently acquired locks.
-  set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
-  for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
-       p != sorted.end();
-       ++p) {
-    bool need_wrlock = !!wrlocks.count(*p);
-    bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
+  auto existing = mdr->locks.begin();
+  for (const auto& p : lov) {
+    bool need_wrlock = p.is_wrlock();
+    bool need_remote_wrlock = p.is_remote_wrlock();
 
     // already locked?
-    if (existing != mdr->locks.end() && *existing == *p) {
+    if (existing != mdr->locks.end() && existing->lock == p.lock) {
       // right kind?
-      SimpleLock *have = *existing;
-      ++existing;
-      if (xlocks.count(have) && mdr->xlocks.count(have)) {
-       dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
+      auto it = existing++;
+      auto have = *it; // don't reference
+
+      if (have.is_xlock() && p.is_xlock()) {
+       dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
        continue;
       }
-      if (mdr->remote_wrlocks.count(have)) {
-       if (!need_remote_wrlock ||
-           mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
-         dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
-                  << " " << *have << " " << *have->get_parent() << dendl;
-         remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
-       }
+
+      if (have.is_remote_wrlock() &&
+         (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
+       dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
+                << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
+       remote_wrlock_finish(it, mdr.get());
+       have.clear_remote_wrlock();
       }
+
       if (need_wrlock || need_remote_wrlock) {
-       if (need_wrlock == !!mdr->wrlocks.count(have) &&
-           need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
+       if (need_wrlock == have.is_wrlock() &&
+           need_remote_wrlock == have.is_remote_wrlock()) {
          if (need_wrlock)
-           dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
+           dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
          if (need_remote_wrlock)
-           dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
+           dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
          continue;
        }
-      }
-      if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
-       dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
+
+       if (have.is_wrlock()) {
+         if (!need_wrlock)
+           dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
+         else if (need_remote_wrlock) // acquire remote_wrlock first
+           dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
+         bool need_issue = false;
+         wrlock_finish(it, mdr.get(), &need_issue);
+         if (need_issue)
+           issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
+       }
+      } else if (have.is_rdlock() && p.is_rdlock()) {
+       dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
        continue;
       }
     }
     
     // hose any stray locks
-    if (existing != mdr->locks.end() && *existing == *p) {
-      assert(need_wrlock || need_remote_wrlock);
-      SimpleLock *lock = *existing;
-      if (mdr->wrlocks.count(lock)) {
-       if (!need_wrlock)
-         dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
-       else if (need_remote_wrlock) // acquire remote_wrlock first
-         dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
-       bool need_issue = false;
-       wrlock_finish(lock, mdr.get(), &need_issue);
-       if (need_issue)
-         issue_set.insert(static_cast<CInode*>(lock->get_parent()));
-      }
-      ++existing;
-    }
     while (existing != mdr->locks.end()) {
-      SimpleLock *stray = *existing;
-      ++existing;
-      dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
+      auto it = existing++;
+      auto stray = *it; // don't reference
+      dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
       bool need_issue = false;
-      if (mdr->xlocks.count(stray)) {
-       xlock_finish(stray, mdr.get(), &need_issue);
-      } else if (mdr->rdlocks.count(stray)) {
-       rdlock_finish(stray, mdr.get(), &need_issue);
+      if (stray.is_xlock()) {
+       xlock_finish(it, mdr.get(), &need_issue);
+      } else if (stray.is_rdlock()) {
+       rdlock_finish(it, mdr.get(), &need_issue);
       } else {
        // may have acquired both wrlock and remore wrlock
-       if (mdr->wrlocks.count(stray))
-         wrlock_finish(stray, mdr.get(), &need_issue);
-       if (mdr->remote_wrlocks.count(stray))
-         remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+       if (stray.is_wrlock())
+         wrlock_finish(it, mdr.get(), &need_issue);
+       if (stray.is_remote_wrlock())
+         remote_wrlock_finish(it, mdr.get());
       }
       if (need_issue)
-       issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+       issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
     }
 
     // lock
-    if (mdr->locking && *p != mdr->locking) {
+    if (mdr->locking && p.lock != mdr->locking) {
       cancel_locking(mdr.get(), &issue_set);
     }
-    if (xlocks.count(*p)) {
+    if (p.is_xlock()) {
       marker.message = "failed to xlock, waiting";
-      if (!xlock_start(*p, mdr)) 
+      if (!xlock_start(p.lock, mdr))
        goto out;
-      dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
     } else if (need_wrlock || need_remote_wrlock) {
-      if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
+      if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
         marker.message = "waiting for remote wrlocks";
-       remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+       remote_wrlock_start(p, p.wrlock_target, mdr);
        goto out;
       }
-      if (need_wrlock && !mdr->wrlocks.count(*p)) {
+      if (need_wrlock) {
         marker.message = "failed to wrlock, waiting";
-       if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
+       if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
          marker.message = "failed to wrlock, dropping remote wrlock and waiting";
          // can't take the wrlock because the scatter lock is gathering. need to
          // release the remote wrlock, so that the gathering process can finish.
-         remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
-         remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+         auto it =  mdr->locks.end();
+         ++it;
+         remote_wrlock_finish(it, mdr.get());
+         remote_wrlock_start(p, p.wrlock_target, mdr);
          goto out;
        }
        // nowait if we have already gotten remote wrlock
-       if (!wrlock_start(*p, mdr, need_remote_wrlock))
+       if (!wrlock_start(p, mdr, need_remote_wrlock))
          goto out;
-       dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
+       dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
       }
     } else {
-      assert(mdr->is_master());
-      if ((*p)->needs_recover()) {
+      ceph_assert(mdr->is_master());
+      if (p.lock->needs_recover()) {
        if (mds->is_cluster_degraded()) {
          if (!mdr->is_queued_for_replay()) {
            // see comments in SimpleLock::set_state_rejoin() and
            // ScatterLock::encode_state_for_rejoin()
            drop_locks(mdr.get());
            mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-           dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
+           dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
                     << ", waiting for cluster recovered" << dendl;
            marker.message = "rejoin recovering lock, waiting for cluster recovered";
            return false;
          }
        } else {
-         (*p)->clear_need_recover();
+         p.lock->clear_need_recover();
        }
       }
 
       marker.message = "failed to rdlock, waiting";
-      if (!rdlock_start(*p, mdr)) 
+      if (!rdlock_start(p, mdr))
        goto out;
-      dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
     }
   }
     
   // any extra unneeded locks?
   while (existing != mdr->locks.end()) {
-    SimpleLock *stray = *existing;
-    ++existing;
-    dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
+    auto it = existing++;
+    auto stray = *it;
+    dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
     bool need_issue = false;
-    if (mdr->xlocks.count(stray)) {
-      xlock_finish(stray, mdr.get(), &need_issue);
-    } else if (mdr->rdlocks.count(stray)) {
-      rdlock_finish(stray, mdr.get(), &need_issue);
+    if (stray.is_xlock()) {
+      xlock_finish(it, mdr.get(), &need_issue);
+    } else if (stray.is_rdlock()) {
+      rdlock_finish(it, mdr.get(), &need_issue);
     } else {
       // may have acquired both wrlock and remore wrlock
-      if (mdr->wrlocks.count(stray))
-       wrlock_finish(stray, mdr.get(), &need_issue);
-      if (mdr->remote_wrlocks.count(stray))
-       remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+      if (stray.is_wrlock())
+       wrlock_finish(it, mdr.get(), &need_issue);
+      if (stray.is_remote_wrlock())
+       remote_wrlock_finish(it, mdr.get());
     }
     if (need_issue)
-      issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+      issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
   }
 
   mdr->done_locking = true;
@@ -662,7 +629,7 @@ void Locker::notify_freeze_waiter(MDSCacheObject *o)
     dir = dn->get_dir();
   } else {
     dir = dynamic_cast<CDir*>(o);
-    assert(dir);
+    ceph_assert(dir);
   }
   if (dir) {
     if (dir->is_freezing_dir())
@@ -677,73 +644,68 @@ void Locker::notify_freeze_waiter(MDSCacheObject *o)
 
 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
 {
-  for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
-       p != mut->xlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    assert(object->is_auth());
+  for (const auto &p : mut->locks) {
+    if (!p.is_xlock())
+      continue;
+    MDSCacheObject *obj = p.lock->get_parent();
+    ceph_assert(obj->is_auth());
     if (skip_dentry &&
-       ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
+       (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
       continue;
-    dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
-    (*p)->set_xlock_done();
+    dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
+    p.lock->set_xlock_done();
   }
 }
 
-void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  while (!mut->rdlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
-    rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-}
-
-void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
+                        bool drop_rdlocks)
 {
   set<mds_rank_t> slaves;
 
-  while (!mut->xlocks.empty()) {
-    SimpleLock *lock = *mut->xlocks.begin();
-    MDSCacheObject *p = lock->get_parent();
-    if (!p->is_auth()) {
-      assert(lock->get_sm()->can_remote_xlock);
-      slaves.insert(p->authority().first);
-      lock->put_xlock();
-      mut->locks.erase(lock);
-      mut->xlocks.erase(lock);
-      continue;
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    SimpleLock *lock = it->lock;
+    MDSCacheObject *obj = lock->get_parent();
+
+    if (it->is_xlock()) {
+      if (obj->is_auth()) {
+       bool ni = false;
+       xlock_finish(it++, mut, &ni);
+       if (ni)
+         pneed_issue->insert(static_cast<CInode*>(obj));
+      } else {
+       ceph_assert(lock->get_sm()->can_remote_xlock);
+       slaves.insert(obj->authority().first);
+       lock->put_xlock();
+       mut->locks.erase(it++);
+      }
+    } else if (it->is_wrlock() || it->is_remote_wrlock()) {
+      if (it->is_remote_wrlock()) {
+       slaves.insert(it->wrlock_target);
+       it->clear_remote_wrlock();
+      }
+      if (it->is_wrlock()) {
+       bool ni = false;
+       wrlock_finish(it++, mut, &ni);
+       if (ni)
+         pneed_issue->insert(static_cast<CInode*>(obj));
+      } else {
+       mut->locks.erase(it++);
+      }
+    } else if (drop_rdlocks && it->is_rdlock()) {
+      bool ni = false;
+      rdlock_finish(it++, mut, &ni);
+      if (ni)
+       pneed_issue->insert(static_cast<CInode*>(obj));
+    } else {
+      ++it;
     }
-    bool ni = false;
-    xlock_finish(lock, mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-
-  while (!mut->remote_wrlocks.empty()) {
-    map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
-    slaves.insert(p->second);
-    if (mut->wrlocks.count(p->first) == 0)
-      mut->locks.erase(p->first);
-    mut->remote_wrlocks.erase(p);
-  }
-
-  while (!mut->wrlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
-    wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
   }
 
   for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
       dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
-      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                       MMDSSlaveRequest::OP_DROPLOCKS);
+      auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS);
       mds->send_message_mds(slavereq, *p);
     }
   }
@@ -752,7 +714,7 @@ void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
 {
   SimpleLock *lock = mut->locking;
-  assert(lock);
+  ceph_assert(lock);
   dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
 
   if (lock->get_parent()->is_auth()) {
@@ -778,8 +740,7 @@ void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
 
   if (mut->locking)
     cancel_locking(mut, pneed_issue);
-  _drop_non_rdlocks(mut, pneed_issue);
-  _drop_rdlocks(mut, pneed_issue);
+  _drop_locks(mut, pneed_issue, true);
 
   if (pneed_issue == &my_need_issue)
     issue_caps_set(*pneed_issue);
@@ -792,7 +753,7 @@ void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
   if (!pneed_issue)
     pneed_issue = &my_need_issue;
 
-  _drop_non_rdlocks(mut, pneed_issue);
+  _drop_locks(mut, pneed_issue, false);
 
   if (pneed_issue == &my_need_issue)
     issue_caps_set(*pneed_issue);
@@ -802,15 +763,20 @@ void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
 {
   set<CInode*> need_issue;
 
-  for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
-    SimpleLock *lock = *p;
-    ++p;
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    if (!it->is_rdlock()) {
+      ++it;
+      continue;
+    }
+    SimpleLock *lock = it->lock;
     // make later mksnap/setlayout (at other mds) wait for this unsafe request
     if (lock->get_type() == CEPH_LOCK_ISNAP ||
-       lock->get_type() == CEPH_LOCK_IPOLICY)
+       lock->get_type() == CEPH_LOCK_IPOLICY) {
+      ++it;
       continue;
+    }
     bool ni = false;
-    rdlock_finish(lock, mut, &ni);
+    rdlock_finish(it++, mut, &ni);
     if (ni)
       need_issue.insert(static_cast<CInode*>(lock->get_parent()));
   }
@@ -823,13 +789,13 @@ void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
   set<CInode*> need_issue;
 
   for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
-    SimpleLock *lock = *it;
-    ++it;
+    SimpleLock *lock = it->lock;
     if (lock->get_type() == CEPH_LOCK_IDFT) {
+      ++it;
       continue;
     }
     bool ni = false;
-    wrlock_finish(lock, mut, &ni);
+    wrlock_finish(it++, mut, &ni);
     if (ni)
       need_issue.insert(static_cast<CInode*>(lock->get_parent()));
   }
@@ -838,10 +804,10 @@ void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
 
 // generics
 
-void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
+void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
 {
   dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(!lock->is_stable());
+  ceph_assert(!lock->is_stable());
 
   int next = lock->get_next_state();
 
@@ -853,7 +819,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
   bool need_issue = false;
 
   int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
-  assert(!caps || in != NULL);
+  ceph_assert(!caps || in != NULL);
   if (caps && in->is_head()) {
     in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
                        lock->get_cap_shift(), lock->get_cap_mask());
@@ -890,7 +856,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
            << " on " << *lock->get_parent() << dendl;
 
     if (lock->get_sm() == &sm_filelock) {
-      assert(in);
+      ceph_assert(in);
       if (in->state_test(CInode::STATE_RECOVERING)) {
        dout(7) << "eval_gather finished gather, but still recovering" << dendl;
        return;
@@ -917,13 +883,12 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
          mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
        switch (lock->get_state()) {
        case LOCK_SYNC_LOCK:
-         mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
-                               auth);
+         mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
          break;
 
        case LOCK_MIX_SYNC:
          {
-           MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
+           auto reply = MLock::create(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
            lock->encode_locked_state(reply->get_data());
            mds->send_message_mds(reply, auth);
            next = LOCK_MIX_SYNC2;
@@ -941,7 +906,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
          
        case LOCK_SYNC_MIX:
          { 
-           MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
+           auto reply = MLock::create(lock, LOCK_AC_MIXACK, mds->get_nodeid());
            mds->send_message_mds(reply, auth);
            next = LOCK_SYNC_MIX2;
          }
@@ -951,7 +916,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
          {
            bufferlist data;
            lock->encode_locked_state(data);
-           mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
+           mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
            (static_cast<ScatterLock *>(lock))->start_flush();
            // we'll get an AC_LOCKFLUSHED to complete
          }
@@ -1062,7 +1027,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
 bool Locker::eval(CInode *in, int mask, bool caps_imported)
 {
   bool need_issue = caps_imported;
-  list<MDSInternalContextBase*> finishers;
+  MDSContext::vec finishers;
   
   dout(10) << "eval " << mask << " " << *in << dendl;
 
@@ -1102,7 +1067,7 @@ bool Locker::eval(CInode *in, int mask, bool caps_imported)
       if (in->get_wanted_loner() >= 0) {
        dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
        bool ok = in->try_set_loner();
-       assert(ok);
+       ceph_assert(ok);
        mask = -1;
        goto retry;
       }
@@ -1125,7 +1090,7 @@ public:
   C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
     // We are used as an MDSCacheObject waiter, so should
     // only be invoked by someone already holding the big lock.
-    assert(locker->mds->mds_lock.is_locked_by_me());
+    ceph_assert(locker->mds->mds_lock.is_locked_by_me());
     p->get(MDSCacheObject::PIN_PTRWAITER);    
   }
   void finish(int r) override {
@@ -1150,7 +1115,7 @@ void Locker::try_eval(MDSCacheObject *p, int mask)
   }
 
   if (mask & CEPH_LOCK_DN) {
-    assert(mask == CEPH_LOCK_DN);
+    ceph_assert(mask == CEPH_LOCK_DN);
     bool need_issue = false;  // ignore this, no caps on dentries
     CDentry *dn = static_cast<CDentry *>(p);
     eval_any(&dn->lock, &need_issue);
@@ -1211,7 +1176,9 @@ void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
     }
   }
 
-  if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
+  if (lock->get_type() != CEPH_LOCK_DN &&
+      lock->get_type() != CEPH_LOCK_ISNAP &&
+      p->is_freezing()) {
     dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
     p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
     return;
@@ -1223,7 +1190,7 @@ void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
 {
   bool need_issue = false;
-  list<MDSInternalContextBase*> finishers;
+  MDSContext::vec finishers;
 
   // kick locks now
   if (!in->filelock.is_stable())
@@ -1248,7 +1215,7 @@ void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
 void Locker::eval_scatter_gathers(CInode *in)
 {
   bool need_issue = false;
-  list<MDSInternalContextBase*> finishers;
+  MDSContext::vec finishers;
 
   dout(10) << "eval_scatter_gathers " << *in << dendl;
 
@@ -1312,7 +1279,7 @@ bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
          mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
        dout(10) << "requesting rdlock from auth on "
                 << *lock << " on " << *lock->get_parent() << dendl;
-       mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
+       mds->send_message_mds(MLock::create(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
       }
       return false;
     }
@@ -1327,7 +1294,7 @@ bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
   return false;
 }
 
-bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
+bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSContext *con)
 {
   dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;  
 
@@ -1374,8 +1341,7 @@ bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
     // can read?  grab ref.
     if (lock->can_rdlock(client)) {
       lock->get_rdlock();
-      mut->rdlocks.insert(lock);
-      mut->locks.insert(lock);
+      mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
       return true;
     }
 
@@ -1384,7 +1350,7 @@ bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
        lock->get_state() == LOCK_SNAP_SYNC) {
       // okay, we actually need to kick the head's lock to get ourselves synced up.
       CInode *head = mdcache->get_inode(in->ino());
-      assert(head);
+      ceph_assert(head);
       SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
       if (hlock->get_state() == LOCK_SYNC)
        hlock = head->get_lock(lock->get_type());
@@ -1420,14 +1386,14 @@ void Locker::nudge_log(SimpleLock *lock)
     mds->mdlog->flush();
 }
 
-void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_rdlock());
+  SimpleLock *lock = it->lock;
   // drop ref
   lock->put_rdlock();
-  if (mut) {
-    mut->rdlocks.erase(lock);
-    mut->locks.erase(lock);
-  }
+  if (mut)
+    mut->locks.erase(it);
 
   dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
   
@@ -1441,35 +1407,27 @@ void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issu
 }
 
 
-bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
+bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
 {
-  dout(10) << "can_rdlock_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!(*p)->can_rdlock(-1)) {
-      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+  dout(10) << "can_rdlock_set " << dendl;
+  for (const auto& p : lov) {
+    ceph_assert(p.is_rdlock());
+    if (!p.lock->can_rdlock(-1)) {
+      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
       return false;
     }
+  }
   return true;
 }
 
-bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
-{
-  dout(10) << "rdlock_try_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!rdlock_try(*p, -1, NULL)) {
-      dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
-      return false;
-    }
-  return true;
-}
 
-void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
+void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
 {
-  dout(10) << "rdlock_take_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
-    (*p)->get_rdlock();
-    mut->rdlocks.insert(*p);
-    mut->locks.insert(*p);
+  dout(10) << "rdlock_take_set "  << dendl;
+  for (const auto& p : lov) {
+    ceph_assert(p.is_rdlock());
+    p.lock->get_rdlock();
+    mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
   }
 }
 
@@ -1485,8 +1443,7 @@ void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
   dout(7) << "wrlock_force  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
 }
 
 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
@@ -1508,8 +1465,8 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
     if (lock->can_wrlock(client) &&
        (!want_scatter || lock->get_state() == LOCK_MIX)) {
       lock->get_wrlock();
-      mut->wrlocks.insert(lock);
-      mut->locks.insert(lock);
+      auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+      it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
       return true;
     }
 
@@ -1544,7 +1501,7 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
          mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
        dout(10) << "requesting scatter from auth on "
                 << *lock << " on " << *lock->get_parent() << dendl;
-       mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
+       mds->send_message_mds(MLock::create(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
       }
       break;
     }
@@ -1559,19 +1516,22 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
   return false;
 }
 
-void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_wrlock());
+  SimpleLock* lock = it->lock;
+
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
+    return local_wrlock_finish(it, mut);
 
   dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
   lock->put_wrlock();
-  if (mut) {
-    mut->wrlocks.erase(lock);
-    if (mut->remote_wrlocks.count(lock) == 0)
-      mut->locks.erase(lock);
-  }
+
+  if (it->is_remote_wrlock())
+    it->clear_wrlock();
+  else
+    mut->locks.erase(it);
 
   if (!lock->is_wrlocked()) {
     if (!lock->is_stable())
@@ -1600,30 +1560,31 @@ void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestR
   // send lock request
   mut->start_locking(lock, target);
   mut->more()->slaves.insert(target);
-  MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                            MMDSSlaveRequest::OP_WRLOCK);
+  auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
   r->set_lock_type(lock->get_type());
   lock->get_parent()->set_object_info(r->get_object_info());
   mds->send_message_mds(r, target);
 
-  assert(mut->more()->waiting_on_slave.count(target) == 0);
+  ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
   mut->more()->waiting_on_slave.insert(target);
 }
 
-void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
-                                  MutationImpl *mut)
+void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
-  // drop ref
-  mut->remote_wrlocks.erase(lock);
-  if (mut->wrlocks.count(lock) == 0)
-    mut->locks.erase(lock);
+  ceph_assert(it->is_remote_wrlock());
+  SimpleLock *lock = it->lock;
+  mds_rank_t target = it->wrlock_target;
+
+  if (it->is_wrlock())
+    it->clear_remote_wrlock();
+  else
+    mut->locks.erase(it);
   
   dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
          << " " << *lock->get_parent()  << dendl;
   if (!mds->is_cluster_degraded() ||
       mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
-    MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                     MMDSSlaveRequest::OP_UNWRLOCK);
+    auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
     slavereq->set_lock_type(lock->get_type());
     lock->get_parent()->set_object_info(slavereq->get_object_info());
     mds->send_message_mds(slavereq, target);
@@ -1656,8 +1617,7 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
            in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
        lock->set_state(LOCK_XLOCK);
        lock->get_xlock(mut, client);
-       mut->xlocks.insert(lock);
-       mut->locks.insert(lock);
+       mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
        mut->finish_locking(lock);
        return true;
       }
@@ -1685,8 +1645,8 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
     return false;
   } else {
     // replica
-    assert(lock->get_sm()->can_remote_xlock);
-    assert(!mut->slave_request);
+    ceph_assert(lock->get_sm()->can_remote_xlock);
+    ceph_assert(!mut->slave_request);
     
     // wait for single auth
     if (lock->get_parent()->is_ambiguous_auth()) {
@@ -1708,13 +1668,12 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
     // send lock request
     mut->more()->slaves.insert(auth);
     mut->start_locking(lock, auth);
-    MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                              MMDSSlaveRequest::OP_XLOCK);
+    auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
     r->set_lock_type(lock->get_type());
     lock->get_parent()->set_object_info(r->get_object_info());
     mds->send_message_mds(r, auth);
 
-    assert(mut->more()->waiting_on_slave.count(auth) == 0);
+    ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
     mut->more()->waiting_on_slave.insert(auth);
 
     return false;
@@ -1723,12 +1682,13 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
 
 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
 {
-  assert(!lock->is_stable());
-  if (lock->get_num_rdlocks() == 0 &&
+  ceph_assert(!lock->is_stable());
+  if (lock->get_type() != CEPH_LOCK_DN &&
+      lock->get_type() != CEPH_LOCK_ISNAP &&
+      lock->get_num_rdlocks() == 0 &&
       lock->get_num_wrlocks() == 0 &&
       !lock->is_leased() &&
-      lock->get_state() != LOCK_XLOCKSNAP &&
-      lock->get_type() != CEPH_LOCK_DN) {
+      lock->get_state() != LOCK_XLOCKSNAP) {
     CInode *in = static_cast<CInode*>(lock->get_parent());
     client_t loner = in->get_target_loner();
     if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
@@ -1747,11 +1707,14 @@ void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue
   eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
 }
 
-void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_xlock());
+  SimpleLock *lock = it->lock;
+
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
+    return local_xlock_finish(it, mut);
 
   dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
 
@@ -1759,23 +1722,21 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
 
   // drop ref
   lock->put_xlock();
-  assert(mut);
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  ceph_assert(mut);
+  mut->locks.erase(it);
   
   bool do_issue = false;
 
   // remote xlock?
   if (!lock->get_parent()->is_auth()) {
-    assert(lock->get_sm()->can_remote_xlock);
+    ceph_assert(lock->get_sm()->can_remote_xlock);
 
     // tell auth
     dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent()  << dendl;
     mds_rank_t auth = lock->get_parent()->authority().first;
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
-      MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
-                                                       MMDSSlaveRequest::OP_UNXLOCK);
+      auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
       slavereq->set_lock_type(lock->get_type());
       lock->get_parent()->set_object_info(slavereq->get_object_info());
       mds->send_message_mds(slavereq, auth);
@@ -1802,16 +1763,17 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
   }
 }
 
-void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
+void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_xlock());
+  SimpleLock *lock = it->lock;
   dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
 
   lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
 
   MDSCacheObject *p = lock->get_parent();
-  assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
+  ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
 
   if (!lock->is_stable())
     lock->get_parent()->auth_unpin(lock);
@@ -1838,87 +1800,94 @@ version_t Locker::issue_file_data_version(CInode *in)
 class C_Locker_FileUpdate_finish : public LockerLogContext {
   CInode *in;
   MutationRef mut;
-  bool share_max;
-  bool need_issue;
+  unsigned flags;
   client_t client;
-  MClientCaps *ack;
+  MClientCaps::ref ack;
 public:
-  C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
-                               bool sm=false, bool ni=false, client_t c=-1,
-                               MClientCaps *ac = 0)
-    : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
-      client(c), ack(ac) {
+  C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
+                             const MClientCaps::ref &ack, client_t c=-1)
+    : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
     in->get(CInode::PIN_PTRWAITER);
   }
   void finish(int r) override {
-    locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
+    locker->file_update_finish(in, mut, flags, client, ack);
     in->put(CInode::PIN_PTRWAITER);
   }
 };
 
-void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
-                               client_t client, MClientCaps *ack)
+enum {
+  UPDATE_SHAREMAX = 1,
+  UPDATE_NEEDSISSUE = 2,
+  UPDATE_SNAPFLUSH = 4,
+};
+
+void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
+                               client_t client, const MClientCaps::ref &ack)
 {
   dout(10) << "file_update_finish on " << *in << dendl;
   in->pop_and_dirty_projected_inode(mut->ls);
 
   mut->apply();
-  
+
   if (ack) {
     Session *session = mds->get_session(client);
     if (session) {
       // "oldest flush tid" > 0 means client uses unique TID for each flush
       if (ack->get_oldest_flush_tid() > 0)
-       session->add_completed_flush(ack->get_client_tid());
+        session->add_completed_flush(ack->get_client_tid());
       mds->send_message_client_counted(ack, session);
     } else {
       dout(10) << " no session for client." << client << " " << *ack << dendl;
-      ack->put();
     }
   }
 
   set<CInode*> need_issue;
   drop_locks(mut.get(), &need_issue);
 
-  if (!in->is_head() && !in->client_snap_caps.empty()) {
+  if (in->is_head()) {
+    if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
+      Capability *cap = in->get_client_cap(client);
+      if (cap && (cap->wanted() & ~cap->pending()))
+       issue_caps(in, cap);
+    }
+
+    if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
+       (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
+      share_inode_max_size(in);
+
+  } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
     dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
     // check for snap writeback completion
     bool gather = false;
     auto p = in->client_snap_caps.begin();
     while (p != in->client_snap_caps.end()) {
-      SimpleLock *lock = in->get_lock(p->first);
-      assert(lock);
-      dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
-              << " lock " << *lock << " on " << *in << dendl;
-      lock->put_wrlock();
-
-      p->second.erase(client);
-      if (p->second.empty()) {
-       gather = true;
-       in->client_snap_caps.erase(p++);
-      } else
-       ++p;
+      auto q = p->second.find(client);
+      if (q != p->second.end()) {
+       SimpleLock *lock = in->get_lock(p->first);
+       ceph_assert(lock);
+       dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
+                << " lock " << *lock << " on " << *in << dendl;
+       lock->put_wrlock();
+
+       p->second.erase(q);
+       if (p->second.empty()) {
+         gather = true;
+         in->client_snap_caps.erase(p++);
+       } else
+         ++p;
+      }
     }
     if (gather) {
-      if (in->client_snap_caps.empty())
+      if (in->client_snap_caps.empty()) {
        in->item_open_file.remove_myself();
+       in->item_caps.remove_myself();
+      }
       eval_cap_gather(in, &need_issue);
     }
-  } else {
-    if (issue_client_cap && need_issue.count(in) == 0) {
-      Capability *cap = in->get_client_cap(client);
-      if (cap && (cap->wanted() & ~cap->pending()))
-       issue_caps(in, cap);
-    }
-  
-    if (share_max && in->is_auth() &&
-       (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
-      share_inode_max_size(in);
   }
   issue_caps_set(need_issue);
 
-  utime_t now = ceph_clock_now();
-  mds->balancer->hit_inode(now, in, META_POP_IWR);
+  mds->balancer->hit_inode(in, META_POP_IWR);
 
   // auth unpin after issuing caps
   mut->cleanup();
@@ -1939,8 +1908,8 @@ Capability* Locker::issue_new_caps(CInode *in,
 
 
   // my needs
-  assert(session->info.inst.name.is_client());
-  client_t my_client = session->info.inst.name.num();
+  ceph_assert(session->info.inst.name.is_client());
+  client_t my_client = session->get_client();
   int my_want = ceph_caps_for_mode(mode);
 
   // register a capability
@@ -2012,19 +1981,19 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
            << " on " << *in << dendl;
   }
 
-  assert(in->is_head());
+  ceph_assert(in->is_head());
 
   // count conflicts with
   int nissued = 0;        
 
   // client caps
-  map<client_t, Capability*>::iterator it;
+  map<client_t, Capability>::iterator it;
   if (only_cap)
     it = in->client_caps.find(only_cap->get_client());
   else
     it = in->client_caps.begin();
   for (; it != in->client_caps.end(); ++it) {
-    Capability *cap = it->second;
+    Capability *cap = &it->second;
     if (cap->is_stale())
       continue;
 
@@ -2038,10 +2007,10 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
     // add in any xlocker-only caps (for locks this client is the xlocker for)
     allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
 
-    Session *session = mds->get_session(it->first);
-    if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
-       !(session && session->connection &&
-         session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
+    if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
+        cap->is_noinline()) ||
+       (!in->inode.layout.pool_ns.empty() &&
+        cap->is_nopoolns()))
       allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
 
     int pending = cap->pending();
@@ -2100,15 +2069,16 @@ bool Locker::issue_caps(CInode *in, Capability *only_cap)
                cap->reset_num_revoke_warnings();
        }
 
-       MClientCaps *m = new MClientCaps(op, in->ino(),
-                                        in->find_snaprealm()->inode->ino(),
-                                        cap->get_cap_id(), cap->get_last_seq(),
-                                        after, wanted, 0,
-                                        cap->get_mseq(),
-                                         mds->get_osd_epoch_barrier());
+       auto m = MClientCaps::create(op, in->ino(),
+                                           in->find_snaprealm()->inode->ino(),
+                                           cap->get_cap_id(),
+                                           cap->get_last_seq(),
+                                           after, wanted, 0,
+                                           cap->get_mseq(),
+                                           mds->get_osd_epoch_barrier());
        in->encode_cap_message(m, cap);
 
-       mds->send_message_client_counted(m, it->first);
+       mds->send_message_client_counted(m, cap->get_session());
       }
     }
 
@@ -2123,19 +2093,17 @@ void Locker::issue_truncate(CInode *in)
 {
   dout(7) << "issue_truncate on " << *in << dendl;
   
-  for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
-       it != in->client_caps.end();
-       ++it) {
-    Capability *cap = it->second;
-    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
-                                    in->ino(),
-                                    in->find_snaprealm()->inode->ino(),
-                                    cap->get_cap_id(), cap->get_last_seq(),
-                                    cap->pending(), cap->wanted(), 0,
-                                    cap->get_mseq(),
-                                     mds->get_osd_epoch_barrier());
+  for (auto &p : in->client_caps) {
+    Capability *cap = &p.second;
+    auto m = MClientCaps::create(CEPH_CAP_OP_TRUNC,
+                                       in->ino(),
+                                       in->find_snaprealm()->inode->ino(),
+                                       cap->get_cap_id(), cap->get_last_seq(),
+                                       cap->pending(), cap->wanted(), 0,
+                                       cap->get_mseq(),
+                                       mds->get_osd_epoch_barrier());
     in->encode_cap_message(m, cap);                         
-    mds->send_message_client_counted(m, it->first);
+    mds->send_message_client_counted(m, p.first);
   }
 
   // should we increase max_size?
@@ -2203,10 +2171,11 @@ void Locker::resume_stale_caps(Session *session)
 {
   dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
 
+  bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
   for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
     Capability *cap = *p;
     ++p;
-    if (!cap->is_notable())
+    if (lazy && !cap->is_notable())
       break; // see revoke_stale_caps()
 
     CInode *in = cap->get_inode();
@@ -2254,7 +2223,7 @@ public:
 
 void Locker::request_inode_file_caps(CInode *in)
 {
-  assert(!in->is_auth());
+  ceph_assert(!in->is_auth());
 
   int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
   if (wanted != in->replica_caps_wanted) {
@@ -2280,13 +2249,11 @@ void Locker::request_inode_file_caps(CInode *in)
 
     if (!mds->is_cluster_degraded() ||
        mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
-      mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
-                           auth);
+      mds->send_message_mds(MInodeFileCaps::create(in->ino(), in->replica_caps_wanted), auth);
   }
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_inode_file_caps(MInodeFileCaps *m)
+void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref &m)
 {
   // nobody should be talking to us during recovery.
   if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
@@ -2294,25 +2261,21 @@ void Locker::handle_inode_file_caps(MInodeFileCaps *m)
       mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
       return;
     }
-    assert(!"got unexpected message during recovery");
+    ceph_abort_msg("got unexpected message during recovery");
   }
 
   // ok
   CInode *in = mdcache->get_inode(m->get_ino());
   mds_rank_t from = mds_rank_t(m->get_source().num());
 
-  assert(in);
-  assert(in->is_auth());
+  ceph_assert(in);
+  ceph_assert(in->is_auth());
 
   dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
 
-  if (m->get_caps())
-    in->mds_caps_wanted[from] = m->get_caps();
-  else
-    in->mds_caps_wanted.erase(from);
+  in->set_mds_caps_wanted(from, m->get_caps());
 
   try_eval(in, CEPH_CAP_LOCKS);
-  m->put();
 }
 
 
@@ -2340,12 +2303,12 @@ public:
 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
 {
   uint64_t new_max = (size + 1) << 1;
-  uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
+  uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
   if (max_inc > 0) {
     max_inc *= pi->layout.object_size;
-    new_max = MIN(new_max, size + max_inc);
+    new_max = std::min(new_max, size + max_inc);
   }
-  return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
+  return round_up_to(new_max, pi->get_layout_size_increment());
 }
 
 void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
@@ -2363,17 +2326,15 @@ void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
 
   // increase ranges as appropriate.
   // shrink to 0 if no WR|BUFFER caps issued.
-  for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
-       p != in->client_caps.end();
-       ++p) {
-    if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_ANY_FILE_WR)) {
-      client_writeable_range_t& nr = (*new_ranges)[p->first];
+  for (auto &p : in->client_caps) {
+    if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
+      client_writeable_range_t& nr = (*new_ranges)[p.first];
       nr.range.first = 0;
-      if (latest->client_ranges.count(p->first)) {
-       client_writeable_range_t& oldr = latest->client_ranges[p->first];
+      if (latest->client_ranges.count(p.first)) {
+       client_writeable_range_t& oldr = latest->client_ranges[p.first];
        if (ms > oldr.range.last)
          *max_increased = true;
-       nr.range.last = MAX(ms, oldr.range.last);
+       nr.range.last = std::max(ms, oldr.range.last);
        nr.follows = oldr.follows;
       } else {
        *max_increased = true;
@@ -2381,10 +2342,10 @@ void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
        nr.follows = in->first - 1;
       }
       if (update)
-       p->second->mark_clientwriteable();
+       p.second.mark_clientwriteable();
     } else {
       if (update)
-       p->second->clear_clientwriteable();
+       p.second.clear_clientwriteable();
     }
   }
 }
@@ -2393,8 +2354,8 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
                                  uint64_t new_max_size, uint64_t new_size,
                                  utime_t new_mtime)
 {
-  assert(in->is_auth());
-  assert(in->is_file());
+  ceph_assert(in->is_auth());
+  ceph_assert(in->is_file());
 
   CInode::mempool_inode *latest = in->get_projected_inode();
   CInode::mempool_inode::client_range_map new_ranges;
@@ -2404,8 +2365,8 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
   bool max_increased = false;
 
   if (update_size) {
-    new_size = size = MAX(size, new_size);
-    new_mtime = MAX(new_mtime, latest->mtime);
+    new_size = size = std::max(size, new_size);
+    new_mtime = std::max(new_mtime, latest->mtime);
     if (latest->size == new_size && latest->mtime == new_mtime)
       update_size = false;
   }
@@ -2486,7 +2447,6 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
     eo->add_ino(in->ino());
     metablob = &eo->metablob;
     le = eo;
-    mut->ls->open_files.push_back(&in->item_open_file);
   } else {
     EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
     metablob = &eu->metablob;
@@ -2502,8 +2462,8 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
     metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
     mdcache->journal_dirty_inode(mut.get(), metablob, in);
   }
-  mds->mdlog->submit_entry(le,
-          new C_Locker_FileUpdate_finish(this, in, mut, true));
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
+      UPDATE_SHAREMAX, MClientCaps::ref()));
   wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
   mut->auth_pin(in);
 
@@ -2523,26 +2483,28 @@ void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
    * the cap later.
    */
   dout(10) << "share_inode_max_size on " << *in << dendl;
-  map<client_t, Capability*>::iterator it;
+  map<client_t, Capability>::iterator it;
   if (only_cap)
     it = in->client_caps.find(only_cap->get_client());
   else
     it = in->client_caps.begin();
   for (; it != in->client_caps.end(); ++it) {
     const client_t client = it->first;
-    Capability *cap = it->second;
+    Capability *cap = &it->second;
     if (cap->is_suppress())
       continue;
     if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
       dout(10) << "share_inode_max_size with client." << client << dendl;
       cap->inc_last_seq();
-      MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
-                                      in->ino(),
-                                      in->find_snaprealm()->inode->ino(),
-                                      cap->get_cap_id(), cap->get_last_seq(),
-                                      cap->pending(), cap->wanted(), 0,
-                                       cap->get_mseq(),
-                                       mds->get_osd_epoch_barrier());
+      auto m = MClientCaps::create(CEPH_CAP_OP_GRANT,
+                                         in->ino(),
+                                         in->find_snaprealm()->inode->ino(),
+                                         cap->get_cap_id(),
+                                         cap->get_last_seq(),
+                                         cap->pending(),
+                                         cap->wanted(), 0,
+                                         cap->get_mseq(),
+                                         mds->get_osd_epoch_barrier());
       in->encode_cap_message(m, cap);
       mds->send_message_client_counted(m, client);
     }
@@ -2592,33 +2554,68 @@ void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
     return;
   }
 
-  if (cap->wanted() == 0) {
-    if (cur->item_open_file.is_on_list() &&
-       !cur->is_any_caps_wanted()) {
-      dout(10) << " removing unwanted file from open file list " << *cur << dendl;
-      cur->item_open_file.remove_myself();
-    }
-  } else {
+  if (cap->wanted()) {
     if (cur->state_test(CInode::STATE_RECOVERING) &&
        (cap->wanted() & (CEPH_CAP_FILE_RD |
                          CEPH_CAP_FILE_WR))) {
       mds->mdcache->recovery_queue.prioritize(cur);
     }
 
-    if (!cur->item_open_file.is_on_list()) {
-      dout(10) << " adding to open file list " << *cur << dendl;
-      assert(cur->last == CEPH_NOSNAP);
-      LogSegment *ls = mds->mdlog->get_current_segment();
+    if (mdcache->open_file_table.should_log_open(cur)) {
+      ceph_assert(cur->last == CEPH_NOSNAP);
       EOpen *le = new EOpen(mds->mdlog);
       mds->mdlog->start_entry(le);
       le->add_clean_inode(cur);
-      ls->open_files.push_back(&cur->item_open_file);
       mds->mdlog->submit_entry(le);
     }
   }
 }
 
+void Locker::snapflush_nudge(CInode *in)
+{
+  ceph_assert(in->last != CEPH_NOSNAP);
+  if (in->client_snap_caps.empty())
+    return;
+
+  CInode *head = mdcache->get_inode(in->ino());
+  // head inode gets unpinned when snapflush starts. It might get trimmed
+  // before snapflush finishes.
+  if (!head)
+    return;
 
+  ceph_assert(head->is_auth());
+  if (head->client_need_snapflush.empty())
+    return;
+
+  SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
+  if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
+    hlock = NULL;
+    for (int i = 0; i < num_cinode_locks; i++) {
+      SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
+      if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
+       hlock = lock;
+       break;
+      }
+    }
+  }
+  if (hlock) {
+    _rdlock_kick(hlock, true);
+  } else {
+    // also, requeue, in case of unstable lock
+    need_snapflush_inodes.push_back(&in->item_caps);
+  }
+}
+
+void Locker::mark_need_snapflush_inode(CInode *in)
+{
+  ceph_assert(in->last != CEPH_NOSNAP);
+  if (!in->item_caps.is_on_list()) {
+    need_snapflush_inodes.push_back(&in->item_caps);
+    utime_t now = ceph_clock_now();
+    in->last_dirstat_prop = now;
+    dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
+  }
+}
 
 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
 {
@@ -2632,9 +2629,9 @@ void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
     if (clients.count(client)) {
       dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
       CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
-      assert(sin);
-      assert(sin->first <= snapid);
-      _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
+      ceph_assert(sin);
+      ceph_assert(sin->first <= snapid);
+      _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref());
       head_in->remove_need_snapflush(sin, snapid, client);
     }
   }
@@ -2659,37 +2656,35 @@ bool Locker::should_defer_client_cap_frozen(CInode *in)
   return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
 }
 
-/*
- * This function DOES put the passed message before returning
- */
-void Locker::handle_client_caps(MClientCaps *m)
+void Locker::handle_client_caps(const MClientCaps::const_ref &m)
 {
   client_t client = m->get_source().num();
   snapid_t follows = m->get_snap_follows();
+  auto op = m->get_op();
+  auto dirty = m->get_dirty();
   dout(7) << "handle_client_caps "
-         << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
          << " on " << m->get_ino()
          << " tid " << m->get_client_tid() << " follows " << follows
-         << " op " << ceph_cap_op_name(m->get_op()) << dendl;
+         << " op " << ceph_cap_op_name(op)
+         << " flags 0x" << std::hex << m->flags << std::dec << dendl;
 
   Session *session = mds->get_session(m);
   if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
     if (!session) {
       dout(5) << " no session, dropping " << *m << dendl;
-      m->put();
       return;
     }
     if (session->is_closed() ||
        session->is_closing() ||
        session->is_killing()) {
       dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
-      m->put();
       return;
     }
-    if (mds->is_reconnect() &&
-       m->get_dirty() && m->get_client_tid() > 0 &&
+    if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
+       dirty && m->get_client_tid() > 0 &&
        !session->have_completed_flush(m->get_client_tid())) {
-      mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
+      mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
+                                         op == CEPH_CAP_OP_FLUSHSNAP);
     }
     mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
     return;
@@ -2699,25 +2694,21 @@ void Locker::handle_client_caps(MClientCaps *m)
       session->have_completed_flush(m->get_client_tid())) {
     dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
            << " for client." << client << dendl;
-    MClientCaps *ack;
-    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
-                           m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+    MClientCaps::ref ack;
+    if (op == CEPH_CAP_OP_FLUSHSNAP) {
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
     } else {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
-                           m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
-                           mds->get_osd_epoch_barrier());
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
     }
     ack->set_snap_follows(follows);
     ack->set_client_tid(m->get_client_tid());
     mds->send_message_client_counted(ack, m->get_connection());
-    if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
-      m->put();
+    if (op == CEPH_CAP_OP_FLUSHSNAP) {
       return;
     } else {
       // fall-thru because the message may release some caps
-      m->clear_dirty();
-      m->set_op(CEPH_CAP_OP_UPDATE);
+      dirty = false;
+      op = CEPH_CAP_OP_UPDATE;
     }
   }
 
@@ -2727,11 +2718,11 @@ void Locker::handle_client_caps(MClientCaps *m)
       mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
 
       if (session->get_num_trim_flushes_warnings() > 0 &&
-         session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
+         session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
        session->reset_num_trim_flushes_warnings();
     } else {
       if (session->get_num_completed_flushes() >=
-         (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
+         (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
        session->inc_num_trim_flushes_warnings();
        stringstream ss;
        ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
@@ -2762,7 +2753,6 @@ void Locker::handle_client_caps(MClientCaps *m)
      *   - mds receives cap messages from client
      */
     dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
-    m->put();
     return;
   }
 
@@ -2782,10 +2772,9 @@ void Locker::handle_client_caps(MClientCaps *m)
   cap = head_in->get_client_cap(client);
   if (!cap) {
     dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
-    m->put();
     return;
   }  
-  assert(cap);
+  ceph_assert(cap);
 
   // freezing|frozen?
   if (should_defer_client_cap_frozen(head_in)) {
@@ -2796,11 +2785,10 @@ void Locker::handle_client_caps(MClientCaps *m)
   if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
     dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
            << ", dropping" << dendl;
-    m->put();
     return;
   }
 
-  int op = m->get_op();
+  bool need_unpin = false;
 
   // flushsnap?
   if (op == CEPH_CAP_OP_FLUSHSNAP) {
@@ -2813,6 +2801,13 @@ void Locker::handle_client_caps(MClientCaps *m)
     snapid_t snap = realm->get_snap_following(follows);
     dout(10) << "  flushsnap follows " << follows << " -> snap " << snap << dendl;
 
+    auto p = head_in->client_need_snapflush.begin();
+    if (p != head_in->client_need_snapflush.end() && p->first < snap) {
+      head_in->auth_pin(this); // prevent subtree frozen
+      need_unpin = true;
+      _do_null_snapflush(head_in, client, snap);
+    }
+
     CInode *in = head_in;
     if (snap != CEPH_NOSNAP) {
       in = mdcache->pick_inode_snap(head_in, snap - 1);
@@ -2823,9 +2818,9 @@ void Locker::handle_client_caps(MClientCaps *m)
     // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
     // other cap ops.  (except possibly duplicate FLUSHSNAP requests, but worst
     // case we get a dup response, so whatever.)
-    MClientCaps *ack = 0;
-    if (m->get_dirty()) {
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+    MClientCaps::ref ack;
+    if (dirty) {
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
       ack->set_snap_follows(follows);
       ack->set_client_tid(m->get_client_tid());
       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
@@ -2840,10 +2835,8 @@ void Locker::handle_client_caps(MClientCaps *m)
       // this cap now follows a later snap (i.e. the one initiating this flush, or later)
       if (in == head_in)
        cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
-      else if (head_in->client_need_snapflush.begin()->first < snap)
-       _do_null_snapflush(head_in, client, snap);
    
-      _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
+      _do_snap_update(in, snap, dirty, follows, client, m, ack);
 
       if (in != head_in)
        head_in->remove_need_snapflush(in, snap, client);
@@ -2863,17 +2856,17 @@ void Locker::handle_client_caps(MClientCaps *m)
       in = mdcache->pick_inode_snap(head_in, follows);
       // intermediate snap inodes
       while (in != head_in) {
-       assert(in->last != CEPH_NOSNAP);
-       if (in->is_auth() && m->get_dirty()) {
+       ceph_assert(in->last != CEPH_NOSNAP);
+       if (in->is_auth() && dirty) {
          dout(10) << " updating intermediate snapped inode " << *in << dendl;
-         _do_cap_update(in, NULL, m->get_dirty(), follows, m);
+         _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref());
        }
        in = mdcache->pick_inode_snap(head_in, in->last);
       }
     }
  
     // head inode, and cap
-    MClientCaps *ack = 0;
+    MClientCaps::ref ack;
 
     int caps = m->get_caps();
     if (caps & ~cap->issued()) {
@@ -2884,7 +2877,7 @@ void Locker::handle_client_caps(MClientCaps *m)
     cap->confirm_receipt(m->get_seq(), caps);
     dout(10) << " follows " << follows
             << " retains " << ccap_string(m->get_caps())
-            << " dirty " << ccap_string(m->get_dirty())
+            << " dirty " << ccap_string(dirty)
             << " on " << *in << dendl;
 
 
@@ -2896,24 +2889,33 @@ void Locker::handle_client_caps(MClientCaps *m)
     //  released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
     //  update/release).
     if (!head_in->client_need_snapflush.empty()) {
-      if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
+      if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
+         !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
+       head_in->auth_pin(this); // prevent subtree frozen
+       need_unpin = true;
        _do_null_snapflush(head_in, client);
       } else {
        dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
       }
     }
-    
-    if (m->get_dirty() && in->is_auth()) {
-      dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty()) 
+
+    bool need_snapflush = cap->need_snapflush();
+    if (dirty && in->is_auth()) {
+      dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
              << " seq " << m->get_seq() << " on " << *in << dendl;
-      ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
-                           m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+      ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
+          m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
       ack->set_client_tid(m->get_client_tid());
       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+
+      // client flushes and releases caps at the same time. make sure MDCache::cow_inode()
+      // properly setup CInode::client_need_snapflush
+      if ((dirty & ~cap->issued()) && !need_snapflush)
+       cap->mark_needsnapflush();
     }
 
     // filter wanted based on what we could ever give out (given auth/replica status)
-    bool need_flush = m->flags & CLIENT_CAPS_SYNC;
+    bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
     int new_wanted = m->get_wanted();
     if (new_wanted != cap->wanted()) {
       if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
@@ -2923,10 +2925,15 @@ void Locker::handle_client_caps(MClientCaps *m)
 
       adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
     }
-      
-    if (in->is_auth() &&
-       _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
-      // updated
+
+    bool updated = in->is_auth() &&
+                  _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush);
+
+    if (cap->need_snapflush() &&
+       (!need_snapflush || !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)))
+      cap->clear_needsnapflush();
+
+    if (updated) {
       eval(in, CEPH_CAP_LOCKS);
 
       if (!need_flush && (cap->wanted() & ~cap->pending()))
@@ -2942,7 +2949,6 @@ void Locker::handle_client_caps(MClientCaps *m)
 
       if (cap->get_last_seq() == 0 &&
          (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
-       cap->issue_norevoke(cap->issued());
        share_inode_max_size(in, cap);
       }
     }
@@ -2952,7 +2958,8 @@ void Locker::handle_client_caps(MClientCaps *m)
   }
 
  out:
-  m->put();
+  if (need_unpin)
+    head_in->auth_unpin(this);
 }
 
 
@@ -2970,7 +2977,7 @@ public:
 };
 
 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
-                                        boost::string_view dname)
+                                        std::string_view dname)
 {
   inodeno_t ino = (uint64_t)item.ino;
   uint64_t cap_id = item.cap_id;
@@ -3100,7 +3107,7 @@ void Locker::kick_cap_releases(MDRequestRef& mdr)
 /**
  * m and ack might be NULL, so don't dereference them unless dirty != 0
  */
-void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
+void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const MClientCaps::const_ref &m, const MClientCaps::ref &ack)
 {
   dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
           << " follows " << follows << " snap " << snap
@@ -3158,8 +3165,8 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
     dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
            << " len " << m->xattrbl.length() << dendl;
     i->xattr_version = m->head.xattr_version;
-    bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*px, p);
+    auto p = m->xattrbl.cbegin();
+    decode(*px, p);
   }
 
   {
@@ -3184,17 +3191,17 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
     le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
                                  ack->get_oldest_flush_tid());
 
-  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
-                                                             client, ack));
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
+                                                             ack, client));
 }
 
-void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
+void Locker::_update_cap_fields(CInode *in, int dirty, const MClientCaps::const_ref &m, CInode::mempool_inode *pi)
 {
   if (dirty == 0)
     return;
 
   /* m must be valid if there are dirty caps */
-  assert(m);
+  ceph_assert(m);
   uint64_t features = m->get_connection()->get_features();
 
   if (m->get_ctime() > pi->ctime) {
@@ -3291,14 +3298,14 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::m
  */
 bool Locker::_do_cap_update(CInode *in, Capability *cap,
                            int dirty, snapid_t follows,
-                           MClientCaps *m, MClientCaps *ack,
+                           const MClientCaps::const_ref &m, const MClientCaps::ref &ack,
                            bool *need_flush)
 {
   dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
           << " issued " << ccap_string(cap ? cap->issued() : 0)
           << " wanted " << ccap_string(cap ? cap->wanted() : 0)
           << " on " << *in << dendl;
-  assert(in->is_auth());
+  ceph_assert(in->is_auth());
   client_t client = m->get_source().num();
   CInode::mempool_inode *latest = in->get_projected_inode();
 
@@ -3344,7 +3351,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
        bool need_issue = false;
        if (cap)
          cap->inc_suppress();
-       if (in->mds_caps_wanted.empty() &&
+       if (in->get_mds_caps_wanted().empty() &&
            (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
          if (in->filelock.get_state() != LOCK_EXCL)
            file_excl(&in->filelock, &need_issue);
@@ -3369,19 +3376,19 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
 
   if (m->flockbl.length()) {
     int32_t num_locks;
-    bufferlist::iterator bli = m->flockbl.begin();
-    ::decode(num_locks, bli);
+    auto bli = m->flockbl.cbegin();
+    decode(num_locks, bli);
     for ( int i=0; i < num_locks; ++i) {
       ceph_filelock decoded_lock;
-      ::decode(decoded_lock, bli);
+      decode(decoded_lock, bli);
       in->get_fcntl_lock_state()->held_locks.
        insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
       ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
     }
-    ::decode(num_locks, bli);
+    decode(num_locks, bli);
     for ( int i=0; i < num_locks; ++i) {
       ceph_filelock decoded_lock;
-      ::decode(decoded_lock, bli);
+      decode(decoded_lock, bli);
       in->get_flock_lock_state()->held_locks.
        insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
       ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
@@ -3442,8 +3449,8 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   if (xattr) {
     dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
     pi.inode.xattr_version = m->head.xattr_version;
-    bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*pi.xattrs, p);
+    auto p = m->xattrbl.cbegin();
+    decode(*pi.xattrs, p);
     wrlock_force(&in->xattrlock, mut);
   }
   
@@ -3456,9 +3463,13 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
     le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
                                  ack->get_oldest_flush_tid());
 
-  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
-                                                             change_max, !!cap,
-                                                             client, ack));
+  unsigned update_flags = 0;
+  if (change_max)
+    update_flags |= UPDATE_SHAREMAX;
+  if (cap)
+    update_flags |= UPDATE_NEEDSISSUE;
+  mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
+                                                             ack, client));
   if (need_flush && !*need_flush &&
       ((change_max && new_max) || // max INCREASE
        _need_flush_mdlog(in, dirty)))
@@ -3467,8 +3478,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   return true;
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_client_cap_release(MClientCapRelease *m)
+void Locker::handle_client_cap_release(const MClientCapRelease::const_ref &m)
 {
   client_t client = m->get_source().num();
   dout(10) << "handle_client_cap_release " << *m << dendl;
@@ -3490,15 +3500,13 @@ void Locker::handle_client_cap_release(MClientCapRelease *m)
 
   Session *session = mds->get_session(m);
 
-  for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
-    _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
+  for (const auto &cap : m->caps) {
+    _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
   }
 
   if (session) {
     session->notify_cap_release(m->caps.size());
   }
-
-  m->put();
 }
 
 class C_Locker_RetryCapRelease : public LockerContext {
@@ -3627,20 +3635,39 @@ void Locker::caps_tick()
 {
   utime_t now = ceph_clock_now();
 
+  if (!need_snapflush_inodes.empty()) {
+    // snap inodes that needs flush are auth pinned, they affect
+    // subtree/difrarg freeze.
+    utime_t cutoff = now;
+    cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
+
+    CInode *last = need_snapflush_inodes.back();
+    while (!need_snapflush_inodes.empty()) {
+      CInode *in = need_snapflush_inodes.front();
+      if (in->last_dirstat_prop >= cutoff)
+       break;
+      in->item_caps.remove_myself();
+      snapflush_nudge(in);
+      if (in == last)
+       break;
+    }
+  }
+
   dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
 
-  int i = 0;
+  now = ceph_clock_now();
+  int n = 0;
   for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
     Capability *cap = *p;
 
     utime_t age = now - cap->get_last_revoke_stamp();
-    dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+    dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
     if (age <= mds->mdsmap->get_session_timeout()) {
       dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
       break;
     } else {
-      ++i;
-      if (i > MAX_WARN_CAPS) {
+      ++n;
+      if (n > MAX_WARN_CAPS) {
         dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
           << "revoking, ignoring subsequent caps" << dendl;
         break;
@@ -3656,23 +3683,22 @@ void Locker::caps_tick()
       mds->clog->warn() << ss.str();
       dout(20) << __func__ << " " << ss.str() << dendl;
     } else {
-      dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+      dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
     }
   }
 }
 
 
-void Locker::handle_client_lease(MClientLease *m)
+void Locker::handle_client_lease(const MClientLease::const_ref &m)
 {
   dout(10) << "handle_client_lease " << *m << dendl;
 
-  assert(m->get_source().is_client());
+  ceph_assert(m->get_source().is_client());
   client_t client = m->get_source().num();
 
   CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
   if (!in) {
     dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
-    m->put();
     return;
   }
   CDentry *dn = 0;
@@ -3683,7 +3709,6 @@ void Locker::handle_client_lease(MClientLease *m)
     dn = dir->lookup(m->dname);
   if (!dn) {
     dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
-    m->put();
     return;
   }
   dout(10) << " on " << *dn << dendl;
@@ -3692,7 +3717,6 @@ void Locker::handle_client_lease(MClientLease *m)
   ClientLease *l = dn->get_client_lease(client);
   if (!l) {
     dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
-    m->put();
     return;
   } 
 
@@ -3706,7 +3730,6 @@ void Locker::handle_client_lease(MClientLease *m)
              << " on " << *dn << dendl;
       dn->remove_client_lease(l, this);
     }
-    m->put();
     break;
 
   case CEPH_MDS_LEASE_RENEW:
@@ -3714,16 +3737,17 @@ void Locker::handle_client_lease(MClientLease *m)
       dout(7) << "handle_client_lease client." << client << " renew on " << *dn
              << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
       if (dn->lock.can_lease(client)) {
+        auto reply = MClientLease::create(*m);
        int pool = 1;   // fixme.. do something smart!
-       m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
-       m->h.seq = ++l->seq;
-       m->clear_payload();
+       reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
+       reply->h.seq = ++l->seq;
+       reply->clear_payload();
 
        utime_t now = ceph_clock_now();
        now += mdcache->client_lease_durations[pool];
        mdcache->touch_client_lease(l, pool, now);
 
-       mds->send_message_client_counted(m, m->get_connection());
+       mds->send_message_client_counted(reply, m->get_connection());
       }
     }
     break;
@@ -3751,20 +3775,17 @@ void Locker::issue_client_lease(CDentry *dn, client_t client,
     now += mdcache->client_lease_durations[pool];
     mdcache->touch_client_lease(l, pool, now);
 
-    LeaseStat e;
-    e.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
-    e.seq = ++l->seq;
-    e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
-    ::encode(e, bl);
-    dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
+    LeaseStat lstat;
+    lstat.mask = 1 | CEPH_LOCK_DN;  // old and new bit values
+    lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
+    lstat.seq = ++l->seq;
+    encode_lease(bl, session->info, lstat);
+    dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
             << " on " << *dn << dendl;
   } else {
     // null lease
-    LeaseStat e;
-    e.mask = 0;
-    e.seq = 0;
-    e.duration_ms = 0;
-    ::encode(e, bl);
+    LeaseStat lstat;
+    encode_lease(bl, session->info, lstat);
     dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
   }
 }
@@ -3780,27 +3801,38 @@ void Locker::revoke_client_leases(SimpleLock *lock)
     ClientLease *l = p->second;
     
     n++;
-    assert(lock->get_type() == CEPH_LOCK_DN);
+    ceph_assert(lock->get_type() == CEPH_LOCK_DN);
 
     CDentry *dn = static_cast<CDentry*>(lock->get_parent());
     int mask = 1 | CEPH_LOCK_DN; // old and new bits
     
     // i should also revoke the dir ICONTENT lease, if they have it!
     CInode *diri = dn->get_dir()->get_inode();
-    mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
-                                             mask,
-                                             diri->ino(),
-                                             diri->first, CEPH_NOSNAP,
-                                             dn->get_name()),
-                            l->client);
+    auto lease = MClientLease::create(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
+    mds->send_message_client_counted(lease, l->client);
   }
 }
 
-
+void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
+                         const LeaseStat& ls)
+{
+  if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
+    ENCODE_START(1, 1, bl);
+    encode(ls.mask, bl);
+    encode(ls.duration_ms, bl);
+    encode(ls.seq, bl);
+    ENCODE_FINISH(bl);
+  }
+  else {
+    encode(ls.mask, bl);
+    encode(ls.duration_ms, bl);
+    encode(ls.seq, bl);
+  }
+}
 
 // locks ----------------------------------------------------------------
 
-SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info) 
+SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info) 
 {
   switch (lock_type) {
   case CEPH_LOCK_DN:
@@ -3860,16 +3892,14 @@ SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
   return 0;  
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_lock(MLock *m)
+void Locker::handle_lock(const MLock::const_ref &m)
 {
   // nobody should be talking to us during recovery.
-  assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+  ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
 
   SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
   if (!lock) {
     dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
-    m->put();
     return;
   }
 
@@ -3909,7 +3939,7 @@ void Locker::handle_lock(MLock *m)
 
 /** This function may take a reference to m if it needs one, but does
  * not put references. */
-void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
+void Locker::handle_reqrdlock(SimpleLock *lock, const MLock::const_ref &m)
 {
   MDSCacheObject *parent = lock->get_parent();
   if (parent->is_auth() &&
@@ -3917,13 +3947,13 @@ void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
       !parent->is_frozen()) {
     dout(7) << "handle_reqrdlock got rdlock request on " << *lock
            << " on " << *parent << dendl;
-    assert(parent->is_auth()); // replica auth pinned if they're doing this!
+    ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
     if (lock->is_stable()) {
       simple_sync(lock);
     } else {
       dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
       lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
-                       new C_MDS_RetryMessage(mds, m->get()));
+                       new C_MDS_RetryMessage(mds, m));
     }
   } else {
     dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
@@ -3932,8 +3962,7 @@ void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
   }
 }
 
-/* This function DOES put the passed message before returning */
-void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
+void Locker::handle_simple_lock(SimpleLock *lock, const MLock::const_ref &m)
 {
   int from = m->get_asker();
   
@@ -3944,7 +3973,6 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
     if (lock->get_parent()->is_rejoining()) {
       dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
              << ", dropping " << *m << dendl;
-      m->put();
       return;
     }
   }
@@ -3952,14 +3980,14 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
   switch (m->get_action()) {
     // -- replica --
   case LOCK_AC_SYNC:
-    assert(lock->get_state() == LOCK_LOCK);
+    ceph_assert(lock->get_state() == LOCK_LOCK);
     lock->decode_locked_state(m->get_data());
     lock->set_state(LOCK_SYNC);
     lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
     break;
     
   case LOCK_AC_LOCK:
-    assert(lock->get_state() == LOCK_SYNC);
+    ceph_assert(lock->get_state() == LOCK_SYNC);
     lock->set_state(LOCK_SYNC_LOCK);
     if (lock->is_leased())
       revoke_client_leases(lock);
@@ -3971,9 +3999,9 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
 
     // -- auth --
   case LOCK_AC_LOCKACK:
-    assert(lock->get_state() == LOCK_SYNC_LOCK ||
+    ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
           lock->get_state() == LOCK_SYNC_EXCL);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     if (lock->is_gathering()) {
@@ -3991,8 +4019,6 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
     break;
 
   }
-
-  m->put();
 }
 
 /* unused, currently.
@@ -4040,14 +4066,15 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
 {
   dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
 
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_parent()->is_freezing_or_frozen()) {
-    // dentry lock in unreadable state can block path traverse
-    if ((lock->get_type() != CEPH_LOCK_DN ||
+    // dentry/snap lock in unreadable state can block path traverse
+    if ((lock->get_type() != CEPH_LOCK_DN &&
+        lock->get_type() != CEPH_LOCK_ISNAP) ||
         lock->get_state() == LOCK_SYNC ||
-        lock->get_parent()->is_frozen()))
+        lock->get_parent()->is_frozen())
       return;
   }
 
@@ -4061,7 +4088,7 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
 
   CInode *in = 0;
   int wanted = 0;
-  if (lock->get_type() != CEPH_LOCK_DN) {
+  if (lock->get_cap_shift()) {
     in = static_cast<CInode*>(lock->get_parent());
     in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
   }
@@ -4092,8 +4119,8 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4133,7 +4160,7 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
     
     bool need_recover = false;
     if (lock->get_type() == CEPH_LOCK_IFILE) {
-      assert(in);
+      ceph_assert(in);
       if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
         mds->mdcache->queue_file_recover(in);
        need_recover = true;
@@ -4175,8 +4202,8 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4230,9 +4257,9 @@ void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
-  assert(lock->get_state() != LOCK_LOCK);
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
+  ceph_assert(lock->get_state() != LOCK_LOCK);
   
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4270,7 +4297,7 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 
   bool need_recover = false;
   if (lock->get_type() == CEPH_LOCK_IFILE) {
-    assert(in);
+    ceph_assert(in);
     if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
       mds->mdcache->queue_file_recover(in);
       need_recover = true;
@@ -4316,9 +4343,9 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 void Locker::simple_xlock(SimpleLock *lock)
 {
   dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->get_parent()->is_auth());
   //assert(lock->is_stable());
-  assert(lock->get_state() != LOCK_XLOCK);
+  ceph_assert(lock->get_state() != LOCK_XLOCK);
   
   CInode *in = 0;
   if (lock->get_cap_shift())
@@ -4419,8 +4446,7 @@ void Locker::scatter_writebehind(ScatterLock *lock)
 
   // forcefully take a wrlock
   lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
 
   in->pre_cow_old_inode();  // avoid cow mayhem
 
@@ -4475,8 +4501,8 @@ void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
 {
   dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
 
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_parent()->is_freezing_or_frozen()) {
     dout(20) << "  freezing|frozen" << dendl;
@@ -4552,7 +4578,7 @@ void Locker::mark_updated_scatterlock(ScatterLock *lock)
  * we need to lock|scatter in order to push fnode changes into the
  * inode.dirstat.
  */
-void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
+void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
 {
   CInode *p = static_cast<CInode *>(lock->get_parent());
 
@@ -4636,7 +4662,7 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
          // handle_file_lock due to AC_NUDGE, because the rest of the
          // time we are replicated or have dirty data and won't get
          // called.  bailing here avoids an infinite loop.
-         assert(!c); 
+         ceph_assert(!c); 
          break;
        }
       } else {
@@ -4651,9 +4677,9 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
             << *lock << " on " << *p << dendl;
     // request unscatter?
     mds_rank_t auth = lock->get_parent()->authority().first;
-    if (!mds->is_cluster_degraded() ||
-       mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
-      mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+    if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+      mds->send_message_mds(MLock::create(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+    }
 
     // wait...
     if (c)
@@ -4683,7 +4709,7 @@ void Locker::scatter_tick()
               << *lock << " " << *lock->get_parent() << dendl;
       continue;
     }
-    if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
+    if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
       break;
     updated_scatterlocks.pop_front();
     scatter_nudge(lock, 0);
@@ -4696,10 +4722,10 @@ void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
 {
   dout(10) << "scatter_tempsync " << *lock
           << " on " << *lock->get_parent() << dendl;
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
-  assert(0 == "not fully implemented, at least not for filelock");
+  ceph_abort_msg("not fully implemented, at least not for filelock");
 
   CInode *in = static_cast<CInode *>(lock->get_parent());
 
@@ -4756,12 +4782,12 @@ void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
   dout(7) << "local_wrlock_grab  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   
-  assert(lock->get_parent()->is_auth());
-  assert(lock->can_wrlock());
-  assert(!mut->wrlocks.count(lock));
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->can_wrlock());
   lock->get_wrlock(mut->get_client());
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+
+  auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+  ceph_assert(ret.second);
 }
 
 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
@@ -4769,12 +4795,11 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
   dout(7) << "local_wrlock_start  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   
-  assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->get_parent()->is_auth());
   if (lock->can_wrlock()) {
-    assert(!mut->wrlocks.count(lock));
     lock->get_wrlock(mut->get_client());
-    mut->wrlocks.insert(lock);
-    mut->locks.insert(lock);
+    auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+    ceph_assert(it->is_wrlock());
     return true;
   } else {
     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
@@ -4782,13 +4807,14 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
   }
 }
 
-void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_wrlock());
+  LocalLock *lock = static_cast<LocalLock*>(it->lock);
   dout(7) << "local_wrlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_wrlock();
-  mut->wrlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
   if (lock->get_num_wrlocks() == 0) {
     lock->finish_waiters(SimpleLock::WAIT_STABLE |
                          SimpleLock::WAIT_WR |
@@ -4801,25 +4827,25 @@ bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
   dout(7) << "local_xlock_start  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   
-  assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->get_parent()->is_auth());
   if (!lock->can_xlock_local()) {
     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
     return false;
   }
 
   lock->get_xlock(mut, mut->get_client());
-  mut->xlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
   return true;
 }
 
-void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_xlock());
+  LocalLock *lock = static_cast<LocalLock*>(it->lock);
   dout(7) << "local_xlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
 
   lock->finish_waiters(SimpleLock::WAIT_STABLE | 
                       SimpleLock::WAIT_WR | 
@@ -4843,8 +4869,8 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
          << "  filelock=" << *lock << " on " << *lock->get_parent()
          << dendl;
 
-  assert(lock->get_parent()->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(lock->get_parent()->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_parent()->is_freezing_or_frozen())
     return;
@@ -4940,8 +4966,8 @@ void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
   dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
 
   CInode *in = static_cast<CInode*>(lock->get_parent());
-  assert(in->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(in->is_auth());
+  ceph_assert(lock->is_stable());
 
   if (lock->get_state() == LOCK_LOCK) {
     in->start_scatter(lock);
@@ -5032,10 +5058,10 @@ void Locker::file_excl(ScatterLock *lock, bool *need_issue)
   CInode *in = static_cast<CInode*>(lock->get_parent());
   dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;  
 
-  assert(in->is_auth());
-  assert(lock->is_stable());
+  ceph_assert(in->is_auth());
+  ceph_assert(lock->is_stable());
 
-  assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
+  ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
         (lock->get_state() == LOCK_XSYN));  // must do xsyn -> excl -> <anything else>
   
   switch (lock->get_state()) {
@@ -5095,8 +5121,8 @@ void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
 {
   dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
   CInode *in = static_cast<CInode *>(lock->get_parent());
-  assert(in->is_auth());
-  assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
+  ceph_assert(in->is_auth());
+  ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
 
   switch (lock->get_state()) {
   case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
@@ -5133,9 +5159,9 @@ void Locker::file_recover(ScatterLock *lock)
   CInode *in = static_cast<CInode *>(lock->get_parent());
   dout(7) << "file_recover " << *lock << " on " << *in << dendl;
 
-  assert(in->is_auth());
+  ceph_assert(in->is_auth());
   //assert(lock->is_stable());
-  assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
+  ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
 
   int gather = 0;
   
@@ -5162,8 +5188,7 @@ void Locker::file_recover(ScatterLock *lock)
 
 
 // messenger
-/* This function DOES put the passed message before returning */
-void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
+void Locker::handle_file_lock(ScatterLock *lock, const MLock::const_ref &m)
 {
   CInode *in = static_cast<CInode*>(lock->get_parent());
   int from = m->get_asker();
@@ -5172,12 +5197,11 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     if (in->is_rejoining()) {
       dout(7) << "handle_file_lock still rejoining " << *in
              << ", dropping " << *m << dendl;
-      m->put();
       return;
     }
   }
 
-  dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
+  dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
          << " on " << *lock
          << " from mds." << from << " " 
          << *in << dendl;
@@ -5187,7 +5211,7 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
   switch (m->get_action()) {
     // -- replica --
   case LOCK_AC_SYNC:
-    assert(lock->get_state() == LOCK_LOCK ||
+    ceph_assert(lock->get_state() == LOCK_LOCK ||
           lock->get_state() == LOCK_MIX ||
           lock->get_state() == LOCK_MIX_SYNC2);
     
@@ -5235,7 +5259,7 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     break;
     
   case LOCK_AC_MIX:
-    assert(lock->get_state() == LOCK_SYNC ||
+    ceph_assert(lock->get_state() == LOCK_SYNC ||
            lock->get_state() == LOCK_LOCK ||
           lock->get_state() == LOCK_SYNC_MIX2);
     
@@ -5261,14 +5285,14 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
 
     // -- auth --
   case LOCK_AC_LOCKACK:
-    assert(lock->get_state() == LOCK_SYNC_LOCK ||
+    ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
            lock->get_state() == LOCK_MIX_LOCK ||
            lock->get_state() == LOCK_MIX_LOCK2 ||
            lock->get_state() == LOCK_MIX_EXCL ||
            lock->get_state() == LOCK_SYNC_EXCL ||
            lock->get_state() == LOCK_SYNC_MIX ||
           lock->get_state() == LOCK_MIX_TSYN);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     if (lock->get_state() == LOCK_MIX_LOCK ||
@@ -5292,8 +5316,8 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     break;
     
   case LOCK_AC_SYNCACK:
-    assert(lock->get_state() == LOCK_MIX_SYNC);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     lock->decode_locked_state(m->get_data());
@@ -5309,8 +5333,8 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
     break;
 
   case LOCK_AC_MIXACK:
-    assert(lock->get_state() == LOCK_SYNC_MIX);
-    assert(lock->is_gathering(from));
+    ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
+    ceph_assert(lock->is_gathering(from));
     lock->remove_gather(from);
     
     if (lock->is_gathering()) {
@@ -5383,12 +5407,4 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
   default:
     ceph_abort();
   }  
-  
-  m->put();
 }
-
-
-
-
-
-