]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/client/Client.cc
import ceph 15.2.14
[ceph.git] / ceph / src / client / Client.cc
old mode 100644 (file)
new mode 100755 (executable)
index 08bfb2f..96d7f5f
@@ -70,7 +70,6 @@
 #include "osdc/Filer.h"
 
 #include "common/Cond.h"
-#include "common/Mutex.h"
 #include "common/perf_counters.h"
 #include "common/admin_socket.h"
 #include "common/errno.h"
@@ -97,7 +96,7 @@
 #include "include/ceph_assert.h"
 #include "include/stat.h"
 
-#include "include/cephfs/ceph_statx.h"
+#include "include/cephfs/ceph_ll_client.h"
 
 #if HAVE_GETGROUPLIST
 #include <grp.h>
 
 #define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED)
 
+using namespace TOPNSPC::common;
+
 void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
 {
   Client *client = static_cast<Client*>(p);
   client->flush_set_callback(oset);
 }
 
+bool Client::is_reserved_vino(vinodeno_t &vino) {
+  if (MDS_IS_PRIVATE_INO(vino.ino)) {
+    ldout(cct, -1) << __func__ << " attempt to access reserved inode number " << vino << dendl;
+    return true;
+  }
+  return false;
+}
+
 
 // -------------
 
@@ -139,29 +148,33 @@ Client::CommandHook::CommandHook(Client *client) :
 {
 }
 
-bool Client::CommandHook::call(std::string_view command,
-                              const cmdmap_t& cmdmap,
-                              std::string_view format, bufferlist& out)
+int Client::CommandHook::call(
+  std::string_view command,
+  const cmdmap_t& cmdmap,
+  Formatter *f,
+  std::ostream& errss,
+  bufferlist& out)
 {
-  std::unique_ptr<Formatter> f(Formatter::create(format));
   f->open_object_section("result");
-  m_client->client_lock.Lock();
-  if (command == "mds_requests")
-    m_client->dump_mds_requests(f.get());
-  else if (command == "mds_sessions")
-    m_client->dump_mds_sessions(f.get());
-  else if (command == "dump_cache")
-    m_client->dump_cache(f.get());
-  else if (command == "kick_stale_sessions")
-    m_client->_kick_stale_sessions();
-  else if (command == "status")
-    m_client->dump_status(f.get());
-  else
-    ceph_abort_msg("bad command registered");
-  m_client->client_lock.Unlock();
+  {
+    std::lock_guard l{m_client->client_lock};
+    if (command == "mds_requests")
+      m_client->dump_mds_requests(f);
+    else if (command == "mds_sessions") {
+      bool cap_dump = false;
+      cmd_getval(cmdmap, "cap_dump", cap_dump);
+      m_client->dump_mds_sessions(f, cap_dump);
+    } else if (command == "dump_cache")
+      m_client->dump_cache(f);
+    else if (command == "kick_stale_sessions")
+      m_client->_kick_stale_sessions();
+    else if (command == "status")
+      m_client->dump_status(f);
+    else
+      ceph_abort_msg("bad command registered");
+  }
   f->close_section();
-  f->flush(out);
-  return true;
+  return 0;
 }
 
 
@@ -261,7 +274,6 @@ vinodeno_t Client::map_faked_ino(ino_t ino)
 Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
   : Dispatcher(m->cct),
     timer(m->cct, client_lock),
-    client_lock("Client::client_lock"),
     messenger(m),
     monclient(mc),
     objecter(objecter_),
@@ -270,6 +282,7 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
     async_dentry_invalidator(m->cct),
     interrupt_finisher(m->cct),
     remount_finisher(m->cct),
+    async_ino_releasor(m->cct),
     objecter_finisher(m->cct),
     m_command_hook(this),
     fscid(0)
@@ -278,6 +291,8 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
 
   user_id = cct->_conf->client_mount_uid;
   group_id = cct->_conf->client_mount_gid;
+  fuse_default_permissions = cct->_conf.get_val<bool>(
+    "fuse_default_permissions");
 
   if (cct->_conf->client_acl_type == "posix_acl")
     acl_type = POSIX_ACL;
@@ -301,22 +316,18 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
                                  cct->_conf->client_oc_target_dirty,
                                  cct->_conf->client_oc_max_dirty_age,
                                  true));
-  objecter_finisher.start();
-  filer.reset(new Filer(objecter, &objecter_finisher));
-  objecter->enable_blacklist_events();
 }
 
 
 Client::~Client()
 {
-  ceph_assert(!client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_not_locked(client_lock));
 
   // It is necessary to hold client_lock, because any inode destruction
   // may call into ObjectCacher, which asserts that it's lock (which is
   // client_lock) is held.
-  client_lock.Lock();
+  std::lock_guard l{client_lock};
   tear_down_cache();
-  client_lock.Unlock();
 }
 
 void Client::tear_down_cache()
@@ -443,7 +454,7 @@ void Client::dump_cache(Formatter *f)
 
 void Client::dump_status(Formatter *f)
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   ldout(cct, 1) << __func__ << dendl;
 
@@ -469,52 +480,60 @@ void Client::dump_status(Formatter *f)
     f->dump_int("osd_epoch", osd_epoch);
     f->dump_int("osd_epoch_barrier", cap_epoch_barrier);
     f->dump_bool("blacklisted", blacklisted);
+    f->dump_string("fs_name", mdsmap->get_fs_name());
   }
 }
 
-int Client::init()
+void Client::_pre_init()
 {
   timer.init();
-  objectcacher->start();
 
-  client_lock.Lock();
-  ceph_assert(!initialized);
+  objecter_finisher.start();
+  filer.reset(new Filer(objecter, &objecter_finisher));
+  objecter->enable_blacklist_events();
 
-  messenger->add_dispatcher_tail(this);
-  client_lock.Unlock();
+  objectcacher->start();
+}
 
+int Client::init()
+{
+  _pre_init();
+  {
+    std::lock_guard l{client_lock};
+    ceph_assert(!initialized);
+    messenger->add_dispatcher_tail(this);
+  }
   _finish_init();
   return 0;
 }
 
 void Client::_finish_init()
 {
-  client_lock.Lock();
-  // logger
-  PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
-  plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
-  plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
-  plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
-  plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
-  plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
-  logger.reset(plb.create_perf_counters());
-  cct->get_perfcounters_collection()->add(logger.get());
-
-  client_lock.Unlock();
+  {
+    std::lock_guard l{client_lock};
+    // logger
+    PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
+    plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
+    plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
+    plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
+    plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
+    plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
+    logger.reset(plb.create_perf_counters());
+    cct->get_perfcounters_collection()->add(logger.get());
+  }
 
   cct->_conf.add_observer(this);
 
   AdminSocket* admin_socket = cct->get_admin_socket();
   int ret = admin_socket->register_command("mds_requests",
-                                          "mds_requests",
                                           &m_command_hook,
                                           "show in-progress mds requests");
   if (ret < 0) {
     lderr(cct) << "error registering admin socket command: "
               << cpp_strerror(-ret) << dendl;
   }
-  ret = admin_socket->register_command("mds_sessions",
-                                      "mds_sessions",
+  ret = admin_socket->register_command("mds_sessions "
+                                      "name=cap_dump,type=CephBool,req=false",
                                       &m_command_hook,
                                       "show mds session state");
   if (ret < 0) {
@@ -522,7 +541,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("dump_cache",
-                                      "dump_cache",
                                       &m_command_hook,
                                       "show in-memory metadata cache contents");
   if (ret < 0) {
@@ -530,7 +548,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("kick_stale_sessions",
-                                      "kick_stale_sessions",
                                       &m_command_hook,
                                       "kick sessions that were remote reset");
   if (ret < 0) {
@@ -538,7 +555,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("status",
-                                      "status",
                                       &m_command_hook,
                                       "show overall client status");
   if (ret < 0) {
@@ -546,9 +562,8 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
 
-  client_lock.Lock();
+  std::lock_guard l{client_lock};
   initialized = true;
-  client_lock.Unlock();
 }
 
 void Client::shutdown() 
@@ -557,10 +572,10 @@ void Client::shutdown()
 
   // If we were not mounted, but were being used for sending
   // MDS commands, we may have sessions that need closing.
-  client_lock.Lock();
-  _close_sessions();
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    _close_sessions();
+  }
   cct->_conf.remove_observer(this);
 
   cct->get_admin_socket()->unregister_commands(&m_command_hook);
@@ -589,14 +604,19 @@ void Client::shutdown()
     remount_finisher.stop();
   }
 
-  objectcacher->stop();  // outside of client_lock! this does a join.
-
-  client_lock.Lock();
-  ceph_assert(initialized);
-  initialized = false;
-  timer.shutdown();
-  client_lock.Unlock();
+  if (ino_release_cb) {
+    ldout(cct, 10) << "shutdown stopping inode release finisher" << dendl;
+    async_ino_releasor.wait_for_empty();
+    async_ino_releasor.stop();
+  }
 
+  objectcacher->stop();  // outside of client_lock! this does a join.
+  {
+    std::lock_guard l{client_lock};
+    ceph_assert(initialized);
+    initialized = false;
+    timer.shutdown();
+  }
   objecter_finisher.wait_for_empty();
   objecter_finisher.stop();
 
@@ -682,7 +702,6 @@ void Client::trim_dentry(Dentry *dn)
                 << dendl;
   if (dn->inode) {
     Inode *diri = dn->dir->parent_inode;
-    diri->dir_release_count++;
     clear_dir_complete_and_ordered(diri, true);
   }
   unlink(dn, false, false);  // drop dir, drop dentry
@@ -992,13 +1011,11 @@ Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dl
     if (old_dentry) {
       if (old_dentry->dir != dir) {
        Inode *old_diri = old_dentry->dir->parent_inode;
-       old_diri->dir_ordered_count++;
        clear_dir_complete_and_ordered(old_diri, false);
       }
       unlink(old_dentry, dir == old_dentry->dir, false);  // drop dentry, keep dir open if its the same dir
     }
     Inode *diri = dir->parent_inode;
-    diri->dir_ordered_count++;
     clear_dir_complete_and_ordered(diri, false);
     dn = link(dir, dname, in, dn);
   }
@@ -1014,7 +1031,7 @@ void Client::update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, Me
   
   ceph_assert(dn);
 
-  if (dlease->mask & CEPH_LOCK_DN) {
+  if (dlease->mask & CEPH_LEASE_VALID) {
     if (dttl > dn->lease_ttl) {
       ldout(cct, 10) << "got dentry lease on " << dn->name
               << " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl;
@@ -1025,6 +1042,8 @@ void Client::update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, Me
     }
   }
   dn->cap_shared_gen = dn->dir->parent_inode->shared_gen;
+  if (dlease->mask & CEPH_LEASE_PRIMARY_LINK)
+    dn->mark_primary();
 }
 
 
@@ -1047,24 +1066,14 @@ void Client::update_dir_dist(Inode *in, DirStat *dst)
 
   // replicated
   in->dir_replicated = !dst->dist.empty();  // FIXME that's just one frag!
-  
-  // dist
-  /*
-  if (!st->dirfrag_dist.empty()) {   // FIXME
-    set<int> dist = st->dirfrag_dist.begin()->second;
-    if (dist.empty() && !in->dir_contacts.empty())
-      ldout(cct, 9) << "lost dist spec for " << in->ino 
-              << " " << dist << dendl;
-    if (!dist.empty() && in->dir_contacts.empty()) 
-      ldout(cct, 9) << "got dist spec for " << in->ino 
-              << " " << dist << dendl;
-    in->dir_contacts = dist;
-  }
-  */
 }
 
 void Client::clear_dir_complete_and_ordered(Inode *diri, bool complete)
 {
+  if (complete)
+    diri->dir_release_count++;
+  else
+    diri->dir_ordered_count++;
   if (diri->flags & I_COMPLETE) {
     if (complete) {
       ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
@@ -1269,7 +1278,6 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
     Dentry *d = request->dentry();
     if (d) {
       Inode *diri = d->dir->parent_inode;
-      diri->dir_release_count++;
       clear_dir_complete_and_ordered(diri, true);
     }
 
@@ -1357,7 +1365,6 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
       if (diri->dir && diri->dir->dentries.count(dname)) {
        dn = diri->dir->dentries[dname];
        if (dn->inode) {
-         diri->dir_ordered_count++;
          clear_dir_complete_and_ordered(diri, false);
          unlink(dn, true, true);  // keep dir, dentry
        }
@@ -1541,7 +1548,7 @@ void Client::connect_mds_targets(mds_rank_t mds)
   }
 }
 
-void Client::dump_mds_sessions(Formatter *f)
+void Client::dump_mds_sessions(Formatter *f, bool cap_dump)
 {
   f->dump_int("id", get_nodeid().v);
   entity_inst_t inst(messenger->get_myname(), messenger->get_myaddr_legacy());
@@ -1551,7 +1558,7 @@ void Client::dump_mds_sessions(Formatter *f)
   f->open_array_section("sessions");
   for (const auto &p : mds_sessions) {
     f->open_object_section("session");
-    p.second.dump(f);
+    p.second.dump(f, cap_dump);
     f->close_section();
   }
   f->close_section();
@@ -1568,7 +1575,7 @@ void Client::dump_mds_requests(Formatter *f)
   }
 }
 
-int Client::verify_reply_trace(int r,
+int Client::verify_reply_trace(int r, MetaSession *session,
                               MetaRequest *request, const MConstRef<MClientReply>& reply,
                               InodeRef *ptarget, bool *pcreated,
                               const UserPerm& perms)
@@ -1581,12 +1588,24 @@ int Client::verify_reply_trace(int r,
 
   extra_bl = reply->get_extra_bl();
   if (extra_bl.length() >= 8) {
-    // if the extra bufferlist has a buffer, we assume its the created inode
-    // and that this request to create succeeded in actually creating
-    // the inode (won the race with other create requests)
-    decode(created_ino, extra_bl);
-    got_created_ino = true;
+    if (session->mds_features.test(CEPHFS_FEATURE_DELEG_INO)) {
+     struct openc_response_t   ocres;
+
+     decode(ocres, extra_bl);
+     created_ino = ocres.created_ino;
+     /*
+      * The userland cephfs client doesn't have a way to do an async create
+      * (yet), so just discard delegated_inos for now. Eventually we should
+      * store them and use them in create calls, even if they are synchronous,
+      * if only for testing purposes.
+      */
+     ldout(cct, 10) << "delegated_inos: " << ocres.delegated_inos << dendl;
+    } else {
+     // u64 containing number of created ino
+     decode(created_ino, extra_bl);
+    }
     ldout(cct, 10) << "make_request created ino " << created_ino << dendl;
+    got_created_ino = true;
   }
 
   if (pcreated)
@@ -1694,6 +1713,7 @@ int Client::make_request(MetaRequest *request,
   if (use_mds >= 0)
     request->resend_mds = use_mds;
 
+  MetaSession *session = NULL;
   while (1) {
     if (request->aborted())
       break;
@@ -1704,7 +1724,7 @@ int Client::make_request(MetaRequest *request,
     }
 
     // set up wait cond
-    Cond caller_cond;
+    ceph::condition_variable caller_cond;
     request->caller_cond = &caller_cond;
 
     // choose mds
@@ -1728,19 +1748,16 @@ int Client::make_request(MetaRequest *request,
     }
 
     // open a session?
-    MetaSession *session = NULL;
     if (!have_open_session(mds)) {
       session = _get_or_open_mds_session(mds);
-
+      if (session->state == MetaSession::STATE_REJECTED) {
+       request->abort(-EPERM);
+       break;
+      }
       // wait
       if (session->state == MetaSession::STATE_OPENING) {
        ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
        wait_on_context_list(session->waiting_for_open);
-        // Abort requests on REJECT from MDS
-        if (rejected_by_mds.count(mds)) {
-          request->abort(-EPERM);
-          break;
-        }
        continue;
       }
 
@@ -1756,11 +1773,14 @@ int Client::make_request(MetaRequest *request,
     // wait for signal
     ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
     request->kick = false;
-    while (!request->reply &&         // reply
-          request->resend_mds < 0 && // forward
-          !request->kick)
-      caller_cond.Wait(client_lock);
-    request->caller_cond = NULL;
+    std::unique_lock l{client_lock, std::adopt_lock};
+    caller_cond.wait(l, [request] {
+      return (request->reply ||                  // reply
+             request->resend_mds >= 0 || // forward
+             request->kick);
+    });
+    l.release();
+    request->caller_cond = nullptr;
 
     // did we get a reply?
     if (request->reply) 
@@ -1785,12 +1805,12 @@ int Client::make_request(MetaRequest *request,
 
   // kick dispatcher (we've got it!)
   ceph_assert(request->dispatch_cond);
-  request->dispatch_cond->Signal();
+  request->dispatch_cond->notify_all();
   ldout(cct, 20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
   request->dispatch_cond = 0;
   
   if (r >= 0 && ptarget)
-    r = verify_reply_trace(r, request, reply, ptarget, pcreated, perms);
+    r = verify_reply_trace(r, session, request, reply, ptarget, pcreated, perms);
 
   if (pdirbl)
     *pdirbl = reply->get_extra_bl();
@@ -1851,7 +1871,7 @@ int Client::encode_inode_release(Inode *in, MetaRequest *req,
 {
   ldout(cct, 20) << __func__ << " enter(in:" << *in << ", req:" << req
           << " mds:" << mds << ", drop:" << drop << ", unless:" << unless
-          << ", have:" << ", force:" << force << ")" << dendl;
+          << ", force:" << force << ")" << dendl;
   int released = 0;
   auto it = in->caps.find(mds);
   if (it != in->caps.end()) {
@@ -1859,15 +1879,20 @@ int Client::encode_inode_release(Inode *in, MetaRequest *req,
     drop &= ~(in->dirty_caps | get_caps_used(in));
     if ((drop & cap.issued) &&
        !(unless & cap.issued)) {
-      ldout(cct, 25) << "Dropping caps. Initial " << ccap_string(cap.issued) << dendl;
+      ldout(cct, 25) << "dropping caps " << ccap_string(drop) << dendl;
       cap.issued &= ~drop;
       cap.implemented &= ~drop;
       released = 1;
-      ldout(cct, 25) << "Now have: " << ccap_string(cap.issued) << dendl;
     } else {
       released = force;
     }
     if (released) {
+      cap.wanted = in->caps_wanted();
+      if (&cap == in->auth_cap &&
+         !(cap.wanted & CEPH_CAP_ANY_FILE_WR)) {
+       in->requested_max_size = 0;
+       ldout(cct, 25) << "reset requested_max_size due to not wanting any file write cap" << dendl;
+      }
       ceph_mds_request_release rel;
       rel.ino = in->ino;
       rel.cap_id = cap.cap_id;
@@ -1901,6 +1926,7 @@ void Client::encode_dentry_release(Dentry *dn, MetaRequest *req,
     rel.item.dname_len = dn->name.length();
     rel.item.dname_seq = dn->lease_seq;
     rel.dname = dn->name;
+    dn->lease_mds = -1;
   }
   ldout(cct, 25) << __func__ << " exit(dn:"
           << dn << ")" << dendl;
@@ -2039,21 +2065,7 @@ MetaSession *Client::_open_mds_session(mds_rank_t mds)
   ceph_assert(em.second); /* not already present */
   MetaSession *session = &em.first->second;
 
-  // Maybe skip sending a request to open if this MDS daemon
-  // has previously sent us a REJECT.
-  if (rejected_by_mds.count(mds)) {
-    if (rejected_by_mds[mds] == session->addrs) {
-      ldout(cct, 4) << __func__ << " mds." << mds << " skipping "
-                       "because we were rejected" << dendl;
-      return session;
-    } else {
-      ldout(cct, 4) << __func__ << " mds." << mds << " old inst "
-                       "rejected us, trying with new inst" << dendl;
-      rejected_by_mds.erase(mds);
-    }
-  }
-
-  auto m = MClientSession::create(CEPH_SESSION_REQUEST_OPEN);
+  auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
   m->metadata = metadata;
   m->supported_features = feature_bitset_t(CEPHFS_FEATURES_CLIENT_SUPPORTED);
   session->con->send_message2(std::move(m));
@@ -2064,19 +2076,24 @@ void Client::_close_mds_session(MetaSession *s)
 {
   ldout(cct, 2) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
   s->state = MetaSession::STATE_CLOSING;
-  s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+  s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
 }
 
-void Client::_closed_mds_session(MetaSession *s)
+void Client::_closed_mds_session(MetaSession *s, int err, bool rejected)
 {
   ldout(cct, 5) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
-  s->state = MetaSession::STATE_CLOSED;
+  if (rejected && s->state != MetaSession::STATE_CLOSING)
+    s->state = MetaSession::STATE_REJECTED;
+  else
+    s->state = MetaSession::STATE_CLOSED;
   s->con->mark_down();
   signal_context_list(s->waiting_for_open);
-  mount_cond.Signal();
-  remove_session_caps(s);
+  mount_cond.notify_all();
+  remove_session_caps(s, err);
   kick_requests_closed(s);
-  mds_sessions.erase(s->mds_num);
+  mds_ranks_closing.erase(s->mds_num);
+  if (s->state == MetaSession::STATE_CLOSED)
+    mds_sessions.erase(s->mds_num);
 }
 
 void Client::handle_client_session(const MConstRef<MClientSession>& m)
@@ -2098,9 +2115,8 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
       if (!missing_features.empty()) {
        lderr(cct) << "mds." << from << " lacks required features '"
                   << missing_features << "', closing session " << dendl;
-       rejected_by_mds[session->mds_num] = session->addrs;
        _close_mds_session(session);
-       _closed_mds_session(session);
+       _closed_mds_session(session, -EPERM, true);
        break;
       }
       session->mds_features = std::move(m->supported_features);
@@ -2108,7 +2124,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
       renew_caps(session);
       session->state = MetaSession::STATE_OPEN;
       if (unmounting)
-       mount_cond.Signal();
+       mount_cond.notify_all();
       else
        connect_mds_targets(from);
       signal_context_list(session->waiting_for_open);
@@ -2146,7 +2162,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
     if (auto& m = session->release; m) {
       session->con->send_message2(std::move(m));
     }
-    session->con->send_message2(MClientSession::create(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
+    session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
     break;
 
   case CEPH_SESSION_FORCE_RO:
@@ -2163,8 +2179,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
        error_str = "unknown error";
       lderr(cct) << "mds." << from << " rejected us (" << error_str << ")" << dendl;
 
-      rejected_by_mds[session->mds_num] = session->addrs;
-      _closed_mds_session(session);
+      _closed_mds_session(session, -EPERM, true);
     }
     break;
 
@@ -2175,7 +2190,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
 
 bool Client::_any_stale_sessions() const
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   for (const auto &p : mds_sessions) {
     if (p.second.state == MetaSession::STATE_STALE) {
@@ -2192,6 +2207,10 @@ void Client::_kick_stale_sessions()
 
   for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
     MetaSession &s = it->second;
+    if (s.state == MetaSession::STATE_REJECTED) {
+      mds_sessions.erase(it++);
+      continue;
+    }
     ++it;
     if (s.state == MetaSession::STATE_STALE)
       _closed_mds_session(&s);
@@ -2247,9 +2266,9 @@ void Client::send_request(MetaRequest *request, MetaSession *session,
   session->con->send_message2(std::move(r));
 }
 
-MClientRequest::ref Client::build_client_request(MetaRequest *request)
+ref_t<MClientRequest> Client::build_client_request(MetaRequest *request)
 {
-  auto req = MClientRequest::create(request->get_op());
+  auto req = make_message<MClientRequest>(request->get_op());
   req->set_tid(request->tid);
   req->set_stamp(request->op_stamp);
   memcpy(&req->head, &request->head, sizeof(ceph_mds_request_head));
@@ -2319,7 +2338,7 @@ void Client::handle_client_request_forward(const MConstRef<MClientRequestForward
   request->item.remove_myself();
   request->num_fwd = fwd->get_num_fwd();
   request->resend_mds = fwd->get_dest_mds();
-  request->caller_cond->Signal();
+  request->caller_cond->notify_all();
 }
 
 bool Client::is_dir_operation(MetaRequest *req)
@@ -2375,7 +2394,7 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
          request->sent_on_mseq == it->second.mseq)) {
       ldout(cct, 20) << "have to return ESTALE" << dendl;
     } else {
-      request->caller_cond->Signal();
+      request->caller_cond->notify_all();
       return;
     }
   }
@@ -2402,18 +2421,23 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
   // Only signal the caller once (on the first reply):
   // Either its an unsafe reply, or its a safe reply and no unsafe reply was sent.
   if (!is_safe || !request->got_unsafe) {
-    Cond cond;
+    ceph::condition_variable cond;
     request->dispatch_cond = &cond;
 
     // wake up waiter
     ldout(cct, 20) << __func__ << " signalling caller " << (void*)request->caller_cond << dendl;
-    request->caller_cond->Signal();
+    request->caller_cond->notify_all();
 
     // wake for kick back
-    while (request->dispatch_cond) {
-      ldout(cct, 20) << __func__ << " awaiting kickback on tid " << tid << " " << &cond << dendl;
-      cond.Wait(client_lock);
-    }
+    std::unique_lock l{client_lock, std::adopt_lock};
+    cond.wait(l, [tid, request, &cond, this] {
+      if (request->dispatch_cond) {
+        ldout(cct, 20) << "handle_client_reply awaiting kickback on tid "
+                      << tid << " " << &cond << dendl;
+      }
+      return !request->dispatch_cond;
+    });
+    l.release();
   }
 
   if (is_safe) {
@@ -2429,7 +2453,7 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
     unregister_request(request);
   }
   if (unmounting)
-    mount_cond.Signal();
+    mount_cond.notify_all();
 }
 
 void Client::_handle_full_flag(int64_t pool)
@@ -2480,7 +2504,7 @@ void Client::handle_osd_map(const MConstRef<MOSDMap>& m)
   bool new_blacklist = false;
   bool prenautilus = objecter->with_osdmap(
     [&](const OSDMap& o) {
-      return o.require_osd_release < CEPH_RELEASE_NAUTILUS;
+      return o.require_osd_release < ceph_release_t::nautilus;
     });
   if (!blacklisted) {
     for (auto a : myaddrs.v) {
@@ -2571,53 +2595,53 @@ bool Client::ms_dispatch2(const MessageRef &m)
   switch (m->get_type()) {
     // mounting and mds sessions
   case CEPH_MSG_MDS_MAP:
-    handle_mds_map(MMDSMap::msgref_cast(m));
+    handle_mds_map(ref_cast<MMDSMap>(m));
     break;
   case CEPH_MSG_FS_MAP:
-    handle_fs_map(MFSMap::msgref_cast(m));
+    handle_fs_map(ref_cast<MFSMap>(m));
     break;
   case CEPH_MSG_FS_MAP_USER:
-    handle_fs_map_user(MFSMapUser::msgref_cast(m));
+    handle_fs_map_user(ref_cast<MFSMapUser>(m));
     break;
   case CEPH_MSG_CLIENT_SESSION:
-    handle_client_session(MClientSession::msgref_cast(m));
+    handle_client_session(ref_cast<MClientSession>(m));
     break;
 
   case CEPH_MSG_OSD_MAP:
-    handle_osd_map(MOSDMap::msgref_cast(m));
+    handle_osd_map(ref_cast<MOSDMap>(m));
     break;
 
     // requests
   case CEPH_MSG_CLIENT_REQUEST_FORWARD:
-    handle_client_request_forward(MClientRequestForward::msgref_cast(m));
+    handle_client_request_forward(ref_cast<MClientRequestForward>(m));
     break;
   case CEPH_MSG_CLIENT_REPLY:
-    handle_client_reply(MClientReply::msgref_cast(m));
+    handle_client_reply(ref_cast<MClientReply>(m));
     break;
 
   // reclaim reply
   case CEPH_MSG_CLIENT_RECLAIM_REPLY:
-    handle_client_reclaim_reply(MClientReclaimReply::msgref_cast(m));
+    handle_client_reclaim_reply(ref_cast<MClientReclaimReply>(m));
     break;
 
   case CEPH_MSG_CLIENT_SNAP:
-    handle_snap(MClientSnap::msgref_cast(m));
+    handle_snap(ref_cast<MClientSnap>(m));
     break;
   case CEPH_MSG_CLIENT_CAPS:
-    handle_caps(MClientCaps::msgref_cast(m));
+    handle_caps(ref_cast<MClientCaps>(m));
     break;
   case CEPH_MSG_CLIENT_LEASE:
-    handle_lease(MClientLease::msgref_cast(m));
+    handle_lease(ref_cast<MClientLease>(m));
     break;
   case MSG_COMMAND_REPLY:
     if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) {
-      handle_command_reply(MCommandReply::msgref_cast(m));
+      handle_command_reply(ref_cast<MCommandReply>(m));
     } else {
       return false;
     }
     break;
   case CEPH_MSG_CLIENT_QUOTA:
-    handle_quota(MClientQuota::msgref_cast(m));
+    handle_quota(ref_cast<MClientQuota>(m));
     break;
 
   default:
@@ -2632,7 +2656,7 @@ bool Client::ms_dispatch2(const MessageRef &m)
     trim_cache();
     if (size < lru.lru_get_size() + inode_map.size()) {
       ldout(cct, 10) << "unmounting: trim pass, cache shrank, poking unmount()" << dendl;
-      mount_cond.Signal();
+      mount_cond.notify_all();
     } else {
       ldout(cct, 10) << "unmounting: trim pass, size still " << lru.lru_get_size() 
                << "+" << inode_map.size() << dendl;
@@ -2783,7 +2807,7 @@ void Client::send_reconnect(MetaSession *session)
 
   early_kick_flushing_caps(session);
 
-  auto m = MClientReconnect::create();
+  auto m = make_message<MClientReconnect>();
   bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT);
 
   // i have an open session.
@@ -2795,11 +2819,12 @@ void Client::send_reconnect(MetaSession *session)
     auto it = in->caps.find(mds);
     if (it != in->caps.end()) {
       if (allow_multi &&
-         m->get_approx_size() >= (std::numeric_limits<int>::max() >> 1)) {
+         m->get_approx_size() >=
+         static_cast<size_t>((std::numeric_limits<int>::max() >> 1))) {
        m->mark_more();
        session->con->send_message2(std::move(m));
 
-       m = MClientReconnect::create();
+       m = make_message<MClientReconnect>();
       }
 
       Cap &cap = it->second;
@@ -2808,7 +2833,7 @@ void Client::send_reconnect(MetaSession *session)
               << " wants " << ccap_string(in->caps_wanted())
               << dendl;
       filepath path;
-      in->make_long_path(path);
+      in->make_short_path(path);
       ldout(cct, 10) << "    path " << path << dendl;
 
       bufferlist flockbl;
@@ -2849,7 +2874,7 @@ void Client::send_reconnect(MetaSession *session)
     m->set_encoding_version(0); // use connection features to choose encoding
   session->con->send_message2(std::move(m));
 
-  mount_cond.Signal();
+  mount_cond.notify_all();
 
   if (session->reclaim_state == MetaSession::RECLAIMING)
     signal_cond_list(waiting_for_reclaim);
@@ -2868,7 +2893,7 @@ void Client::kick_requests(MetaSession *session)
     if (req->aborted()) {
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       continue;
     }
@@ -2936,7 +2961,7 @@ void Client::kick_requests_closed(MetaSession *session)
     if (req->mds == session->mds_num) {
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       req->item.remove_myself();
       if (req->got_unsafe) {
@@ -2978,7 +3003,7 @@ void Client::got_mds_push(MetaSession *s)
   s->seq++;
   ldout(cct, 10) << " mds." << s->mds_num << " seq now " << s->seq << dendl;
   if (s->state == MetaSession::STATE_CLOSING) {
-    s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+    s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
   }
 }
 
@@ -3006,7 +3031,7 @@ void Client::handle_lease(const MConstRef<MClientLease>& m)
   }
   in = inode_map[vino];
 
-  if (m->get_mask() & CEPH_LOCK_DN) {
+  if (m->get_mask() & CEPH_LEASE_VALID) {
     if (!in->dir || in->dir->dentries.count(m->dname) == 0) {
       ldout(cct, 10) << " don't have dir|dentry " << m->get_ino() << "/" << m->dname <<dendl;
       goto revoke;
@@ -3018,7 +3043,9 @@ void Client::handle_lease(const MConstRef<MClientLease>& m)
 
  revoke:
   {
-    auto reply = MClientLease::create(CEPH_MDS_LEASE_RELEASE, seq, m->get_mask(), m->get_ino(), m->get_first(), m->get_last(), m->dname);
+    auto reply = make_message<MClientLease>(CEPH_MDS_LEASE_RELEASE, seq,
+                                           m->get_mask(), m->get_ino(),
+                                           m->get_first(), m->get_last(), m->dname);
     m->get_connection()->send_message2(std::move(reply));
   }
 }
@@ -3093,7 +3120,6 @@ Dentry* Client::link(Dir *dir, const string& name, Inode *in, Dentry *dn)
       Dentry *olddn = in->get_first_parent();
       ceph_assert(olddn->dir != dir || olddn->name != name);
       Inode *old_diri = olddn->dir->parent_inode;
-      old_diri->dir_release_count++;
       clear_dir_complete_and_ordered(old_diri, true);
       unlink(olddn, true, true);  // keep dir, dentry
     }
@@ -3146,7 +3172,7 @@ private:
 public:
   C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { }
   void finish(int r) override {
-    ceph_assert(client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
     if (r != 0) {
       client_t const whoami = client->whoami;  // For the benefit of ldout prefix
       ldout(client->cct, 1) << "I/O error from flush on inode " << inode
@@ -3184,7 +3210,7 @@ void Client::put_cap_ref(Inode *in, int cap)
     int put_nref = 0;
     int drop = last & ~in->caps_issued();
     if (in->snapid == CEPH_NOSNAP) {
-      if ((last & CEPH_CAP_FILE_WR) &&
+      if ((last & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER)) &&
          !in->cap_snaps.empty() &&
          in->cap_snaps.rbegin()->second.writing) {
        ldout(cct, 10) << __func__ << " finishing pending cap_snap on " << *in << dendl;
@@ -3211,8 +3237,10 @@ void Client::put_cap_ref(Inode *in, int cap)
   }
 }
 
-int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
+int Client::get_caps(Fh *fh, int need, int want, int *phave, loff_t endoff)
 {
+  Inode *in = fh->inode.get();
+
   int r = check_pool_perm(in, need);
   if (r < 0)
     return r;
@@ -3226,6 +3254,12 @@ int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
       return -EBADF;
     }
 
+    if ((fh->mode & CEPH_FILE_MODE_WR) && fh->gen != fd_gen)
+      return -EBADF;
+
+    if ((in->flags & I_ERROR_FILELOCK) && fh->has_any_filelocks())
+      return -EIO;
+
     int implemented;
     int have = in->caps_issued(&implemented);
 
@@ -3233,13 +3267,16 @@ int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
     bool waitfor_commit = false;
 
     if (have & need & CEPH_CAP_FILE_WR) {
-      if (endoff > 0 &&
-         (endoff >= (loff_t)in->max_size ||
-          endoff > (loff_t)(in->size << 1)) &&
-         endoff > (loff_t)in->wanted_max_size) {
-       ldout(cct, 10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
-       in->wanted_max_size = endoff;
-       check_caps(in, 0);
+      if (endoff > 0) {
+        if ((endoff >= (loff_t)in->max_size ||
+             endoff > (loff_t)(in->size << 1)) &&
+            endoff > (loff_t)in->wanted_max_size) {
+          ldout(cct, 10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
+          in->wanted_max_size = endoff;
+        }
+        if (in->wanted_max_size > in->max_size &&
+            in->wanted_max_size > in->requested_max_size)
+          check_caps(in, 0);
       }
 
       if (endoff >= 0 && endoff > (loff_t)in->max_size) {
@@ -3371,7 +3408,7 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
   if (flush)
     follows = in->snaprealm->get_snap_context().seq;
   
-  auto m = MClientCaps::create(op,
+  auto m = make_message<MClientCaps>(op,
                                   in->ino,
                                   0,
                                   cap->cap_id, cap->seq,
@@ -3423,9 +3460,14 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
   m->set_snap_follows(follows);
   cap->wanted = want;
   if (cap == in->auth_cap) {
-    m->set_max_size(in->wanted_max_size);
-    in->requested_max_size = in->wanted_max_size;
-    ldout(cct, 15) << "auth cap, setting max_size = " << in->requested_max_size << dendl;
+    if (want & CEPH_CAP_ANY_FILE_WR) {
+      m->set_max_size(in->wanted_max_size);
+      in->requested_max_size = in->wanted_max_size;
+      ldout(cct, 15) << "auth cap, requesting max_size " << in->requested_max_size << dendl;
+    } else {
+      in->requested_max_size = 0;
+      ldout(cct, 15) << "auth cap, reset requested_max_size due to not wanting any file write cap" << dendl;
+    }
   }
 
   if (!session->flushing_caps_tids.empty())
@@ -3609,15 +3651,17 @@ void Client::check_caps(Inode *in, unsigned flags)
     }
 
     int flushing;
+    int msg_flags = 0;
     ceph_tid_t flush_tid;
     if (in->auth_cap == &cap && in->dirty_caps) {
       flushing = mark_caps_flushing(in, &flush_tid);
+      if (flags & CHECK_CAPS_SYNCHRONOUS)
+       msg_flags |= MClientCaps::FLAG_SYNC;
     } else {
       flushing = 0;
       flush_tid = 0;
     }
 
-    int msg_flags = (flags & CHECK_CAPS_SYNCHRONOUS) ? MClientCaps::FLAG_SYNC : 0;
     send_cap(in, session, &cap, msg_flags, cap_used, wanted, retain,
             flushing, flush_tid);
   }
@@ -3643,9 +3687,9 @@ void Client::queue_cap_snap(Inode *in, SnapContext& old_snapc)
     capsnap.context = old_snapc;
     capsnap.issued = in->caps_issued();
     capsnap.dirty = in->caps_dirty();
-    
+
     capsnap.dirty_data = (used & CEPH_CAP_FILE_BUFFER);
-    
+
     capsnap.uid = in->uid;
     capsnap.gid = in->gid;
     capsnap.mode = in->mode;
@@ -3654,7 +3698,7 @@ void Client::queue_cap_snap(Inode *in, SnapContext& old_snapc)
     capsnap.xattr_version = in->xattr_version;
     capsnap.cap_dirtier_uid = in->cap_dirtier_uid;
     capsnap.cap_dirtier_gid = in->cap_dirtier_gid;
+
     if (used & CEPH_CAP_FILE_WR) {
       ldout(cct, 10) << __func__ << " WR used on " << *in << dendl;
       capsnap.writing = 1;
@@ -3689,6 +3733,7 @@ void Client::finish_cap_snap(Inode *in, CapSnap &capsnap, int used)
   }
 
   if (used & CEPH_CAP_FILE_BUFFER) {
+    capsnap.writing = 1;
     ldout(cct, 10) << __func__ << " " << *in << " cap_snap " << &capsnap << " used " << used
             << " WRBUFFER, delaying" << dendl;
   } else {
@@ -3697,19 +3742,12 @@ void Client::finish_cap_snap(Inode *in, CapSnap &capsnap, int used)
   }
 }
 
-void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
-{
-  ldout(cct, 10) << __func__ << " seq " << seq << " on " << *in << dendl;
-  in->cap_snaps.at(seq).dirty_data = 0;
-  flush_snaps(in);
-}
-
 void Client::send_flush_snap(Inode *in, MetaSession *session,
                             snapid_t follows, CapSnap& capsnap)
 {
-  auto m = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP,
-                               in->ino, in->snaprealm->ino, 0,
-                               in->auth_cap->mseq, cap_epoch_barrier);
+  auto m = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP,
+                                    in->ino, in->snaprealm->ino, 0,
+                                    in->auth_cap->mseq, cap_epoch_barrier);
   m->caller_uid = capsnap.cap_dirtier_uid;
   m->caller_gid = capsnap.cap_dirtier_gid;
 
@@ -3771,7 +3809,7 @@ void Client::flush_snaps(Inode *in)
             << " on " << *in << dendl;
     if (capsnap.dirty_data || capsnap.writing)
       break;
-    
+
     capsnap.flush_tid = ++last_flush_tid;
     session->flushing_caps_tids.insert(capsnap.flush_tid);
     in->flushing_cap_tids[capsnap.flush_tid] = 0;
@@ -3782,28 +3820,32 @@ void Client::flush_snaps(Inode *in)
   }
 }
 
-void Client::wait_on_list(list<Cond*>& ls)
+void Client::wait_on_list(list<ceph::condition_variable*>& ls)
 {
-  Cond cond;
+  ceph::condition_variable cond;
   ls.push_back(&cond);
-  cond.Wait(client_lock);
+  std::unique_lock l{client_lock, std::adopt_lock};
+  cond.wait(l);
+  l.release();
   ls.remove(&cond);
 }
 
-void Client::signal_cond_list(list<Cond*>& ls)
+void Client::signal_cond_list(list<ceph::condition_variable*>& ls)
 {
-  for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
-    (*it)->Signal();
+  for (auto cond : ls) {
+    cond->notify_all();
+  }
 }
 
 void Client::wait_on_context_list(list<Context*>& ls)
 {
-  Cond cond;
+  ceph::condition_variable cond;
   bool done = false;
   int r;
-  ls.push_back(new C_Cond(&cond, &done, &r));
-  while (!done)
-    cond.Wait(client_lock);
+  ls.push_back(new C_Cond(cond, &done, &r));
+  std::unique_lock l{client_lock, std::adopt_lock};
+  cond.wait(l, [&done] { return done;});
+  l.release();
 }
 
 void Client::signal_context_list(list<Context*>& ls)
@@ -3852,7 +3894,7 @@ public:
   }
   void finish(int r) override {
     // _async_invalidate takes the lock when it needs to, call this back from outside of lock.
-    ceph_assert(!client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
     client->_async_invalidate(ino, offset, length);
   }
 };
@@ -3934,7 +3976,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
 
 void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
 {
-  ceph_assert(client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(client_lock));
   if (!in->oset.dirty_or_tx) {
     ldout(cct, 10) << " nothing to flush" << dendl;
     return;
@@ -3945,16 +3987,16 @@ void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
                                      offset, size, &onflush);
   if (!ret) {
     // wait for flush
-    client_lock.Unlock();
+    client_lock.unlock();
     onflush.wait();
-    client_lock.Lock();
+    client_lock.lock();
   }
 }
 
 void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
 {
   //  std::lock_guard l(client_lock);
-  ceph_assert(client_lock.is_locked());   // will be called via dispatch() -> objecter -> ...
+  ceph_assert(ceph_mutex_is_locked(client_lock));   // will be called via dispatch() -> objecter -> ...
   Inode *in = static_cast<Inode *>(oset->parent);
   ceph_assert(in);
   _flushed(in);
@@ -3978,10 +4020,10 @@ void Client::check_cap_issue(Inode *in, unsigned issued)
       !(had & CEPH_CAP_FILE_CACHE))
     in->cache_gen++;
 
-  if ((issued & CEPH_CAP_FILE_SHARED) &&
-      !(had & CEPH_CAP_FILE_SHARED)) {
-    in->shared_gen++;
-
+  if ((issued & CEPH_CAP_FILE_SHARED) !=
+      (had & CEPH_CAP_FILE_SHARED)) {
+    if (issued & CEPH_CAP_FILE_SHARED)
+      in->shared_gen++;
     if (in->is_dir())
       clear_dir_complete_and_ordered(in, true);
   }
@@ -4025,7 +4067,9 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
      * don't remove caps.
      */
     if (ceph_seq_cmp(seq, cap.seq) <= 0) {
-      ceph_assert(&cap == in->auth_cap);
+      if (&cap != in->auth_cap)
+         ldout(cct, 0) << "WARNING: " <<  "inode " << *in << " caps on mds." << mds << " != auth_cap." << dendl;
+
       ceph_assert(cap.cap_id == cap_id);
       seq = cap.seq;
       mseq = cap.mseq;
@@ -4124,7 +4168,7 @@ void Client::remove_all_caps(Inode *in)
     remove_cap(&in->caps.begin()->second, true);
 }
 
-void Client::remove_session_caps(MetaSession *s)
+void Client::remove_session_caps(MetaSession *s, int err)
 {
   ldout(cct, 10) << __func__ << " mds." << s->mds_num << dendl;
 
@@ -4136,7 +4180,10 @@ void Client::remove_session_caps(MetaSession *s)
       dirty_caps = in->dirty_caps | in->flushing_caps;
       in->wanted_max_size = 0;
       in->requested_max_size = 0;
+      if (in->has_any_filelocks())
+       in->flags |= I_ERROR_FILELOCK;
     }
+    auto caps = cap->implemented;
     if (cap->wanted | cap->issued)
       in->flags |= I_CAP_DROPPED;
     remove_cap(cap, false);
@@ -4151,15 +4198,29 @@ void Client::remove_session_caps(MetaSession *s)
       in->mark_caps_clean();
       put_inode(in.get());
     }
+    caps &= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_BUFFER;
+    if (caps && !in->caps_issued_mask(caps, true)) {
+      if (err == -EBLACKLISTED) {
+       if (in->oset.dirty_or_tx) {
+         lderr(cct) << __func__ << " still has dirty data on " << *in << dendl;
+         in->set_async_err(err);
+       }
+       objectcacher->purge_set(&in->oset);
+      } else {
+       objectcacher->release_set(&in->oset);
+      }
+      _schedule_invalidate_callback(in.get(), 0, 0);
+    }
+
     signal_cond_list(in->waitfor_caps);
   }
   s->flushing_caps_tids.clear();
-  sync_cond.Signal();
+  sync_cond.notify_all();
 }
 
 int Client::_do_remount(bool retry_on_error)
 {
-  uint64_t max_retries = g_conf().get_val<uint64_t>("mds_max_retries_on_remount_failure");
+  uint64_t max_retries = cct->_conf.get_val<uint64_t>("mds_max_retries_on_remount_failure");
 
   errno = 0;
   int r = remount_cb(callback_handle);
@@ -4245,6 +4306,39 @@ void Client::_trim_negative_child_dentries(InodeRef& in)
   }
 }
 
+class C_Client_CacheRelease : public Context  {
+private:
+  Client *client;
+  vinodeno_t ino;
+public:
+  C_Client_CacheRelease(Client *c, Inode *in) :
+    client(c) {
+    if (client->use_faked_inos())
+      ino = vinodeno_t(in->faked_ino, CEPH_NOSNAP);
+    else
+      ino = in->vino();
+  }
+  void finish(int r) override {
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
+    client->_async_inode_release(ino);
+  }
+};
+
+void Client::_async_inode_release(vinodeno_t ino)
+{
+  if (unmounting)
+    return;
+  ldout(cct, 10) << __func__ << " " << ino << dendl;
+  ino_release_cb(callback_handle, ino);
+}
+
+void Client::_schedule_ino_release_callback(Inode *in) {
+
+  if (ino_release_cb)
+    // we queue the invalidate, which calls the callback and decrements the ref
+    async_ino_releasor.queue(new C_Client_CacheRelease(this, in));
+}
+
 void Client::trim_caps(MetaSession *s, uint64_t max)
 {
   mds_rank_t mds = s->mds_num;
@@ -4283,7 +4377,7 @@ void Client::trim_caps(MetaSession *s, uint64_t max)
         ++q;
        if (dn->lru_is_expireable()) {
          if (can_invalidate_dentries &&
-             dn->dir->parent_inode->ino == MDS_INO_ROOT) {
+             dn->dir->parent_inode->ino == CEPH_INO_ROOT) {
            // Only issue one of these per DN for inodes in root: handle
            // others more efficiently by calling for root-child DNs at
            // the end of this function.
@@ -4296,7 +4390,10 @@ void Client::trim_caps(MetaSession *s, uint64_t max)
          all = false;
         }
       }
-      if (all && in->ino != MDS_INO_ROOT) {
+      if (in->ll_ref == 1 && in->ino != CEPH_INO_ROOT) {
+         _schedule_ino_release_callback(in.get());
+      }
+      if (all && in->ino != CEPH_INO_ROOT) {
         ldout(cct, 20) << __func__ << " counting as trimmed: " << *in << dendl;
        trimmed++;
       }
@@ -4429,7 +4526,9 @@ void Client::wait_sync_caps(ceph_tid_t want)
     if (oldest_tid <= want) {
       ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
                     << " (want " << want << ")" << dendl;
-      sync_cond.Wait(client_lock);
+      std::unique_lock l{client_lock, std::adopt_lock};
+      sync_cond.wait(l);
+      l.release();
       goto retry;
     }
   }
@@ -4682,25 +4781,19 @@ void Client::update_snap_trace(const bufferlist& bl, SnapRealm **realm_ret, bool
       ldout(cct, 10) << __func__ << " " << *realm << " seq " << info.seq()
               << " <= " << realm->seq << " and same parent, SKIPPING" << dendl;
     }
-        
+
     if (!first_realm)
       first_realm = realm;
     else
       put_snap_realm(realm);
   }
 
-  for (map<SnapRealm*, SnapContext>::iterator q = dirty_realms.begin();
-       q != dirty_realms.end();
-       ++q) {
-    SnapRealm *realm = q->first;
+  for (auto &[realm, snapc] : dirty_realms) {
     // if there are new snaps ?
-    if (has_new_snaps(q->second, realm->get_snap_context())) { 
+    if (has_new_snaps(snapc, realm->get_snap_context())) {
       ldout(cct, 10) << " flushing caps on " << *realm << dendl;
-      xlist<Inode*>::iterator r = realm->inodes_with_caps.begin();
-      while (!r.end()) {
-       Inode *in = *r;
-       ++r;
-       queue_cap_snap(in, q->second);
+      for (auto&& in : realm->inodes_with_caps) {
+       queue_cap_snap(in, snapc);
       }
     } else {
       ldout(cct, 10) << " no new snap on " << *realm << dendl;
@@ -4892,8 +4985,10 @@ void Client::handle_cap_import(MetaSession *session, Inode *in, const MConstRef<
   SnapRealm *realm = NULL;
   update_snap_trace(m->snapbl, &realm);
 
+  int issued = m->get_caps();
+  int wanted = m->get_wanted();
   add_update_cap(in, session, m->get_cap_id(),
-                m->get_caps(), m->get_wanted(), m->get_seq(), m->get_mseq(),
+                issued, wanted, m->get_seq(), m->get_mseq(),
                 m->get_realm(), CEPH_CAP_FLAG_AUTH, cap_perms);
   
   if (cap && cap->cap_id == m->peer.cap_id) {
@@ -4904,6 +4999,11 @@ void Client::handle_cap_import(MetaSession *session, Inode *in, const MConstRef<
     put_snap_realm(realm);
   
   if (in->auth_cap && in->auth_cap->session == session) {
+    if (!(wanted & CEPH_CAP_ANY_FILE_WR) ||
+       in->requested_max_size > m->get_max_size()) {
+      in->requested_max_size = 0;
+      ldout(cct, 15) << "reset requested_max_size after cap import" << dendl;
+    }
     // reflush any/all caps (if we are now the auth_cap)
     kick_flushing_caps(in, session);
   }
@@ -5011,7 +5111,7 @@ void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, con
     signal_cond_list(in->waitfor_caps);
     if (session->flushing_caps_tids.empty() ||
        *session->flushing_caps_tids.begin() > flush_ack_tid)
-      sync_cond.Signal();
+      sync_cond.notify_all();
   }
 
   if (!dirty) {
@@ -5063,7 +5163,7 @@ void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, const MCo
       signal_cond_list(in->waitfor_caps);
       if (session->flushing_caps_tids.empty() ||
          *session->flushing_caps_tids.begin() > flush_ack_tid)
-       sync_cond.Signal();
+       sync_cond.notify_all();
     }
   } else {
     ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows
@@ -5095,7 +5195,7 @@ public:
   }
   void finish(int r) override {
     // _async_dentry_invalidate is responsible for its own locking
-    ceph_assert(!client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
     client->_async_dentry_invalidate(dirino, ino, name);
   }
 };
@@ -5106,7 +5206,7 @@ void Client::_async_dentry_invalidate(vinodeno_t dirino, vinodeno_t ino, string&
     return;
   ldout(cct, 10) << __func__ << " '" << name << "' ino " << ino
                 << " in dir " << dirino << dendl;
-  dentry_invalidate_cb(callback_handle, dirino, ino, name);
+  dentry_invalidate_cb(callback_handle, dirino, ino, name.c_str(), name.length());
 }
 
 void Client::_schedule_invalidate_dentry_callback(Dentry *dn, bool del)
@@ -5174,7 +5274,7 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, const M
                << " mds." << mds << " seq " << m->get_seq()
                << " caps now " << ccap_string(new_caps)
                << " was " << ccap_string(cap->issued)
-               << (was_stale ? "" : " (stale)") << dendl;
+               << (was_stale ? " (stale)" : "") << dendl;
 
   if (was_stale)
       cap->issued = cap->implemented = CEPH_CAP_PIN;
@@ -5627,15 +5727,15 @@ int Client::resolve_mds(
  */
 int Client::authenticate()
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   if (monclient->is_authenticated()) {
     return 0;
   }
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int r = monclient->authenticate(cct->_conf->client_mount_timeout);
-  client_lock.Lock();
+  client_lock.lock();
   if (r < 0) {
     return r;
   }
@@ -5656,9 +5756,9 @@ int Client::fetch_fsmap(bool user)
   do {
     C_SaferCond cond;
     monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
   } while (r == -EAGAIN);
 
   if (r < 0) {
@@ -5816,7 +5916,10 @@ int Client::subscribe_mdsmap(const std::string &fs_name)
 
   std::string resolved_fs_name;
   if (fs_name.empty()) {
-    resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
+    resolved_fs_name = cct->_conf.get_val<std::string>("client_fs");
+    if (resolved_fs_name.empty())
+           // Try the backwards compatibility fs name option
+           resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
   } else {
     resolved_fs_name = fs_name;
   }
@@ -5941,17 +6044,40 @@ int Client::mount(const std::string &mount_root, const UserPerm& perms,
 
 void Client::_close_sessions()
 {
+  for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
+    if (it->second.state == MetaSession::STATE_REJECTED)
+      mds_sessions.erase(it++);
+    else
+      ++it;
+  }
+
   while (!mds_sessions.empty()) {
     // send session closes!
     for (auto &p : mds_sessions) {
       if (p.second.state != MetaSession::STATE_CLOSING) {
        _close_mds_session(&p.second);
+       mds_ranks_closing.insert(p.first);
       }
     }
 
     // wait for sessions to close
-    ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
-    mount_cond.Wait(client_lock);
+    double timo = cct->_conf.get_val<std::chrono::seconds>("client_shutdown_timeout").count();
+    ldout(cct, 2) << "waiting for " << mds_ranks_closing.size() << " mds session(s) to close (timeout: "
+                  << timo << "s)" << dendl;
+    std::unique_lock l{client_lock, std::adopt_lock};
+    if (!timo) {
+      mount_cond.wait(l);
+    } else if (!mount_cond.wait_for(l, ceph::make_timespan(timo), [this] { return mds_ranks_closing.empty(); })) {
+      ldout(cct, 1) << mds_ranks_closing.size() << " mds(s) did not respond to session close -- timing out." << dendl;
+      while (!mds_ranks_closing.empty()) {
+        auto session = mds_sessions.at(*mds_ranks_closing.begin());
+        // this prunes entry from mds_sessions and mds_ranks_closing
+        _closed_mds_session(&session, -ETIMEDOUT);
+      }
+    }
+
+    mds_ranks_closing.clear();
+    l.release();
   }
 }
 
@@ -5970,7 +6096,7 @@ void Client::flush_mdlog(MetaSession *session)
   // will crash if they see an unknown CEPH_SESSION_* value in this msg.
   const uint64_t features = session->con->get_features();
   if (HAVE_FEATURE(features, SERVER_LUMINOUS)) {
-    auto m = MClientSession::create(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
+    auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
     session->con->send_message2(std::move(m));
   }
 }
@@ -5988,7 +6114,7 @@ void Client::_abort_mds_sessions(int err)
     req->abort(err);
     if (req->caller_cond) {
       req->kick = true;
-      req->caller_cond->Signal();
+      req->caller_cond->notify_all();
     }
   }
 
@@ -6000,12 +6126,13 @@ void Client::_abort_mds_sessions(int err)
   // Force-close all sessions
   while(!mds_sessions.empty()) {
     auto& session = mds_sessions.begin()->second;
-    _closed_mds_session(&session);
+    _closed_mds_session(&session, err);
   }
 }
 
 void Client::_unmount(bool abort)
 {
+  std::unique_lock lock{client_lock, std::adopt_lock};
   if (unmounting)
     return;
 
@@ -6028,11 +6155,13 @@ void Client::_unmount(bool abort)
     flush_mdlog_sync();
   }
 
-  while (!mds_requests.empty()) {
-    ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
-    mount_cond.Wait(client_lock);
-  }
-
+  mount_cond.wait(lock, [this] {
+    if (!mds_requests.empty()) {
+      ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests"
+                    << dendl;
+    }
+    return mds_requests.empty();
+  });
   if (tick_event)
     timer.cancel_event(tick_event);
   tick_event = 0;
@@ -6063,10 +6192,13 @@ void Client::_unmount(bool abort)
 
   _ll_drop_pins();
 
-  while (unsafe_sync_write > 0) {
-    ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"  << dendl;
-    mount_cond.Wait(client_lock);
-  }
+  mount_cond.wait(lock, [this] {
+    if (unsafe_sync_write > 0) {
+      ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"
+                   << dendl;
+    }
+    return unsafe_sync_write <= 0;
+  });
 
   if (cct->_conf->client_oc) {
     // flush/release all buffered data
@@ -6114,9 +6246,8 @@ void Client::_unmount(bool abort)
             << "+" << inode_map.size() << " items"
            << ", waiting (for caps to release?)"
             << dendl;
-    utime_t until = ceph_clock_now() + utime_t(5, 0);
-    int r = mount_cond.WaitUntil(client_lock, until);
-    if (r == ETIMEDOUT) {
+    if (auto r = mount_cond.wait_for(lock, ceph::make_timespan(5));
+       r == std::cv_status::timeout) {
       dump_cache(NULL);
     }
   }
@@ -6133,6 +6264,7 @@ void Client::_unmount(bool abort)
 
   mounted = false;
 
+  lock.release();
   ldout(cct, 2) << "unmounted." << dendl;
 }
 
@@ -6176,9 +6308,9 @@ void Client::tick()
   ldout(cct, 21) << "tick" << dendl;
   tick_event = timer.add_event_after(
     cct->_conf->client_tick_interval,
-    new FunctionContext([this](int) {
+    new LambdaContext([this](int) {
        // Called back via Timer, which takes client_lock for us
-       ceph_assert(client_lock.is_locked_by_me());
+       ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
        tick();
       }));
   utime_t now = ceph_clock_now();
@@ -6189,7 +6321,7 @@ void Client::tick()
       req->abort(-ETIMEDOUT);
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       signal_cond_list(waiting_for_mdsmap);
       for (auto &p : mds_sessions) {
@@ -6219,6 +6351,16 @@ void Client::tick()
   }
 
   trim_cache(true);
+
+  if (blacklisted && mounted &&
+      last_auto_reconnect + 30 * 60 < now &&
+      cct->_conf.get_val<bool>("client_reconnect_stale")) {
+    messenger->client_reset();
+    fd_gen++; // invalidate open files
+    blacklisted = false;
+    _kick_stale_sessions();
+    last_auto_reconnect = now;
+  }
 }
 
 void Client::renew_caps()
@@ -6238,7 +6380,7 @@ void Client::renew_caps(MetaSession *session)
   ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl;
   session->last_cap_renew_request = ceph_clock_now();
   uint64_t seq = ++session->cap_renew_seq;
-  session->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
+  session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
 }
 
 
@@ -6271,6 +6413,8 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
 {
   int r = 0;
   Dentry *dn = NULL;
+  // can only request shared caps
+  mask &= CEPH_CAP_ANY_SHARED | CEPH_STAT_RSTAT;
 
   if (dname == "..") {
     if (dir->dentries.empty()) {
@@ -6282,9 +6426,7 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
       int r = make_request(req, perms, &tmptarget, NULL, rand() % mdsmap->get_num_in_mds());
 
       if (r == 0) {
-       Inode *tempino = tmptarget.get();
-       _ll_get(tempino);
-       *target = tempino;
+       *target = std::move(tmptarget);
        ldout(cct, 8) << __func__ << " found target " << (*target)->ino << dendl;
       } else {
        *target = dir;
@@ -6341,7 +6483,7 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
        ldout(cct, 20) << " bad lease, cap_ttl " << s.cap_ttl << ", cap_gen " << s.cap_gen
                       << " vs lease_gen " << dn->lease_gen << dendl;
       }
-      // dir lease?
+      // dir shared caps?
       if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED, true)) {
        if (dn->cap_shared_gen == dir->shared_gen &&
            (!dn->inode || dn->inode->caps_issued_mask(mask, true)))
@@ -7206,7 +7348,7 @@ unsigned Client::statx_to_mask(unsigned int flags, unsigned int want)
     mask |= CEPH_CAP_AUTH_SHARED;
   if (want & (CEPH_STATX_NLINK|CEPH_STATX_CTIME|CEPH_STATX_VERSION))
     mask |= CEPH_CAP_LINK_SHARED;
-  if (want & (CEPH_STATX_ATIME|CEPH_STATX_MTIME|CEPH_STATX_CTIME|CEPH_STATX_SIZE|CEPH_STATX_BLOCKS|CEPH_STATX_VERSION))
+  if (want & (CEPH_STATX_NLINK|CEPH_STATX_ATIME|CEPH_STATX_MTIME|CEPH_STATX_CTIME|CEPH_STATX_SIZE|CEPH_STATX_BLOCKS|CEPH_STATX_VERSION))
     mask |= CEPH_CAP_FILE_SHARED;
   if (want & (CEPH_STATX_VERSION|CEPH_STATX_CTIME))
     mask |= CEPH_CAP_XATTR_SHARED;
@@ -7973,7 +8115,7 @@ struct dentry_off_lt {
 int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
                              int caps, bool getref)
 {
-  ceph_assert(client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(client_lock));
   ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
           << " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
           << dendl;
@@ -7991,6 +8133,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
 
   string dn_name;
   while (true) {
+    int mask = caps;
     if (!dirp->inode->is_complete_and_ordered())
       return -EAGAIN;
     if (pd == dir->readdir_cache.end())
@@ -8007,9 +8150,18 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
       continue;
     }
 
-    int r = _getattr(dn->inode, caps, dirp->perms);
+    int idx = pd - dir->readdir_cache.begin();
+    if (dn->inode->is_dir()) {
+      mask |= CEPH_STAT_RSTAT;
+    }
+    int r = _getattr(dn->inode, mask, dirp->perms);
     if (r < 0)
       return r;
+    
+    // the content of readdir_cache may change after _getattr(), so pd may be invalid iterator    
+    pd = dir->readdir_cache.begin() + idx;
+    if (pd >= dir->readdir_cache.end() || *pd != dn)
+      return -EAGAIN;
 
     struct ceph_statx stx;
     struct dirent de;
@@ -8029,9 +8181,9 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
 
     dn_name = dn->name; // fill in name while we have lock
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, in);  // _next_ offset
-    client_lock.Lock();
+    client_lock.lock();
     ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
                   << " = " << r << dendl;
     if (r < 0) {
@@ -8086,7 +8238,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
     uint64_t next_off = 1;
 
     int r;
-    r = _getattr(diri, caps, dirp->perms);
+    r = _getattr(diri, caps | CEPH_STAT_RSTAT, dirp->perms);
     if (r < 0)
       return r;
 
@@ -8099,9 +8251,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       _ll_get(inode);
     }
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, inode);
-    client_lock.Lock();
+    client_lock.lock();
     if (r < 0)
       return r;
 
@@ -8119,7 +8271,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       in = diri->get_first_parent()->dir->parent_inode;
 
     int r;
-    r = _getattr(in, caps, dirp->perms);
+    r = _getattr(in, caps | CEPH_STAT_RSTAT, dirp->perms);
     if (r < 0)
       return r;
 
@@ -8132,9 +8284,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       _ll_get(inode);
     }
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, inode);
-    client_lock.Lock();
+    client_lock.lock();
     if (r < 0)
       return r;
 
@@ -8185,7 +8337,11 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
 
       int r;
       if (check_caps) {
-       r = _getattr(entry.inode, caps, dirp->perms);
+       int mask = caps;
+       if(entry.inode->is_dir()){
+          mask |= CEPH_STAT_RSTAT;
+       }
+       r = _getattr(entry.inode, mask, dirp->perms);
        if (r < 0)
          return r;
       }
@@ -8199,9 +8355,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
        _ll_get(inode);
       }
 
-      client_lock.Unlock();
+      client_lock.unlock();
       r = cb(p, &de, &stx, next_off, inode);  // _next_ offset
-      client_lock.Lock();
+      client_lock.lock();
 
       ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
                     << " = " << r << dendl;
@@ -8289,7 +8445,7 @@ static int _readdir_single_dirent_cb(void *p, struct dirent *de,
 struct dirent *Client::readdir(dir_result_t *d)
 {
   int ret;
-  static struct dirent de;
+  auto& de = d->de;
   single_readdir sr;
   sr.de = &de;
   sr.stx = NULL;
@@ -8435,11 +8591,13 @@ int Client::open(const char *relpath, int flags, const UserPerm& perms,
                 mode_t mode, int stripe_unit, int stripe_count,
                 int object_size, const char *data_pool)
 {
-  ldout(cct, 3) << "open enter(" << relpath << ", " << ceph_flags_sys2wire(flags) << "," << mode << ")" << dendl;
+  int cflags = ceph_flags_sys2wire(flags);
+
+  ldout(cct, 3) << "open enter(" << relpath << ", " << cflags << "," << mode << ")" << dendl;
   std::lock_guard lock(client_lock);
   tout(cct) << "open" << std::endl;
   tout(cct) << relpath << std::endl;
-  tout(cct) << ceph_flags_sys2wire(flags) << std::endl;
+  tout(cct) << cflags << std::endl;
 
   if (unmounting)
     return -ENOTCONN;
@@ -8459,7 +8617,9 @@ int Client::open(const char *relpath, int flags, const UserPerm& perms,
   bool created = false;
   /* O_CREATE with O_EXCL enforces O_NOFOLLOW. */
   bool followsym = !((flags & O_NOFOLLOW) || ((flags & O_CREAT) && (flags & O_EXCL)));
-  int r = path_walk(path, &in, perms, followsym, ceph_caps_for_mode(mode));
+  int mask = ceph_caps_for_mode(ceph_flags_to_mode(cflags));
+
+  int r = path_walk(path, &in, perms, followsym, mask);
 
   if (r == 0 && (flags & O_CREAT) && (flags & O_EXCL))
     return -EEXIST;
@@ -8512,7 +8672,7 @@ int Client::open(const char *relpath, int flags, const UserPerm& perms,
   
  out:
   tout(cct) << r << std::endl;
-  ldout(cct, 3) << "open exit(" << path << ", " << ceph_flags_sys2wire(flags) << ") = " << r << dendl;
+  ldout(cct, 3) << "open exit(" << path << ", " << cflags << ") = " << r << dendl;
   return r;
 }
 
@@ -8556,33 +8716,44 @@ int Client::lookup_hash(inodeno_t ino, inodeno_t dirino, const char *name,
  * the resulting Inode object in one operation, so that caller
  * can safely assume inode will still be there after return.
  */
-int Client::_lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
+int Client::_lookup_vino(vinodeno_t vino, const UserPerm& perms, Inode **inode)
 {
-  ldout(cct, 8) << __func__ << " enter(" << ino << ")" << dendl;
+  ldout(cct, 8) << __func__ << " enter(" << vino << ")" << dendl;
 
   if (unmounting)
     return -ENOTCONN;
 
+  if (is_reserved_vino(vino))
+    return -ESTALE;
+
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPINO);
-  filepath path(ino);
+  filepath path(vino.ino);
   req->set_filepath(path);
 
+  /*
+   * The MDS expects either a "real" snapid here or 0. The special value
+   * carveouts for the snapid are all at the end of the range so we can
+   * just look for any snapid below this value.
+   */
+  if (vino.snapid < CEPH_NOSNAP)
+    req->head.args.lookupino.snapid = vino.snapid;
+
   int r = make_request(req, perms, NULL, NULL, rand() % mdsmap->get_num_in_mds());
   if (r == 0 && inode != NULL) {
-    vinodeno_t vino(ino, CEPH_NOSNAP);
     unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
     ceph_assert(p != inode_map.end());
     *inode = p->second;
     _ll_get(*inode);
   }
-  ldout(cct, 8) << __func__ << " exit(" << ino << ") = " << r << dendl;
+  ldout(cct, 8) << __func__ << " exit(" << vino << ") = " << r << dendl;
   return r;
 }
 
 int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
 {
+  vinodeno_t vino(ino, CEPH_NOSNAP);
   std::lock_guard lock(client_lock);
-  return _lookup_ino(ino, perms, inode);
+  return _lookup_vino(vino, perms, inode);
 }
 
 /**
@@ -8645,7 +8816,7 @@ int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
 Fh *Client::_create_fh(Inode *in, int flags, int cmode, const UserPerm& perms)
 {
   ceph_assert(in);
-  Fh *f = new Fh(in, flags, cmode, perms);
+  Fh *f = new Fh(in, flags, cmode, fd_gen, perms);
 
   ldout(cct, 10) << __func__ << " " << in->ino << " mode " << cmode << dendl;
 
@@ -8773,12 +8944,12 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp,
       if (cmode & CEPH_FILE_MODE_RD)
         need |= CEPH_CAP_FILE_RD;
 
-      result = get_caps(in, need, want, &have, -1);
+      Fh fh(in, flags, cmode, fd_gen, perms);
+      result = get_caps(&fh, need, want, &have, -1);
       if (result < 0) {
        ldout(cct, 8) << "Unable to get caps after open of inode " << *in <<
                          " . Denying open: " <<
                          cpp_strerror(result) << dendl;
-       in->put_open_ref(cmode);
       } else {
        put_cap_ref(in, need);
       }
@@ -8884,27 +9055,65 @@ loff_t Client::lseek(int fd, loff_t offset, int whence)
 loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
 {
   Inode *in = f->inode.get();
-  int r;
+  bool whence_check = false;
   loff_t pos = -1;
 
+  switch (whence) {
+  case SEEK_END:
+    whence_check = true;
+  break;
+
+#ifdef SEEK_DATA
+  case SEEK_DATA:
+    whence_check = true;
+  break;
+#endif
+
+#ifdef SEEK_HOLE
+  case SEEK_HOLE:
+    whence_check = true;
+  break;
+#endif
+  }
+
+  if (whence_check) {
+    int r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+    if (r < 0)
+      return r;
+  }
+
   switch (whence) {
   case SEEK_SET:
     pos = offset;
     break;
 
   case SEEK_CUR:
-    pos += offset;
+    pos = f->pos + offset;
     break;
 
   case SEEK_END:
-    r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
-    if (r < 0)
-      return r;
     pos = in->size + offset;
     break;
 
+#ifdef SEEK_DATA
+  case SEEK_DATA:
+    if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+      return -ENXIO;
+    pos = offset;
+    break;
+#endif
+
+#ifdef SEEK_HOLE
+  case SEEK_HOLE:
+    if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+      return -ENXIO;
+    pos = in->size;
+    break;
+#endif
+
   default:
-    ceph_abort();
+    ldout(cct, 1) << __func__ << ": invalid whence value " << whence << dendl;
+    return -EINVAL;
   }
 
   if (pos < 0) {
@@ -8923,11 +9132,14 @@ void Client::lock_fh_pos(Fh *f)
   ldout(cct, 10) << __func__ << " " << f << dendl;
 
   if (f->pos_locked || !f->pos_waiters.empty()) {
-    Cond cond;
+    ceph::condition_variable cond;
     f->pos_waiters.push_back(&cond);
     ldout(cct, 10) << __func__ << " BLOCKING on " << f << dendl;
-    while (f->pos_locked || f->pos_waiters.front() != &cond)
-      cond.Wait(client_lock);
+    std::unique_lock l{client_lock, std::adopt_lock};
+    cond.wait(l, [f, me=&cond] {
+      return !f->pos_locked && f->pos_waiters.front() == me;
+    });
+    l.release();
     ldout(cct, 10) << __func__ << " UNBLOCKING on " << f << dendl;
     ceph_assert(f->pos_waiters.front() == &cond);
     f->pos_waiters.pop_front();
@@ -8938,8 +9150,15 @@ void Client::lock_fh_pos(Fh *f)
 
 void Client::unlock_fh_pos(Fh *f)
 {
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
+
   ldout(cct, 10) << __func__ << " " << f << dendl;
   f->pos_locked = false;
+  if (!f->pos_waiters.empty()) {
+    // only wake up the oldest waiter
+    auto cond = f->pos_waiters.front();
+    cond->notify_one();
+  }
 }
 
 int Client::uninline_data(Inode *in, Context *onfinish)
@@ -8993,7 +9212,7 @@ int Client::uninline_data(Inode *in, Context *onfinish)
 
 int Client::read(int fd, char *buf, loff_t size, loff_t offset)
 {
-  std::lock_guard lock(client_lock);
+  std::unique_lock lock(client_lock);
   tout(cct) << "read" << std::endl;
   tout(cct) << fd << std::endl;
   tout(cct) << size << std::endl;
@@ -9015,7 +9234,8 @@ int Client::read(int fd, char *buf, loff_t size, loff_t offset)
   int r = _read(f, offset, size, &bl);
   ldout(cct, 3) << "read(" << fd << ", " << (void*)buf << ", " << size << ", " << offset << ") = " << r << dendl;
   if (r >= 0) {
-    bl.copy(0, bl.length(), buf);
+    lock.unlock();
+    bl.begin().copy(bl.length(), buf);
     r = bl.length();
   }
   return r;
@@ -9033,7 +9253,7 @@ int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
   int want, have = 0;
   bool movepos = false;
   std::unique_ptr<C_SaferCond> onuninline;
-  int64_t r = 0;
+  int64_t rc = 0;
   const auto& conf = cct->_conf;
   Inode *in = f->inode.get();
   utime_t lat;
@@ -9051,8 +9271,9 @@ int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
   loff_t start_pos = offset;
 
   if (in->inline_version == 0) {
-    r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true);
+    auto r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true);
     if (r < 0) {
+      rc = r;
       goto done;
     }
     ceph_assert(in->inline_version > 0);
@@ -9063,9 +9284,12 @@ retry:
     want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
   else
     want = CEPH_CAP_FILE_CACHE;
-  r = get_caps(in, CEPH_CAP_FILE_RD, want, &have, -1);
-  if (r < 0) {
-    goto done;
+  {
+    auto r = get_caps(f, CEPH_CAP_FILE_RD, want, &have, -1);
+    if (r < 0) {
+      rc = r;
+      goto done;
+    }
   }
   if (f->flags & O_DIRECT)
     have &= ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO);
@@ -9087,12 +9311,12 @@ retry:
           bl->substr_of(in->inline_data, offset, len - offset);
           bl->append_zero(endoff - len);
         }
-        r = endoff - offset;
+        rc = endoff - offset;
       } else if ((uint64_t)offset < endoff) {
         bl->append_zero(endoff - offset);
-        r = endoff - offset;
+        rc = endoff - offset;
       } else {
-        r = 0;
+        rc = 0;
       }
       goto success;
     }
@@ -9105,27 +9329,31 @@ retry:
     if (f->flags & O_RSYNC) {
       _flush_range(in, offset, size);
     }
-    r = _read_async(f, offset, size, bl);
-    if (r < 0)
+    rc = _read_async(f, offset, size, bl);
+    if (rc < 0)
       goto done;
   } else {
     if (f->flags & O_DIRECT)
       _flush_range(in, offset, size);
 
     bool checkeof = false;
-    r = _read_sync(f, offset, size, bl, &checkeof);
-    if (r < 0)
+    rc = _read_sync(f, offset, size, bl, &checkeof);
+    if (rc < 0)
       goto done;
     if (checkeof) {
-      offset += r;
-      size -= r;
+      offset += rc;
+      size -= rc;
 
       put_cap_ref(in, CEPH_CAP_FILE_RD);
       have = 0;
       // reverify size
-      r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
-      if (r < 0)
-       goto done;
+      {
+        auto r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+        if (r < 0) {
+          rc = r;
+          goto done;
+        }
+      }
 
       // eof?  short read.
       if ((uint64_t)offset < in->size)
@@ -9134,10 +9362,10 @@ retry:
   }
 
 success:
-  ceph_assert(r >= 0);
+  ceph_assert(rc >= 0);
   if (movepos) {
     // adjust fd pos
-    f->pos = start_pos + r;
+    f->pos = start_pos + rc;
   }
   
   lat = ceph_clock_now();
@@ -9148,16 +9376,16 @@ done:
   // done!
   
   if (onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
     if (ret >= 0 || ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
       in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
-      r = ret;
+      rc = ret;
   }
   if (have) {
     put_cap_ref(in, CEPH_CAP_FILE_RD);
@@ -9165,7 +9393,7 @@ done:
   if (movepos) {
     unlock_fh_pos(f);
   }
-  return r;
+  return rc;
 }
 
 Client::C_Readahead::C_Readahead(Client *c, Fh *f) :
@@ -9211,9 +9439,9 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
                              off, len, bl, 0, &onfinish);
   if (r == 0) {
     get_cap_ref(in, CEPH_CAP_FILE_CACHE);
-    client_lock.Unlock();
+    client_lock.unlock();
     r = onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
     put_cap_ref(in, CEPH_CAP_FILE_CACHE);
   }
 
@@ -9249,8 +9477,6 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
 
   ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;
 
-  Mutex flock("Client::_read_sync flock");
-  Cond cond;
   while (left > 0) {
     C_SaferCond onfinish("Client::_read_sync flock");
     bufferlist tbl;
@@ -9260,9 +9486,9 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
                      pos, left, &tbl, 0,
                      in->truncate_size, in->truncate_seq,
                      &onfinish);
-    client_lock.Unlock();
+    client_lock.unlock();
     int r = onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     // if we get ENOENT from OSD, assume 0 bytes returned
     if (r == -ENOENT)
@@ -9316,7 +9542,7 @@ void Client::_sync_write_commit(Inode *in)
   ldout(cct, 15) << __func__ << " unsafe_sync_write = " << unsafe_sync_write << dendl;
   if (unsafe_sync_write == 0 && unmounting) {
     ldout(cct, 10) << __func__ << " -- no more unsafe writes, unmount can proceed" << dendl;
-    mount_cond.Signal();
+    mount_cond.notify_all();
   }
 }
 
@@ -9384,20 +9610,16 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
         if (r <= 0)
           return r;
 
-        int bufoff = 0;
+        auto iter = bl.cbegin();
         for (unsigned j = 0, resid = r; j < iovcnt && resid > 0; j++) {
                /*
                 * This piece of code aims to handle the case that bufferlist does not have enough data 
                 * to fill in the iov 
                 */
-               if (resid < iov[j].iov_len) {
-                    bl.copy(bufoff, resid, (char *)iov[j].iov_base);
-                    break;
-               } else {
-                    bl.copy(bufoff, iov[j].iov_len, (char *)iov[j].iov_base);
-               }
-               resid -= iov[j].iov_len;
-               bufoff += iov[j].iov_len;
+               const auto round_size = std::min<unsigned>(resid, iov[j].iov_len);
+               iter.copy(round_size, reinterpret_cast<char*>(iov[j].iov_base));
+               resid -= round_size;
+               /* iter is self-updating */
         }
         return r;  
     }
@@ -9447,7 +9669,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
      * change out from under us.
      */
     if (f->flags & O_APPEND) {
-      int r = _lseek(f, 0, SEEK_END);
+      auto r = _lseek(f, 0, SEEK_END);
       if (r < 0) {
         unlock_fh_pos(f);
         return r;
@@ -9499,7 +9721,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
     want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
   else
     want = CEPH_CAP_FILE_BUFFER;
-  int r = get_caps(in, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
+  int r = get_caps(f, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
   if (r < 0)
     return r;
 
@@ -9534,7 +9756,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
       uint32_t len = in->inline_data.length();
 
       if (endoff < len)
-        in->inline_data.copy(endoff, len - endoff, bl);
+        in->inline_data.begin(endoff).copy(len - endoff, bl); // XXX
 
       if (offset < len)
         in->inline_data.splice(offset, len - offset);
@@ -9587,10 +9809,12 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
                       offset, size, bl, ceph::real_clock::now(), 0,
                       in->truncate_size, in->truncate_seq,
                       &onfinish);
-    client_lock.Unlock();
-    onfinish.wait();
-    client_lock.Lock();
+    client_lock.unlock();
+    r = onfinish.wait();
+    client_lock.lock();
     _sync_write_commit(in);
+    if (r < 0)
+      goto done;
   }
 
   // if we get here, write was successful, update client metadata
@@ -9632,9 +9856,9 @@ success:
 done:
 
   if (nullptr != onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int uninline_ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
@@ -9687,6 +9911,8 @@ int Client::ftruncate(int fd, loff_t length, const UserPerm& perms)
   if (f->flags & O_PATH)
     return -EBADF;
 #endif
+  if ((f->mode & CEPH_FILE_MODE_WR) == 0)
+    return -EBADF;
   struct stat attr;
   attr.st_size = length;
   return _setattr(f->inode, &attr, CEPH_SETATTR_SIZE, perms);
@@ -9764,10 +9990,10 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   }
 
   if (nullptr != object_cacher_completion) { // wait on a real reply instead of guessing
-    client_lock.Unlock();
+    client_lock.unlock();
     ldout(cct, 15) << "waiting on data to flush" << dendl;
     r = object_cacher_completion->wait();
-    client_lock.Lock();
+    client_lock.lock();
     ldout(cct, 15) << "got " << r << " from flush writeback" << dendl;
   } else {
     // FIXME: this can starve
@@ -9868,6 +10094,10 @@ int Client::chdir(const char *relpath, std::string &new_cwd,
   int r = path_walk(path, &in, perms);
   if (r < 0)
     return r;
+
+  if (!(in.get()->is_dir()))
+    return -ENOTDIR;
+
   if (cwd != in)
     cwd.swap(in);
   ldout(cct, 3) << "chdir(" << relpath << ")  cwd now " << cwd->ino << dendl;
@@ -9943,11 +10173,11 @@ int Client::statfs(const char *path, struct statvfs *stbuf,
     objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
   }
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int rval = cond.wait();
   assert(root);
   total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
-  client_lock.Lock();
+  client_lock.lock();
 
   if (rval < 0) {
     ldout(cct, 1) << "underlying call to statfs returned error: "
@@ -10033,6 +10263,9 @@ int Client::_do_filelock(Inode *in, Fh *fh, int lock_type, int op, int sleep,
                 << " type " << fl->l_type << " owner " << owner
                 << " " << fl->l_start << "~" << fl->l_len << dendl;
 
+  if (in->flags & I_ERROR_FILELOCK)
+    return -EIO;
+
   int lock_cmd;
   if (F_RDLCK == fl->l_type)
     lock_cmd = CEPH_LOCK_SHARED;
@@ -10206,30 +10439,39 @@ void Client::_release_filelocks(Fh *fh)
   Inode *in = fh->inode.get();
   ldout(cct, 10) << __func__ << " " << fh << " ino " << in->ino << dendl;
 
+  list<ceph_filelock> activated_locks;
+
   list<pair<int, ceph_filelock> > to_release;
 
   if (fh->fcntl_locks) {
     auto &lock_state = fh->fcntl_locks;
-    for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
-       p != lock_state->held_locks.end();
-       ++p)
-      to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, p->second));
+    for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
+      auto q = p++;
+      if (in->flags & I_ERROR_FILELOCK) {
+       lock_state->remove_lock(q->second, activated_locks);
+      } else {
+       to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, q->second));
+      }
+    }
     lock_state.reset();
   }
   if (fh->flock_locks) {
     auto &lock_state = fh->flock_locks;
-    for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
-       p != lock_state->held_locks.end();
-       ++p)
-      to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, p->second));
+    for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
+      auto q = p++;
+      if (in->flags & I_ERROR_FILELOCK) {
+       lock_state->remove_lock(q->second, activated_locks);
+      } else {
+       to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, q->second));
+      }
+    }
     lock_state.reset();
   }
 
-  if (to_release.empty())
-    return;
+  if ((in->flags & I_ERROR_FILELOCK) && !in->has_any_filelocks())
+    in->flags &= ~I_ERROR_FILELOCK;
 
-  // mds has already released filelocks if session was closed.
-  if (in->caps.empty())
+  if (to_release.empty())
     return;
 
   struct flock fl;
@@ -10335,7 +10577,7 @@ int Client::ll_statfs(Inode *in, struct statvfs *stbuf, const UserPerm& perms)
   return statfs(0, stbuf, perms);
 }
 
-void Client::ll_register_callbacks(struct client_callback_args *args)
+void Client::ll_register_callbacks(struct ceph_client_callback_args *args)
 {
   if (!args)
     return;
@@ -10363,7 +10605,12 @@ void Client::ll_register_callbacks(struct client_callback_args *args)
     remount_cb = args->remount_cb;
     remount_finisher.start();
   }
-  umask_cb = args->umask_cb;
+  if (args->ino_release_cb) {
+    ino_release_cb = args->ino_release_cb;
+    async_ino_releasor.start();
+  }
+  if (args->umask_cb)
+    umask_cb = args->umask_cb;
 }
 
 int Client::test_dentry_handling(bool can_invalidate)
@@ -10406,11 +10653,11 @@ int Client::_sync_fs()
   wait_sync_caps(flush_tid);
 
   if (nullptr != cond) {
-    client_lock.Unlock();
+    client_lock.unlock();
     ldout(cct, 15) << __func__ << " waiting on data to flush" << dendl;
     cond->wait();
     ldout(cct, 15) << __func__ << " flush finished" << dendl;
-    client_lock.Lock();
+    client_lock.lock();
   }
 
   return 0;
@@ -10475,10 +10722,10 @@ int Client::ll_lazyio(Fh *fh, int enable)
   return _lazyio(fh, enable);
 }
 
-int Client::lazyio_propogate(int fd, loff_t offset, size_t count)
+int Client::lazyio_propagate(int fd, loff_t offset, size_t count)
 {
   std::lock_guard l(client_lock);
-  ldout(cct, 3) << "op: client->lazyio_propogate(" << fd
+  ldout(cct, 3) << "op: client->lazyio_propagate(" << fd
           << ", " << offset << ", " << count << ")" << dendl;
   
   Fh *f = get_filehandle(fd);
@@ -10503,8 +10750,11 @@ int Client::lazyio_synchronize(int fd, loff_t offset, size_t count)
   Inode *in = f->inode.get();
   
   _fsync(f, true);
-  if (_release(in))
-    check_caps(in, 0);
+  if (_release(in)) {
+    int r =_getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+    if (r < 0) 
+      return r;
+  }
   return 0;
 }
 
@@ -10605,6 +10855,7 @@ Inode *Client::open_snapdir(Inode *diri)
     in->mtime = diri->mtime;
     in->ctime = diri->ctime;
     in->btime = diri->btime;
+    in->atime = diri->atime;
     in->size = diri->size;
     in->change_attr = diri->change_attr;
 
@@ -10635,8 +10886,6 @@ int Client::ll_lookup(Inode *parent, const char *name, struct stat *attr,
     return -ENOTCONN;
 
   int r = 0;
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     if (strcmp(name, ".") && strcmp(name, "..")) {
       r = may_lookup(parent, perms);
@@ -10666,58 +10915,61 @@ int Client::ll_lookup(Inode *parent, const char *name, struct stat *attr,
   return r;
 }
 
-int Client::ll_lookup_inode(
-    struct inodeno_t ino,
+int Client::ll_lookup_vino(
+    vinodeno_t vino,
     const UserPerm& perms,
     Inode **inode)
 {
   ceph_assert(inode != NULL);
-  std::lock_guard lock(client_lock);
-  ldout(cct, 3) << "ll_lookup_inode " << ino  << dendl;
-   
+
   if (unmounting)
     return -ENOTCONN;
 
-  // Num1: get inode and *inode
-  int r = _lookup_ino(ino, perms, inode);
-  if (r)
-    return r;
-
-  ceph_assert(*inode != NULL);
+  if (is_reserved_vino(vino))
+    return -ESTALE;
 
-  if (!(*inode)->dentries.empty()) {
-    ldout(cct, 8) << __func__ << " dentry already present" << dendl;
+  std::lock_guard lock(client_lock);
+  ldout(cct, 3) << __func__ << vino << dendl;
+   
+  // Check the cache first
+  unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
+  if (p != inode_map.end()) {
+    *inode = p->second;
+    _ll_get(*inode);
     return 0;
   }
 
-  if ((*inode)->is_root()) {
-    ldout(cct, 8) << "ino is root, no parent" << dendl;
-    return 0;
-  }
+  uint64_t snapid = vino.snapid;
 
-  // Num2: Request the parent inode, so that we can look up the name
-  Inode *parent;
-  r = _lookup_parent(*inode, perms, &parent);
-  if (r) {
-    _ll_forget(*inode, 1);  
+  // for snapdir, find the non-snapped dir inode
+  if (snapid == CEPH_SNAPDIR)
+    vino.snapid = CEPH_NOSNAP;
+
+  int r = _lookup_vino(vino, perms, inode);
+  if (r)
     return r;
-  }
+  ceph_assert(*inode != NULL);
 
-  ceph_assert(parent != NULL);
+  if (snapid == CEPH_SNAPDIR) {
+    Inode *tmp = *inode;
 
-  // Num3: Finally, get the name (dentry) of the requested inode
-  r = _lookup_name(*inode, parent, perms);
-  if (r) {
-    // Unexpected error
-    _ll_forget(parent, 1);
-    _ll_forget(*inode, 1);
-    return r;
+    // open the snapdir and put the inode ref
+    *inode = open_snapdir(tmp);
+    _ll_forget(tmp, 1);
+    _ll_get(*inode);
   }
-
-  _ll_forget(parent, 1);
   return 0;
 }
 
+int Client::ll_lookup_inode(
+    struct inodeno_t ino,
+    const UserPerm& perms,
+    Inode **inode)
+{
+  vinodeno_t vino(ino, CEPH_NOSNAP);
+  return ll_lookup_vino(vino, perms, inode);
+}
+
 int Client::ll_lookupx(Inode *parent, const char *name, Inode **out,
                       struct ceph_statx *stx, unsigned want, unsigned flags,
                       const UserPerm& perms)
@@ -10732,8 +10984,6 @@ int Client::ll_lookupx(Inode *parent, const char *name, Inode **out,
     return -ENOTCONN;
 
   int r = 0;
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     r = may_lookup(parent, perms);
     if (r < 0)
@@ -10929,6 +11179,9 @@ Inode *Client::ll_get_inode(vinodeno_t vino)
   if (unmounting)
     return NULL;
 
+  if (is_reserved_vino(vino))
+    return NULL;
+
   unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
   if (p == inode_map.end())
     return NULL;
@@ -11004,8 +11257,6 @@ int Client::_ll_setattrx(Inode *in, struct ceph_statx *stx, int mask,
   tout(cct) << stx->stx_btime << std::endl;
   tout(cct) << mask << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int res = may_setattr(in, stx, mask, perms);
     if (res < 0)
@@ -11257,6 +11508,9 @@ int Client::_getxattr(Inode *in, const char *name, void *value, size_t size,
     if (vxattr->flags & VXATTR_RSTAT) {
       flags |= CEPH_STAT_RSTAT;
     }
+    if (vxattr->flags & VXATTR_DIRSTAT) {
+      flags |= CEPH_CAP_FILE_SHARED;
+    }
     r = _getattr(in, flags, perms, true);
     if (r != 0) {
       // Error from getattr!
@@ -11331,8 +11585,6 @@ int Client::ll_getxattr(Inode *in, const char *name, void *value,
   tout(cct) << vino.ino.val << std::endl;
   tout(cct) << name << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = xattr_permission(in, name, MAY_READ, perms);
     if (r < 0)
@@ -11367,29 +11619,6 @@ int Client::_listxattr(Inode *in, char *name, size_t size,
     name += this_len;
     size -= this_len;
   }
-
-  const VXattr *vxattr;
-  for (vxattr = _get_vxattrs(in); vxattr && !vxattr->name.empty(); vxattr++) {
-    if (vxattr->hidden)
-      continue;
-    // call pointer-to-member function
-    if (vxattr->exists_cb && !(this->*(vxattr->exists_cb))(in))
-      continue;
-
-    size_t this_len = vxattr->name.length() + 1;
-    r += this_len;
-    if (len_only)
-      continue;
-
-    if (this_len > size) {
-      r = -ERANGE;
-      goto out;
-    }
-
-    memcpy(name, vxattr->name.c_str(), this_len);
-    name += this_len;
-    size -= this_len;
-  }
 out:
   ldout(cct, 8) << __func__ << "(" << in->ino << ", " << size << ") = " << r << dendl;
   return r;
@@ -11453,6 +11682,12 @@ int Client::_setxattr(Inode *in, const char *name, const void *value,
     return -EROFS;
   }
 
+  if (size == 0) {
+    value = "";
+  } else if (value == NULL) {
+      return -EINVAL;
+  }
+
   bool posix_acl_xattr = false;
   if (acl_type == POSIX_ACL)
     posix_acl_xattr = !strncmp(name, "system.", 7);
@@ -11610,8 +11845,6 @@ int Client::ll_setxattr(Inode *in, const char *name, const void *value,
   tout(cct) << vino.ino.val << std::endl;
   tout(cct) << name << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = xattr_permission(in, name, MAY_WRITE, perms);
     if (r < 0)
@@ -11676,8 +11909,6 @@ int Client::ll_removexattr(Inode *in, const char *name, const UserPerm& perms)
   tout(cct) << vino.ino.val << std::endl;
   tout(cct) << name << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = xattr_permission(in, name, MAY_WRITE, perms);
     if (r < 0)
@@ -11690,7 +11921,8 @@ int Client::ll_removexattr(Inode *in, const char *name, const UserPerm& perms)
 bool Client::_vxattrcb_quota_exists(Inode *in)
 {
   return in->quota.is_enable() &&
-        in->snaprealm && in->snaprealm->ino == in->ino;
+   (in->snapid != CEPH_NOSNAP ||
+    (in->snaprealm && in->snaprealm->ino == in->ino));
 }
 size_t Client::_vxattrcb_quota(Inode *in, char *val, size_t size)
 {
@@ -11814,24 +12046,25 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
       (long unsigned)in->snap_btime.nsec());
 }
 
+size_t Client::_vxattrcb_cluster_fsid(Inode *in, char *val, size_t size)
+{
+  return snprintf(val, size, "%s", monclient->get_fsid().to_string().c_str());
+}
+
+size_t Client::_vxattrcb_client_id(Inode *in, char *val, size_t size)
+{
+  auto name = messenger->get_myname();
+  return snprintf(val, size, "%s%ld", name.type_str(), name.num());
+}
+
 #define CEPH_XATTR_NAME(_type, _name) "ceph." #_type "." #_name
 #define CEPH_XATTR_NAME2(_type, _name, _name2) "ceph." #_type "." #_name "." #_name2
 
-#define XATTR_NAME_CEPH(_type, _name)                          \
-{                                                              \
-  name: CEPH_XATTR_NAME(_type, _name),                         \
-  getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
-  readonly: true,                                              \
-  hidden: false,                                               \
-  exists_cb: NULL,                                             \
-  flags: 0,                                                     \
-}
-#define XATTR_NAME_CEPH2(_type, _name, _flags)                 \
+#define XATTR_NAME_CEPH(_type, _name, _flags)                 \
 {                                                              \
   name: CEPH_XATTR_NAME(_type, _name),                         \
   getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
   readonly: true,                                              \
-  hidden: false,                                               \
   exists_cb: NULL,                                             \
   flags: _flags,                                               \
 }
@@ -11840,7 +12073,6 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
   name: CEPH_XATTR_NAME2(_type, _name, _field),                        \
   getxattr_cb: &Client::_vxattrcb_ ## _name ## _ ## _field,    \
   readonly: false,                                             \
-  hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_layout_exists,                 \
   flags: 0,                                                     \
 }
@@ -11849,7 +12081,6 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
   name: CEPH_XATTR_NAME(_type, _name),                         \
   getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
   readonly: false,                                             \
-  hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_quota_exists,                  \
   flags: 0,                                                     \
 }
@@ -11859,7 +12090,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.dir.layout",
     getxattr_cb: &Client::_vxattrcb_layout,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
     flags: 0,
   },
@@ -11868,19 +12098,18 @@ const Client::VXattr Client::_dir_vxattrs[] = {
   XATTR_LAYOUT_FIELD(dir, layout, object_size),
   XATTR_LAYOUT_FIELD(dir, layout, pool),
   XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
-  XATTR_NAME_CEPH(dir, entries),
-  XATTR_NAME_CEPH(dir, files),
-  XATTR_NAME_CEPH(dir, subdirs),
-  XATTR_NAME_CEPH2(dir, rentries, VXATTR_RSTAT),
-  XATTR_NAME_CEPH2(dir, rfiles, VXATTR_RSTAT),
-  XATTR_NAME_CEPH2(dir, rsubdirs, VXATTR_RSTAT),
-  XATTR_NAME_CEPH2(dir, rbytes, VXATTR_RSTAT),
-  XATTR_NAME_CEPH2(dir, rctime, VXATTR_RSTAT),
+  XATTR_NAME_CEPH(dir, entries, VXATTR_DIRSTAT),
+  XATTR_NAME_CEPH(dir, files, VXATTR_DIRSTAT),
+  XATTR_NAME_CEPH(dir, subdirs, VXATTR_DIRSTAT),
+  XATTR_NAME_CEPH(dir, rentries, VXATTR_RSTAT),
+  XATTR_NAME_CEPH(dir, rfiles, VXATTR_RSTAT),
+  XATTR_NAME_CEPH(dir, rsubdirs, VXATTR_RSTAT),
+  XATTR_NAME_CEPH(dir, rbytes, VXATTR_RSTAT),
+  XATTR_NAME_CEPH(dir, rctime, VXATTR_RSTAT),
   {
     name: "ceph.quota",
     getxattr_cb: &Client::_vxattrcb_quota,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_quota_exists,
     flags: 0,
   },
@@ -11890,7 +12119,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.dir.pin",
     getxattr_cb: &Client::_vxattrcb_dir_pin,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_dir_pin_exists,
     flags: 0,
   },
@@ -11898,7 +12126,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.snap.btime",
     getxattr_cb: &Client::_vxattrcb_snap_btime,
     readonly: true,
-    hidden: false,
     exists_cb: &Client::_vxattrcb_snap_btime_exists,
     flags: 0,
   },
@@ -11910,7 +12137,6 @@ const Client::VXattr Client::_file_vxattrs[] = {
     name: "ceph.file.layout",
     getxattr_cb: &Client::_vxattrcb_layout,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
     flags: 0,
   },
@@ -11923,13 +12149,30 @@ const Client::VXattr Client::_file_vxattrs[] = {
     name: "ceph.snap.btime",
     getxattr_cb: &Client::_vxattrcb_snap_btime,
     readonly: true,
-    hidden: false,
     exists_cb: &Client::_vxattrcb_snap_btime_exists,
     flags: 0,
   },
   { name: "" }     /* Required table terminator */
 };
 
+const Client::VXattr Client::_common_vxattrs[] = {
+  {
+    name: "ceph.cluster_fsid",
+    getxattr_cb: &Client::_vxattrcb_cluster_fsid,
+    readonly: true,
+    exists_cb: nullptr,
+    flags: 0,
+  },
+  {
+    name: "ceph.client_id",
+    getxattr_cb: &Client::_vxattrcb_client_id,
+    readonly: true,
+    exists_cb: nullptr,
+    flags: 0,
+  },
+  { name: "" }     /* Required table terminator */
+};
+
 const Client::VXattr *Client::_get_vxattrs(Inode *in)
 {
   if (in->is_dir())
@@ -11950,7 +12193,16 @@ const Client::VXattr *Client::_match_vxattr(Inode *in, const char *name)
        vxattr++;
       }
     }
+
+    // for common vxattrs
+    vxattr = _common_vxattrs;
+    while (!vxattr->name.empty()) {
+      if (vxattr->name == name)
+        return vxattr;
+      vxattr++;
+    }
   }
+
   return NULL;
 }
 
@@ -12048,8 +12300,6 @@ int Client::ll_mknod(Inode *parent, const char *name, mode_t mode,
   tout(cct) << mode << std::endl;
   tout(cct) << rdev << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_create(parent, perms);
     if (r < 0)
@@ -12089,8 +12339,6 @@ int Client::ll_mknodx(Inode *parent, const char *name, mode_t mode,
   tout(cct) << mode << std::endl;
   tout(cct) << rdev << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_create(parent, perms);
     if (r < 0)
@@ -12278,8 +12526,6 @@ int Client::ll_mkdir(Inode *parent, const char *name, mode_t mode,
   tout(cct) << name << std::endl;
   tout(cct) << mode << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_create(parent, perm);
     if (r < 0)
@@ -12316,8 +12562,6 @@ int Client::ll_mkdirx(Inode *parent, const char *name, mode_t mode, Inode **out,
   tout(cct) << name << std::endl;
   tout(cct) << mode << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_create(parent, perms);
     if (r < 0)
@@ -12403,8 +12647,6 @@ int Client::ll_symlink(Inode *parent, const char *name, const char *value,
   tout(cct) << name << std::endl;
   tout(cct) << value << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_create(parent, perms);
     if (r < 0)
@@ -12442,8 +12684,6 @@ int Client::ll_symlinkx(Inode *parent, const char *name, const char *value,
   tout(cct) << name << std::endl;
   tout(cct) << value << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_create(parent, perms);
     if (r < 0)
@@ -12527,8 +12767,6 @@ int Client::ll_unlink(Inode *in, const char *name, const UserPerm& perm)
   tout(cct) << vino.ino.val << std::endl;
   tout(cct) << name << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_delete(in, name, perm);
     if (r < 0)
@@ -12604,8 +12842,6 @@ int Client::ll_rmdir(Inode *in, const char *name, const UserPerm& perms)
   tout(cct) << vino.ino.val << std::endl;
   tout(cct) << name << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_delete(in, name, perms);
     if (r < 0)
@@ -12632,15 +12868,6 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     else
       return -EROFS;
   }
-  if (fromdir != todir) {
-    Inode *fromdir_root =
-      fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
-    Inode *todir_root =
-      todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
-    if (fromdir_root != todir_root) {
-      return -EXDEV;
-    }
-  }
 
   InodeRef target;
   MetaRequest *req = new MetaRequest(op);
@@ -12673,7 +12900,32 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     req->dentry_unless = CEPH_CAP_FILE_EXCL;
 
     InodeRef oldin, otherin;
-    res = _lookup(fromdir, fromname, 0, &oldin, perm);
+    Inode *fromdir_root = nullptr;
+    Inode *todir_root = nullptr;
+    int mask = 0;
+    bool quota_check = false;
+    if (fromdir != todir) {
+      fromdir_root =
+        fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
+      todir_root =
+        todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
+
+      if (todir_root->quota.is_enable() && fromdir_root != todir_root) {
+        // use CEPH_STAT_RSTAT mask to force send getattr or lookup request 
+        // to auth MDS to get latest rstat for todir_root and source dir 
+        // even if their dentry caches and inode caps are satisfied.
+        res = _getattr(todir_root, CEPH_STAT_RSTAT, perm, true);
+        if (res < 0)
+          goto fail;
+
+        quota_check = true;
+        if (oldde->inode && oldde->inode->is_dir()) {
+          mask |= CEPH_STAT_RSTAT;
+        }
+      }
+    }
+
+    res = _lookup(fromdir, fromname, mask, &oldin, perm);
     if (res < 0)
       goto fail;
 
@@ -12682,6 +12934,39 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     req->set_old_inode(oldinode);
     req->old_inode_drop = CEPH_CAP_LINK_SHARED;
 
+    if (quota_check) {
+      int64_t old_bytes, old_files;
+      if (oldinode->is_dir()) {
+        old_bytes = oldinode->rstat.rbytes;
+        old_files = oldinode->rstat.rsize();
+      } else {
+        old_bytes = oldinode->size;
+        old_files = 1;
+      }
+
+      bool quota_exceed = false;
+      if (todir_root && todir_root->quota.max_bytes &&
+          (old_bytes + todir_root->rstat.rbytes) >= todir_root->quota.max_bytes) {
+        ldout(cct, 10) << "_rename (" << oldinode->ino << " bytes="
+                       << old_bytes << ") to (" << todir->ino 
+                      << ") will exceed quota on " << *todir_root << dendl;
+        quota_exceed = true;
+      }
+
+      if (todir_root && todir_root->quota.max_files &&
+          (old_files + todir_root->rstat.rsize()) >= todir_root->quota.max_files) {
+        ldout(cct, 10) << "_rename (" << oldinode->ino << " files="
+                       << old_files << ") to (" << todir->ino 
+                       << ") will exceed quota on " << *todir_root << dendl;
+        quota_exceed = true;
+      }
+
+      if (quota_exceed) {
+        res = (oldinode->is_dir()) ? -EXDEV : -EDQUOT;
+        goto fail;
+      }
+    }
+
     res = _lookup(todir, toname, 0, &otherin, perm);
     switch (res) {
     case 0:
@@ -12741,8 +13026,6 @@ int Client::ll_rename(Inode *parent, const char *name, Inode *newparent,
   tout(cct) << vnewparent.ino.val << std::endl;
   tout(cct) << newname << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_delete(parent, name, perm);
     if (r < 0)
@@ -12820,8 +13103,6 @@ int Client::ll_link(Inode *in, Inode *newparent, const char *newname,
 
   InodeRef target;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     if (S_ISDIR(in->mode))
       return -EPERM;
@@ -12951,8 +13232,6 @@ int Client::ll_opendir(Inode *in, int flags, dir_result_t** dirpp,
   tout(cct) << "ll_opendir" << std::endl;
   tout(cct) << vino.ino.val << std::endl;
 
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     int r = may_open(in, flags, perms);
     if (r < 0)
@@ -13011,8 +13290,6 @@ int Client::ll_open(Inode *in, int flags, Fh **fhp, const UserPerm& perms)
   tout(cct) << ceph_flags_sys2wire(flags) << std::endl;
 
   int r;
-  auto fuse_default_permissions = cct->_conf.get_val<bool>(
-    "fuse_default_permissions");
   if (!fuse_default_permissions) {
     r = may_open(in, flags, perms);
     if (r < 0)
@@ -13056,8 +13333,6 @@ int Client::_ll_create(Inode *parent, const char *name, mode_t mode,
     return -EEXIST;
 
   if (r == -ENOENT && (flags & O_CREAT)) {
-    auto fuse_default_permissions = cct->_conf.get_val<bool>(
-      "fuse_default_permissions");
     if (!fuse_default_permissions) {
       r = may_create(parent, perms);
       if (r < 0)
@@ -13076,8 +13351,6 @@ int Client::_ll_create(Inode *parent, const char *name, mode_t mode,
 
   ldout(cct, 20) << "_ll_create created = " << created << dendl;
   if (!created) {
-    auto fuse_default_permissions = cct->_conf.get_val<bool>(
-      "fuse_default_permissions");
     if (!fuse_default_permissions) {
       r = may_open(in->get(), flags, perms);
       if (r < 0) {
@@ -13203,7 +13476,10 @@ int Client::ll_read(Fh *fh, loff_t off, loff_t len, bufferlist *bl)
 
   /* We can't return bytes written larger than INT_MAX, clamp len to that */
   len = std::min(len, (loff_t)INT_MAX);
-  return _read(fh, off, len, bl);
+  int r = _read(fh, off, len, bl);
+  ldout(cct, 3) << "ll_read " << fh << " " << off << "~" << len << " = " << r
+               << dendl;
+  return r;
 }
 
 int Client::ll_read_block(Inode *in, uint64_t blockid,
@@ -13231,12 +13507,12 @@ int Client::ll_read_block(Inode *in, uint64_t blockid,
                 CEPH_OSD_FLAG_READ,
                  &onfinish);
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int r = onfinish.wait();
-  client_lock.Lock();
+  client_lock.lock();
 
   if (r >= 0) {
-      bl.copy(0, bl.length(), buf);
+      bl.begin().copy(bl.length(), buf);
       r = bl.length();
   }
 
@@ -13276,9 +13552,9 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
   fakesnap.seq = snapseq;
 
   /* lock just in time */
-  client_lock.Lock();
+  client_lock.lock();
   if (unmounting) {
-    client_lock.Unlock();
+    client_lock.unlock();
     return -ENOTCONN;
   }
 
@@ -13292,7 +13568,7 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
                  0,
                  onsafe.get());
 
-  client_lock.Unlock();
+  client_lock.unlock();
   if (nullptr != onsafe) {
     r = onsafe->wait();
   }
@@ -13445,7 +13721,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
   }
 
   int have;
-  int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
+  int r = get_caps(fh, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
   if (r < 0)
     return r;
 
@@ -13454,17 +13730,20 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     if (in->inline_version < CEPH_INLINE_NONE &&
         (have & CEPH_CAP_FILE_BUFFER)) {
       bufferlist bl;
+      auto inline_iter = in->inline_data.cbegin();
       int len = in->inline_data.length();
       if (offset < len) {
         if (offset > 0)
-          in->inline_data.copy(0, offset, bl);
+          inline_iter.copy(offset, bl);
         int size = length;
         if (offset + size > len)
           size = len - offset;
         if (size > 0)
           bl.append_zero(size);
-        if (offset + size < len)
-          in->inline_data.copy(offset + size, len - offset - size, bl);
+        if (offset + size < len) {
+          inline_iter += size;
+          inline_iter.copy(len - offset - size, bl);
+        }
         in->inline_data = bl;
         in->inline_version++;
       }
@@ -13492,9 +13771,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
       in->change_attr++;
       in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-      client_lock.Unlock();
+      client_lock.unlock();
       onfinish.wait();
-      client_lock.Lock();
+      client_lock.lock();
       _sync_write_commit(in);
     }
   } else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
@@ -13514,9 +13793,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
   }
 
   if (nullptr != onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     if (ret >= 0 || ret == -ECANCELED) {
       in->inline_data.clear();
@@ -13988,8 +14267,7 @@ void Client::ms_handle_remote_reset(Connection *con)
        case MetaSession::STATE_OPEN:
          {
            objecter->maybe_request_map(); /* to check if we are blacklisted */
-           const auto& conf = cct->_conf;
-           if (conf->client_reconnect_stale) {
+           if (cct->_conf.get_val<bool>("client_reconnect_stale")) {
              ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
              _closed_mds_session(s);
            } else {
@@ -14016,14 +14294,6 @@ bool Client::ms_handle_refused(Connection *con)
   return false;
 }
 
-bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer)
-{
-  if (dest_type == CEPH_ENTITY_TYPE_MON)
-    return true;
-  *authorizer = monclient->build_authorizer(dest_type);
-  return true;
-}
-
 Inode *Client::get_quota_root(Inode *in, const UserPerm& perms)
 {
   Inode *quota_in = root_ancestor;
@@ -14091,16 +14361,16 @@ bool Client::is_quota_bytes_exceeded(Inode *in, int64_t new_bytes,
 
 bool Client::is_quota_bytes_approaching(Inode *in, const UserPerm& perms)
 {
+  ceph_assert(in->size >= in->reported_size);
+  const uint64_t size = in->size - in->reported_size;
   return check_quota_condition(in, perms,
-      [](const Inode &in) {
+      [&size](const Inode &in) {
         if (in.quota.max_bytes) {
           if (in.rstat.rbytes >= in.quota.max_bytes) {
             return true;
           }
 
-          ceph_assert(in.size >= in.reported_size);
           const uint64_t space = in.quota.max_bytes - in.rstat.rbytes;
-          const uint64_t size = in.size - in.reported_size;
           return (space >> 4) < size;
         } else {
           return false;
@@ -14120,6 +14390,10 @@ int Client::check_pool_perm(Inode *in, int need)
   if (!cct->_conf->client_check_pool_perm)
     return 0;
 
+  /* Only need to do this for regular files */
+  if (!in->is_file())
+    return 0;
+
   int64_t pool_id = in->layout.pool_id;
   std::string pool_ns = in->layout.pool_ns;
   std::pair<int64_t, std::string> perm_key(pool_id, pool_ns);
@@ -14168,10 +14442,10 @@ int Client::check_pool_perm(Inode *in, int need)
     objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
                     nullsnapc, ceph::real_clock::now(), 0, &wr_cond);
 
-    client_lock.Unlock();
+    client_lock.unlock();
     int rd_ret = rd_cond.wait();
     int wr_ret = wr_cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     bool errored = false;
 
@@ -14379,14 +14653,14 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
     MetaSession *session;
     if (!have_open_session(mds)) {
       session = _get_or_open_mds_session(mds);
+      if (session->state == MetaSession::STATE_REJECTED)
+       return -EPERM;
       if (session->state != MetaSession::STATE_OPENING) {
        // umounting?
        return -EINVAL;
       }
       ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
       wait_on_context_list(session->waiting_for_open);
-      if (rejected_by_mds.count(mds))
-       return -EPERM;
       continue;
     }
 
@@ -14397,7 +14671,7 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
     if (session->reclaim_state == MetaSession::RECLAIM_NULL ||
        session->reclaim_state == MetaSession::RECLAIMING) {
       session->reclaim_state = MetaSession::RECLAIMING;
-      auto m = MClientReclaim::create(uuid, flags);
+      auto m = make_message<MClientReclaim>(uuid, flags);
       session->con->send_message2(std::move(m));
       wait_on_list(waiting_for_reclaim);
     } else if (session->reclaim_state == MetaSession::RECLAIM_FAIL) {
@@ -14422,9 +14696,9 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
   C_SaferCond cond;
   if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
     ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
-    client_lock.Unlock();
+    client_lock.unlock();
     cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
   }
 
   bool blacklisted = objecter->with_osdmap(
@@ -14449,7 +14723,7 @@ void Client::finish_reclaim()
 
   for (auto &p : mds_sessions) {
     p.second.reclaim_state = MetaSession::RECLAIM_NULL;
-    auto m = MClientReclaim::create("", MClientReclaim::FLAG_FINISH);
+    auto m = make_message<MClientReclaim>("", MClientReclaim::FLAG_FINISH);
     p.second.con->send_message2(std::move(m));
   }
 
@@ -14535,7 +14809,7 @@ void intrusive_ptr_release(Inode *in)
 
 mds_rank_t Client::_get_random_up_mds() const
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   std::set<mds_rank_t> up;
   mdsmap->get_up_mds_set(up);
@@ -14550,7 +14824,7 @@ mds_rank_t Client::_get_random_up_mds() const
 
 
 StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
-    : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
+    : Client(m, mc, new Objecter(m->cct, m, mc, nullptr))
 {
   monclient->set_messenger(m);
   objecter->set_client_incarnation(0);
@@ -14564,11 +14838,10 @@ StandaloneClient::~StandaloneClient()
 
 int StandaloneClient::init()
 {
-  timer.init();
-  objectcacher->start();
+  _pre_init();
   objecter->init();
 
-  client_lock.Lock();
+  client_lock.lock();
   ceph_assert(!is_initialized());
 
   messenger->add_dispatcher_tail(objecter);
@@ -14579,7 +14852,7 @@ int StandaloneClient::init()
   if (r < 0) {
     // need to do cleanup because we're in an intermediate init state
     timer.shutdown();
-    client_lock.Unlock();
+    client_lock.unlock();
     objecter->shutdown();
     objectcacher->stop();
     monclient->shutdown();
@@ -14587,7 +14860,7 @@ int StandaloneClient::init()
   }
   objecter->start();
 
-  client_lock.Unlock();
+  client_lock.unlock();
   _finish_init();
 
   return 0;