]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/client/Client.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / client / Client.cc
index f6b1fad4ce5101200e8e3aac92c8395a21bf495b..84fcf91d0d926ca5fa9a84c508daf260024cb361 100644 (file)
@@ -70,7 +70,6 @@
 #include "osdc/Filer.h"
 
 #include "common/Cond.h"
-#include "common/Mutex.h"
 #include "common/perf_counters.h"
 #include "common/admin_socket.h"
 #include "common/errno.h"
@@ -139,29 +138,31 @@ Client::CommandHook::CommandHook(Client *client) :
 {
 }
 
-bool Client::CommandHook::call(std::string_view command,
-                              const cmdmap_t& cmdmap,
-                              std::string_view format, bufferlist& out)
+int Client::CommandHook::call(
+  std::string_view command,
+  const cmdmap_t& cmdmap,
+  Formatter *f,
+  std::ostream& errss,
+  bufferlist& out)
 {
-  std::unique_ptr<Formatter> f(Formatter::create(format));
   f->open_object_section("result");
-  m_client->client_lock.Lock();
-  if (command == "mds_requests")
-    m_client->dump_mds_requests(f.get());
-  else if (command == "mds_sessions")
-    m_client->dump_mds_sessions(f.get());
-  else if (command == "dump_cache")
-    m_client->dump_cache(f.get());
-  else if (command == "kick_stale_sessions")
-    m_client->_kick_stale_sessions();
-  else if (command == "status")
-    m_client->dump_status(f.get());
-  else
-    ceph_abort_msg("bad command registered");
-  m_client->client_lock.Unlock();
+  {
+    std::lock_guard l{m_client->client_lock};
+    if (command == "mds_requests")
+      m_client->dump_mds_requests(f);
+    else if (command == "mds_sessions")
+      m_client->dump_mds_sessions(f);
+    else if (command == "dump_cache")
+      m_client->dump_cache(f);
+    else if (command == "kick_stale_sessions")
+      m_client->_kick_stale_sessions();
+    else if (command == "status")
+      m_client->dump_status(f);
+    else
+      ceph_abort_msg("bad command registered");
+  }
   f->close_section();
-  f->flush(out);
-  return true;
+  return 0;
 }
 
 
@@ -261,7 +262,6 @@ vinodeno_t Client::map_faked_ino(ino_t ino)
 Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
   : Dispatcher(m->cct),
     timer(m->cct, client_lock),
-    client_lock("Client::client_lock"),
     messenger(m),
     monclient(mc),
     objecter(objecter_),
@@ -311,14 +311,13 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
 
 Client::~Client()
 {
-  ceph_assert(!client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_not_locked(client_lock));
 
   // It is necessary to hold client_lock, because any inode destruction
   // may call into ObjectCacher, which asserts that it's lock (which is
   // client_lock) is held.
-  client_lock.Lock();
+  std::lock_guard l{client_lock};
   tear_down_cache();
-  client_lock.Unlock();
 }
 
 void Client::tear_down_cache()
@@ -445,7 +444,7 @@ void Client::dump_cache(Formatter *f)
 
 void Client::dump_status(Formatter *f)
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   ldout(cct, 1) << __func__ << dendl;
 
@@ -478,37 +477,34 @@ int Client::init()
 {
   timer.init();
   objectcacher->start();
-
-  client_lock.Lock();
-  ceph_assert(!initialized);
-
-  messenger->add_dispatcher_tail(this);
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    ceph_assert(!initialized);
+    messenger->add_dispatcher_tail(this);
+  }
   _finish_init();
   return 0;
 }
 
 void Client::_finish_init()
 {
-  client_lock.Lock();
-  // logger
-  PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
-  plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
-  plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
-  plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
-  plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
-  plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
-  logger.reset(plb.create_perf_counters());
-  cct->get_perfcounters_collection()->add(logger.get());
-
-  client_lock.Unlock();
+  {
+    std::lock_guard l{client_lock};
+    // logger
+    PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
+    plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
+    plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
+    plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
+    plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
+    plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
+    logger.reset(plb.create_perf_counters());
+    cct->get_perfcounters_collection()->add(logger.get());
+  }
 
   cct->_conf.add_observer(this);
 
   AdminSocket* admin_socket = cct->get_admin_socket();
   int ret = admin_socket->register_command("mds_requests",
-                                          "mds_requests",
                                           &m_command_hook,
                                           "show in-progress mds requests");
   if (ret < 0) {
@@ -516,7 +512,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("mds_sessions",
-                                      "mds_sessions",
                                       &m_command_hook,
                                       "show mds session state");
   if (ret < 0) {
@@ -524,7 +519,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("dump_cache",
-                                      "dump_cache",
                                       &m_command_hook,
                                       "show in-memory metadata cache contents");
   if (ret < 0) {
@@ -532,7 +526,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("kick_stale_sessions",
-                                      "kick_stale_sessions",
                                       &m_command_hook,
                                       "kick sessions that were remote reset");
   if (ret < 0) {
@@ -540,7 +533,6 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
   ret = admin_socket->register_command("status",
-                                      "status",
                                       &m_command_hook,
                                       "show overall client status");
   if (ret < 0) {
@@ -548,9 +540,8 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
 
-  client_lock.Lock();
+  std::lock_guard l{client_lock};
   initialized = true;
-  client_lock.Unlock();
 }
 
 void Client::shutdown() 
@@ -559,10 +550,10 @@ void Client::shutdown()
 
   // If we were not mounted, but were being used for sending
   // MDS commands, we may have sessions that need closing.
-  client_lock.Lock();
-  _close_sessions();
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    _close_sessions();
+  }
   cct->_conf.remove_observer(this);
 
   cct->get_admin_socket()->unregister_commands(&m_command_hook);
@@ -592,13 +583,12 @@ void Client::shutdown()
   }
 
   objectcacher->stop();  // outside of client_lock! this does a join.
-
-  client_lock.Lock();
-  ceph_assert(initialized);
-  initialized = false;
-  timer.shutdown();
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    ceph_assert(initialized);
+    initialized = false;
+    timer.shutdown();
+  }
   objecter_finisher.wait_for_empty();
   objecter_finisher.stop();
 
@@ -1016,7 +1006,7 @@ void Client::update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, Me
   
   ceph_assert(dn);
 
-  if (dlease->mask & CEPH_LOCK_DN) {
+  if (dlease->mask & CEPH_LEASE_VALID) {
     if (dttl > dn->lease_ttl) {
       ldout(cct, 10) << "got dentry lease on " << dn->name
               << " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl;
@@ -1556,7 +1546,7 @@ void Client::dump_mds_requests(Formatter *f)
   }
 }
 
-int Client::verify_reply_trace(int r,
+int Client::verify_reply_trace(int r, MetaSession *session,
                               MetaRequest *request, const MConstRef<MClientReply>& reply,
                               InodeRef *ptarget, bool *pcreated,
                               const UserPerm& perms)
@@ -1569,12 +1559,24 @@ int Client::verify_reply_trace(int r,
 
   extra_bl = reply->get_extra_bl();
   if (extra_bl.length() >= 8) {
-    // if the extra bufferlist has a buffer, we assume its the created inode
-    // and that this request to create succeeded in actually creating
-    // the inode (won the race with other create requests)
-    decode(created_ino, extra_bl);
-    got_created_ino = true;
+    if (session->mds_features.test(CEPHFS_FEATURE_DELEG_INO)) {
+     struct openc_response_t   ocres;
+
+     decode(ocres, extra_bl);
+     created_ino = ocres.created_ino;
+     /*
+      * The userland cephfs client doesn't have a way to do an async create
+      * (yet), so just discard delegated_inos for now. Eventually we should
+      * store them and use them in create calls, even if they are synchronous,
+      * if only for testing purposes.
+      */
+     ldout(cct, 10) << "delegated_inos: " << ocres.delegated_inos << dendl;
+    } else {
+     // u64 containing number of created ino
+     decode(created_ino, extra_bl);
+    }
     ldout(cct, 10) << "make_request created ino " << created_ino << dendl;
+    got_created_ino = true;
   }
 
   if (pcreated)
@@ -1682,6 +1684,7 @@ int Client::make_request(MetaRequest *request,
   if (use_mds >= 0)
     request->resend_mds = use_mds;
 
+  MetaSession *session = NULL;
   while (1) {
     if (request->aborted())
       break;
@@ -1692,7 +1695,7 @@ int Client::make_request(MetaRequest *request,
     }
 
     // set up wait cond
-    Cond caller_cond;
+    ceph::condition_variable caller_cond;
     request->caller_cond = &caller_cond;
 
     // choose mds
@@ -1716,7 +1719,6 @@ int Client::make_request(MetaRequest *request,
     }
 
     // open a session?
-    MetaSession *session = NULL;
     if (!have_open_session(mds)) {
       session = _get_or_open_mds_session(mds);
 
@@ -1744,11 +1746,14 @@ int Client::make_request(MetaRequest *request,
     // wait for signal
     ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
     request->kick = false;
-    while (!request->reply &&         // reply
-          request->resend_mds < 0 && // forward
-          !request->kick)
-      caller_cond.Wait(client_lock);
-    request->caller_cond = NULL;
+    std::unique_lock l{client_lock, std::adopt_lock};
+    caller_cond.wait(l, [request] {
+      return (request->reply ||                  // reply
+             request->resend_mds >= 0 || // forward
+             request->kick);
+    });
+    l.release();
+    request->caller_cond = nullptr;
 
     // did we get a reply?
     if (request->reply) 
@@ -1773,12 +1778,12 @@ int Client::make_request(MetaRequest *request,
 
   // kick dispatcher (we've got it!)
   ceph_assert(request->dispatch_cond);
-  request->dispatch_cond->Signal();
+  request->dispatch_cond->notify_all();
   ldout(cct, 20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
   request->dispatch_cond = 0;
   
   if (r >= 0 && ptarget)
-    r = verify_reply_trace(r, request, reply, ptarget, pcreated, perms);
+    r = verify_reply_trace(r, session, request, reply, ptarget, pcreated, perms);
 
   if (pdirbl)
     *pdirbl = reply->get_extra_bl();
@@ -2041,7 +2046,7 @@ MetaSession *Client::_open_mds_session(mds_rank_t mds)
     }
   }
 
-  auto m = MClientSession::create(CEPH_SESSION_REQUEST_OPEN);
+  auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
   m->metadata = metadata;
   m->supported_features = feature_bitset_t(CEPHFS_FEATURES_CLIENT_SUPPORTED);
   session->con->send_message2(std::move(m));
@@ -2052,7 +2057,7 @@ void Client::_close_mds_session(MetaSession *s)
 {
   ldout(cct, 2) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
   s->state = MetaSession::STATE_CLOSING;
-  s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+  s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
 }
 
 void Client::_closed_mds_session(MetaSession *s)
@@ -2061,7 +2066,7 @@ void Client::_closed_mds_session(MetaSession *s)
   s->state = MetaSession::STATE_CLOSED;
   s->con->mark_down();
   signal_context_list(s->waiting_for_open);
-  mount_cond.Signal();
+  mount_cond.notify_all();
   remove_session_caps(s);
   kick_requests_closed(s);
   mds_sessions.erase(s->mds_num);
@@ -2096,7 +2101,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
       renew_caps(session);
       session->state = MetaSession::STATE_OPEN;
       if (unmounting)
-       mount_cond.Signal();
+       mount_cond.notify_all();
       else
        connect_mds_targets(from);
       signal_context_list(session->waiting_for_open);
@@ -2134,7 +2139,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
     if (auto& m = session->release; m) {
       session->con->send_message2(std::move(m));
     }
-    session->con->send_message2(MClientSession::create(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
+    session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
     break;
 
   case CEPH_SESSION_FORCE_RO:
@@ -2163,7 +2168,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
 
 bool Client::_any_stale_sessions() const
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   for (const auto &p : mds_sessions) {
     if (p.second.state == MetaSession::STATE_STALE) {
@@ -2235,9 +2240,9 @@ void Client::send_request(MetaRequest *request, MetaSession *session,
   session->con->send_message2(std::move(r));
 }
 
-MClientRequest::ref Client::build_client_request(MetaRequest *request)
+ref_t<MClientRequest> Client::build_client_request(MetaRequest *request)
 {
-  auto req = MClientRequest::create(request->get_op());
+  auto req = make_message<MClientRequest>(request->get_op());
   req->set_tid(request->tid);
   req->set_stamp(request->op_stamp);
   memcpy(&req->head, &request->head, sizeof(ceph_mds_request_head));
@@ -2307,7 +2312,7 @@ void Client::handle_client_request_forward(const MConstRef<MClientRequestForward
   request->item.remove_myself();
   request->num_fwd = fwd->get_num_fwd();
   request->resend_mds = fwd->get_dest_mds();
-  request->caller_cond->Signal();
+  request->caller_cond->notify_all();
 }
 
 bool Client::is_dir_operation(MetaRequest *req)
@@ -2363,7 +2368,7 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
          request->sent_on_mseq == it->second.mseq)) {
       ldout(cct, 20) << "have to return ESTALE" << dendl;
     } else {
-      request->caller_cond->Signal();
+      request->caller_cond->notify_all();
       return;
     }
   }
@@ -2390,18 +2395,23 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
   // Only signal the caller once (on the first reply):
   // Either its an unsafe reply, or its a safe reply and no unsafe reply was sent.
   if (!is_safe || !request->got_unsafe) {
-    Cond cond;
+    ceph::condition_variable cond;
     request->dispatch_cond = &cond;
 
     // wake up waiter
     ldout(cct, 20) << __func__ << " signalling caller " << (void*)request->caller_cond << dendl;
-    request->caller_cond->Signal();
+    request->caller_cond->notify_all();
 
     // wake for kick back
-    while (request->dispatch_cond) {
-      ldout(cct, 20) << __func__ << " awaiting kickback on tid " << tid << " " << &cond << dendl;
-      cond.Wait(client_lock);
-    }
+    std::unique_lock l{client_lock, std::adopt_lock};
+    cond.wait(l, [tid, request, &cond, this] {
+      if (request->dispatch_cond) {
+        ldout(cct, 20) << "handle_client_reply awaiting kickback on tid "
+                      << tid << " " << &cond << dendl;
+      }
+      return !request->dispatch_cond;
+    });
+    l.release();
   }
 
   if (is_safe) {
@@ -2417,7 +2427,7 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
     unregister_request(request);
   }
   if (unmounting)
-    mount_cond.Signal();
+    mount_cond.notify_all();
 }
 
 void Client::_handle_full_flag(int64_t pool)
@@ -2468,7 +2478,7 @@ void Client::handle_osd_map(const MConstRef<MOSDMap>& m)
   bool new_blacklist = false;
   bool prenautilus = objecter->with_osdmap(
     [&](const OSDMap& o) {
-      return o.require_osd_release < CEPH_RELEASE_NAUTILUS;
+      return o.require_osd_release < ceph_release_t::nautilus;
     });
   if (!blacklisted) {
     for (auto a : myaddrs.v) {
@@ -2559,53 +2569,53 @@ bool Client::ms_dispatch2(const MessageRef &m)
   switch (m->get_type()) {
     // mounting and mds sessions
   case CEPH_MSG_MDS_MAP:
-    handle_mds_map(MMDSMap::msgref_cast(m));
+    handle_mds_map(ref_cast<MMDSMap>(m));
     break;
   case CEPH_MSG_FS_MAP:
-    handle_fs_map(MFSMap::msgref_cast(m));
+    handle_fs_map(ref_cast<MFSMap>(m));
     break;
   case CEPH_MSG_FS_MAP_USER:
-    handle_fs_map_user(MFSMapUser::msgref_cast(m));
+    handle_fs_map_user(ref_cast<MFSMapUser>(m));
     break;
   case CEPH_MSG_CLIENT_SESSION:
-    handle_client_session(MClientSession::msgref_cast(m));
+    handle_client_session(ref_cast<MClientSession>(m));
     break;
 
   case CEPH_MSG_OSD_MAP:
-    handle_osd_map(MOSDMap::msgref_cast(m));
+    handle_osd_map(ref_cast<MOSDMap>(m));
     break;
 
     // requests
   case CEPH_MSG_CLIENT_REQUEST_FORWARD:
-    handle_client_request_forward(MClientRequestForward::msgref_cast(m));
+    handle_client_request_forward(ref_cast<MClientRequestForward>(m));
     break;
   case CEPH_MSG_CLIENT_REPLY:
-    handle_client_reply(MClientReply::msgref_cast(m));
+    handle_client_reply(ref_cast<MClientReply>(m));
     break;
 
   // reclaim reply
   case CEPH_MSG_CLIENT_RECLAIM_REPLY:
-    handle_client_reclaim_reply(MClientReclaimReply::msgref_cast(m));
+    handle_client_reclaim_reply(ref_cast<MClientReclaimReply>(m));
     break;
 
   case CEPH_MSG_CLIENT_SNAP:
-    handle_snap(MClientSnap::msgref_cast(m));
+    handle_snap(ref_cast<MClientSnap>(m));
     break;
   case CEPH_MSG_CLIENT_CAPS:
-    handle_caps(MClientCaps::msgref_cast(m));
+    handle_caps(ref_cast<MClientCaps>(m));
     break;
   case CEPH_MSG_CLIENT_LEASE:
-    handle_lease(MClientLease::msgref_cast(m));
+    handle_lease(ref_cast<MClientLease>(m));
     break;
   case MSG_COMMAND_REPLY:
     if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) {
-      handle_command_reply(MCommandReply::msgref_cast(m));
+      handle_command_reply(ref_cast<MCommandReply>(m));
     } else {
       return false;
     }
     break;
   case CEPH_MSG_CLIENT_QUOTA:
-    handle_quota(MClientQuota::msgref_cast(m));
+    handle_quota(ref_cast<MClientQuota>(m));
     break;
 
   default:
@@ -2620,7 +2630,7 @@ bool Client::ms_dispatch2(const MessageRef &m)
     trim_cache();
     if (size < lru.lru_get_size() + inode_map.size()) {
       ldout(cct, 10) << "unmounting: trim pass, cache shrank, poking unmount()" << dendl;
-      mount_cond.Signal();
+      mount_cond.notify_all();
     } else {
       ldout(cct, 10) << "unmounting: trim pass, size still " << lru.lru_get_size() 
                << "+" << inode_map.size() << dendl;
@@ -2771,7 +2781,7 @@ void Client::send_reconnect(MetaSession *session)
 
   early_kick_flushing_caps(session);
 
-  auto m = MClientReconnect::create();
+  auto m = make_message<MClientReconnect>();
   bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT);
 
   // i have an open session.
@@ -2783,11 +2793,12 @@ void Client::send_reconnect(MetaSession *session)
     auto it = in->caps.find(mds);
     if (it != in->caps.end()) {
       if (allow_multi &&
-         m->get_approx_size() >= (std::numeric_limits<int>::max() >> 1)) {
+         m->get_approx_size() >=
+         static_cast<size_t>((std::numeric_limits<int>::max() >> 1))) {
        m->mark_more();
        session->con->send_message2(std::move(m));
 
-       m = MClientReconnect::create();
+       m = make_message<MClientReconnect>();
       }
 
       Cap &cap = it->second;
@@ -2837,7 +2848,7 @@ void Client::send_reconnect(MetaSession *session)
     m->set_encoding_version(0); // use connection features to choose encoding
   session->con->send_message2(std::move(m));
 
-  mount_cond.Signal();
+  mount_cond.notify_all();
 
   if (session->reclaim_state == MetaSession::RECLAIMING)
     signal_cond_list(waiting_for_reclaim);
@@ -2856,7 +2867,7 @@ void Client::kick_requests(MetaSession *session)
     if (req->aborted()) {
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       continue;
     }
@@ -2924,7 +2935,7 @@ void Client::kick_requests_closed(MetaSession *session)
     if (req->mds == session->mds_num) {
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       req->item.remove_myself();
       if (req->got_unsafe) {
@@ -2966,7 +2977,7 @@ void Client::got_mds_push(MetaSession *s)
   s->seq++;
   ldout(cct, 10) << " mds." << s->mds_num << " seq now " << s->seq << dendl;
   if (s->state == MetaSession::STATE_CLOSING) {
-    s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+    s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
   }
 }
 
@@ -2994,7 +3005,7 @@ void Client::handle_lease(const MConstRef<MClientLease>& m)
   }
   in = inode_map[vino];
 
-  if (m->get_mask() & CEPH_LOCK_DN) {
+  if (m->get_mask() & CEPH_LEASE_VALID) {
     if (!in->dir || in->dir->dentries.count(m->dname) == 0) {
       ldout(cct, 10) << " don't have dir|dentry " << m->get_ino() << "/" << m->dname <<dendl;
       goto revoke;
@@ -3006,7 +3017,9 @@ void Client::handle_lease(const MConstRef<MClientLease>& m)
 
  revoke:
   {
-    auto reply = MClientLease::create(CEPH_MDS_LEASE_RELEASE, seq, m->get_mask(), m->get_ino(), m->get_first(), m->get_last(), m->dname);
+    auto reply = make_message<MClientLease>(CEPH_MDS_LEASE_RELEASE, seq,
+                                           m->get_mask(), m->get_ino(),
+                                           m->get_first(), m->get_last(), m->dname);
     m->get_connection()->send_message2(std::move(reply));
   }
 }
@@ -3134,7 +3147,7 @@ private:
 public:
   C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { }
   void finish(int r) override {
-    ceph_assert(client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
     if (r != 0) {
       client_t const whoami = client->whoami;  // For the benefit of ldout prefix
       ldout(client->cct, 1) << "I/O error from flush on inode " << inode
@@ -3359,7 +3372,7 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
   if (flush)
     follows = in->snaprealm->get_snap_context().seq;
   
-  auto m = MClientCaps::create(op,
+  auto m = make_message<MClientCaps>(op,
                                   in->ino,
                                   0,
                                   cap->cap_id, cap->seq,
@@ -3695,9 +3708,9 @@ void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
 void Client::send_flush_snap(Inode *in, MetaSession *session,
                             snapid_t follows, CapSnap& capsnap)
 {
-  auto m = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP,
-                               in->ino, in->snaprealm->ino, 0,
-                               in->auth_cap->mseq, cap_epoch_barrier);
+  auto m = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP,
+                                    in->ino, in->snaprealm->ino, 0,
+                                    in->auth_cap->mseq, cap_epoch_barrier);
   m->caller_uid = capsnap.cap_dirtier_uid;
   m->caller_gid = capsnap.cap_dirtier_gid;
 
@@ -3770,28 +3783,32 @@ void Client::flush_snaps(Inode *in)
   }
 }
 
-void Client::wait_on_list(list<Cond*>& ls)
+void Client::wait_on_list(list<ceph::condition_variable*>& ls)
 {
-  Cond cond;
+  ceph::condition_variable cond;
   ls.push_back(&cond);
-  cond.Wait(client_lock);
+  std::unique_lock l{client_lock, std::adopt_lock};
+  cond.wait(l);
+  l.release();
   ls.remove(&cond);
 }
 
-void Client::signal_cond_list(list<Cond*>& ls)
+void Client::signal_cond_list(list<ceph::condition_variable*>& ls)
 {
-  for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
-    (*it)->Signal();
+  for (auto cond : ls) {
+    cond->notify_all();
+  }
 }
 
 void Client::wait_on_context_list(list<Context*>& ls)
 {
-  Cond cond;
+  ceph::condition_variable cond;
   bool done = false;
   int r;
-  ls.push_back(new C_Cond(&cond, &done, &r));
-  while (!done)
-    cond.Wait(client_lock);
+  ls.push_back(new C_Cond(cond, &done, &r));
+  std::unique_lock l{client_lock, std::adopt_lock};
+  cond.wait(l, [&done] { return done;});
+  l.release();
 }
 
 void Client::signal_context_list(list<Context*>& ls)
@@ -3840,7 +3857,7 @@ public:
   }
   void finish(int r) override {
     // _async_invalidate takes the lock when it needs to, call this back from outside of lock.
-    ceph_assert(!client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
     client->_async_invalidate(ino, offset, length);
   }
 };
@@ -3922,7 +3939,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
 
 void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
 {
-  ceph_assert(client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(client_lock));
   if (!in->oset.dirty_or_tx) {
     ldout(cct, 10) << " nothing to flush" << dendl;
     return;
@@ -3933,16 +3950,16 @@ void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
                                      offset, size, &onflush);
   if (!ret) {
     // wait for flush
-    client_lock.Unlock();
+    client_lock.unlock();
     onflush.wait();
-    client_lock.Lock();
+    client_lock.lock();
   }
 }
 
 void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
 {
   //  std::lock_guard l(client_lock);
-  ceph_assert(client_lock.is_locked());   // will be called via dispatch() -> objecter -> ...
+  ceph_assert(ceph_mutex_is_locked(client_lock));   // will be called via dispatch() -> objecter -> ...
   Inode *in = static_cast<Inode *>(oset->parent);
   ceph_assert(in);
   _flushed(in);
@@ -4144,7 +4161,7 @@ void Client::remove_session_caps(MetaSession *s)
     signal_cond_list(in->waitfor_caps);
   }
   s->flushing_caps_tids.clear();
-  sync_cond.Signal();
+  sync_cond.notify_all();
 }
 
 int Client::_do_remount(bool retry_on_error)
@@ -4419,7 +4436,9 @@ void Client::wait_sync_caps(ceph_tid_t want)
     if (oldest_tid <= want) {
       ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
                     << " (want " << want << ")" << dendl;
-      sync_cond.Wait(client_lock);
+      std::unique_lock l{client_lock, std::adopt_lock};
+      sync_cond.wait(l);
+      l.release();
       goto retry;
     }
   }
@@ -5001,7 +5020,7 @@ void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, con
     signal_cond_list(in->waitfor_caps);
     if (session->flushing_caps_tids.empty() ||
        *session->flushing_caps_tids.begin() > flush_ack_tid)
-      sync_cond.Signal();
+      sync_cond.notify_all();
   }
 
   if (!dirty) {
@@ -5053,7 +5072,7 @@ void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, const MCo
       signal_cond_list(in->waitfor_caps);
       if (session->flushing_caps_tids.empty() ||
          *session->flushing_caps_tids.begin() > flush_ack_tid)
-       sync_cond.Signal();
+       sync_cond.notify_all();
     }
   } else {
     ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows
@@ -5085,7 +5104,7 @@ public:
   }
   void finish(int r) override {
     // _async_dentry_invalidate is responsible for its own locking
-    ceph_assert(!client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
     client->_async_dentry_invalidate(dirino, ino, name);
   }
 };
@@ -5617,15 +5636,15 @@ int Client::resolve_mds(
  */
 int Client::authenticate()
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   if (monclient->is_authenticated()) {
     return 0;
   }
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int r = monclient->authenticate(cct->_conf->client_mount_timeout);
-  client_lock.Lock();
+  client_lock.lock();
   if (r < 0) {
     return r;
   }
@@ -5646,9 +5665,9 @@ int Client::fetch_fsmap(bool user)
   do {
     C_SaferCond cond;
     monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
   } while (r == -EAGAIN);
 
   if (r < 0) {
@@ -5806,7 +5825,10 @@ int Client::subscribe_mdsmap(const std::string &fs_name)
 
   std::string resolved_fs_name;
   if (fs_name.empty()) {
-    resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
+    resolved_fs_name = cct->_conf.get_val<std::string>("client_fs");
+    if (resolved_fs_name.empty())
+           // Try the backwards compatibility fs name option
+           resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
   } else {
     resolved_fs_name = fs_name;
   }
@@ -5941,7 +5963,9 @@ void Client::_close_sessions()
 
     // wait for sessions to close
     ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
-    mount_cond.Wait(client_lock);
+    std::unique_lock l{client_lock, std::adopt_lock};
+    mount_cond.wait(l);
+    l.release();
   }
 }
 
@@ -5960,7 +5984,7 @@ void Client::flush_mdlog(MetaSession *session)
   // will crash if they see an unknown CEPH_SESSION_* value in this msg.
   const uint64_t features = session->con->get_features();
   if (HAVE_FEATURE(features, SERVER_LUMINOUS)) {
-    auto m = MClientSession::create(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
+    auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
     session->con->send_message2(std::move(m));
   }
 }
@@ -5978,7 +6002,7 @@ void Client::_abort_mds_sessions(int err)
     req->abort(err);
     if (req->caller_cond) {
       req->kick = true;
-      req->caller_cond->Signal();
+      req->caller_cond->notify_all();
     }
   }
 
@@ -5996,6 +6020,7 @@ void Client::_abort_mds_sessions(int err)
 
 void Client::_unmount(bool abort)
 {
+  std::unique_lock lock{client_lock, std::adopt_lock};
   if (unmounting)
     return;
 
@@ -6018,11 +6043,13 @@ void Client::_unmount(bool abort)
     flush_mdlog_sync();
   }
 
-  while (!mds_requests.empty()) {
-    ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
-    mount_cond.Wait(client_lock);
-  }
-
+  mount_cond.wait(lock, [this] {
+    if (!mds_requests.empty()) {
+      ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests"
+                    << dendl;
+    }
+    return mds_requests.empty();
+  });
   if (tick_event)
     timer.cancel_event(tick_event);
   tick_event = 0;
@@ -6053,10 +6080,13 @@ void Client::_unmount(bool abort)
 
   _ll_drop_pins();
 
-  while (unsafe_sync_write > 0) {
-    ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"  << dendl;
-    mount_cond.Wait(client_lock);
-  }
+  mount_cond.wait(lock, [this] {
+    if (unsafe_sync_write > 0) {
+      ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"
+                   << dendl;
+    }
+    return unsafe_sync_write <= 0;
+  });
 
   if (cct->_conf->client_oc) {
     // flush/release all buffered data
@@ -6104,9 +6134,8 @@ void Client::_unmount(bool abort)
             << "+" << inode_map.size() << " items"
            << ", waiting (for caps to release?)"
             << dendl;
-    utime_t until = ceph_clock_now() + utime_t(5, 0);
-    int r = mount_cond.WaitUntil(client_lock, until);
-    if (r == ETIMEDOUT) {
+    if (auto r = mount_cond.wait_for(lock, ceph::make_timespan(5));
+       r == std::cv_status::timeout) {
       dump_cache(NULL);
     }
   }
@@ -6123,6 +6152,7 @@ void Client::_unmount(bool abort)
 
   mounted = false;
 
+  lock.release();
   ldout(cct, 2) << "unmounted." << dendl;
 }
 
@@ -6166,9 +6196,9 @@ void Client::tick()
   ldout(cct, 21) << "tick" << dendl;
   tick_event = timer.add_event_after(
     cct->_conf->client_tick_interval,
-    new FunctionContext([this](int) {
+    new LambdaContext([this](int) {
        // Called back via Timer, which takes client_lock for us
-       ceph_assert(client_lock.is_locked_by_me());
+       ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
        tick();
       }));
   utime_t now = ceph_clock_now();
@@ -6179,7 +6209,7 @@ void Client::tick()
       req->abort(-ETIMEDOUT);
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       signal_cond_list(waiting_for_mdsmap);
       for (auto &p : mds_sessions) {
@@ -6228,7 +6258,7 @@ void Client::renew_caps(MetaSession *session)
   ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl;
   session->last_cap_renew_request = ceph_clock_now();
   uint64_t seq = ++session->cap_renew_seq;
-  session->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
+  session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
 }
 
 
@@ -7963,7 +7993,7 @@ struct dentry_off_lt {
 int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
                              int caps, bool getref)
 {
-  ceph_assert(client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(client_lock));
   ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
           << " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
           << dendl;
@@ -8025,9 +8055,9 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
 
     dn_name = dn->name; // fill in name while we have lock
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, in);  // _next_ offset
-    client_lock.Lock();
+    client_lock.lock();
     ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
                   << " = " << r << dendl;
     if (r < 0) {
@@ -8095,9 +8125,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       _ll_get(inode);
     }
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, inode);
-    client_lock.Lock();
+    client_lock.lock();
     if (r < 0)
       return r;
 
@@ -8128,9 +8158,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       _ll_get(inode);
     }
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, inode);
-    client_lock.Lock();
+    client_lock.lock();
     if (r < 0)
       return r;
 
@@ -8195,9 +8225,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
        _ll_get(inode);
       }
 
-      client_lock.Unlock();
+      client_lock.unlock();
       r = cb(p, &de, &stx, next_off, inode);  // _next_ offset
-      client_lock.Lock();
+      client_lock.lock();
 
       ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
                     << " = " << r << dendl;
@@ -8880,14 +8910,31 @@ loff_t Client::lseek(int fd, loff_t offset, int whence)
 loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
 {
   Inode *in = f->inode.get();
-  int r;
+  bool whence_check = false;
   loff_t pos = -1;
 
-  if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
-    r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
-    if (r < 0) {
+  switch (whence) {
+  case SEEK_END:
+    whence_check = true;
+  break;
+
+#ifdef SEEK_DATA
+  case SEEK_DATA:
+    whence_check = true;
+  break;
+#endif
+
+#ifdef SEEK_HOLE
+  case SEEK_HOLE:
+    whence_check = true;
+  break;
+#endif
+  }
+
+  if (whence_check) {
+    int r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+    if (r < 0)
       return r;
-    }
   }
 
   switch (whence) {
@@ -8903,22 +8950,21 @@ loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
     pos = in->size + offset;
     break;
 
+#ifdef SEEK_DATA
   case SEEK_DATA:
-    if (offset < 0 || offset >= in->size) {
-      r = -ENXIO;
-      return offset; 
-    }
+    if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+      return -ENXIO;
     pos = offset;
     break;
+#endif
 
+#ifdef SEEK_HOLE
   case SEEK_HOLE:
-    if (offset < 0 || offset >= in->size) {
-      r = -ENXIO;
-      pos = offset; 
-    } else {
-      pos = in->size;
-    }
+    if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+      return -ENXIO;
+    pos = in->size;
     break;
+#endif
 
   default:
     ldout(cct, 1) << __func__ << ": invalid whence value " << whence << dendl;
@@ -8941,11 +8987,14 @@ void Client::lock_fh_pos(Fh *f)
   ldout(cct, 10) << __func__ << " " << f << dendl;
 
   if (f->pos_locked || !f->pos_waiters.empty()) {
-    Cond cond;
+    ceph::condition_variable cond;
     f->pos_waiters.push_back(&cond);
     ldout(cct, 10) << __func__ << " BLOCKING on " << f << dendl;
-    while (f->pos_locked || f->pos_waiters.front() != &cond)
-      cond.Wait(client_lock);
+    std::unique_lock l{client_lock, std::adopt_lock};
+    cond.wait(l, [f, me=&cond] {
+      return !f->pos_locked && f->pos_waiters.front() == me;
+    });
+    l.release();
     ldout(cct, 10) << __func__ << " UNBLOCKING on " << f << dendl;
     ceph_assert(f->pos_waiters.front() == &cond);
     f->pos_waiters.pop_front();
@@ -9033,7 +9082,7 @@ int Client::read(int fd, char *buf, loff_t size, loff_t offset)
   int r = _read(f, offset, size, &bl);
   ldout(cct, 3) << "read(" << fd << ", " << (void*)buf << ", " << size << ", " << offset << ") = " << r << dendl;
   if (r >= 0) {
-    bl.copy(0, bl.length(), buf);
+    bl.begin().copy(bl.length(), buf);
     r = bl.length();
   }
   return r;
@@ -9166,9 +9215,9 @@ done:
   // done!
   
   if (onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
     if (ret >= 0 || ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
@@ -9229,9 +9278,9 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
                              off, len, bl, 0, &onfinish);
   if (r == 0) {
     get_cap_ref(in, CEPH_CAP_FILE_CACHE);
-    client_lock.Unlock();
+    client_lock.unlock();
     r = onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
     put_cap_ref(in, CEPH_CAP_FILE_CACHE);
   }
 
@@ -9267,8 +9316,6 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
 
   ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;
 
-  Mutex flock("Client::_read_sync flock");
-  Cond cond;
   while (left > 0) {
     C_SaferCond onfinish("Client::_read_sync flock");
     bufferlist tbl;
@@ -9278,9 +9325,9 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
                      pos, left, &tbl, 0,
                      in->truncate_size, in->truncate_seq,
                      &onfinish);
-    client_lock.Unlock();
+    client_lock.unlock();
     int r = onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     // if we get ENOENT from OSD, assume 0 bytes returned
     if (r == -ENOENT)
@@ -9334,7 +9381,7 @@ void Client::_sync_write_commit(Inode *in)
   ldout(cct, 15) << __func__ << " unsafe_sync_write = " << unsafe_sync_write << dendl;
   if (unsafe_sync_write == 0 && unmounting) {
     ldout(cct, 10) << __func__ << " -- no more unsafe writes, unmount can proceed" << dendl;
-    mount_cond.Signal();
+    mount_cond.notify_all();
   }
 }
 
@@ -9402,20 +9449,16 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
         if (r <= 0)
           return r;
 
-        int bufoff = 0;
+        auto iter = bl.cbegin();
         for (unsigned j = 0, resid = r; j < iovcnt && resid > 0; j++) {
                /*
                 * This piece of code aims to handle the case that bufferlist does not have enough data 
                 * to fill in the iov 
                 */
-               if (resid < iov[j].iov_len) {
-                    bl.copy(bufoff, resid, (char *)iov[j].iov_base);
-                    break;
-               } else {
-                    bl.copy(bufoff, iov[j].iov_len, (char *)iov[j].iov_base);
-               }
-               resid -= iov[j].iov_len;
-               bufoff += iov[j].iov_len;
+               const auto round_size = std::min<unsigned>(resid, iov[j].iov_len);
+               iter.copy(round_size, reinterpret_cast<char*>(iov[j].iov_base));
+               resid -= round_size;
+               /* iter is self-updating */
         }
         return r;  
     }
@@ -9465,7 +9508,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
      * change out from under us.
      */
     if (f->flags & O_APPEND) {
-      int r = _lseek(f, 0, SEEK_END);
+      auto r = _lseek(f, 0, SEEK_END);
       if (r < 0) {
         unlock_fh_pos(f);
         return r;
@@ -9552,7 +9595,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
       uint32_t len = in->inline_data.length();
 
       if (endoff < len)
-        in->inline_data.copy(endoff, len - endoff, bl);
+        in->inline_data.begin(endoff).copy(len - endoff, bl); // XXX
 
       if (offset < len)
         in->inline_data.splice(offset, len - offset);
@@ -9605,9 +9648,9 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
                       offset, size, bl, ceph::real_clock::now(), 0,
                       in->truncate_size, in->truncate_seq,
                       &onfinish);
-    client_lock.Unlock();
+    client_lock.unlock();
     onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
     _sync_write_commit(in);
   }
 
@@ -9650,9 +9693,9 @@ success:
 done:
 
   if (nullptr != onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int uninline_ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
@@ -9782,10 +9825,10 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   }
 
   if (nullptr != object_cacher_completion) { // wait on a real reply instead of guessing
-    client_lock.Unlock();
+    client_lock.unlock();
     ldout(cct, 15) << "waiting on data to flush" << dendl;
     r = object_cacher_completion->wait();
-    client_lock.Lock();
+    client_lock.lock();
     ldout(cct, 15) << "got " << r << " from flush writeback" << dendl;
   } else {
     // FIXME: this can starve
@@ -9965,11 +10008,11 @@ int Client::statfs(const char *path, struct statvfs *stbuf,
     objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
   }
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int rval = cond.wait();
   assert(root);
   total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
-  client_lock.Lock();
+  client_lock.lock();
 
   if (rval < 0) {
     ldout(cct, 1) << "underlying call to statfs returned error: "
@@ -10428,11 +10471,11 @@ int Client::_sync_fs()
   wait_sync_caps(flush_tid);
 
   if (nullptr != cond) {
-    client_lock.Unlock();
+    client_lock.unlock();
     ldout(cct, 15) << __func__ << " waiting on data to flush" << dendl;
     cond->wait();
     ldout(cct, 15) << __func__ << " flush finished" << dendl;
-    client_lock.Lock();
+    client_lock.lock();
   }
 
   return 0;
@@ -11384,29 +11427,6 @@ int Client::_listxattr(Inode *in, char *name, size_t size,
     name += this_len;
     size -= this_len;
   }
-
-  const VXattr *vxattr;
-  for (vxattr = _get_vxattrs(in); vxattr && !vxattr->name.empty(); vxattr++) {
-    if (vxattr->hidden)
-      continue;
-    // call pointer-to-member function
-    if (vxattr->exists_cb && !(this->*(vxattr->exists_cb))(in))
-      continue;
-
-    size_t this_len = vxattr->name.length() + 1;
-    r += this_len;
-    if (len_only)
-      continue;
-
-    if (this_len > size) {
-      r = -ERANGE;
-      goto out;
-    }
-
-    memcpy(name, vxattr->name.c_str(), this_len);
-    name += this_len;
-    size -= this_len;
-  }
 out:
   ldout(cct, 8) << __func__ << "(" << in->ino << ", " << size << ") = " << r << dendl;
   return r;
@@ -11835,7 +11855,6 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
   name: CEPH_XATTR_NAME(_type, _name),                         \
   getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
   readonly: true,                                              \
-  hidden: false,                                               \
   exists_cb: NULL,                                             \
   flags: 0,                                                     \
 }
@@ -11844,7 +11863,6 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
   name: CEPH_XATTR_NAME(_type, _name),                         \
   getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
   readonly: true,                                              \
-  hidden: false,                                               \
   exists_cb: NULL,                                             \
   flags: _flags,                                               \
 }
@@ -11853,7 +11871,6 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
   name: CEPH_XATTR_NAME2(_type, _name, _field),                        \
   getxattr_cb: &Client::_vxattrcb_ ## _name ## _ ## _field,    \
   readonly: false,                                             \
-  hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_layout_exists,                 \
   flags: 0,                                                     \
 }
@@ -11862,7 +11879,6 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
   name: CEPH_XATTR_NAME(_type, _name),                         \
   getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
   readonly: false,                                             \
-  hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_quota_exists,                  \
   flags: 0,                                                     \
 }
@@ -11872,7 +11888,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.dir.layout",
     getxattr_cb: &Client::_vxattrcb_layout,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
     flags: 0,
   },
@@ -11893,7 +11908,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.quota",
     getxattr_cb: &Client::_vxattrcb_quota,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_quota_exists,
     flags: 0,
   },
@@ -11903,7 +11917,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.dir.pin",
     getxattr_cb: &Client::_vxattrcb_dir_pin,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_dir_pin_exists,
     flags: 0,
   },
@@ -11911,7 +11924,6 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     name: "ceph.snap.btime",
     getxattr_cb: &Client::_vxattrcb_snap_btime,
     readonly: true,
-    hidden: false,
     exists_cb: &Client::_vxattrcb_snap_btime_exists,
     flags: 0,
   },
@@ -11923,7 +11935,6 @@ const Client::VXattr Client::_file_vxattrs[] = {
     name: "ceph.file.layout",
     getxattr_cb: &Client::_vxattrcb_layout,
     readonly: false,
-    hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
     flags: 0,
   },
@@ -11936,7 +11947,6 @@ const Client::VXattr Client::_file_vxattrs[] = {
     name: "ceph.snap.btime",
     getxattr_cb: &Client::_vxattrcb_snap_btime,
     readonly: true,
-    hidden: false,
     exists_cb: &Client::_vxattrcb_snap_btime_exists,
     flags: 0,
   },
@@ -12629,15 +12639,6 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     else
       return -EROFS;
   }
-  if (fromdir != todir) {
-    Inode *fromdir_root =
-      fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
-    Inode *todir_root =
-      todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
-    if (fromdir_root != todir_root) {
-      return -EXDEV;
-    }
-  }
 
   InodeRef target;
   MetaRequest *req = new MetaRequest(op);
@@ -12670,7 +12671,32 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     req->dentry_unless = CEPH_CAP_FILE_EXCL;
 
     InodeRef oldin, otherin;
-    res = _lookup(fromdir, fromname, 0, &oldin, perm);
+    Inode *fromdir_root = nullptr;
+    Inode *todir_root = nullptr;
+    int mask = 0;
+    bool quota_check = false;
+    if (fromdir != todir) {
+      fromdir_root =
+        fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
+      todir_root =
+        todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
+
+      if (todir_root->quota.is_enable() && fromdir_root != todir_root) {
+        // use CEPH_STAT_RSTAT mask to force send getattr or lookup request 
+        // to auth MDS to get latest rstat for todir_root and source dir 
+        // even if their dentry caches and inode caps are satisfied.
+        res = _getattr(todir_root, CEPH_STAT_RSTAT, perm, true);
+        if (res < 0)
+          goto fail;
+
+        quota_check = true;
+        if (oldde->inode && oldde->inode->is_dir()) {
+          mask |= CEPH_STAT_RSTAT;
+        }
+      }
+    }
+
+    res = _lookup(fromdir, fromname, mask, &oldin, perm);
     if (res < 0)
       goto fail;
 
@@ -12679,6 +12705,39 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     req->set_old_inode(oldinode);
     req->old_inode_drop = CEPH_CAP_LINK_SHARED;
 
+    if (quota_check) {
+      int64_t old_bytes, old_files;
+      if (oldinode->is_dir()) {
+        old_bytes = oldinode->rstat.rbytes;
+        old_files = oldinode->rstat.rsize();
+      } else {
+        old_bytes = oldinode->size;
+        old_files = 1;
+      }
+
+      bool quota_exceed = false;
+      if (todir_root && todir_root->quota.max_bytes &&
+          (old_bytes + todir_root->rstat.rbytes) >= todir_root->quota.max_bytes) {
+        ldout(cct, 10) << "_rename (" << oldinode->ino << " bytes="
+                       << old_bytes << ") to (" << todir->ino 
+                      << ") will exceed quota on " << *todir_root << dendl;
+        quota_exceed = true;
+      }
+
+      if (todir_root && todir_root->quota.max_files &&
+          (old_files + todir_root->rstat.rsize()) >= todir_root->quota.max_files) {
+        ldout(cct, 10) << "_rename (" << oldinode->ino << " files="
+                       << old_files << ") to (" << todir->ino 
+                       << ") will exceed quota on " << *todir_root << dendl;
+        quota_exceed = true;
+      }
+
+      if (quota_exceed) {
+        res = (oldinode->is_dir()) ? -EXDEV : -EDQUOT;
+        goto fail;
+      }
+    }
+
     res = _lookup(todir, toname, 0, &otherin, perm);
     switch (res) {
     case 0:
@@ -13216,12 +13275,12 @@ int Client::ll_read_block(Inode *in, uint64_t blockid,
                 CEPH_OSD_FLAG_READ,
                  &onfinish);
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int r = onfinish.wait();
-  client_lock.Lock();
+  client_lock.lock();
 
   if (r >= 0) {
-      bl.copy(0, bl.length(), buf);
+      bl.begin().copy(bl.length(), buf);
       r = bl.length();
   }
 
@@ -13261,9 +13320,9 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
   fakesnap.seq = snapseq;
 
   /* lock just in time */
-  client_lock.Lock();
+  client_lock.lock();
   if (unmounting) {
-    client_lock.Unlock();
+    client_lock.unlock();
     return -ENOTCONN;
   }
 
@@ -13277,7 +13336,7 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
                  0,
                  onsafe.get());
 
-  client_lock.Unlock();
+  client_lock.unlock();
   if (nullptr != onsafe) {
     r = onsafe->wait();
   }
@@ -13439,17 +13498,20 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     if (in->inline_version < CEPH_INLINE_NONE &&
         (have & CEPH_CAP_FILE_BUFFER)) {
       bufferlist bl;
+      auto inline_iter = in->inline_data.cbegin();
       int len = in->inline_data.length();
       if (offset < len) {
         if (offset > 0)
-          in->inline_data.copy(0, offset, bl);
+          inline_iter.copy(offset, bl);
         int size = length;
         if (offset + size > len)
           size = len - offset;
         if (size > 0)
           bl.append_zero(size);
-        if (offset + size < len)
-          in->inline_data.copy(offset + size, len - offset - size, bl);
+        if (offset + size < len) {
+          inline_iter += size;
+          inline_iter.copy(len - offset - size, bl);
+        }
         in->inline_data = bl;
         in->inline_version++;
       }
@@ -13477,9 +13539,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
       in->change_attr++;
       in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-      client_lock.Unlock();
+      client_lock.unlock();
       onfinish.wait();
-      client_lock.Lock();
+      client_lock.lock();
       _sync_write_commit(in);
     }
   } else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
@@ -13499,9 +13561,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
   }
 
   if (nullptr != onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     if (ret >= 0 || ret == -ECANCELED) {
       in->inline_data.clear();
@@ -14001,14 +14063,6 @@ bool Client::ms_handle_refused(Connection *con)
   return false;
 }
 
-bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer)
-{
-  if (dest_type == CEPH_ENTITY_TYPE_MON)
-    return true;
-  *authorizer = monclient->build_authorizer(dest_type);
-  return true;
-}
-
 Inode *Client::get_quota_root(Inode *in, const UserPerm& perms)
 {
   Inode *quota_in = root_ancestor;
@@ -14076,16 +14130,16 @@ bool Client::is_quota_bytes_exceeded(Inode *in, int64_t new_bytes,
 
 bool Client::is_quota_bytes_approaching(Inode *in, const UserPerm& perms)
 {
+  ceph_assert(in->size >= in->reported_size);
+  const uint64_t size = in->size - in->reported_size;
   return check_quota_condition(in, perms,
-      [](const Inode &in) {
+      [&size](const Inode &in) {
         if (in.quota.max_bytes) {
           if (in.rstat.rbytes >= in.quota.max_bytes) {
             return true;
           }
 
-          ceph_assert(in.size >= in.reported_size);
           const uint64_t space = in.quota.max_bytes - in.rstat.rbytes;
-          const uint64_t size = in.size - in.reported_size;
           return (space >> 4) < size;
         } else {
           return false;
@@ -14153,10 +14207,10 @@ int Client::check_pool_perm(Inode *in, int need)
     objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
                     nullsnapc, ceph::real_clock::now(), 0, &wr_cond);
 
-    client_lock.Unlock();
+    client_lock.unlock();
     int rd_ret = rd_cond.wait();
     int wr_ret = wr_cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     bool errored = false;
 
@@ -14382,7 +14436,7 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
     if (session->reclaim_state == MetaSession::RECLAIM_NULL ||
        session->reclaim_state == MetaSession::RECLAIMING) {
       session->reclaim_state = MetaSession::RECLAIMING;
-      auto m = MClientReclaim::create(uuid, flags);
+      auto m = make_message<MClientReclaim>(uuid, flags);
       session->con->send_message2(std::move(m));
       wait_on_list(waiting_for_reclaim);
     } else if (session->reclaim_state == MetaSession::RECLAIM_FAIL) {
@@ -14407,9 +14461,9 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
   C_SaferCond cond;
   if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
     ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
-    client_lock.Unlock();
+    client_lock.unlock();
     cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
   }
 
   bool blacklisted = objecter->with_osdmap(
@@ -14434,7 +14488,7 @@ void Client::finish_reclaim()
 
   for (auto &p : mds_sessions) {
     p.second.reclaim_state = MetaSession::RECLAIM_NULL;
-    auto m = MClientReclaim::create("", MClientReclaim::FLAG_FINISH);
+    auto m = make_message<MClientReclaim>("", MClientReclaim::FLAG_FINISH);
     p.second.con->send_message2(std::move(m));
   }
 
@@ -14520,7 +14574,7 @@ void intrusive_ptr_release(Inode *in)
 
 mds_rank_t Client::_get_random_up_mds() const
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   std::set<mds_rank_t> up;
   mdsmap->get_up_mds_set(up);
@@ -14553,7 +14607,7 @@ int StandaloneClient::init()
   objectcacher->start();
   objecter->init();
 
-  client_lock.Lock();
+  client_lock.lock();
   ceph_assert(!is_initialized());
 
   messenger->add_dispatcher_tail(objecter);
@@ -14564,7 +14618,7 @@ int StandaloneClient::init()
   if (r < 0) {
     // need to do cleanup because we're in an intermediate init state
     timer.shutdown();
-    client_lock.Unlock();
+    client_lock.unlock();
     objecter->shutdown();
     objectcacher->stop();
     monclient->shutdown();
@@ -14572,7 +14626,7 @@ int StandaloneClient::init()
   }
   objecter->start();
 
-  client_lock.Unlock();
+  client_lock.unlock();
   _finish_init();
 
   return 0;