]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / mds / Server.cc
index 2123aace1ec1ef68c0e4be8471f1fc8547a290e1..bf12cb7e2c8fa9f24be9f6eae1b330aa829351cb 100644 (file)
@@ -31,6 +31,7 @@
 #include "Mutation.h"
 #include "MetricsHandler.h"
 #include "cephfs_features.h"
+#include "MDSContext.h"
 
 #include "msg/Messenger.h"
 
@@ -50,6 +51,7 @@
 #include "common/perf_counters.h"
 #include "include/compat.h"
 #include "osd/OSDMap.h"
+#include "fscrypt.h"
 
 #include <errno.h>
 
 
 #include "common/config.h"
 
+#include "msg/Message.h"
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".server "
 
+using namespace std;
+
 class ServerContext : public MDSContext {
   protected:
   Server *server;
@@ -198,6 +204,8 @@ void Server::create_logger()
                    "Request type set file layout latency");
   plb.add_time_avg(l_mdss_req_setdirlayout_latency, "req_setdirlayout_latency",
                    "Request type set directory layout latency");
+  plb.add_time_avg(l_mdss_req_getvxattr_latency, "req_getvxattr_latency",
+                   "Request type get virtual extended attribute latency");
   plb.add_time_avg(l_mdss_req_setxattr_latency, "req_setxattr_latency",
                    "Request type set extended attribute latency");
   plb.add_time_avg(l_mdss_req_rmxattr_latency, "req_rmxattr_latency",
@@ -248,6 +256,7 @@ void Server::create_logger()
 Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
   mds(m), 
   mdcache(mds->mdcache), mdlog(mds->mdlog),
+  inject_rename_corrupt_dentry_first(g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first")),
   recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
   metrics_handler(metrics_handler)
 {
@@ -260,7 +269,10 @@ Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
   cap_acquisition_throttle = g_conf().get_val<uint64_t>("mds_session_cap_acquisition_throttle");
   max_caps_throttle_ratio = g_conf().get_val<double>("mds_session_max_caps_throttle_ratio");
   caps_throttle_retry_request_timeout = g_conf().get_val<double>("mds_cap_acquisition_throttle_retry_request_timeout");
+  dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
+  bal_fragment_size_max = g_conf().get_val<int64_t>("mds_bal_fragment_size_max");
   supported_features = feature_bitset_t(CEPHFS_FEATURES_MDS_SUPPORTED);
+  supported_metric_spec = feature_bitset_t(CEPHFS_METRIC_FEATURES_ALL);
 }
 
 void Server::dispatch(const cref_t<Message> &m)
@@ -347,6 +359,9 @@ void Server::dispatch(const cref_t<Message> &m)
   case CEPH_MSG_CLIENT_REQUEST:
     handle_client_request(ref_cast<MClientRequest>(m));
     return;
+  case CEPH_MSG_CLIENT_REPLY:
+    handle_client_reply(ref_cast<MClientReply>(m));
+    return;
   case CEPH_MSG_CLIENT_RECLAIM:
     handle_client_reclaim(ref_cast<MClientReclaim>(m));
     return;
@@ -354,8 +369,8 @@ void Server::dispatch(const cref_t<Message> &m)
     handle_peer_request(ref_cast<MMDSPeerRequest>(m));
     return;
   default:
-    derr << "server unknown message " << m->get_type() << dendl;
-    ceph_abort_msg("server unknown message");  
+    derr << "Server unknown message " << m->get_type() << " from peer type " << m->get_connection()->get_peer_type() << dendl;
+    ceph_abort_msg("server unknown message  " + to_string(m->get_type()) + " from peer type " + to_string(m->get_connection()->get_peer_type()));  
   }
 }
 
@@ -404,10 +419,10 @@ Session* Server::find_session_by_uuid(std::string_view uuid)
     if (!session) {
       session = it.second;
     } else if (!session->reclaiming_from) {
-      assert(it.second->reclaiming_from == session);
+      ceph_assert(it.second->reclaiming_from == session);
       session = it.second;
     } else {
-      assert(session->reclaiming_from == it.second);
+      ceph_assert(session->reclaiming_from == it.second);
     }
   }
   return session;
@@ -431,7 +446,7 @@ void Server::reclaim_session(Session *session, const cref_t<MClientReclaim> &m)
   unsigned flags = m->get_flags();
   if (flags != CEPH_RECLAIM_RESET) { // currently only support reset
     dout(10) << __func__ << " unsupported flags" << dendl;
-    reply->set_result(-CEPHFS_EOPNOTSUPP);
+    reply->set_result(-CEPHFS_EINVAL);
     mds->send_message_client(reply, session);
     return;
   }
@@ -445,18 +460,15 @@ void Server::reclaim_session(Session *session, const cref_t<MClientReclaim> &m)
       mds->send_message_client(reply, session);
     }
 
-    assert(!target->reclaiming_from);
-    assert(!session->reclaiming_from);
+    ceph_assert(!target->reclaiming_from);
+    ceph_assert(!session->reclaiming_from);
     session->reclaiming_from = target;
     reply->set_addrs(entity_addrvec_t(target->info.inst.addr));
   }
 
   if (flags & CEPH_RECLAIM_RESET) {
     finish_reclaim_session(session, reply);
-    return;
-  }
-
-  ceph_abort();
+  } else ceph_assert(0); /* no other flags are handled at this time */
 }
 
 void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply)
@@ -469,7 +481,7 @@ void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaim
     if (reply) {
       int64_t session_id = session->get_client().v;
       send_reply = new LambdaContext([this, session_id, reply](int r) {
-           assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+           ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
            Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(session_id));
            if (!session) {
              return;
@@ -500,15 +512,16 @@ void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaim
 void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
 {
   Session *session = mds->get_session(m);
+  uint32_t flags = m->get_flags();
   dout(3) << __func__ <<  " " << *m << " from " << m->get_source() << dendl;
-  assert(m->get_source().is_client()); // should _not_ come from an mds!
+  ceph_assert(m->is_a_client()); // should _not_ come from an mds!
 
   if (!session) {
     dout(0) << " ignoring sessionless msg " << *m << dendl;
     return;
   }
 
-  std::string_view fs_name = mds->get_fs_name();
+  std::string_view fs_name = mds->mdsmap->get_fs_name();
   if (!fs_name.empty() && !session->fs_name_capable(fs_name, MAY_READ)) {
     dout(0) << " dropping message not allowed for this fs_name: " << *m << dendl;
     return;
@@ -519,7 +532,15 @@ void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
     return;
   }
 
-  if (m->get_flags() & MClientReclaim::FLAG_FINISH) {
+  if (flags & MClientReclaim::FLAG_FINISH) {
+    if (flags ^ MClientReclaim::FLAG_FINISH) {
+      dout(0) << __func__ << " client specified FLAG_FINISH with other flags."
+                             " Other flags:" << flags << dendl;
+      auto reply = make_message<MClientReclaimReply>(0);
+      reply->set_result(-CEPHFS_EINVAL);
+      mds->send_message_client(reply, session);
+      return;
+    }
     finish_reclaim_session(session);
   } else {
     reclaim_session(session, m);
@@ -532,7 +553,7 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   Session *session = mds->get_session(m);
 
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
-  ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+  ceph_assert(m->is_a_client()); // should _not_ come from an mds!
 
   if (!session) {
     dout(0) << " ignoring sessionless msg " << *m << dendl;
@@ -542,7 +563,7 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
     return;
   }
 
-  std::string_view fs_name = mds->get_fs_name();
+  std::string_view fs_name = mds->mdsmap->get_fs_name();
   if (!fs_name.empty() && !session->fs_name_capable(fs_name, MAY_READ)) {
     dout(0) << " dropping message not allowed for this fs_name: " << *m << dendl;
     auto reply = make_message<MClientSession>(CEPH_SESSION_REJECT);
@@ -573,12 +594,38 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   uint64_t sseq = 0;
   switch (m->get_op()) {
   case CEPH_SESSION_REQUEST_OPEN:
+    if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+      dout(0) << "new sessions are not permitted, enable again via"
+                 "`ceph fs set <fs_name> refuse_client_session false`" << dendl;
+      auto reply = make_message<MClientSession>(CEPH_SESSION_REJECT);
+      reply->metadata["error_string"] = "new sessions are not permitted,"
+                                        " enable again via `ceph fs set"
+                                        " <fs_name> refuse_client_session false`";
+      mds->send_message(reply, m->get_connection());
+      return;
+    }
     if (session->is_opening() ||
        session->is_open() ||
        session->is_stale() ||
        session->is_killing() ||
        terminating_sessions) {
-      dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+      if (m->supported_features.test(CEPHFS_FEATURE_NOTIFY_SESSION_STATE)) {
+       if (session->is_open() && !mds->is_stopping()) {
+          dout(10) << "currently already opened" << dendl;
+
+          auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN,
+                                                    session->get_push_seq());
+          if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+            reply->supported_features = supported_features;
+          mds->send_message_client(reply, session);
+          if (mdcache->is_readonly()) {
+            auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
+            mds->send_message_client(m, session);
+          }
+       }
+      }
+      dout(10) << "currently " << session->get_state_name()
+               << ", dropping this req" << dendl;
       return;
     }
     ceph_assert(session->is_closed() || session->is_closing());
@@ -613,8 +660,8 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
         dout(2) << css->strv() << dendl;
       };
 
-      auto send_reject_message = [this, &session, &log_session_status](std::string_view err_str) {
-       auto m = make_message<MClientSession>(CEPH_SESSION_REJECT);
+      auto send_reject_message = [this, &session, &log_session_status](std::string_view err_str, unsigned flags=0) {
+       auto m = make_message<MClientSession>(CEPH_SESSION_REJECT, 0, flags);
        if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
          m->metadata["error_string"] = err_str;
        mds->send_message_client(m, session);
@@ -633,7 +680,9 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
        // has been blocklisted.  If mounted with recover_session=clean
        // (since 5.4), it tries to automatically recover itself from
        // blocklisting.
-       send_reject_message("blocklisted (blacklisted)");
+        unsigned flags = 0;
+       flags |= MClientSession::SESSION_BLOCKLISTED;
+       send_reject_message("blocklisted (blacklisted)", flags);
        session->clear();
        break;
       }
@@ -773,7 +822,11 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
     break;
 
   default:
-    ceph_abort();
+    auto m = make_message<MClientSession>(CEPH_SESSION_REJECT);
+    mds->send_message_client(m, session);
+    derr << "Server received unknown message " << m->get_type() << ", closing session and blocklisting the client " << session->get_client() << dendl;
+    CachedStackStringStream css;
+    mds->evict_client(session->get_client().v, false, true, *css, nullptr);
   }
 }
 
@@ -850,8 +903,10 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
     metrics_handler->add_session(session);
     ceph_assert(session->get_connection());
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
-    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
       reply->supported_features = supported_features;
+      reply->metric_spec = supported_metric_spec;
+    }
     mds->send_message_client(reply, session);
     if (mdcache->is_readonly()) {
       auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
@@ -1004,8 +1059,10 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
         metrics_handler->add_session(session);
 
        auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
-       if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+       if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
          reply->supported_features = supported_features;
+          reply->metric_spec = supported_metric_spec;
+       }
        mds->send_message_client(reply, session);
 
        if (mdcache->is_readonly())
@@ -1147,7 +1204,7 @@ void Server::find_idle_sessions()
       if (mds->locker->revoke_stale_caps(session)) {
        mds->locker->remove_stale_leases(session);
        finish_flush_session(session, session->get_push_seq());
-       auto m = make_message<MClientSession>(CEPH_SESSION_STALE, session->get_push_seq());
+       auto m = make_message<MClientSession>(CEPH_SESSION_STALE);
        mds->send_message_client(m, session);
       } else {
        to_evict.push_back(session);
@@ -1162,7 +1219,7 @@ void Server::find_idle_sessions()
   const auto sessions_p2 = mds->sessionmap.by_state.find(Session::STATE_STALE);
   if (sessions_p2 != mds->sessionmap.by_state.end() && !sessions_p2->second->empty()) {
     for (auto session : *(sessions_p2->second)) {
-      assert(session->is_stale());
+      ceph_assert(session->is_stale());
       auto last_cap_renew_span = std::chrono::duration<double>(now - session->last_cap_renew).count();
       if (last_cap_renew_span < cutoff) {
        dout(20) << "oldest stale session is " << session->info.inst
@@ -1253,6 +1310,22 @@ void Server::handle_conf_change(const std::set<std::string>& changed) {
   if (changed.count("mds_alternate_name_max")) {
     alternate_name_max  = g_conf().get_val<Option::size_t>("mds_alternate_name_max");
   }
+  if (changed.count("mds_fscrypt_last_block_max_size")) {
+    fscrypt_last_block_max_size = g_conf().get_val<Option::size_t>("mds_fscrypt_last_block_max_size");
+  }
+  if (changed.count("mds_dir_max_entries")) {
+    dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
+    dout(20) << __func__ << " max entries per directory changed to "
+            << dir_max_entries << dendl;
+  }
+  if (changed.count("mds_bal_fragment_size_max")) {
+    bal_fragment_size_max = g_conf().get_val<int64_t>("mds_bal_fragment_size_max");
+    dout(20) << __func__ << " max fragment size changed to "
+            << bal_fragment_size_max << dendl;
+  }
+  if (changed.count("mds_inject_rename_corrupt_dentry_first")) {
+    inject_rename_corrupt_dentry_first = g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first");
+  }
 }
 
 /*
@@ -1284,38 +1357,23 @@ void Server::kill_session(Session *session, Context *on_safe)
   }
 }
 
-size_t Server::apply_blocklist(const std::set<entity_addr_t> &blocklist)
+size_t Server::apply_blocklist()
 {
-  bool prenautilus = mds->objecter->with_osdmap(
-      [&](const OSDMap& o) {
-       return o.require_osd_release < ceph_release_t::nautilus;
-      });
-
   std::vector<Session*> victims;
   const auto& sessions = mds->sessionmap.get_sessions();
-  for (const auto& p : sessions) {
-    if (!p.first.is_client()) {
-      // Do not apply OSDMap blocklist to MDS daemons, we find out
-      // about their death via MDSMap.
-      continue;
-    }
-
-    Session *s = p.second;
-    auto inst_addr = s->info.inst.addr;
-    // blocklist entries are always TYPE_ANY for nautilus+
-    inst_addr.set_type(entity_addr_t::TYPE_ANY);
-    if (blocklist.count(inst_addr)) {
-      victims.push_back(s);
-      continue;
-    }
-    if (prenautilus) {
-      // ...except pre-nautilus, they were TYPE_LEGACY
-      inst_addr.set_type(entity_addr_t::TYPE_LEGACY);
-      if (blocklist.count(inst_addr)) {
-       victims.push_back(s);
+  mds->objecter->with_osdmap(
+    [&](const OSDMap& o) {
+      for (const auto& p : sessions) {
+       if (!p.first.is_client()) {
+         // Do not apply OSDMap blocklist to MDS daemons, we find out
+         // about their death via MDSMap.
+         continue;
+       }
+       if (o.is_blocklisted(p.second->info.inst.addr)) {
+         victims.push_back(p.second);
+       }
       }
-    }
-  }
+    });
 
   for (const auto& s : victims) {
     kill_session(s, nullptr);
@@ -1406,6 +1464,18 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
     return;
   }
 
+  if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+    mds->clog->warn() << "client could not reconnect as" 
+                         " file system flag refuse_client_session is set";
+    dout(0) << "client cannot reconnect when file system flag"
+               " refuse_client_session is set" << dendl;
+    auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
+    reply->metadata["error_string"] = "client cannot reconnect when file system flag" 
+                                        " refuse_client_session is set";
+    mds->send_message(reply, m->get_connection());
+    return;
+  }
+
   if (!session->is_open()) {
     dout(0) << " ignoring msg from not-open session" << *m << dendl;
     auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
@@ -1477,8 +1547,10 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
     metrics_handler->add_session(session);
     // notify client of success with an OPEN
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
-    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
       reply->supported_features = supported_features;
+      reply->metric_spec = supported_metric_spec;
+    }
     mds->send_message_client(reply, session);
     mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
   }
@@ -1910,20 +1982,23 @@ void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEv
     mdr->pin(dn);
 
   early_reply(mdr, in, dn);
-  
+
   mdr->committing = true;
   submit_mdlog_entry(le, fin, mdr, __func__);
-  
+
   if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
     if (mds->queue_one_replay()) {
       dout(10) << " queued next replay op" << dendl;
     } else {
       dout(10) << " journaled last replay op" << dendl;
     }
-  } else if (mdr->did_early_reply)
+  } else if (mdr->did_early_reply) {
     mds->locker->drop_rdlocks_for_early_reply(mdr.get());
-  else
+    if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
+      mdlog->flush();
+  } else {
     mdlog->flush();
+  }
 }
 
 void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
@@ -1993,6 +2068,9 @@ void Server::perf_gather_op_latency(const cref_t<MClientRequest> &req, utime_t l
   case CEPH_MDS_OP_SETDIRLAYOUT:
     code = l_mdss_req_setdirlayout_latency;
     break;
+  case CEPH_MDS_OP_GETVXATTR:
+    code = l_mdss_req_getvxattr_latency;
+    break;
   case CEPH_MDS_OP_SETXATTR:
     code = l_mdss_req_setxattr_latency;
     break;
@@ -2047,7 +2125,9 @@ void Server::perf_gather_op_latency(const cref_t<MClientRequest> &req, utime_t l
   case CEPH_MDS_OP_RENAMESNAP:
     code = l_mdss_req_renamesnap_latency;
     break;
-  default: ceph_abort();
+  default:
+    dout(1) << ": unknown client op" << dendl;
+    return;
   }
   logger->tinc(code, lat);   
 }
@@ -2114,6 +2194,9 @@ void Server::early_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn)
   mds->logger->inc(l_mds_reply);
   utime_t lat = ceph_clock_now() - req->get_recv_stamp();
   mds->logger->tinc(l_mds_reply_latency, lat);
+  if (lat >= g_conf()->mds_op_complaint_time) {
+    mds->logger->inc(l_mds_slow_reply);
+  }
   if (client_inst.name.is_client()) {
     mds->sessionmap.hit_session(mdr->session);
   }
@@ -2172,6 +2255,9 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
     mds->logger->inc(l_mds_reply);
     utime_t lat = ceph_clock_now() - mdr->client_request->get_recv_stamp();
     mds->logger->tinc(l_mds_reply_latency, lat);
+    if (lat >= g_conf()->mds_op_complaint_time) {
+      mds->logger->inc(l_mds_slow_reply);
+    }
     if (session && client_inst.name.is_client()) {
       mds->sessionmap.hit_session(session);
     }
@@ -2209,6 +2295,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
     mds->send_message_client(reply, session);
   }
 
+  if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+    mds->send_message(reply, mdr->client_request->get_connection());
+  }
+
   if (req->is_queued_for_replay() &&
       (mdr->has_completed || reply->get_result() < 0)) {
     if (reply->get_result() < 0) {
@@ -2264,7 +2354,7 @@ void Server::set_trace_dist(const ref_t<MClientReply> &reply,
       realm = in->find_snaprealm();
     else
       realm = dn->get_dir()->get_inode()->find_snaprealm();
-    reply->snapbl = realm->get_snap_trace();
+    reply->snapbl = get_snap_trace(session, realm);
     dout(10) << "set_trace_dist snaprealm " << *realm << " len=" << reply->snapbl.length() << dendl;
   }
 
@@ -2291,20 +2381,7 @@ void Server::set_trace_dist(const ref_t<MClientReply> &reply,
     dout(20) << "set_trace_dist added dir  " << *dir << dendl;
 
     encode(dn->get_name(), bl);
-
-    int lease_mask = 0;
-    CDentry::linkage_t *dnl = dn->get_linkage(mdr->get_client(), mdr);
-    if (dnl->is_primary()) {
-      ceph_assert(dnl->get_inode() == in);
-      lease_mask = CEPH_LEASE_PRIMARY_LINK;
-    } else {
-      if (dnl->is_remote())
-       ceph_assert(dnl->get_remote_ino() == in->ino());
-      else
-       ceph_assert(!in);
-    }
-    mds->locker->issue_client_lease(dn, mdr, lease_mask, now, bl);
-    dout(20) << "set_trace_dist added dn   " << snapid << " " << *dn << dendl;
+    mds->locker->issue_client_lease(dn, in, mdr, now, bl);
   } else
     reply->head.is_dentry = 0;
 
@@ -2337,7 +2414,7 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   bool sessionclosed_isok = replay_unsafe_with_closed_session;
   // active session?
   Session *session = 0;
-  if (req->get_source().is_client()) {
+  if (req->is_a_client()) {
     session = mds->get_session(req);
     if (!session) {
       dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
@@ -2441,7 +2518,7 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
 
   // process embedded cap releases?
   //  (only if NOT replay!)
-  if (!req->releases.empty() && req->get_source().is_client() && !req->is_replay()) {
+  if (!req->releases.empty() && req->is_a_client() && !req->is_replay()) {
     client_t client = req->get_source().num();
     for (const auto &r : req->releases) {
       mds->locker->process_request_cap_release(mdr, client, r.item, r.dname);
@@ -2453,6 +2530,38 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   return;
 }
 
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+  dout(4) << "handle_client_reply " << *reply << dendl;
+
+  ceph_assert(reply->is_safe());
+  ceph_tid_t tid = reply->get_tid();
+
+  if (mds->internal_client_requests.count(tid) == 0) {
+    dout(1) << " no pending request on tid " << tid << dendl;
+    return;
+  }
+
+  auto &req = mds->internal_client_requests.at(tid);
+  CDentry *dn = req.get_dentry();
+
+  switch (reply->get_op()) {
+  case CEPH_MDS_OP_RENAME:
+    if (dn) {
+      dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+      MDSContext::vec finished;
+      dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+      mds->queue_waiters(finished);
+    }
+    break;
+  default:
+    dout(5) << " unknown client op " << reply->get_op() << dendl;
+  }
+
+  mds->internal_client_requests.erase(tid);
+}
+
 void Server::handle_osd_map()
 {
   /* Note that we check the OSDMAP_FULL flag directly rather than
@@ -2513,7 +2622,7 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
   if (is_full) {
     CInode *cur = try_get_auth_inode(mdr, req->get_filepath().get_ino());
     if (!cur) {
-      respond_to_request(mdr, -EINVAL);
+      // the request is already responded to
       return;
     }
     if (req->get_op() == CEPH_MDS_OP_SETLAYOUT ||
@@ -2563,6 +2672,9 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
   case CEPH_MDS_OP_GETATTR:
     handle_client_getattr(mdr, false);
     break;
+  case CEPH_MDS_OP_GETVXATTR:
+    handle_client_getvxattr(mdr);
+    break;
 
   case CEPH_MDS_OP_SETATTR:
     handle_client_setattr(mdr);
@@ -2672,7 +2784,7 @@ void Server::handle_peer_request(const cref_t<MMDSPeerRequest> &m)
 
   CDentry *straydn = NULL;
   if (m->straybl.length() > 0) {
-    mdcache->decode_replica_stray(straydn, m->straybl, from);
+    mdcache->decode_replica_stray(straydn, nullptr, m->straybl, from);
     ceph_assert(straydn);
     m->straybl.clear();
   }
@@ -2849,7 +2961,7 @@ void Server::handle_peer_request_reply(const cref_t<MMDSPeerRequest> &m)
     break;
 
   default:
-    ceph_abort();
+    ceph_abort_msg("unknown op " + to_string(m->get_op()) + " requested");
   }
 }
 
@@ -2876,7 +2988,7 @@ void Server::dispatch_peer_request(MDRequestRef& mdr)
 
       if (!lock) {
        dout(10) << "don't have object, dropping" << dendl;
-       ceph_abort(); // can this happen, if we auth pinned properly.
+       ceph_abort_msg("don't have object"); // can this happen, if we auth pinned properly.
       }
       if (op == MMDSPeerRequest::OP_XLOCK && !lock->get_parent()->is_auth()) {
        dout(10) << "not auth for remote xlock attempt, dropping on " 
@@ -2963,7 +3075,7 @@ void Server::dispatch_peer_request(MDRequestRef& mdr)
     break;
 
   default: 
-    ceph_abort();
+    ceph_abort_msg("unknown op "+ to_string(op)+ " received");
   }
 }
 
@@ -3192,18 +3304,38 @@ bool Server::check_access(MDRequestRef& mdr, CInode *in, unsigned mask)
  * check whether fragment has reached maximum size
  *
  */
-bool Server::check_fragment_space(MDRequestRef &mdr, CDir *in)
+bool Server::check_fragment_space(MDRequestRef &mdr, CDir *dir)
 {
-  const auto size = in->get_frag_size();
-  if (size >= g_conf()->mds_bal_fragment_size_max) {
-    dout(10) << "fragment " << *in << " size exceeds " << g_conf()->mds_bal_fragment_size_max << " (CEPHFS_ENOSPC)" << dendl;
+  const auto size = dir->get_frag_size();
+  const auto max = bal_fragment_size_max;
+  if (size >= max) {
+    dout(10) << "fragment " << *dir << " size exceeds " << max << " (CEPHFS_ENOSPC)" << dendl;
     respond_to_request(mdr, -CEPHFS_ENOSPC);
     return false;
+  } else {
+    dout(20) << "fragment " << *dir << " size " << size << " < "  << max << dendl;
   }
 
   return true;
 }
 
+/**
+ * check whether entries in a dir reached maximum size
+ *
+ */
+bool Server::check_dir_max_entries(MDRequestRef &mdr, CDir *in)
+{
+  const uint64_t size = in->inode->get_projected_inode()->dirstat.nfiles +
+                   in->inode->get_projected_inode()->dirstat.nsubdirs;
+  if (dir_max_entries && size >= dir_max_entries) {
+    dout(10) << "entries per dir " << *in << " size exceeds " << dir_max_entries << " (ENOSPC)" << dendl;
+    respond_to_request(mdr, -ENOSPC);
+    return false;
+  }
+  return true;
+}
+
+
 CDentry* Server::prepare_stray_dentry(MDRequestRef& mdr, CInode *in)
 {
   string straydname;
@@ -3258,17 +3390,36 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   // while session is opening.
   bool allow_prealloc_inos = mdr->session->is_open();
 
+  inodeno_t _useino = useino;
+
   // assign ino
-  if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(useino))) {
-    mds->sessionmap.mark_projected(mdr->session);
-    dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
-            << " (" << mdr->session->info.prealloc_inos.size() << " left)"
-            << dendl;
-  } else {
-    mdr->alloc_ino = 
-      _inode->ino = mds->inotable->project_alloc_id(useino);
-    dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
-  }
+  do {
+    if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(_useino))) {
+      if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+        _inode->ino = 0;
+        dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+                 << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+                << " but has been taken, will try again!" << dendl;
+      } else {
+        mds->sessionmap.mark_projected(mdr->session);
+        dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+                 << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+                 << dendl;
+      }
+    } else {
+      mdr->alloc_ino =
+       _inode->ino = mds->inotable->project_alloc_id(_useino);
+      if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+        mds->inotable->apply_alloc_id(_inode->ino);
+        _inode->ino = 0;
+        dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino
+                << " but has been taken, will try again!" << dendl;
+      } else {
+        dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
+      }
+    }
+    _useino = 0;
+  } while (!_inode->ino);
 
   if (useino && useino != _inode->ino) {
     dout(0) << "WARNING: client specified " << useino << " and i allocated " << _inode->ino << dendl;
@@ -3277,7 +3428,7 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
        << " but mds." << mds->get_nodeid() << " allocated " << _inode->ino;
     //ceph_abort(); // just for now.
   }
-    
+
   if (allow_prealloc_inos &&
       mdr->session->get_num_projected_prealloc_inos() < g_conf()->mds_client_prealloc_inos / 2) {
     int need = g_conf()->mds_client_prealloc_inos - mdr->session->get_num_projected_prealloc_inos();
@@ -3308,18 +3459,20 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   _inode->truncate_seq = 1; /* starting with 1, 0 is kept for no-truncation logic */
 
   CInode *diri = dir->get_inode();
+  auto pip = diri->get_projected_inode();
 
-  dout(10) << oct << " dir mode 0" << diri->get_inode()->mode << " new mode 0" << mode << dec << dendl;
+  dout(10) << oct << " dir mode 0" << pip->mode << " new mode 0" << mode << dec << dendl;
 
-  if (diri->get_inode()->mode & S_ISGID) {
+  if (pip->mode & S_ISGID) {
     dout(10) << " dir is sticky" << dendl;
-    _inode->gid = diri->get_inode()->gid;
+    _inode->gid = pip->gid;
     if (S_ISDIR(mode)) {
-      dout(10) << " new dir also sticky" << dendl;      
+      dout(10) << " new dir also sticky" << dendl;
       _inode->mode |= S_ISGID;
     }
-  } else 
+  } else {
     _inode->gid = mdr->client_request->get_caller_gid();
+  }
 
   _inode->uid = mdr->client_request->get_caller_uid();
 
@@ -3329,6 +3482,11 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   _inode->change_attr = 0;
 
   const cref_t<MClientRequest> &req = mdr->client_request;
+
+  dout(10) << "copying fscrypt_auth len " << req->fscrypt_auth.size() << dendl;
+  _inode->fscrypt_auth = req->fscrypt_auth;
+  _inode->fscrypt_file = req->fscrypt_file;
+
   if (req->get_data().length()) {
     auto p = req->get_data().cbegin();
 
@@ -3385,15 +3543,68 @@ void Server::apply_allocated_inos(MDRequestRef& mdr, Session *session)
   }
 }
 
+struct C_MDS_TryOpenInode : public ServerContext {
+  MDRequestRef mdr;
+  inodeno_t ino;
+  C_MDS_TryOpenInode(Server *s, MDRequestRef& r, inodeno_t i) :
+    ServerContext(s), mdr(r), ino(i) {}
+  void finish(int r) override {
+    server->_try_open_ino(mdr, r, ino);
+  }
+};
+
+void Server::_try_open_ino(MDRequestRef& mdr, int r, inodeno_t ino)
+{
+  dout(10) << "_try_open_ino " << mdr.get() << " ino " << ino << " r=" << r << dendl;
+
+  // `r` is a rank if >=0, else an error code
+  if (r >= 0) {
+    mds_rank_t dest_rank(r);
+    if (dest_rank == mds->get_nodeid())
+      dispatch_client_request(mdr);
+    else
+      mdcache->request_forward(mdr, dest_rank);
+    return;
+  }
+
+  // give up
+  if (r == -CEPHFS_ENOENT || r == -CEPHFS_ENODATA)
+    r = -CEPHFS_ESTALE;
+  respond_to_request(mdr, r);
+}
+
 class C_MDS_TryFindInode : public ServerContext {
   MDRequestRef mdr;
+  MDCache *mdcache;
+  inodeno_t ino;
 public:
-  C_MDS_TryFindInode(Server *s, MDRequestRef& r) : ServerContext(s), mdr(r) {}
+  C_MDS_TryFindInode(Server *s, MDRequestRef& r, MDCache *m, inodeno_t i) :
+    ServerContext(s), mdr(r), mdcache(m), ino(i) {}
   void finish(int r) override {
-    if (r == -CEPHFS_ESTALE) // :( find_ino_peers failed
-      server->respond_to_request(mdr, r);
-    else
+    if (r == -CEPHFS_ESTALE) { // :( find_ino_peers failed
+      /*
+       * There has one case that when the MDS crashes and the
+       * openfiletable journal couldn't be flushed and then
+       * the replacing MDS is possibly won't load some already
+       * opened CInodes into the MDCache. And if the clients
+       * will retry some requests after reconnected, the MDS
+       * will return -ESTALE after failing to find the ino in
+       * all active peers.
+       *
+       * As a workaround users can run `ls -R ${mountpoint}`
+       * to list all the sub-files or sub-direcotries from the
+       * mountpoint.
+       *
+       * We need try to open the ino and try it again.
+       */
+      CInode *in = mdcache->get_inode(ino);
+      if (in && in->state_test(CInode::STATE_PURGING))
+        server->respond_to_request(mdr, r);
+      else
+        mdcache->open_ino(ino, (int64_t)-1, new C_MDS_TryOpenInode(server, mdr, ino));
+    } else {
       server->dispatch_client_request(mdr);
+    }
   }
 };
 
@@ -3433,8 +3644,8 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr,
       respond_to_request(mdr, r);
     } else if (r == -CEPHFS_ESTALE) {
       dout(10) << "FAIL on CEPHFS_ESTALE but attempting recovery" << dendl;
-      MDSContext *c = new C_MDS_TryFindInode(this, mdr);
-      mdcache->find_ino_peers(refpath.get_ino(), c);
+      inodeno_t ino = refpath.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
     } else {
       dout(10) << "FAIL on error " << r << dendl;
       respond_to_request(mdr, r);
@@ -3468,12 +3679,18 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr,
 
 /** rdlock_path_xlock_dentry
  * traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
  * get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
  */
 CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
-                                         bool create, bool okexist, bool want_layout)
+                                         bool create, bool okexist, bool authexist,
+                                         bool want_layout)
 {
   const filepath& refpath = mdr->get_filepath();
   dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
@@ -3511,6 +3728,8 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
     flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
   if (create)
     flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+  if (authexist)
+    flags |= MDS_TRAVERSE_WANT_INODE;
   if (want_layout)
     flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
   int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
@@ -3519,7 +3738,8 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
   if (r < 0) {
     if (r == -CEPHFS_ESTALE) {
       dout(10) << "FAIL on CEPHFS_ESTALE but attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = refpath.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
       return nullptr;
     }
     respond_to_request(mdr, r);
@@ -3531,7 +3751,9 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
   CInode *diri = dir->get_inode();
 
   if (!mdr->reqid.name.is_mds()) {
-    if (diri->is_system() && !diri->is_root()) {
+    if (diri->is_system() && !diri->is_root() &&
+       (!diri->is_lost_and_found() ||
+        mdr->client_request->get_op() != CEPH_MDS_OP_UNLINK)) {
       respond_to_request(mdr, -CEPHFS_EROFS);
       return nullptr;
     }
@@ -3604,7 +3826,8 @@ Server::rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn)
   if (r != 0) {
     if (r == -CEPHFS_ESTALE) {
       dout(10) << "CEPHFS_ESTALE on path, attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = refpath.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
     } else if (r < 0) {
       respond_to_request(mdr, r);
     }
@@ -3616,7 +3839,8 @@ Server::rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn)
   if (r != 0) {
     if (r == -CEPHFS_ESTALE) {
       dout(10) << "CEPHFS_ESTALE on path2, attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath2.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = refpath2.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
     } else if (r < 0) {
       respond_to_request(mdr, r);
     }
@@ -3824,8 +4048,6 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   if (!ref)
     return;
 
-  mdr->getattr_caps = mask;
-
   /*
    * if client currently holds the EXCL cap on a field, do not rdlock
    * it; client's stat() will result in valid info if _either_ EXCL
@@ -3878,7 +4100,7 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   // value for them.  (currently this matters for xattrs and inline data)
   mdr->getattr_caps = mask;
 
-  mds->balancer->hit_inode(ref, META_POP_IRD, req->get_source().num());
+  mds->balancer->hit_inode(ref, META_POP_IRD);
 
   // reply
   dout(10) << "reply to stat on " << *req << dendl;
@@ -4277,8 +4499,7 @@ void Server::handle_client_open(MDRequestRef& mdr)
   if (cmode & CEPH_FILE_MODE_WR)
     mds->balancer->hit_inode(cur, META_POP_IWR);
   else
-    mds->balancer->hit_inode(cur, META_POP_IRD,
-                            mdr->client_request->get_source().num());
+    mds->balancer->hit_inode(cur, META_POP_IRD);
 
   CDentry *dn = 0;
   if (req->get_dentry_wanted()) {
@@ -4300,6 +4521,9 @@ public:
   void finish(int r) override {
     ceph_assert(r == 0);
 
+    // crash current MDS and the replacing MDS will test the journal
+    ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
     dn->pop_projected_linkage();
 
     // dirty inode, dn, dir
@@ -4336,14 +4560,19 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   }
 
   bool excl = req->head.args.open.flags & CEPH_O_EXCL;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
   if (!excl && !dnl->is_null()) {
     // it existed.
-    mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+    ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
 
     MutationImpl::LockOpVec lov;
     lov.add_rdlock(&dnl->get_inode()->snaplock);
@@ -4418,6 +4647,8 @@ void Server::handle_client_openc(MDRequestRef& mdr)
     return;
   if (!check_fragment_space(mdr, dir))
     return;
+  if (!check_dir_max_entries(mdr, dir))
+    return;
 
   if (mdr->dn[0].size() == 1)
     mds->locker->create_lock_cache(mdr, diri, &mdr->dir_layout);
@@ -4628,7 +4859,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   //  this isn't perfect, but we should capture the main variable/unbounded size items!
   int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8)*2;
   int bytes_left = max_bytes - front_bytes;
-  bytes_left -= realm->get_snap_trace().length();
+  bytes_left -= get_snap_trace(session, realm).length();
 
   // build dir contents
   bufferlist dnbl;
@@ -4648,8 +4879,11 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     bool dnp = dn->use_projected(client, mdr);
     CDentry::linkage_t *dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
 
-    if (dnl->is_null())
+    if (dnl->is_null()) {
+      if (dn->get_num_ref() == 0 && !dn->is_projected())
+       dir->remove_dentry(dn);
       continue;
+    }
 
     if (dn->last < snapid || dn->first > snapid) {
       dout(20) << "skipping non-overlapping snap " << *dn << dendl;
@@ -4709,8 +4943,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     // dentry
     dout(12) << "including    dn " << *dn << dendl;
     encode(dn->get_name(), dnbl);
-    int lease_mask = dnl->is_primary() ? CEPH_LEASE_PRIMARY_LINK : 0;
-    mds->locker->issue_client_lease(dn, mdr, lease_mask, now, dnbl);
+    mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
 
     // inode
     dout(12) << "including inode " << *in << dendl;
@@ -4757,7 +4990,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   mdr->reply_extra_bl = dirbl;
 
   // bump popularity.  NOTE: this doesn't quite capture it.
-  mds->balancer->hit_dir(dir, META_POP_READDIR, -1, numfiles);
+  mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
   
   // reply
   mdr->tracei = diri;
@@ -4986,10 +5219,24 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   __u32 mask = req->head.args.setattr.mask;
   __u32 access_mask = MAY_WRITE;
 
+  if (req->get_header().version < 6) {
+    // No changes to fscrypted inodes by downrevved clients
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EPERM);
+      return;
+    }
+
+    // Only allow fscrypt field changes by capable clients
+    if (mask & (CEPH_SETATTR_FSCRYPT_FILE|CEPH_SETATTR_FSCRYPT_AUTH)) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+  }
+
   // xlock inode
-  if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
+  if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID|CEPH_SETATTR_FSCRYPT_AUTH|CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID))
     lov.add_xlock(&cur->authlock);
-  if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
+  if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE|CEPH_SETATTR_FSCRYPT_FILE))
     lov.add_xlock(&cur->filelock);
   if (mask & CEPH_SETATTR_CTIME)
     lov.add_wrlock(&cur->versionlock);
@@ -5020,7 +5267,15 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   bool truncating_smaller = false;
   if (mask & CEPH_SETATTR_SIZE) {
-    truncating_smaller = req->head.args.setattr.size < old_size;
+    if (req->get_data().length() >
+        sizeof(struct ceph_fscrypt_last_block_header) + fscrypt_last_block_max_size) {
+      dout(10) << __func__ << ": the last block size is too large" << dendl;
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
+    truncating_smaller = req->head.args.setattr.size < old_size ||
+       (req->head.args.setattr.size == old_size && req->get_data().length());
     if (truncating_smaller && pip->is_truncating()) {
       dout(10) << " waiting for pending truncate from " << pip->truncate_from
               << " to " << pip->truncate_size << " to complete on " << *cur << dendl;
@@ -5029,6 +5284,32 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
       cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
       return;
     }
+
+    if (truncating_smaller && req->get_data().length()) {
+      struct ceph_fscrypt_last_block_header header;
+      memset(&header, 0, sizeof(header));
+      auto bl = req->get_data().cbegin();
+      DECODE_START(1, bl);
+      decode(header.change_attr, bl);
+      DECODE_FINISH(bl);
+
+      dout(20) << __func__ << " mdr->retry:" << mdr->retry
+               << " header.change_attr: " << header.change_attr
+               << " header.file_offset: " << header.file_offset
+               << " header.block_size: " << header.block_size
+               << dendl;
+
+      if (header.change_attr != pip->change_attr) {
+        dout(5) << __func__ << ": header.change_attr:" << header.change_attr
+                << " != current change_attr:" << pip->change_attr
+                << ", let client retry it!" << dendl;
+        // flush the journal to make sure the clients will get the lasted
+        // change_attr as possible for the next retry
+        mds->mdlog->flush();
+        respond_to_request(mdr, -CEPHFS_EAGAIN);
+        return;
+      }
+    }
   }
 
   bool changed_ranges = false;
@@ -5047,10 +5328,20 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   if (mask & CEPH_SETATTR_MODE)
     pi.inode->mode = (pi.inode->mode & ~07777) | (req->head.args.setattr.mode & 07777);
-  else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
-           S_ISREG(pi.inode->mode) &&
-            (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
-    pi.inode->mode &= ~(S_ISUID|S_ISGID);
+  else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID|
+                   CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID)) &&
+           S_ISREG(pi.inode->mode)) {
+    if (mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID) &&
+       (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+      pi.inode->mode &= ~(S_ISUID|S_ISGID);
+    } else {
+      if (mask & CEPH_SETATTR_KILL_SUID) {
+        pi.inode->mode &= ~S_ISUID;
+      }
+      if (mask & CEPH_SETATTR_KILL_SGID) {
+        pi.inode->mode &= ~S_ISGID;
+      }
+    }
   }
 
   if (mask & CEPH_SETATTR_MTIME)
@@ -5063,7 +5354,7 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     pi.inode->time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
   if (mask & CEPH_SETATTR_SIZE) {
     if (truncating_smaller) {
-      pi.inode->truncate(old_size, req->head.args.setattr.size);
+      pi.inode->truncate(old_size, req->head.args.setattr.size, req->get_data());
       le->metablob.add_truncate_start(cur->ino());
     } else {
       pi.inode->size = req->head.args.setattr.size;
@@ -5079,6 +5370,11 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     }
   }
 
+  if (mask & CEPH_SETATTR_FSCRYPT_AUTH)
+    pi.inode->fscrypt_auth = req->fscrypt_auth;
+  if (mask & CEPH_SETATTR_FSCRYPT_FILE)
+    pi.inode->fscrypt_file = req->fscrypt_file;
+
   pi.inode->version = cur->pre_dirty();
   pi.inode->ctime = mdr->get_op_stamp();
   if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
@@ -5378,11 +5674,85 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
 }
 
 // XATTRS
+int Server::parse_layout_vxattr_json(
+  string name, string value, const OSDMap& osdmap, file_layout_t *layout)
+{
+  auto parse_pool = [&](std::string pool_name, int64_t pool_id) -> int64_t {
+    if (pool_name != "") {
+      int64_t _pool_id = osdmap.lookup_pg_pool_name(pool_name);
+      if (_pool_id < 0) {
+       dout(10) << __func__ << ": unknown pool name:" << pool_name << dendl;
+       return -CEPHFS_EINVAL;
+      }
+      return _pool_id;
+    } else if (pool_id >= 0) {
+      const auto pools = osdmap.get_pools();
+      if (pools.find(pool_id) == pools.end()) {
+       dout(10) << __func__ << ": unknown pool id:" << pool_id << dendl;
+       return -CEPHFS_EINVAL;
+      }
+      return pool_id;
+    } else {
+      return -CEPHFS_EINVAL;
+    }
+  };
 
-int Server::parse_layout_vxattr(string name, string value, const OSDMap& osdmap,
-                               file_layout_t *layout, bool validate)
+  try {
+    if (name == "layout.json") {
+      JSONParser json_parser;
+      if (json_parser.parse(value.c_str(), value.length()) and json_parser.is_object()) {
+       std::string field;
+       try {
+         field = "object_size";
+         JSONDecoder::decode_json("object_size", layout->object_size, &json_parser, true);
+
+         field = "stripe_unit";
+         JSONDecoder::decode_json("stripe_unit", layout->stripe_unit, &json_parser, true);
+
+         field = "stripe_count";
+         JSONDecoder::decode_json("stripe_count", layout->stripe_count, &json_parser, true);
+
+         field = "pool_namespace";
+         JSONDecoder::decode_json("pool_namespace", layout->pool_ns, &json_parser, false);
+
+         field = "pool_id";
+         int64_t pool_id = 0;
+         JSONDecoder::decode_json("pool_id", pool_id, &json_parser, false);
+
+         field = "pool_name";
+         std::string pool_name;
+         JSONDecoder::decode_json("pool_name", pool_name, &json_parser, false);
+
+         pool_id = parse_pool(pool_name, pool_id);
+         if (pool_id < 0) {
+           return (int)pool_id;
+         }
+         layout->pool_id = pool_id;
+       } catch (JSONDecoder::err&) {
+         dout(10) << __func__ << ": json is missing a mandatory field named "
+                  << field << dendl;
+         return -CEPHFS_EINVAL;
+       }
+      } else {
+       dout(10) << __func__ << ": bad json" << dendl;
+       return -CEPHFS_EINVAL;
+      }
+    } else {
+      dout(10) << __func__ << ": unknown layout vxattr " << name << dendl;
+      return -CEPHFS_ENODATA; // no such attribute
+    }
+  } catch (boost::bad_lexical_cast const&) {
+    dout(10) << __func__ << ": bad vxattr value:" << value
+            << ", unable to parse for xattr:" << name << dendl;
+    return -CEPHFS_EINVAL;
+  }
+  return 0;
+}
+
+// parse old style layout string
+int Server::parse_layout_vxattr_string(
+  string name, string value, const OSDMap& osdmap, file_layout_t *layout)
 {
-  dout(20) << "parse_layout_vxattr name " << name << " value '" << value << "'" << dendl;
   try {
     if (name == "layout") {
       string::iterator begin = value.begin();
@@ -5393,14 +5763,14 @@ int Server::parse_layout_vxattr(string name, string value, const OSDMap& osdmap,
        return -CEPHFS_EINVAL;
       }
       string left(begin, end);
-      dout(10) << " parsed " << m << " left '" << left << "'" << dendl;
+      dout(10) << __func__ << ": parsed " << m << " left '" << left << "'" << dendl;
       if (begin != end)
        return -CEPHFS_EINVAL;
       for (map<string,string>::iterator q = m.begin(); q != m.end(); ++q) {
         // Skip validation on each attr, we do it once at the end (avoid
         // rejecting intermediate states if the overall result is ok)
-       int r = parse_layout_vxattr(string("layout.") + q->first, q->second,
-                                    osdmap, layout, false);
+       int r = parse_layout_vxattr_string(string("layout.") + q->first, q->second,
+                                          osdmap, layout);
        if (r < 0)
          return r;
       }
@@ -5416,28 +5786,54 @@ int Server::parse_layout_vxattr(string name, string value, const OSDMap& osdmap,
       } catch (boost::bad_lexical_cast const&) {
        int64_t pool = osdmap.lookup_pg_pool_name(value);
        if (pool < 0) {
-         dout(10) << " unknown pool " << value << dendl;
+         dout(10) << __func__ << ": unknown pool " << value << dendl;
          return -CEPHFS_ENOENT;
        }
        layout->pool_id = pool;
       }
+    } else if (name == "layout.pool_id") {
+      layout->pool_id = boost::lexical_cast<int64_t>(value);
+    } else if (name == "layout.pool_name") {
+      layout->pool_id = osdmap.lookup_pg_pool_name(value);
+      if (layout->pool_id < 0) {
+       dout(10) << __func__ << ": unknown pool " << value << dendl;
+       return -CEPHFS_EINVAL;
+      }
     } else if (name == "layout.pool_namespace") {
       layout->pool_ns = value;
     } else {
-      dout(10) << " unknown layout vxattr " << name << dendl;
-      return -CEPHFS_EINVAL;
+      dout(10) << __func__ << ": unknown layout vxattr " << name << dendl;
+      return -CEPHFS_ENODATA; // no such attribute
     }
   } catch (boost::bad_lexical_cast const&) {
-    dout(10) << "bad vxattr value, unable to parse int for " << name << dendl;
+    dout(10) << __func__ << ": bad vxattr value, unable to parse int for "
+            << name << dendl;
     return -CEPHFS_EINVAL;
   }
+  return 0;
+}
+
+int Server::parse_layout_vxattr(string name, string value, const OSDMap& osdmap,
+                               file_layout_t *layout, bool validate)
+{
+  dout(20) << __func__ << ": name:" << name << " value:'" << value << "'" << dendl;
+
+  int r;
+  if (name == "layout.json") {
+    r = parse_layout_vxattr_json(name, value, osdmap, layout);
+  } else {
+    r = parse_layout_vxattr_string(name, value, osdmap, layout);
+  }
+  if (r < 0) {
+    return r;
+  }
 
   if (validate && !layout->is_valid()) {
-    dout(10) << "bad layout" << dendl;
+    dout(10) << __func__ << ": bad layout" << dendl;
     return -CEPHFS_EINVAL;
   }
   if (!mds->mdsmap->is_data_pool(layout->pool_id)) {
-    dout(10) << " invalid data pool " << layout->pool_id << dendl;
+    dout(10) << __func__ << ": invalid data pool " << layout->pool_id << dendl;
     return -CEPHFS_EINVAL;
   }
   return 0;
@@ -5568,6 +5964,7 @@ int Server::check_layout_vxattr(MDRequestRef& mdr,
 void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
 {
   const cref_t<MClientRequest> &req = mdr->client_request;
+  MutationImpl::LockOpVec lov;
   string name(req->get_path2());
   bufferlist bl = req->get_data();
   string value (bl.c_str(), bl.length());
@@ -5593,6 +5990,18 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     if (!xlock_policylock(mdr, cur, true))
       return;
 
+    /* We need 'As' caps for the fscrypt context */
+    lov.add_xlock(&cur->authlock);
+    if (!mds->locker->acquire_locks(mdr, lov)) {
+      return;
+    }
+
+    /* encrypted directories can't have their layout changed */
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
     file_layout_t layout;
     if (cur->get_projected_inode()->has_layout())
       layout = cur->get_projected_inode()->layout;
@@ -5624,11 +6033,16 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
       return;
 
-    MutationImpl::LockOpVec lov;
     lov.add_xlock(&cur->filelock);
     if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
+    /* encrypted files can't have their layout changed */
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
     auto pi = cur->project_inode(mdr);
     int64_t old_pool = pi.inode->layout.pool_id;
     pi.inode->add_old_pool(old_pool);
@@ -5649,7 +6063,7 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
       return;
     }
 
-    if (quota.is_enable() && !cur->get_projected_srnode())
+    if (quota.is_enabled() && !cur->get_projected_srnode())
       adjust_realm = true;
 
     if (!xlock_policylock(mdr, cur, false, adjust_realm))
@@ -5691,13 +6105,11 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
      */
     if (!mdr->more()->rdonly_checks) {
       if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
-        MutationImpl::LockOpVec lov;
         lov.add_rdlock(&cur->snaplock);
         if (!mds->locker->acquire_locks(mdr, lov))
           return;
         mdr->locking_state |= MutationImpl::ALL_LOCKED;
       }
-      SnapRealm *realm = cur->find_snaprealm();
       const auto srnode = cur->get_projected_srnode();
       if (val == (srnode && srnode->is_subvolume())) {
         dout(20) << "already marked subvolume" << dendl;
@@ -5753,6 +6165,10 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     try {
       rank = boost::lexical_cast<mds_rank_t>(value);
       if (rank < 0) rank = MDS_RANK_NONE;
+      else if (rank >= MAX_MDS) {
+        respond_to_request(mdr, -CEPHFS_EDOM);
+        return;
+      }
     } catch (boost::bad_lexical_cast const&) {
       dout(10) << "bad vxattr value, unable to parse int for " << name << dendl;
       respond_to_request(mdr, -CEPHFS_EINVAL);
@@ -6154,8 +6570,6 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   pi.inode->ctime = mdr->get_op_stamp();
   if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
     pi.inode->rstat.rctime = mdr->get_op_stamp();
-  if (name == "encryption.ctx"sv)
-    pi.inode->fscrypt = true;
   pi.inode->change_attr++;
   pi.inode->xattr_version++;
 
@@ -6246,6 +6660,159 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
   journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur));
 }
 
+void Server::handle_client_getvxattr(MDRequestRef& mdr)
+{
+  const auto& req = mdr->client_request;
+  string xattr_name{req->get_path2()};
+
+  // is a ceph virtual xattr?
+  if (!is_ceph_vxattr(xattr_name)) {
+    respond_to_request(mdr, -CEPHFS_ENODATA);
+    return;
+  }
+
+  CInode *cur = rdlock_path_pin_ref(mdr, true, false);
+  if (!cur) {
+    return;
+  }
+
+  if (is_ceph_dir_vxattr(xattr_name)) {
+    if (!cur->is_dir()) {
+      respond_to_request(mdr, -CEPHFS_ENODATA);
+      return;
+    }
+  } else if (is_ceph_file_vxattr(xattr_name)) {
+    if (cur->is_dir()) {
+      respond_to_request(mdr, -CEPHFS_ENODATA);
+      return;
+    }
+  }
+
+  CachedStackStringStream css;
+  int r = 0;
+  ceph::bufferlist bl;
+  // handle these vxattrs
+  if ((xattr_name.substr(0, 15) == "ceph.dir.layout"sv) ||
+      (xattr_name.substr(0, 16) == "ceph.file.layout"sv)) {
+    std::string layout_field;
+
+    struct layout_xattr_info_t {
+      enum class InheritanceStatus : uint32_t {
+       DEFAULT = 0,
+       SET = 1,
+       INHERITED = 2
+      };
+
+      const file_layout_t     layout;
+      const InheritanceStatus status;
+
+      layout_xattr_info_t(const file_layout_t& l, InheritanceStatus inh)
+        : layout(l), status(inh) { }
+
+      static std::string status_to_string(InheritanceStatus status) {
+       switch (status) {
+         case InheritanceStatus::DEFAULT: return "default"s;
+         case InheritanceStatus::SET: return "set"s;
+         case InheritanceStatus::INHERITED: return "inherited"s;
+         default: return "unknown"s;
+       }
+      }
+    };
+
+    auto is_default_layout = [&](const file_layout_t& layout) -> bool {
+      return (layout == mdcache->default_file_layout);
+    };
+    auto get_inherited_layout = [&](CInode *cur) -> layout_xattr_info_t {
+      auto orig_in = cur;
+
+      while (cur) {
+        if (cur->get_projected_inode()->has_layout()) {
+         auto& curr_layout = cur->get_projected_inode()->layout;
+         if (is_default_layout(curr_layout)) {
+           return {curr_layout, layout_xattr_info_t::InheritanceStatus::DEFAULT};
+         }
+          if (cur == orig_in) {
+             // we've found a new layout at this inode
+             return {curr_layout, layout_xattr_info_t::InheritanceStatus::SET};
+          } else {
+             return {curr_layout, layout_xattr_info_t::InheritanceStatus::INHERITED};
+          }
+        }
+
+        if (cur->is_root()) {
+          break;
+       }
+
+        cur = cur->get_projected_parent_dir()->get_inode();
+      }
+      mds->clog->error() << "no layout found at root dir!";
+      ceph_abort("no layout found at root dir! something is really messed up with layouts!");
+    };
+
+    if (xattr_name == "ceph.dir.layout.json"sv ||
+       xattr_name == "ceph.file.layout.json"sv) {
+      // fetch layout only for valid xattr_name
+      const auto lxi = get_inherited_layout(cur);
+
+      *css << "{\"stripe_unit\": " << lxi.layout.stripe_unit
+          << ", \"stripe_count\": " << lxi.layout.stripe_count
+          << ", \"object_size\": " << lxi.layout.object_size
+          << ", \"pool_name\": ";
+      mds->objecter->with_osdmap([lxi, &css](const OSDMap& o) {
+         *css << "\"";
+          if (o.have_pg_pool(lxi.layout.pool_id)) {
+           *css << o.get_pool_name(lxi.layout.pool_id);
+         }
+         *css << "\"";
+       });
+      *css << ", \"pool_id\": " << (uint64_t)lxi.layout.pool_id;
+      *css << ", \"pool_namespace\": \"" << lxi.layout.pool_ns << "\"";
+      *css << ", \"inheritance\": \"@"
+          << layout_xattr_info_t::status_to_string(lxi.status) << "\"}";
+    } else if ((xattr_name == "ceph.dir.layout.pool_name"sv) ||
+              (xattr_name == "ceph.file.layout.pool_name"sv)) {
+      // fetch layout only for valid xattr_name
+      const auto lxi = get_inherited_layout(cur);
+      mds->objecter->with_osdmap([lxi, &css](const OSDMap& o) {
+         if (o.have_pg_pool(lxi.layout.pool_id)) {
+         *css << o.get_pool_name(lxi.layout.pool_id);
+         }
+         });
+    } else if ((xattr_name == "ceph.dir.layout.pool_id"sv) ||
+               (xattr_name == "ceph.file.layout.pool_id"sv)) {
+      // fetch layout only for valid xattr_name
+      const auto lxi = get_inherited_layout(cur);
+      *css << (uint64_t)lxi.layout.pool_id;
+    } else {
+      r = -CEPHFS_ENODATA; // no such attribute
+    }
+  } else if (xattr_name.substr(0, 12) == "ceph.dir.pin"sv) {
+    if (xattr_name == "ceph.dir.pin"sv) {
+      *css << cur->get_projected_inode()->export_pin;
+    } else if (xattr_name == "ceph.dir.pin.random"sv) {
+      *css << cur->get_projected_inode()->export_ephemeral_random_pin;
+    } else if (xattr_name == "ceph.dir.pin.distributed"sv) {
+      *css << cur->get_projected_inode()->export_ephemeral_distributed_pin;
+    } else {
+      // otherwise respond as invalid request
+      // since we only handle ceph vxattrs here
+      r = -CEPHFS_ENODATA; // no such attribute
+    }
+  } else {
+    // otherwise respond as invalid request
+    // since we only handle ceph vxattrs here
+    r = -CEPHFS_ENODATA; // no such attribute
+  }
+
+  if (r == 0) {
+    ENCODE_START(1, 1, bl);
+    encode(css->strv(), bl);
+    ENCODE_FINISH(bl);
+    mdr->reply_extra_bl = bl;
+  }
+
+  respond_to_request(mdr, r);
+}
 
 // =================================================================
 // DIRECTORY and NAMESPACE OPS
@@ -6253,6 +6820,84 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
 
 // ------------------------------------------------
 
+struct C_WaitUnlinkToFinish : public MDSContext {
+protected:
+  MDCache *mdcache;
+  CDentry *dn;
+  MDSContext *fin;
+
+  MDSRank *get_mds() override
+  {
+    ceph_assert(mdcache != NULL);
+    return mdcache->mds;
+  }
+
+public:
+  C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+    mdcache(m), dn(d), fin(f) {}
+  void finish(int r) override {
+    fin->complete(r);
+    dn->put(CDentry::PIN_PURGING);
+  }
+};
+
+bool Server::is_unlink_pending(CDentry *dn)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
+      return true;
+  }
+  return false;
+}
+
+void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
+{
+  dout(20) << __func__ << " dn " << *dn << dendl;
+  mds->locker->drop_locks(mdr.get());
+  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+  dn->get(CDentry::PIN_PURGING);
+  dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
+}
+
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+  MDCache *mdcache;
+  CDentry *dn;
+  MDSContext *fin;
+
+  MDSRank *get_mds() override
+  {
+    ceph_assert(mdcache != NULL);
+    return mdcache->mds;
+  }
+
+public:
+  C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+    mdcache(m), dn(d), fin(f) {}
+  void finish(int r) override {
+    fin->complete(r);
+    dn->put(CDentry::PIN_PURGING);
+  }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+      return true;
+  }
+  return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+  dout(20) << __func__ << " dn " << *dn << dendl;
+  mds->locker->drop_locks(mdr.get());
+  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+  dn->get(CDentry::PIN_PURGING);
+  dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
 // MKNOD
 
 class C_MDS_mknod_finish : public ServerLogContext {
@@ -6264,6 +6909,9 @@ public:
   void finish(int r) override {
     ceph_assert(r == 0);
 
+    // crash current MDS and the replacing MDS will test the journal
+    ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
     // link the inode
     dn->pop_projected_linkage();
     
@@ -6312,15 +6960,22 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
     mode |= S_IFREG;
 
   mdr->disable_lock_cache();
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
   if (!check_access(mdr, diri, MAY_WRITE))
     return;
-  if (!check_fragment_space(mdr, dn->get_dir()))
+  if (!check_fragment_space(mdr, dir))
+    return;
+  if (!check_dir_max_entries(mdr, dir))
     return;
 
   ceph_assert(dn->get_projected_linkage()->is_null());
@@ -6412,6 +7067,11 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -6421,6 +7081,8 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
 
   if (!check_fragment_space(mdr, dir))
     return;
+  if (!check_dir_max_entries(mdr, dir))
+    return;
 
   ceph_assert(dn->get_projected_linkage()->is_null());
   if (req->get_alternate_name().size() > alternate_name_max) {
@@ -6505,6 +7167,11 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -6512,6 +7179,8 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
     return;
   if (!check_fragment_space(mdr, dir))
     return;
+  if (!check_dir_max_entries(mdr, dir))
+    return;
 
   ceph_assert(dn->get_projected_linkage()->is_null());
   if (req->get_alternate_name().size() > alternate_name_max) {
@@ -6549,6 +7218,11 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
 
   journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi));
   mds->balancer->maybe_fragment(dir, false);
+
+  // flush the journal as soon as possible
+  if (g_conf()->mds_kill_skip_replaying_inotable) {
+    mdlog->flush();
+  }
 }
 
 
@@ -6574,7 +7248,8 @@ void Server::handle_client_link(MDRequestRef& mdr)
     targeti = mdcache->get_inode(req->get_filepath2().get_ino());
     if (!targeti) {
       dout(10) << "CEPHFS_ESTALE on path2, attempting recovery" << dendl;
-      mdcache->find_ino_peers(req->get_filepath2().get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = req->get_filepath2().get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
       return;
     }
     mdr->pin(targeti);
@@ -6608,6 +7283,11 @@ void Server::handle_client_link(MDRequestRef& mdr)
     targeti = ret.second->get_projected_linkage()->get_inode();
   }
 
+  if (is_unlink_pending(destdn)) {
+    wait_for_pending_unlink(destdn, mdr);
+    return;
+  }
+
   ceph_assert(destdn->get_projected_linkage()->is_null());
   if (req->get_alternate_name().size() > alternate_name_max) {
     dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
@@ -6640,6 +7320,7 @@ void Server::handle_client_link(MDRequestRef& mdr)
   if (targeti->get_projected_inode()->nlink == 0) {
     dout(7) << "target has no link, failing..." << dendl;
     respond_to_request(mdr, -CEPHFS_ENOENT);
+    return;
   }
 
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
@@ -6651,6 +7332,9 @@ void Server::handle_client_link(MDRequestRef& mdr)
 
     if (!check_fragment_space(mdr, dir))
       return;
+
+    if (!check_dir_max_entries(mdr, dir))
+      return;
   }
 
   CInode* target_pin = targeti->get_projected_parent_dir()->inode;
@@ -6658,6 +7342,13 @@ void Server::handle_client_link(MDRequestRef& mdr)
   if (target_pin != dir->inode &&
       target_realm->get_subvolume_ino() !=
       dir->inode->find_snaprealm()->get_subvolume_ino()) {
+    if (target_pin->is_stray()) {
+      mds->locker->drop_locks(mdr.get());
+      targeti->add_waiter(CInode::WAIT_UNLINK,
+                          new C_MDS_RetryRequest(mdcache, mdr));
+      mdlog->flush();
+      return;
+    }
     dout(7) << "target is in different subvolume, failing..." << dendl;
     respond_to_request(mdr, -CEPHFS_EXDEV);
     return;
@@ -6888,11 +7579,17 @@ void Server::_link_remote_finish(MDRequestRef& mdr, bool inc,
   mdr->apply();
 
   MDRequestRef null_ref;
-  if (inc)
+  if (inc) {
     mdcache->send_dentry_link(dn, null_ref);
-  else
+  } else {
+    dn->state_clear(CDentry::STATE_UNLINKING);
     mdcache->send_dentry_unlink(dn, NULL, null_ref);
-  
+
+    MDSContext::vec finished;
+    dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+    mdcache->mds->queue_waiters(finished);
+  }
+
   // bump target popularity
   mds->balancer->hit_inode(targeti, META_POP_IWR);
   mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
@@ -7266,10 +7963,25 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   if (rmdir)
     mdr->disable_lock_cache();
+
   CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
   if (!dn)
     return;
 
+  if (is_reintegrate_pending(dn)) {
+    wait_for_pending_reintegrate(dn, mdr);
+    return;
+  }
+
+  // notify replica MDSes the dentry is under unlink
+  if (!dn->state_test(CDentry::STATE_UNLINKING)) {
+    dn->state_set(CDentry::STATE_UNLINKING);
+    mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
+    if (dn->replica_unlinking_ref) {
+      return;
+    }
+  }
+
   CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
   ceph_assert(!dnl->is_null());
   CInode *in = dnl->get_inode();
@@ -7286,11 +7998,13 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // do empty directory checks
       if (_dir_is_nonempty_unlocked(mdr, in)) {
-       respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+        dn->state_clear(CDentry::STATE_UNLINKING);
+        respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
        return;
       }
     } else {
       dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
+      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_EISDIR);
       return;
     }
@@ -7298,6 +8012,7 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // unlink
       dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
+      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_ENOTDIR);
       return;
     }
@@ -7305,8 +8020,10 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   CInode *diri = dn->get_dir()->get_inode();
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
-    if (!check_access(mdr, diri, MAY_WRITE))
+    if (!check_access(mdr, diri, MAY_WRITE)) {
+      dn->state_clear(CDentry::STATE_UNLINKING);
       return;
+    }
   }
 
   // -- create stray dentry? --
@@ -7345,6 +8062,7 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   if (in->is_dir() &&
       _dir_is_nonempty(mdr, in)) {
     respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+    dn->state_clear(CDentry::STATE_UNLINKING);
     return;
   }
 
@@ -7544,9 +8262,14 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
   }
 
   mdr->apply();
-  
+
+  dn->state_clear(CDentry::STATE_UNLINKING);
   mdcache->send_dentry_unlink(dn, straydn, mdr);
-  
+
+  MDSContext::vec finished;
+  dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+  mdcache->mds->queue_waiters(finished);
+
   if (straydn) {
     // update subtree map?
     if (strayin->is_dir())
@@ -7561,7 +8284,7 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
 
   // reply
   respond_to_request(mdr, 0);
-  
+
   // removing a new dn?
   dn->get_dir()->try_remove_unlinked_dn(dn);
 
@@ -8020,6 +8743,16 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   if (!destdn)
     return;
 
+  if (is_unlink_pending(destdn)) {
+    wait_for_pending_unlink(destdn, mdr);
+    return;
+  }
+
+  if (is_unlink_pending(srcdn)) {
+    wait_for_pending_unlink(srcdn, mdr);
+    return;
+  }
+
   dout(10) << " destdn " << *destdn << dendl;
   CDir *destdir = destdn->get_dir();
   ceph_assert(destdir->is_auth());
@@ -8187,7 +8920,10 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     if (!check_access(mdr, destdn->get_dir()->get_inode(), MAY_WRITE))
       return;
 
-    if (!check_fragment_space(mdr, destdn->get_dir()))
+    if (!linkmerge && !check_fragment_space(mdr, destdn->get_dir()))
+      return;
+
+    if (!linkmerge && !check_dir_max_entries(mdr, destdn->get_dir()))
       return;
 
     if (!check_access(mdr, srci, MAY_WRITE))
@@ -8433,6 +9169,12 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
 
   journal_and_reply(mdr, srci, destdn, le, fin);
+
+  // trigger to flush mdlog in case reintegrating or migrating the stray dn,
+  // because the link requests maybe waiting.
+  if (srcdn->get_dir()->inode->is_stray()) {
+    mdlog->flush();
+  }
   mds->balancer->maybe_fragment(destdn->get_dir(), false);
 }
 
@@ -8891,6 +9633,16 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       mdcache->journal_cow_dentry(mdr.get(), metablob, destdn, CEPH_NOSNAP, 0, destdnl);
 
     destdn->first = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+    {
+      auto do_corruption = inject_rename_corrupt_dentry_first;
+      if (unlikely(do_corruption > 0.0)) {
+        auto r = ceph::util::generate_random_number(0.0, 1.0);
+        if (r < do_corruption) {
+          dout(0) << "corrupting dn: " << *destdn << dendl;
+          destdn->first = -10;
+        }
+      }
+    }
 
     if (destdn->is_auth())
       metablob->add_primary_dentry(destdn, srci, true, true);
@@ -9033,6 +9785,14 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
 
   srcdn->get_dir()->unlink_inode(srcdn);
 
+  // After the stray dn being unlinked from the corresponding inode in case of
+  // reintegrate_stray/migrate_stray, just wake up the waitiers.
+  MDSContext::vec finished;
+  in->take_waiting(CInode::WAIT_UNLINK, finished);
+  if (!finished.empty()) {
+    mds->queue_waiters(finished);
+  }
+
   // dest
   if (srcdn_was_remote) {
     if (!linkmerge) {
@@ -10320,6 +11080,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     return;
   }
   if (snapname.length() == 0 ||
+      snapname.length() > snapshot_name_max ||
       snapname[0] == '_') {
     respond_to_request(mdr, -CEPHFS_EINVAL);
     return;
@@ -10378,6 +11139,8 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     em.first->second = info;
   newsnap.seq = snapid;
   newsnap.last_created = snapid;
+  newsnap.last_modified = info.stamp;
+  newsnap.change_attr++;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -10468,7 +11231,6 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   }
   snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
   dout(10) << " snapname " << snapname << " is " << snapid << dendl;
-
   if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
     MutationImpl::LockOpVec lov;
     lov.add_xlock(&diri->snaplock);
@@ -10516,6 +11278,8 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   newnode.snaps.erase(snapid);
   newnode.seq = seq;
   newnode.last_destroyed = seq;
+  newnode.last_modified = mdr->get_op_stamp();
+  newnode.change_attr++;
 
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
   le->metablob.add_table_transaction(TABLE_SNAP, stid);
@@ -10531,9 +11295,6 @@ void Server::_rmsnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid)
 {
   dout(10) << "_rmsnap_finish " << *mdr << " " << snapid << dendl;
   snapid_t stid = mdr->more()->stid;
-  auto p = mdr->more()->snapidbl.cbegin();
-  snapid_t seq;
-  decode(seq, p);  
 
   mdr->apply();
 
@@ -10548,6 +11309,8 @@ void Server::_rmsnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid)
 
   // yay
   mdr->in[0] = diri;
+  mdr->tracei = diri;
+  mdr->snapid = snapid;
   respond_to_request(mdr, 0);
 
   // purge snapshot data
@@ -10653,6 +11416,8 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   auto it = newsnap.snaps.find(snapid);
   ceph_assert(it != newsnap.snaps.end());
   it->second.name = dstname;
+  newsnap.last_modified = mdr->get_op_stamp();
+  newsnap.change_attr++;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -10707,3 +11472,18 @@ void Server::dump_reconnect_status(Formatter *f) const
   f->dump_stream("client_reconnect_gather") << client_reconnect_gather;
   f->close_section();
 }
+
+const bufferlist& Server::get_snap_trace(Session *session, SnapRealm *realm) const {
+  ceph_assert(session);
+  ceph_assert(realm);
+  if (session->info.has_feature(CEPHFS_FEATURE_NEW_SNAPREALM_INFO)) {
+    return realm->get_snap_trace_new();
+  } else {
+    return realm->get_snap_trace();
+  }
+}
+
+const bufferlist& Server::get_snap_trace(client_t client, SnapRealm *realm) const {
+  Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
+  return get_snap_trace(session, realm);
+}