]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Locker.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mds / Locker.cc
index a0ccf96016be557ca2ed2661894243659344e9da..f9858b573f5f660f306c58127b2ad5c1c02c6dba 100644 (file)
  * 
  */
 
+#include <boost/utility/string_view.hpp>
 
 #include "MDSRank.h"
 #include "MDCache.h"
 #include "Locker.h"
+#include "MDBalancer.h"
 #include "CInode.h"
 #include "CDir.h"
 #include "CDentry.h"
@@ -229,13 +231,51 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   // xlocks
   for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
-    dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
-    sorted.insert(*p);
-    mustpin.insert((*p)->get_parent());
+    SimpleLock *lock = *p;
+
+    if ((lock->get_type() == CEPH_LOCK_ISNAP ||
+         lock->get_type() == CEPH_LOCK_IPOLICY) &&
+       mds->is_cluster_degraded() &&
+       mdr->is_master() &&
+       !mdr->is_queued_for_replay()) {
+      // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
+      // get processed in proper order.
+      bool wait = false;
+      if (lock->get_parent()->is_auth()) {
+       if (!mdr->locks.count(lock)) {
+         set<mds_rank_t> ls;
+         lock->get_parent()->list_replicas(ls);
+         for (auto m : ls) {
+           if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
+             wait = true;
+             break;
+           }
+         }
+       }
+      } else {
+       // if the lock is the latest locked one, it's possible that slave mds got the lock
+       // while there are recovering mds.
+       if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
+         wait = true;
+      }
+      if (wait) {
+       dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
+                << ", waiting for cluster recovered" << dendl;
+       mds->locker->drop_locks(mdr.get(), NULL);
+       mdr->drop_local_auth_pins();
+       mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+       return false;
+      }
+    }
+
+    dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
+
+    sorted.insert(lock);
+    mustpin.insert(lock->get_parent());
 
     // augment xlock with a versionlock?
     if ((*p)->get_type() == CEPH_LOCK_DN) {
-      CDentry *dn = (CDentry*)(*p)->get_parent();
+      CDentry *dn = (CDentry*)lock->get_parent();
       if (!dn->is_auth())
        continue;
 
@@ -252,9 +292,9 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
        sorted.insert(&dn->versionlock);
       }
     }
-    if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
+    if (lock->get_type() > CEPH_LOCK_IVERSION) {
       // inode version lock?
-      CInode *in = (CInode*)(*p)->get_parent();
+      CInode *in = (CInode*)lock->get_parent();
       if (!in->is_auth())
        continue;
       if (mdr->is_master()) {
@@ -547,33 +587,20 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
       }
     } else {
       assert(mdr->is_master());
-      if ((*p)->is_scatterlock()) {
-       ScatterLock *slock = static_cast<ScatterLock *>(*p);
-       if (slock->is_rejoin_mix()) {
-         // If there is a recovering mds who replcated an object when it failed
-         // and scatterlock in the object was in MIX state, It's possible that
-         // the recovering mds needs to take wrlock on the scatterlock when it
-         // replays unsafe requests. So this mds should delay taking rdlock on
-         // the scatterlock until the recovering mds finishes replaying unsafe.
-         // Otherwise unsafe requests may get replayed after current request.
-         //
-         // For example:
-         // The recovering mds is auth mds of a dirfrag, this mds is auth mds
-         // of correspinding inode. when 'rm -rf' the direcotry, this mds should
-         // delay the rmdir request until the recovering mds has replayed unlink
-         // requests.
-         if (mds->is_cluster_degraded()) {
-           if (!mdr->is_replay()) {
-             drop_locks(mdr.get());
-             mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-             dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
-                      << ", waiting for cluster recovered" << dendl;
-             marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
-             return false;
-           }
-         } else {
-           slock->clear_rejoin_mix();
+      if ((*p)->needs_recover()) {
+       if (mds->is_cluster_degraded()) {
+         if (!mdr->is_queued_for_replay()) {
+           // see comments in SimpleLock::set_state_rejoin() and
+           // ScatterLock::encode_state_for_rejoin()
+           drop_locks(mdr.get());
+           mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+           dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
+                    << ", waiting for cluster recovered" << dendl;
+           marker.message = "rejoin recovering lock, waiting for cluster recovered";
+           return false;
          }
+       } else {
+         (*p)->clear_need_recover();
        }
       }
 
@@ -762,19 +789,26 @@ void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
     issue_caps_set(*pneed_issue);
 }
 
-void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
 {
-  set<CInode*> my_need_issue;
-  if (!pneed_issue)
-    pneed_issue = &my_need_issue;
+  set<CInode*> need_issue;
 
-  _drop_rdlocks(mut, pneed_issue);
+  for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
+    SimpleLock *lock = *p;
+    ++p;
+    // make later mksnap/setlayout (at other mds) wait for this unsafe request
+    if (lock->get_type() == CEPH_LOCK_ISNAP ||
+       lock->get_type() == CEPH_LOCK_IPOLICY)
+      continue;
+    bool ni = false;
+    rdlock_finish(lock, mut, &ni);
+    if (ni)
+      need_issue.insert(static_cast<CInode*>(lock->get_parent()));
+  }
 
-  if (pneed_issue == &my_need_issue)
-    issue_caps_set(*pneed_issue);
+  issue_caps_set(need_issue);
 }
 
-
 // generics
 
 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
@@ -926,6 +960,7 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<M
       case LOCK_TSYN_MIX:
       case LOCK_SYNC_MIX:
       case LOCK_EXCL_MIX:
+      case LOCK_XSYN_MIX:
        in->start_scatter(static_cast<ScatterLock *>(lock));
        if (lock->get_parent()->is_replicated()) {
          bufferlist softdata;
@@ -1006,15 +1041,15 @@ bool Locker::eval(CInode *in, int mask, bool caps_imported)
 
   // choose loner?
   if (in->is_auth() && in->is_head()) {
-    if (in->choose_ideal_loner() >= 0) {
-      if (in->try_set_loner()) {
-       dout(10) << "eval set loner to client." << in->get_loner() << dendl;
-       need_issue = true;
-       mask = -1;
-      } else
-       dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
-    } else
-      dout(10) << "eval doesn't want loner" << dendl;
+    client_t orig_loner = in->get_loner();
+    if (in->choose_ideal_loner()) {
+      dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
+      need_issue = true;
+      mask = -1;
+    } else if (in->get_wanted_loner() != in->get_loner()) {
+      dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
+      mask = -1;
+    }
   }
 
  retry:
@@ -1035,19 +1070,14 @@ bool Locker::eval(CInode *in, int mask, bool caps_imported)
 
   // drop loner?
   if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
-    dout(10) << "  trying to drop loner" << dendl;
     if (in->try_drop_loner()) {
-      dout(10) << "  dropped loner" << dendl;
       need_issue = true;
-
       if (in->get_wanted_loner() >= 0) {
-       if (in->try_set_loner()) {
-         dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
-         mask = -1;
-         goto retry;
-       } else {
-         dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
-       }
+       dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
+       bool ok = in->try_set_loner();
+       assert(ok);
+       mask = -1;
+       goto retry;
       }
     }
   }
@@ -1665,7 +1695,7 @@ void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue
   assert(!lock->is_stable());
   if (lock->get_num_rdlocks() == 0 &&
       lock->get_num_wrlocks() == 0 &&
-      lock->get_num_client_lease() == 0 &&
+      !lock->is_leased() &&
       lock->get_state() != LOCK_XLOCKSNAP &&
       lock->get_type() != CEPH_LOCK_DN) {
     CInode *in = static_cast<CInode*>(lock->get_parent());
@@ -1824,7 +1854,7 @@ void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bo
     dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
     // check for snap writeback completion
     bool gather = false;
-    compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
+    auto p = in->client_snap_caps.begin();
     while (p != in->client_snap_caps.end()) {
       SimpleLock *lock = in->get_lock(p->first);
       assert(lock);
@@ -1857,6 +1887,9 @@ void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bo
   }
   issue_caps_set(need_issue);
 
+  utime_t now = ceph_clock_now();
+  mds->balancer->hit_inode(now, in, META_POP_IWR);
+
   // auth unpin after issuing caps
   mut->cleanup();
 }
@@ -2256,22 +2289,22 @@ public:
   }
 };
 
-uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
+uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
 {
   uint64_t new_max = (size + 1) << 1;
   uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
   if (max_inc > 0) {
-    max_inc *= pi->get_layout_size_increment();
+    max_inc *= pi->layout.object_size;
     new_max = MIN(new_max, size + max_inc);
   }
   return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
 }
 
 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
-                                   map<client_t,client_writeable_range_t> *new_ranges,
+                                   CInode::mempool_inode::client_range_map *new_ranges,
                                    bool *max_increased)
 {
-  inode_t *latest = in->get_projected_inode();
+  auto latest = in->get_projected_inode();
   uint64_t ms;
   if(latest->has_layout()) {
     ms = calc_new_max_size(latest, size);
@@ -2310,8 +2343,8 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
   assert(in->is_auth());
   assert(in->is_file());
 
-  inode_t *latest = in->get_projected_inode();
-  map<client_t, client_writeable_range_t> new_ranges;
+  CInode::mempool_inode *latest = in->get_projected_inode();
+  CInode::mempool_inode::client_range_map new_ranges;
   uint64_t size = latest->size;
   bool update_size = new_size > 0;
   bool update_max = false;
@@ -2371,20 +2404,22 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
   MutationRef mut(new MutationImpl());
   mut->ls = mds->mdlog->get_current_segment();
     
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
 
   if (update_max) {
-    dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
-    pi->client_ranges = new_ranges;
+    dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
+    pi.inode.client_ranges = new_ranges;
   }
 
   if (update_size) {
-    dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
-    pi->size = new_size;
-    pi->rstat.rbytes = new_size;
-    dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
-    pi->mtime = new_mtime;
+    dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
+    pi.inode.size = new_size;
+    pi.inode.rstat.rbytes = new_size;
+    dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
+    pi.inode.mtime = new_mtime;
+    if (new_mtime > pi.inode.ctime)
+      pi.inode.ctime = pi.inode.rstat.rctime = new_mtime;
   }
 
   // use EOpen if the file is still open; otherwise, use EUpdate.
@@ -2537,31 +2572,14 @@ void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
   for (auto p = head_in->client_need_snapflush.begin();
        p != head_in->client_need_snapflush.end() && p->first < last; ) {
     snapid_t snapid = p->first;
-    set<client_t>& clients = p->second;
+    auto &clients = p->second;
     ++p;  // be careful, q loop below depends on this
 
     if (clients.count(client)) {
       dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
-      CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
-      if (!sin) {
-       // hrm, look forward until we find the inode. 
-       //  (we can only look it up by the last snapid it is valid for)
-       dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
-       for (compact_map<snapid_t, set<client_t> >::iterator q = p;  // p is already at next entry
-            q != head_in->client_need_snapflush.end();
-            ++q) {
-         dout(10) << " trying snapid " << q->first << dendl;
-         sin = mdcache->get_inode(head_in->ino(), q->first);
-         if (sin) {
-           assert(sin->first <= snapid);
-           break;
-         }
-         dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
-       }
-       if (!sin && head_in->is_multiversion())
-         sin = head_in;
-       assert(sin);
-      }
+      CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
+      assert(sin);
+      assert(sin->first <= snapid);
       _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
       head_in->remove_need_snapflush(sin, snapid, client);
     }
@@ -2592,9 +2610,7 @@ bool Locker::should_defer_client_cap_frozen(CInode *in)
  */
 void Locker::handle_client_caps(MClientCaps *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
   client_t client = m->get_source().num();
-
   snapid_t follows = m->get_snap_follows();
   dout(7) << "handle_client_caps "
          << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
@@ -2602,6 +2618,7 @@ void Locker::handle_client_caps(MClientCaps *m)
          << " tid " << m->get_client_tid() << " follows " << follows
          << " op " << ceph_cap_op_name(m->get_op()) << dendl;
 
+  Session *session = mds->get_session(m);
   if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
     if (!session) {
       dout(5) << " no session, dropping " << *m << dendl;
@@ -2696,29 +2713,21 @@ void Locker::handle_client_caps(MClientCaps *m)
     mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
   }
 
-  CInode *in = head_in;
-  if (follows > 0) {
-    in = mdcache->pick_inode_snap(head_in, follows);
-    if (in != head_in)
-      dout(10) << " head inode " << *head_in << dendl;
-  }
-  dout(10) << "  cap inode " << *in << dendl;
+  dout(10) << " head inode " << *head_in << dendl;
 
   Capability *cap = 0;
-  cap = in->get_client_cap(client);
-  if (!cap && in != head_in)
-    cap = head_in->get_client_cap(client);
+  cap = head_in->get_client_cap(client);
   if (!cap) {
-    dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
+    dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
     m->put();
     return;
   }  
   assert(cap);
 
   // freezing|frozen?
-  if (should_defer_client_cap_frozen(in)) {
-    dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
-    in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
+  if (should_defer_client_cap_frozen(head_in)) {
+    dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
+    head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
     return;
   }
   if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
@@ -2732,15 +2741,22 @@ void Locker::handle_client_caps(MClientCaps *m)
 
   // flushsnap?
   if (op == CEPH_CAP_OP_FLUSHSNAP) {
-    if (!in->is_auth()) {
-      dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
+    if (!head_in->is_auth()) {
+      dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
       goto out;
     }
 
-    SnapRealm *realm = in->find_snaprealm();
+    SnapRealm *realm = head_in->find_snaprealm();
     snapid_t snap = realm->get_snap_following(follows);
     dout(10) << "  flushsnap follows " << follows << " -> snap " << snap << dendl;
 
+    CInode *in = head_in;
+    if (snap != CEPH_NOSNAP) {
+      in = mdcache->pick_inode_snap(head_in, snap - 1);
+      if (in != head_in)
+       dout(10) << " snapped inode " << *in << dendl;
+    }
+
     // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
     // other cap ops.  (except possibly duplicate FLUSHSNAP requests, but worst
     // case we get a dup response, so whatever.)
@@ -2768,7 +2784,6 @@ void Locker::handle_client_caps(MClientCaps *m)
 
       if (in != head_in)
        head_in->remove_need_snapflush(in, snap, client);
-      
     } else {
       dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
       if (ack)
@@ -2780,14 +2795,18 @@ void Locker::handle_client_caps(MClientCaps *m)
   if (cap->get_cap_id() != m->get_cap_id()) {
     dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
   } else {
-    // intermediate snap inodes
-    while (in != head_in) {
-      assert(in->last != CEPH_NOSNAP);
-      if (in->is_auth() && m->get_dirty()) {
-       dout(10) << " updating intermediate snapped inode " << *in << dendl;
-       _do_cap_update(in, NULL, m->get_dirty(), follows, m);
+    CInode *in = head_in;
+    if (follows > 0) {
+      in = mdcache->pick_inode_snap(head_in, follows);
+      // intermediate snap inodes
+      while (in != head_in) {
+       assert(in->last != CEPH_NOSNAP);
+       if (in->is_auth() && m->get_dirty()) {
+         dout(10) << " updating intermediate snapped inode " << *in << dendl;
+         _do_cap_update(in, NULL, m->get_dirty(), follows, m);
+       }
+       in = mdcache->pick_inode_snap(head_in, in->last);
       }
-      in = mdcache->pick_inode_snap(head_in, in->last);
     }
  
     // head inode, and cap
@@ -2888,7 +2907,7 @@ public:
 };
 
 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
-                                        const string &dname)
+                                        boost::string_view dname)
 {
   inodeno_t ino = (uint64_t)item.ino;
   uint64_t cap_id = item.cap_id;
@@ -3041,53 +3060,55 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
   // normal metadata updates that we can apply to the head as well.
 
   // update xattrs?
-  bool xattrs = false;
-  map<string,bufferptr> *px = 0;
-  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
-      m->xattrbl.length() &&
-      m->head.xattr_version > in->get_projected_inode()->xattr_version)
-    xattrs = true;
-
-  old_inode_t *oi = 0;
+  CInode::mempool_xattr_map *px = nullptr;
+  bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
+                m->xattrbl.length() &&
+                m->head.xattr_version > in->get_projected_inode()->xattr_version;
+
+  CInode::mempool_old_inode *oi = 0;
   if (in->is_multiversion()) {
     oi = in->pick_old_inode(snap);
   }
 
-  inode_t *pi;
+  CInode::mempool_inode *i;
   if (oi) {
     dout(10) << " writing into old inode" << dendl;
-    pi = in->project_inode();
-    pi->version = in->pre_dirty();
+    auto &pi = in->project_inode();
+    pi.inode.version = in->pre_dirty();
     if (snap > oi->first)
       in->split_old_inode(snap);
-    pi = &oi->inode;
+    i = &oi->inode;
     if (xattrs)
       px = &oi->xattrs;
   } else {
+    auto &pi = in->project_inode(xattrs);
+    pi.inode.version = in->pre_dirty();
+    i = &pi.inode;
     if (xattrs)
-      px = new map<string,bufferptr>;
-    pi = in->project_inode(px);
-    pi->version = in->pre_dirty();
+      px = pi.xattrs.get();
   }
 
-  _update_cap_fields(in, dirty, m, pi);
+  _update_cap_fields(in, dirty, m, i);
 
   // xattr
-  if (px) {
-    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
+  if (xattrs) {
+    dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
            << " len " << m->xattrbl.length() << dendl;
-    pi->xattr_version = m->head.xattr_version;
+    i->xattr_version = m->head.xattr_version;
     bufferlist::iterator p = m->xattrbl.begin();
     ::decode(*px, p);
   }
 
-  if (pi->client_ranges.count(client)) {
-    if (in->last == snap) {
-      dout(10) << "  removing client_range entirely" << dendl;
-      pi->client_ranges.erase(client);
-    } else {
-      dout(10) << "  client_range now follows " << snap << dendl;
-      pi->client_ranges[client].follows = snap;
+  {
+    auto it = i->client_ranges.find(client);
+    if (it != i->client_ranges.end()) {
+      if (in->last == snap) {
+        dout(10) << "  removing client_range entirely" << dendl;
+        i->client_ranges.erase(it);
+      } else {
+        dout(10) << "  client_range now follows " << snap << dendl;
+        it->second.follows = snap;
+      }
     }
   }
 
@@ -3104,7 +3125,7 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll
                                                              client, ack));
 }
 
-void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
+void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
 {
   if (dirty == 0)
     return;
@@ -3116,7 +3137,7 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
   if (m->get_ctime() > pi->ctime) {
     dout(7) << "  ctime " << pi->ctime << " -> " << m->get_ctime()
            << " for " << *in << dendl;
-    pi->ctime = m->get_ctime();
+    pi->ctime = pi->rstat.rctime = m->get_ctime();
   }
 
   if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
@@ -3212,7 +3233,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
           << " on " << *in << dendl;
   assert(in->is_auth());
   client_t client = m->get_source().num();
-  inode_t *latest = in->get_projected_inode();
+  CInode::mempool_inode *latest = in->get_projected_inode();
 
   // increase or zero max_size?
   uint64_t size = m->get_size();
@@ -3303,43 +3324,39 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   if (!dirty && !change_max)
     return false;
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  Session *session = mds->get_session(m);
   if (session->check_access(in, MAY_WRITE,
                            m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
-    session->put();
     dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
     return false;
   }
-  session->put();
 
   // do the update.
   EUpdate *le = new EUpdate(mds->mdlog, "cap update");
   mds->mdlog->start_entry(le);
 
-  // xattrs update?
-  map<string,bufferptr> *px = 0;
-  if ((dirty & CEPH_CAP_XATTR_EXCL) && 
-      m->xattrbl.length() &&
-      m->head.xattr_version > in->get_projected_inode()->xattr_version)
-    px = new map<string,bufferptr>;
+  bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
+               m->xattrbl.length() &&
+               m->head.xattr_version > in->get_projected_inode()->xattr_version;
 
-  inode_t *pi = in->project_inode(px);
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode(xattr);
+  pi.inode.version = in->pre_dirty();
 
   MutationRef mut(new MutationImpl());
   mut->ls = mds->mdlog->get_current_segment();
 
-  _update_cap_fields(in, dirty, m, pi);
+  _update_cap_fields(in, dirty, m, &pi.inode);
 
   if (change_max) {
     dout(7) << "  max_size " << old_max << " -> " << new_max
            << " for " << *in << dendl;
     if (new_max) {
-      pi->client_ranges[client].range.first = 0;
-      pi->client_ranges[client].range.last = new_max;
-      pi->client_ranges[client].follows = in->first - 1;
+      auto &cr = pi.inode.client_ranges[client];
+      cr.range.first = 0;
+      cr.range.last = new_max;
+      cr.follows = in->first - 1;
     } else 
-      pi->client_ranges.erase(client);
+      pi.inode.client_ranges.erase(client);
   }
     
   if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) 
@@ -3349,13 +3366,12 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
   if (dirty & CEPH_CAP_AUTH_EXCL)
     wrlock_force(&in->authlock, mut);
 
-  // xattr
-  if (px) {
-    dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
-    pi->xattr_version = m->head.xattr_version;
+  // xattrs update?
+  if (xattr) {
+    dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
+    pi.inode.xattr_version = m->head.xattr_version;
     bufferlist::iterator p = m->xattrbl.begin();
-    ::decode(*px, p);
-
+    ::decode(*pi.xattrs, p);
     wrlock_force(&in->xattrlock, mut);
   }
   
@@ -3400,7 +3416,7 @@ void Locker::handle_client_cap_release(MClientCapRelease *m)
     mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
   }
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  Session *session = mds->get_session(m);
 
   for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
     _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
@@ -3492,7 +3508,7 @@ void Locker::remove_client_cap(CInode *in, client_t client)
 
 /**
  * Return true if any currently revoking caps exceed the
- * mds_revoke_cap_timeout threshold.
+ * mds_session_timeout threshold.
  */
 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
 {
@@ -3503,7 +3519,7 @@ bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
     } else {
       utime_t now = ceph_clock_now();
       utime_t age = now - (*p)->get_last_revoke_stamp();
-      if (age <= g_conf->mds_revoke_cap_timeout) {
+      if (age <= g_conf->mds_session_timeout) {
           return false;
       } else {
           return true;
@@ -3548,8 +3564,8 @@ void Locker::caps_tick()
 
     utime_t age = now - cap->get_last_revoke_stamp();
     dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
-    if (age <= g_conf->mds_revoke_cap_timeout) {
-      dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
+    if (age <= g_conf->mds_session_timeout) {
+      dout(20) << __func__ << " age below timeout " << g_conf->mds_session_timeout << dendl;
       break;
     } else {
       ++i;
@@ -3560,7 +3576,7 @@ void Locker::caps_tick()
       }
     }
     // exponential backoff of warning intervals
-    if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
+    if (age > g_conf->mds_session_timeout * (1 << cap->get_num_revoke_warnings())) {
       cap->inc_num_revoke_warnings();
       stringstream ss;
       ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
@@ -3707,7 +3723,6 @@ void Locker::revoke_client_leases(SimpleLock *lock)
                                              dn->get_name()),
                             l->client);
   }
-  assert(n == lock->get_num_client_lease());
 }
 
 
@@ -4156,11 +4171,7 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
 
   switch (lock->get_state()) {
   case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
-  case LOCK_XSYN:
-    file_excl(static_cast<ScatterLock*>(lock), need_issue);
-    if (lock->get_state() != LOCK_EXCL)
-      return;
-    // fall-thru
+  case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
   case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
   case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
     (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
@@ -4342,8 +4353,8 @@ void Locker::scatter_writebehind(ScatterLock *lock)
 
   in->pre_cow_old_inode();  // avoid cow mayhem
 
-  inode_t *pi = in->project_inode();
-  pi->version = in->pre_dirty();
+  auto &pi = in->project_inode();
+  pi.inode.version = in->pre_dirty();
 
   in->finish_scatter_gather_update(lock->get_type());
   lock->start_flush();
@@ -4478,7 +4489,7 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
     dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
     if (c) 
       p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
-    else
+    else if (lock->is_dirty())
       // just requeue.  not ideal.. starvation prone..
       updated_scatterlocks.push_back(lock->get_updated_item());
     return;
@@ -4488,7 +4499,7 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
     dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
     if (c) 
       p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
-    else
+    else if (lock->is_dirty())
       // just requeue.  not ideal.. starvation prone..
       updated_scatterlocks.push_back(lock->get_updated_item());
     return;
@@ -4578,7 +4589,8 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
       lock->add_waiter(SimpleLock::WAIT_STABLE, c);
 
     // also, requeue, in case we had wrong auth or something
-    updated_scatterlocks.push_back(lock->get_updated_item());
+    if (lock->is_dirty())
+      updated_scatterlocks.push_back(lock->get_updated_item());
   }
 }
 
@@ -4826,7 +4838,7 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
           !lock->is_rdlocked() &&
           //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
           (lock->get_scatter_wanted() ||
-           (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
+           (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
     dout(7) << "file_eval stable, bump to mixed " << *lock
            << " on " << *lock->get_parent() << dendl;
     scatter_mix(lock, need_issue);
@@ -4841,7 +4853,7 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
             in->is_dir() && in->has_subtree_or_exporting_dirfrag())  // if we are a delegation point, stay where we are
           //((wanted & CEPH_CAP_RD) || 
           //in->is_replicated() || 
-          //lock->get_num_client_lease() || 
+          //lock->is_leased() ||
           //(!loner && lock->get_state() == LOCK_EXCL)) &&
           ) {
     dout(7) << "file_eval stable, bump to sync " << *lock 
@@ -4884,12 +4896,8 @@ void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
     // gather?
     switch (lock->get_state()) {
     case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
-    case LOCK_XSYN:
-      file_excl(lock, need_issue);
-      if (lock->get_state() != LOCK_EXCL)
-       return;
-      // fall-thru
     case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
+    case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
     case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
     default: ceph_abort();
     }
@@ -4898,8 +4906,7 @@ void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
     if (lock->is_rdlocked())
       gather++;
     if (in->is_replicated()) {
-      if (lock->get_state() != LOCK_EXCL_MIX &&   // EXCL replica is already LOCK
-         lock->get_state() != LOCK_XSYN_EXCL) {  // XSYN replica is already LOCK;  ** FIXME here too!
+      if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
        send_lock_message(lock, LOCK_AC_MIX);
        lock->init_gather();
        gather++;