]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / mds / Server.cc
index e6f7e864fb41075870512325cda987ce7201e0c6..bf12cb7e2c8fa9f24be9f6eae1b330aa829351cb 100644 (file)
@@ -31,6 +31,7 @@
 #include "Mutation.h"
 #include "MetricsHandler.h"
 #include "cephfs_features.h"
+#include "MDSContext.h"
 
 #include "msg/Messenger.h"
 
@@ -50,6 +51,7 @@
 #include "common/perf_counters.h"
 #include "include/compat.h"
 #include "osd/OSDMap.h"
+#include "fscrypt.h"
 
 #include <errno.h>
 
@@ -60,6 +62,8 @@
 
 #include "common/config.h"
 
+#include "msg/Message.h"
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
@@ -252,6 +256,7 @@ void Server::create_logger()
 Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
   mds(m), 
   mdcache(mds->mdcache), mdlog(mds->mdlog),
+  inject_rename_corrupt_dentry_first(g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first")),
   recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
   metrics_handler(metrics_handler)
 {
@@ -267,6 +272,7 @@ Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
   dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
   bal_fragment_size_max = g_conf().get_val<int64_t>("mds_bal_fragment_size_max");
   supported_features = feature_bitset_t(CEPHFS_FEATURES_MDS_SUPPORTED);
+  supported_metric_spec = feature_bitset_t(CEPHFS_METRIC_FEATURES_ALL);
 }
 
 void Server::dispatch(const cref_t<Message> &m)
@@ -353,6 +359,9 @@ void Server::dispatch(const cref_t<Message> &m)
   case CEPH_MSG_CLIENT_REQUEST:
     handle_client_request(ref_cast<MClientRequest>(m));
     return;
+  case CEPH_MSG_CLIENT_REPLY:
+    handle_client_reply(ref_cast<MClientReply>(m));
+    return;
   case CEPH_MSG_CLIENT_RECLAIM:
     handle_client_reclaim(ref_cast<MClientReclaim>(m));
     return;
@@ -360,8 +369,8 @@ void Server::dispatch(const cref_t<Message> &m)
     handle_peer_request(ref_cast<MMDSPeerRequest>(m));
     return;
   default:
-    derr << "server unknown message " << m->get_type() << dendl;
-    ceph_abort_msg("server unknown message");  
+    derr << "Server unknown message " << m->get_type() << " from peer type " << m->get_connection()->get_peer_type() << dendl;
+    ceph_abort_msg("server unknown message  " + to_string(m->get_type()) + " from peer type " + to_string(m->get_connection()->get_peer_type()));  
   }
 }
 
@@ -437,7 +446,7 @@ void Server::reclaim_session(Session *session, const cref_t<MClientReclaim> &m)
   unsigned flags = m->get_flags();
   if (flags != CEPH_RECLAIM_RESET) { // currently only support reset
     dout(10) << __func__ << " unsupported flags" << dendl;
-    reply->set_result(-CEPHFS_EOPNOTSUPP);
+    reply->set_result(-CEPHFS_EINVAL);
     mds->send_message_client(reply, session);
     return;
   }
@@ -459,10 +468,7 @@ void Server::reclaim_session(Session *session, const cref_t<MClientReclaim> &m)
 
   if (flags & CEPH_RECLAIM_RESET) {
     finish_reclaim_session(session, reply);
-    return;
-  }
-
-  ceph_abort();
+  } else ceph_assert(0); /* no other flags are handled at this time */
 }
 
 void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply)
@@ -506,8 +512,9 @@ void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaim
 void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
 {
   Session *session = mds->get_session(m);
+  uint32_t flags = m->get_flags();
   dout(3) << __func__ <<  " " << *m << " from " << m->get_source() << dendl;
-  ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+  ceph_assert(m->is_a_client()); // should _not_ come from an mds!
 
   if (!session) {
     dout(0) << " ignoring sessionless msg " << *m << dendl;
@@ -525,7 +532,15 @@ void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
     return;
   }
 
-  if (m->get_flags() & MClientReclaim::FLAG_FINISH) {
+  if (flags & MClientReclaim::FLAG_FINISH) {
+    if (flags ^ MClientReclaim::FLAG_FINISH) {
+      dout(0) << __func__ << " client specified FLAG_FINISH with other flags."
+                             " Other flags:" << flags << dendl;
+      auto reply = make_message<MClientReclaimReply>(0);
+      reply->set_result(-CEPHFS_EINVAL);
+      mds->send_message_client(reply, session);
+      return;
+    }
     finish_reclaim_session(session);
   } else {
     reclaim_session(session, m);
@@ -538,7 +553,7 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   Session *session = mds->get_session(m);
 
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
-  ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+  ceph_assert(m->is_a_client()); // should _not_ come from an mds!
 
   if (!session) {
     dout(0) << " ignoring sessionless msg " << *m << dendl;
@@ -579,12 +594,38 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   uint64_t sseq = 0;
   switch (m->get_op()) {
   case CEPH_SESSION_REQUEST_OPEN:
+    if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+      dout(0) << "new sessions are not permitted, enable again via"
+                 "`ceph fs set <fs_name> refuse_client_session false`" << dendl;
+      auto reply = make_message<MClientSession>(CEPH_SESSION_REJECT);
+      reply->metadata["error_string"] = "new sessions are not permitted,"
+                                        " enable again via `ceph fs set"
+                                        " <fs_name> refuse_client_session false`";
+      mds->send_message(reply, m->get_connection());
+      return;
+    }
     if (session->is_opening() ||
        session->is_open() ||
        session->is_stale() ||
        session->is_killing() ||
        terminating_sessions) {
-      dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+      if (m->supported_features.test(CEPHFS_FEATURE_NOTIFY_SESSION_STATE)) {
+       if (session->is_open() && !mds->is_stopping()) {
+          dout(10) << "currently already opened" << dendl;
+
+          auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN,
+                                                    session->get_push_seq());
+          if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+            reply->supported_features = supported_features;
+          mds->send_message_client(reply, session);
+          if (mdcache->is_readonly()) {
+            auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
+            mds->send_message_client(m, session);
+          }
+       }
+      }
+      dout(10) << "currently " << session->get_state_name()
+               << ", dropping this req" << dendl;
       return;
     }
     ceph_assert(session->is_closed() || session->is_closing());
@@ -781,7 +822,11 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
     break;
 
   default:
-    ceph_abort();
+    auto m = make_message<MClientSession>(CEPH_SESSION_REJECT);
+    mds->send_message_client(m, session);
+    derr << "Server received unknown message " << m->get_type() << ", closing session and blocklisting the client " << session->get_client() << dendl;
+    CachedStackStringStream css;
+    mds->evict_client(session->get_client().v, false, true, *css, nullptr);
   }
 }
 
@@ -858,8 +903,10 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
     metrics_handler->add_session(session);
     ceph_assert(session->get_connection());
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
-    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
       reply->supported_features = supported_features;
+      reply->metric_spec = supported_metric_spec;
+    }
     mds->send_message_client(reply, session);
     if (mdcache->is_readonly()) {
       auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
@@ -1012,8 +1059,10 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
         metrics_handler->add_session(session);
 
        auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
-       if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+       if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
          reply->supported_features = supported_features;
+          reply->metric_spec = supported_metric_spec;
+       }
        mds->send_message_client(reply, session);
 
        if (mdcache->is_readonly())
@@ -1155,7 +1204,7 @@ void Server::find_idle_sessions()
       if (mds->locker->revoke_stale_caps(session)) {
        mds->locker->remove_stale_leases(session);
        finish_flush_session(session, session->get_push_seq());
-       auto m = make_message<MClientSession>(CEPH_SESSION_STALE, session->get_push_seq());
+       auto m = make_message<MClientSession>(CEPH_SESSION_STALE);
        mds->send_message_client(m, session);
       } else {
        to_evict.push_back(session);
@@ -1261,6 +1310,9 @@ void Server::handle_conf_change(const std::set<std::string>& changed) {
   if (changed.count("mds_alternate_name_max")) {
     alternate_name_max  = g_conf().get_val<Option::size_t>("mds_alternate_name_max");
   }
+  if (changed.count("mds_fscrypt_last_block_max_size")) {
+    fscrypt_last_block_max_size = g_conf().get_val<Option::size_t>("mds_fscrypt_last_block_max_size");
+  }
   if (changed.count("mds_dir_max_entries")) {
     dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
     dout(20) << __func__ << " max entries per directory changed to "
@@ -1271,6 +1323,9 @@ void Server::handle_conf_change(const std::set<std::string>& changed) {
     dout(20) << __func__ << " max fragment size changed to "
             << bal_fragment_size_max << dendl;
   }
+  if (changed.count("mds_inject_rename_corrupt_dentry_first")) {
+    inject_rename_corrupt_dentry_first = g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first");
+  }
 }
 
 /*
@@ -1302,38 +1357,23 @@ void Server::kill_session(Session *session, Context *on_safe)
   }
 }
 
-size_t Server::apply_blocklist(const std::set<entity_addr_t> &blocklist)
+size_t Server::apply_blocklist()
 {
-  bool prenautilus = mds->objecter->with_osdmap(
-      [&](const OSDMap& o) {
-       return o.require_osd_release < ceph_release_t::nautilus;
-      });
-
   std::vector<Session*> victims;
   const auto& sessions = mds->sessionmap.get_sessions();
-  for (const auto& p : sessions) {
-    if (!p.first.is_client()) {
-      // Do not apply OSDMap blocklist to MDS daemons, we find out
-      // about their death via MDSMap.
-      continue;
-    }
-
-    Session *s = p.second;
-    auto inst_addr = s->info.inst.addr;
-    // blocklist entries are always TYPE_ANY for nautilus+
-    inst_addr.set_type(entity_addr_t::TYPE_ANY);
-    if (blocklist.count(inst_addr)) {
-      victims.push_back(s);
-      continue;
-    }
-    if (prenautilus) {
-      // ...except pre-nautilus, they were TYPE_LEGACY
-      inst_addr.set_type(entity_addr_t::TYPE_LEGACY);
-      if (blocklist.count(inst_addr)) {
-       victims.push_back(s);
+  mds->objecter->with_osdmap(
+    [&](const OSDMap& o) {
+      for (const auto& p : sessions) {
+       if (!p.first.is_client()) {
+         // Do not apply OSDMap blocklist to MDS daemons, we find out
+         // about their death via MDSMap.
+         continue;
+       }
+       if (o.is_blocklisted(p.second->info.inst.addr)) {
+         victims.push_back(p.second);
+       }
       }
-    }
-  }
+    });
 
   for (const auto& s : victims) {
     kill_session(s, nullptr);
@@ -1424,6 +1464,18 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
     return;
   }
 
+  if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+    mds->clog->warn() << "client could not reconnect as" 
+                         " file system flag refuse_client_session is set";
+    dout(0) << "client cannot reconnect when file system flag"
+               " refuse_client_session is set" << dendl;
+    auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
+    reply->metadata["error_string"] = "client cannot reconnect when file system flag" 
+                                        " refuse_client_session is set";
+    mds->send_message(reply, m->get_connection());
+    return;
+  }
+
   if (!session->is_open()) {
     dout(0) << " ignoring msg from not-open session" << *m << dendl;
     auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
@@ -1495,8 +1547,10 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
     metrics_handler->add_session(session);
     // notify client of success with an OPEN
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
-    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
       reply->supported_features = supported_features;
+      reply->metric_spec = supported_metric_spec;
+    }
     mds->send_message_client(reply, session);
     mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
   }
@@ -1928,20 +1982,23 @@ void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEv
     mdr->pin(dn);
 
   early_reply(mdr, in, dn);
-  
+
   mdr->committing = true;
   submit_mdlog_entry(le, fin, mdr, __func__);
-  
+
   if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
     if (mds->queue_one_replay()) {
       dout(10) << " queued next replay op" << dendl;
     } else {
       dout(10) << " journaled last replay op" << dendl;
     }
-  } else if (mdr->did_early_reply)
+  } else if (mdr->did_early_reply) {
     mds->locker->drop_rdlocks_for_early_reply(mdr.get());
-  else
+    if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
+      mdlog->flush();
+  } else {
     mdlog->flush();
+  }
 }
 
 void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
@@ -2137,6 +2194,9 @@ void Server::early_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn)
   mds->logger->inc(l_mds_reply);
   utime_t lat = ceph_clock_now() - req->get_recv_stamp();
   mds->logger->tinc(l_mds_reply_latency, lat);
+  if (lat >= g_conf()->mds_op_complaint_time) {
+    mds->logger->inc(l_mds_slow_reply);
+  }
   if (client_inst.name.is_client()) {
     mds->sessionmap.hit_session(mdr->session);
   }
@@ -2195,6 +2255,9 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
     mds->logger->inc(l_mds_reply);
     utime_t lat = ceph_clock_now() - mdr->client_request->get_recv_stamp();
     mds->logger->tinc(l_mds_reply_latency, lat);
+    if (lat >= g_conf()->mds_op_complaint_time) {
+      mds->logger->inc(l_mds_slow_reply);
+    }
     if (session && client_inst.name.is_client()) {
       mds->sessionmap.hit_session(session);
     }
@@ -2232,6 +2295,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
     mds->send_message_client(reply, session);
   }
 
+  if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+    mds->send_message(reply, mdr->client_request->get_connection());
+  }
+
   if (req->is_queued_for_replay() &&
       (mdr->has_completed || reply->get_result() < 0)) {
     if (reply->get_result() < 0) {
@@ -2287,7 +2354,7 @@ void Server::set_trace_dist(const ref_t<MClientReply> &reply,
       realm = in->find_snaprealm();
     else
       realm = dn->get_dir()->get_inode()->find_snaprealm();
-    reply->snapbl = realm->get_snap_trace();
+    reply->snapbl = get_snap_trace(session, realm);
     dout(10) << "set_trace_dist snaprealm " << *realm << " len=" << reply->snapbl.length() << dendl;
   }
 
@@ -2314,20 +2381,7 @@ void Server::set_trace_dist(const ref_t<MClientReply> &reply,
     dout(20) << "set_trace_dist added dir  " << *dir << dendl;
 
     encode(dn->get_name(), bl);
-
-    int lease_mask = 0;
-    CDentry::linkage_t *dnl = dn->get_linkage(mdr->get_client(), mdr);
-    if (dnl->is_primary()) {
-      ceph_assert(dnl->get_inode() == in);
-      lease_mask = CEPH_LEASE_PRIMARY_LINK;
-    } else {
-      if (dnl->is_remote())
-       ceph_assert(dnl->get_remote_ino() == in->ino());
-      else
-       ceph_assert(!in);
-    }
-    mds->locker->issue_client_lease(dn, mdr, lease_mask, now, bl);
-    dout(20) << "set_trace_dist added dn   " << snapid << " " << *dn << dendl;
+    mds->locker->issue_client_lease(dn, in, mdr, now, bl);
   } else
     reply->head.is_dentry = 0;
 
@@ -2360,7 +2414,7 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   bool sessionclosed_isok = replay_unsafe_with_closed_session;
   // active session?
   Session *session = 0;
-  if (req->get_source().is_client()) {
+  if (req->is_a_client()) {
     session = mds->get_session(req);
     if (!session) {
       dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
@@ -2464,7 +2518,7 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
 
   // process embedded cap releases?
   //  (only if NOT replay!)
-  if (!req->releases.empty() && req->get_source().is_client() && !req->is_replay()) {
+  if (!req->releases.empty() && req->is_a_client() && !req->is_replay()) {
     client_t client = req->get_source().num();
     for (const auto &r : req->releases) {
       mds->locker->process_request_cap_release(mdr, client, r.item, r.dname);
@@ -2476,6 +2530,38 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   return;
 }
 
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+  dout(4) << "handle_client_reply " << *reply << dendl;
+
+  ceph_assert(reply->is_safe());
+  ceph_tid_t tid = reply->get_tid();
+
+  if (mds->internal_client_requests.count(tid) == 0) {
+    dout(1) << " no pending request on tid " << tid << dendl;
+    return;
+  }
+
+  auto &req = mds->internal_client_requests.at(tid);
+  CDentry *dn = req.get_dentry();
+
+  switch (reply->get_op()) {
+  case CEPH_MDS_OP_RENAME:
+    if (dn) {
+      dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+      MDSContext::vec finished;
+      dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+      mds->queue_waiters(finished);
+    }
+    break;
+  default:
+    dout(5) << " unknown client op " << reply->get_op() << dendl;
+  }
+
+  mds->internal_client_requests.erase(tid);
+}
+
 void Server::handle_osd_map()
 {
   /* Note that we check the OSDMAP_FULL flag directly rather than
@@ -2698,7 +2784,7 @@ void Server::handle_peer_request(const cref_t<MMDSPeerRequest> &m)
 
   CDentry *straydn = NULL;
   if (m->straybl.length() > 0) {
-    mdcache->decode_replica_stray(straydn, m->straybl, from);
+    mdcache->decode_replica_stray(straydn, nullptr, m->straybl, from);
     ceph_assert(straydn);
     m->straybl.clear();
   }
@@ -2875,7 +2961,7 @@ void Server::handle_peer_request_reply(const cref_t<MMDSPeerRequest> &m)
     break;
 
   default:
-    ceph_abort();
+    ceph_abort_msg("unknown op " + to_string(m->get_op()) + " requested");
   }
 }
 
@@ -2902,7 +2988,7 @@ void Server::dispatch_peer_request(MDRequestRef& mdr)
 
       if (!lock) {
        dout(10) << "don't have object, dropping" << dendl;
-       ceph_abort(); // can this happen, if we auth pinned properly.
+       ceph_abort_msg("don't have object"); // can this happen, if we auth pinned properly.
       }
       if (op == MMDSPeerRequest::OP_XLOCK && !lock->get_parent()->is_auth()) {
        dout(10) << "not auth for remote xlock attempt, dropping on " 
@@ -2989,7 +3075,7 @@ void Server::dispatch_peer_request(MDRequestRef& mdr)
     break;
 
   default: 
-    ceph_abort();
+    ceph_abort_msg("unknown op "+ to_string(op)+ " received");
   }
 }
 
@@ -3304,17 +3390,36 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   // while session is opening.
   bool allow_prealloc_inos = mdr->session->is_open();
 
+  inodeno_t _useino = useino;
+
   // assign ino
-  if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(useino))) {
-    mds->sessionmap.mark_projected(mdr->session);
-    dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
-            << " (" << mdr->session->info.prealloc_inos.size() << " left)"
-            << dendl;
-  } else {
-    mdr->alloc_ino = 
-      _inode->ino = mds->inotable->project_alloc_id(useino);
-    dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
-  }
+  do {
+    if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(_useino))) {
+      if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+        _inode->ino = 0;
+        dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+                 << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+                << " but has been taken, will try again!" << dendl;
+      } else {
+        mds->sessionmap.mark_projected(mdr->session);
+        dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+                 << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+                 << dendl;
+      }
+    } else {
+      mdr->alloc_ino =
+       _inode->ino = mds->inotable->project_alloc_id(_useino);
+      if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+        mds->inotable->apply_alloc_id(_inode->ino);
+        _inode->ino = 0;
+        dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino
+                << " but has been taken, will try again!" << dendl;
+      } else {
+        dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
+      }
+    }
+    _useino = 0;
+  } while (!_inode->ino);
 
   if (useino && useino != _inode->ino) {
     dout(0) << "WARNING: client specified " << useino << " and i allocated " << _inode->ino << dendl;
@@ -3323,7 +3428,7 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
        << " but mds." << mds->get_nodeid() << " allocated " << _inode->ino;
     //ceph_abort(); // just for now.
   }
-    
+
   if (allow_prealloc_inos &&
       mdr->session->get_num_projected_prealloc_inos() < g_conf()->mds_client_prealloc_inos / 2) {
     int need = g_conf()->mds_client_prealloc_inos - mdr->session->get_num_projected_prealloc_inos();
@@ -3354,18 +3459,20 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   _inode->truncate_seq = 1; /* starting with 1, 0 is kept for no-truncation logic */
 
   CInode *diri = dir->get_inode();
+  auto pip = diri->get_projected_inode();
 
-  dout(10) << oct << " dir mode 0" << diri->get_inode()->mode << " new mode 0" << mode << dec << dendl;
+  dout(10) << oct << " dir mode 0" << pip->mode << " new mode 0" << mode << dec << dendl;
 
-  if (diri->get_inode()->mode & S_ISGID) {
+  if (pip->mode & S_ISGID) {
     dout(10) << " dir is sticky" << dendl;
-    _inode->gid = diri->get_inode()->gid;
+    _inode->gid = pip->gid;
     if (S_ISDIR(mode)) {
-      dout(10) << " new dir also sticky" << dendl;      
+      dout(10) << " new dir also sticky" << dendl;
       _inode->mode |= S_ISGID;
     }
-  } else 
+  } else {
     _inode->gid = mdr->client_request->get_caller_gid();
+  }
 
   _inode->uid = mdr->client_request->get_caller_uid();
 
@@ -3375,6 +3482,11 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   _inode->change_attr = 0;
 
   const cref_t<MClientRequest> &req = mdr->client_request;
+
+  dout(10) << "copying fscrypt_auth len " << req->fscrypt_auth.size() << dendl;
+  _inode->fscrypt_auth = req->fscrypt_auth;
+  _inode->fscrypt_file = req->fscrypt_file;
+
   if (req->get_data().length()) {
     auto p = req->get_data().cbegin();
 
@@ -3382,9 +3494,6 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
     auto _xattrs = CInode::allocate_xattr_map();
     decode_noshare(*_xattrs, p);
     dout(10) << "prepare_new_inode setting xattrs " << *_xattrs << dendl;
-    if (_xattrs->count("encryption.ctx")) {
-      _inode->fscrypt = true;
-    }
     in->reset_xattrs(std::move(_xattrs));
   }
 
@@ -3434,15 +3543,68 @@ void Server::apply_allocated_inos(MDRequestRef& mdr, Session *session)
   }
 }
 
+struct C_MDS_TryOpenInode : public ServerContext {
+  MDRequestRef mdr;
+  inodeno_t ino;
+  C_MDS_TryOpenInode(Server *s, MDRequestRef& r, inodeno_t i) :
+    ServerContext(s), mdr(r), ino(i) {}
+  void finish(int r) override {
+    server->_try_open_ino(mdr, r, ino);
+  }
+};
+
+void Server::_try_open_ino(MDRequestRef& mdr, int r, inodeno_t ino)
+{
+  dout(10) << "_try_open_ino " << mdr.get() << " ino " << ino << " r=" << r << dendl;
+
+  // `r` is a rank if >=0, else an error code
+  if (r >= 0) {
+    mds_rank_t dest_rank(r);
+    if (dest_rank == mds->get_nodeid())
+      dispatch_client_request(mdr);
+    else
+      mdcache->request_forward(mdr, dest_rank);
+    return;
+  }
+
+  // give up
+  if (r == -CEPHFS_ENOENT || r == -CEPHFS_ENODATA)
+    r = -CEPHFS_ESTALE;
+  respond_to_request(mdr, r);
+}
+
 class C_MDS_TryFindInode : public ServerContext {
   MDRequestRef mdr;
+  MDCache *mdcache;
+  inodeno_t ino;
 public:
-  C_MDS_TryFindInode(Server *s, MDRequestRef& r) : ServerContext(s), mdr(r) {}
+  C_MDS_TryFindInode(Server *s, MDRequestRef& r, MDCache *m, inodeno_t i) :
+    ServerContext(s), mdr(r), mdcache(m), ino(i) {}
   void finish(int r) override {
-    if (r == -CEPHFS_ESTALE) // :( find_ino_peers failed
-      server->respond_to_request(mdr, r);
-    else
+    if (r == -CEPHFS_ESTALE) { // :( find_ino_peers failed
+      /*
+       * There has one case that when the MDS crashes and the
+       * openfiletable journal couldn't be flushed and then
+       * the replacing MDS is possibly won't load some already
+       * opened CInodes into the MDCache. And if the clients
+       * will retry some requests after reconnected, the MDS
+       * will return -ESTALE after failing to find the ino in
+       * all active peers.
+       *
+       * As a workaround users can run `ls -R ${mountpoint}`
+       * to list all the sub-files or sub-direcotries from the
+       * mountpoint.
+       *
+       * We need try to open the ino and try it again.
+       */
+      CInode *in = mdcache->get_inode(ino);
+      if (in && in->state_test(CInode::STATE_PURGING))
+        server->respond_to_request(mdr, r);
+      else
+        mdcache->open_ino(ino, (int64_t)-1, new C_MDS_TryOpenInode(server, mdr, ino));
+    } else {
       server->dispatch_client_request(mdr);
+    }
   }
 };
 
@@ -3482,8 +3644,8 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr,
       respond_to_request(mdr, r);
     } else if (r == -CEPHFS_ESTALE) {
       dout(10) << "FAIL on CEPHFS_ESTALE but attempting recovery" << dendl;
-      MDSContext *c = new C_MDS_TryFindInode(this, mdr);
-      mdcache->find_ino_peers(refpath.get_ino(), c);
+      inodeno_t ino = refpath.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
     } else {
       dout(10) << "FAIL on error " << r << dendl;
       respond_to_request(mdr, r);
@@ -3517,12 +3679,18 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr,
 
 /** rdlock_path_xlock_dentry
  * traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
  * get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
  */
 CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
-                                         bool create, bool okexist, bool want_layout)
+                                         bool create, bool okexist, bool authexist,
+                                         bool want_layout)
 {
   const filepath& refpath = mdr->get_filepath();
   dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
@@ -3560,6 +3728,8 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
     flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
   if (create)
     flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+  if (authexist)
+    flags |= MDS_TRAVERSE_WANT_INODE;
   if (want_layout)
     flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
   int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
@@ -3568,7 +3738,8 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
   if (r < 0) {
     if (r == -CEPHFS_ESTALE) {
       dout(10) << "FAIL on CEPHFS_ESTALE but attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = refpath.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
       return nullptr;
     }
     respond_to_request(mdr, r);
@@ -3580,7 +3751,9 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
   CInode *diri = dir->get_inode();
 
   if (!mdr->reqid.name.is_mds()) {
-    if (diri->is_system() && !diri->is_root()) {
+    if (diri->is_system() && !diri->is_root() &&
+       (!diri->is_lost_and_found() ||
+        mdr->client_request->get_op() != CEPH_MDS_OP_UNLINK)) {
       respond_to_request(mdr, -CEPHFS_EROFS);
       return nullptr;
     }
@@ -3653,7 +3826,8 @@ Server::rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn)
   if (r != 0) {
     if (r == -CEPHFS_ESTALE) {
       dout(10) << "CEPHFS_ESTALE on path, attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = refpath.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
     } else if (r < 0) {
       respond_to_request(mdr, r);
     }
@@ -3665,7 +3839,8 @@ Server::rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn)
   if (r != 0) {
     if (r == -CEPHFS_ESTALE) {
       dout(10) << "CEPHFS_ESTALE on path2, attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath2.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = refpath2.get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
     } else if (r < 0) {
       respond_to_request(mdr, r);
     }
@@ -3873,8 +4048,6 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   if (!ref)
     return;
 
-  mdr->getattr_caps = mask;
-
   /*
    * if client currently holds the EXCL cap on a field, do not rdlock
    * it; client's stat() will result in valid info if _either_ EXCL
@@ -3927,7 +4100,7 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   // value for them.  (currently this matters for xattrs and inline data)
   mdr->getattr_caps = mask;
 
-  mds->balancer->hit_inode(ref, META_POP_IRD, req->get_source().num());
+  mds->balancer->hit_inode(ref, META_POP_IRD);
 
   // reply
   dout(10) << "reply to stat on " << *req << dendl;
@@ -4326,8 +4499,7 @@ void Server::handle_client_open(MDRequestRef& mdr)
   if (cmode & CEPH_FILE_MODE_WR)
     mds->balancer->hit_inode(cur, META_POP_IWR);
   else
-    mds->balancer->hit_inode(cur, META_POP_IRD,
-                            mdr->client_request->get_source().num());
+    mds->balancer->hit_inode(cur, META_POP_IRD);
 
   CDentry *dn = 0;
   if (req->get_dentry_wanted()) {
@@ -4349,6 +4521,9 @@ public:
   void finish(int r) override {
     ceph_assert(r == 0);
 
+    // crash current MDS and the replacing MDS will test the journal
+    ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
     dn->pop_projected_linkage();
 
     // dirty inode, dn, dir
@@ -4385,14 +4560,19 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   }
 
   bool excl = req->head.args.open.flags & CEPH_O_EXCL;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
   if (!excl && !dnl->is_null()) {
     // it existed.
-    mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+    ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
 
     MutationImpl::LockOpVec lov;
     lov.add_rdlock(&dnl->get_inode()->snaplock);
@@ -4679,7 +4859,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   //  this isn't perfect, but we should capture the main variable/unbounded size items!
   int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8)*2;
   int bytes_left = max_bytes - front_bytes;
-  bytes_left -= realm->get_snap_trace().length();
+  bytes_left -= get_snap_trace(session, realm).length();
 
   // build dir contents
   bufferlist dnbl;
@@ -4699,8 +4879,11 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     bool dnp = dn->use_projected(client, mdr);
     CDentry::linkage_t *dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
 
-    if (dnl->is_null())
+    if (dnl->is_null()) {
+      if (dn->get_num_ref() == 0 && !dn->is_projected())
+       dir->remove_dentry(dn);
       continue;
+    }
 
     if (dn->last < snapid || dn->first > snapid) {
       dout(20) << "skipping non-overlapping snap " << *dn << dendl;
@@ -4760,8 +4943,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     // dentry
     dout(12) << "including    dn " << *dn << dendl;
     encode(dn->get_name(), dnbl);
-    int lease_mask = dnl->is_primary() ? CEPH_LEASE_PRIMARY_LINK : 0;
-    mds->locker->issue_client_lease(dn, mdr, lease_mask, now, dnbl);
+    mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
 
     // inode
     dout(12) << "including inode " << *in << dendl;
@@ -4808,7 +4990,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   mdr->reply_extra_bl = dirbl;
 
   // bump popularity.  NOTE: this doesn't quite capture it.
-  mds->balancer->hit_dir(dir, META_POP_READDIR, -1, numfiles);
+  mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
   
   // reply
   mdr->tracei = diri;
@@ -5037,10 +5219,24 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   __u32 mask = req->head.args.setattr.mask;
   __u32 access_mask = MAY_WRITE;
 
+  if (req->get_header().version < 6) {
+    // No changes to fscrypted inodes by downrevved clients
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EPERM);
+      return;
+    }
+
+    // Only allow fscrypt field changes by capable clients
+    if (mask & (CEPH_SETATTR_FSCRYPT_FILE|CEPH_SETATTR_FSCRYPT_AUTH)) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+  }
+
   // xlock inode
-  if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
+  if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID|CEPH_SETATTR_FSCRYPT_AUTH|CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID))
     lov.add_xlock(&cur->authlock);
-  if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
+  if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE|CEPH_SETATTR_FSCRYPT_FILE))
     lov.add_xlock(&cur->filelock);
   if (mask & CEPH_SETATTR_CTIME)
     lov.add_wrlock(&cur->versionlock);
@@ -5071,7 +5267,15 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   bool truncating_smaller = false;
   if (mask & CEPH_SETATTR_SIZE) {
-    truncating_smaller = req->head.args.setattr.size < old_size;
+    if (req->get_data().length() >
+        sizeof(struct ceph_fscrypt_last_block_header) + fscrypt_last_block_max_size) {
+      dout(10) << __func__ << ": the last block size is too large" << dendl;
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
+    truncating_smaller = req->head.args.setattr.size < old_size ||
+       (req->head.args.setattr.size == old_size && req->get_data().length());
     if (truncating_smaller && pip->is_truncating()) {
       dout(10) << " waiting for pending truncate from " << pip->truncate_from
               << " to " << pip->truncate_size << " to complete on " << *cur << dendl;
@@ -5080,6 +5284,32 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
       cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
       return;
     }
+
+    if (truncating_smaller && req->get_data().length()) {
+      struct ceph_fscrypt_last_block_header header;
+      memset(&header, 0, sizeof(header));
+      auto bl = req->get_data().cbegin();
+      DECODE_START(1, bl);
+      decode(header.change_attr, bl);
+      DECODE_FINISH(bl);
+
+      dout(20) << __func__ << " mdr->retry:" << mdr->retry
+               << " header.change_attr: " << header.change_attr
+               << " header.file_offset: " << header.file_offset
+               << " header.block_size: " << header.block_size
+               << dendl;
+
+      if (header.change_attr != pip->change_attr) {
+        dout(5) << __func__ << ": header.change_attr:" << header.change_attr
+                << " != current change_attr:" << pip->change_attr
+                << ", let client retry it!" << dendl;
+        // flush the journal to make sure the clients will get the lasted
+        // change_attr as possible for the next retry
+        mds->mdlog->flush();
+        respond_to_request(mdr, -CEPHFS_EAGAIN);
+        return;
+      }
+    }
   }
 
   bool changed_ranges = false;
@@ -5098,10 +5328,20 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   if (mask & CEPH_SETATTR_MODE)
     pi.inode->mode = (pi.inode->mode & ~07777) | (req->head.args.setattr.mode & 07777);
-  else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
-           S_ISREG(pi.inode->mode) &&
-            (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
-    pi.inode->mode &= ~(S_ISUID|S_ISGID);
+  else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID|
+                   CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID)) &&
+           S_ISREG(pi.inode->mode)) {
+    if (mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID) &&
+       (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+      pi.inode->mode &= ~(S_ISUID|S_ISGID);
+    } else {
+      if (mask & CEPH_SETATTR_KILL_SUID) {
+        pi.inode->mode &= ~S_ISUID;
+      }
+      if (mask & CEPH_SETATTR_KILL_SGID) {
+        pi.inode->mode &= ~S_ISGID;
+      }
+    }
   }
 
   if (mask & CEPH_SETATTR_MTIME)
@@ -5114,7 +5354,7 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     pi.inode->time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
   if (mask & CEPH_SETATTR_SIZE) {
     if (truncating_smaller) {
-      pi.inode->truncate(old_size, req->head.args.setattr.size);
+      pi.inode->truncate(old_size, req->head.args.setattr.size, req->get_data());
       le->metablob.add_truncate_start(cur->ino());
     } else {
       pi.inode->size = req->head.args.setattr.size;
@@ -5130,6 +5370,11 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     }
   }
 
+  if (mask & CEPH_SETATTR_FSCRYPT_AUTH)
+    pi.inode->fscrypt_auth = req->fscrypt_auth;
+  if (mask & CEPH_SETATTR_FSCRYPT_FILE)
+    pi.inode->fscrypt_file = req->fscrypt_file;
+
   pi.inode->version = cur->pre_dirty();
   pi.inode->ctime = mdr->get_op_stamp();
   if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
@@ -5719,6 +5964,7 @@ int Server::check_layout_vxattr(MDRequestRef& mdr,
 void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
 {
   const cref_t<MClientRequest> &req = mdr->client_request;
+  MutationImpl::LockOpVec lov;
   string name(req->get_path2());
   bufferlist bl = req->get_data();
   string value (bl.c_str(), bl.length());
@@ -5744,6 +5990,18 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     if (!xlock_policylock(mdr, cur, true))
       return;
 
+    /* We need 'As' caps for the fscrypt context */
+    lov.add_xlock(&cur->authlock);
+    if (!mds->locker->acquire_locks(mdr, lov)) {
+      return;
+    }
+
+    /* encrypted directories can't have their layout changed */
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
     file_layout_t layout;
     if (cur->get_projected_inode()->has_layout())
       layout = cur->get_projected_inode()->layout;
@@ -5775,11 +6033,16 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
       return;
 
-    MutationImpl::LockOpVec lov;
     lov.add_xlock(&cur->filelock);
     if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
+    /* encrypted files can't have their layout changed */
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
     auto pi = cur->project_inode(mdr);
     int64_t old_pool = pi.inode->layout.pool_id;
     pi.inode->add_old_pool(old_pool);
@@ -5800,7 +6063,7 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
       return;
     }
 
-    if (quota.is_enable() && !cur->get_projected_srnode())
+    if (quota.is_enabled() && !cur->get_projected_srnode())
       adjust_realm = true;
 
     if (!xlock_policylock(mdr, cur, false, adjust_realm))
@@ -5842,7 +6105,6 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
      */
     if (!mdr->more()->rdonly_checks) {
       if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
-        MutationImpl::LockOpVec lov;
         lov.add_rdlock(&cur->snaplock);
         if (!mds->locker->acquire_locks(mdr, lov))
           return;
@@ -6308,8 +6570,6 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   pi.inode->ctime = mdr->get_op_stamp();
   if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
     pi.inode->rstat.rctime = mdr->get_op_stamp();
-  if (name == "encryption.ctx"sv)
-    pi.inode->fscrypt = true;
   pi.inode->change_attr++;
   pi.inode->xattr_version++;
 
@@ -6560,6 +6820,84 @@ void Server::handle_client_getvxattr(MDRequestRef& mdr)
 
 // ------------------------------------------------
 
+struct C_WaitUnlinkToFinish : public MDSContext {
+protected:
+  MDCache *mdcache;
+  CDentry *dn;
+  MDSContext *fin;
+
+  MDSRank *get_mds() override
+  {
+    ceph_assert(mdcache != NULL);
+    return mdcache->mds;
+  }
+
+public:
+  C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+    mdcache(m), dn(d), fin(f) {}
+  void finish(int r) override {
+    fin->complete(r);
+    dn->put(CDentry::PIN_PURGING);
+  }
+};
+
+bool Server::is_unlink_pending(CDentry *dn)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
+      return true;
+  }
+  return false;
+}
+
+void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
+{
+  dout(20) << __func__ << " dn " << *dn << dendl;
+  mds->locker->drop_locks(mdr.get());
+  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+  dn->get(CDentry::PIN_PURGING);
+  dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
+}
+
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+  MDCache *mdcache;
+  CDentry *dn;
+  MDSContext *fin;
+
+  MDSRank *get_mds() override
+  {
+    ceph_assert(mdcache != NULL);
+    return mdcache->mds;
+  }
+
+public:
+  C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+    mdcache(m), dn(d), fin(f) {}
+  void finish(int r) override {
+    fin->complete(r);
+    dn->put(CDentry::PIN_PURGING);
+  }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+      return true;
+  }
+  return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+  dout(20) << __func__ << " dn " << *dn << dendl;
+  mds->locker->drop_locks(mdr.get());
+  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+  dn->get(CDentry::PIN_PURGING);
+  dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
 // MKNOD
 
 class C_MDS_mknod_finish : public ServerLogContext {
@@ -6571,6 +6909,9 @@ public:
   void finish(int r) override {
     ceph_assert(r == 0);
 
+    // crash current MDS and the replacing MDS will test the journal
+    ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
     // link the inode
     dn->pop_projected_linkage();
     
@@ -6619,10 +6960,15 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
     mode |= S_IFREG;
 
   mdr->disable_lock_cache();
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
   if (!check_access(mdr, diri, MAY_WRITE))
@@ -6721,6 +7067,11 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -6816,6 +7167,11 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -6862,6 +7218,11 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
 
   journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi));
   mds->balancer->maybe_fragment(dir, false);
+
+  // flush the journal as soon as possible
+  if (g_conf()->mds_kill_skip_replaying_inotable) {
+    mdlog->flush();
+  }
 }
 
 
@@ -6887,7 +7248,8 @@ void Server::handle_client_link(MDRequestRef& mdr)
     targeti = mdcache->get_inode(req->get_filepath2().get_ino());
     if (!targeti) {
       dout(10) << "CEPHFS_ESTALE on path2, attempting recovery" << dendl;
-      mdcache->find_ino_peers(req->get_filepath2().get_ino(), new C_MDS_TryFindInode(this, mdr));
+      inodeno_t ino = req->get_filepath2().get_ino();
+      mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
       return;
     }
     mdr->pin(targeti);
@@ -6921,6 +7283,11 @@ void Server::handle_client_link(MDRequestRef& mdr)
     targeti = ret.second->get_projected_linkage()->get_inode();
   }
 
+  if (is_unlink_pending(destdn)) {
+    wait_for_pending_unlink(destdn, mdr);
+    return;
+  }
+
   ceph_assert(destdn->get_projected_linkage()->is_null());
   if (req->get_alternate_name().size() > alternate_name_max) {
     dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
@@ -6975,6 +7342,13 @@ void Server::handle_client_link(MDRequestRef& mdr)
   if (target_pin != dir->inode &&
       target_realm->get_subvolume_ino() !=
       dir->inode->find_snaprealm()->get_subvolume_ino()) {
+    if (target_pin->is_stray()) {
+      mds->locker->drop_locks(mdr.get());
+      targeti->add_waiter(CInode::WAIT_UNLINK,
+                          new C_MDS_RetryRequest(mdcache, mdr));
+      mdlog->flush();
+      return;
+    }
     dout(7) << "target is in different subvolume, failing..." << dendl;
     respond_to_request(mdr, -CEPHFS_EXDEV);
     return;
@@ -7205,11 +7579,17 @@ void Server::_link_remote_finish(MDRequestRef& mdr, bool inc,
   mdr->apply();
 
   MDRequestRef null_ref;
-  if (inc)
+  if (inc) {
     mdcache->send_dentry_link(dn, null_ref);
-  else
+  } else {
+    dn->state_clear(CDentry::STATE_UNLINKING);
     mdcache->send_dentry_unlink(dn, NULL, null_ref);
-  
+
+    MDSContext::vec finished;
+    dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+    mdcache->mds->queue_waiters(finished);
+  }
+
   // bump target popularity
   mds->balancer->hit_inode(targeti, META_POP_IWR);
   mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
@@ -7583,10 +7963,25 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   if (rmdir)
     mdr->disable_lock_cache();
+
   CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
   if (!dn)
     return;
 
+  if (is_reintegrate_pending(dn)) {
+    wait_for_pending_reintegrate(dn, mdr);
+    return;
+  }
+
+  // notify replica MDSes the dentry is under unlink
+  if (!dn->state_test(CDentry::STATE_UNLINKING)) {
+    dn->state_set(CDentry::STATE_UNLINKING);
+    mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
+    if (dn->replica_unlinking_ref) {
+      return;
+    }
+  }
+
   CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
   ceph_assert(!dnl->is_null());
   CInode *in = dnl->get_inode();
@@ -7603,11 +7998,13 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // do empty directory checks
       if (_dir_is_nonempty_unlocked(mdr, in)) {
-       respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+        dn->state_clear(CDentry::STATE_UNLINKING);
+        respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
        return;
       }
     } else {
       dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
+      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_EISDIR);
       return;
     }
@@ -7615,6 +8012,7 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // unlink
       dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
+      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_ENOTDIR);
       return;
     }
@@ -7622,8 +8020,10 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   CInode *diri = dn->get_dir()->get_inode();
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
-    if (!check_access(mdr, diri, MAY_WRITE))
+    if (!check_access(mdr, diri, MAY_WRITE)) {
+      dn->state_clear(CDentry::STATE_UNLINKING);
       return;
+    }
   }
 
   // -- create stray dentry? --
@@ -7662,6 +8062,7 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   if (in->is_dir() &&
       _dir_is_nonempty(mdr, in)) {
     respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+    dn->state_clear(CDentry::STATE_UNLINKING);
     return;
   }
 
@@ -7861,9 +8262,14 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
   }
 
   mdr->apply();
-  
+
+  dn->state_clear(CDentry::STATE_UNLINKING);
   mdcache->send_dentry_unlink(dn, straydn, mdr);
-  
+
+  MDSContext::vec finished;
+  dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+  mdcache->mds->queue_waiters(finished);
+
   if (straydn) {
     // update subtree map?
     if (strayin->is_dir())
@@ -7878,7 +8284,7 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
 
   // reply
   respond_to_request(mdr, 0);
-  
+
   // removing a new dn?
   dn->get_dir()->try_remove_unlinked_dn(dn);
 
@@ -8337,6 +8743,16 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   if (!destdn)
     return;
 
+  if (is_unlink_pending(destdn)) {
+    wait_for_pending_unlink(destdn, mdr);
+    return;
+  }
+
+  if (is_unlink_pending(srcdn)) {
+    wait_for_pending_unlink(srcdn, mdr);
+    return;
+  }
+
   dout(10) << " destdn " << *destdn << dendl;
   CDir *destdir = destdn->get_dir();
   ceph_assert(destdir->is_auth());
@@ -8753,6 +9169,12 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
 
   journal_and_reply(mdr, srci, destdn, le, fin);
+
+  // trigger to flush mdlog in case reintegrating or migrating the stray dn,
+  // because the link requests maybe waiting.
+  if (srcdn->get_dir()->inode->is_stray()) {
+    mdlog->flush();
+  }
   mds->balancer->maybe_fragment(destdn->get_dir(), false);
 }
 
@@ -9211,6 +9633,16 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       mdcache->journal_cow_dentry(mdr.get(), metablob, destdn, CEPH_NOSNAP, 0, destdnl);
 
     destdn->first = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+    {
+      auto do_corruption = inject_rename_corrupt_dentry_first;
+      if (unlikely(do_corruption > 0.0)) {
+        auto r = ceph::util::generate_random_number(0.0, 1.0);
+        if (r < do_corruption) {
+          dout(0) << "corrupting dn: " << *destdn << dendl;
+          destdn->first = -10;
+        }
+      }
+    }
 
     if (destdn->is_auth())
       metablob->add_primary_dentry(destdn, srci, true, true);
@@ -9353,6 +9785,14 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
 
   srcdn->get_dir()->unlink_inode(srcdn);
 
+  // After the stray dn being unlinked from the corresponding inode in case of
+  // reintegrate_stray/migrate_stray, just wake up the waitiers.
+  MDSContext::vec finished;
+  in->take_waiting(CInode::WAIT_UNLINK, finished);
+  if (!finished.empty()) {
+    mds->queue_waiters(finished);
+  }
+
   // dest
   if (srcdn_was_remote) {
     if (!linkmerge) {
@@ -10640,6 +11080,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     return;
   }
   if (snapname.length() == 0 ||
+      snapname.length() > snapshot_name_max ||
       snapname[0] == '_') {
     respond_to_request(mdr, -CEPHFS_EINVAL);
     return;
@@ -10698,6 +11139,8 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     em.first->second = info;
   newsnap.seq = snapid;
   newsnap.last_created = snapid;
+  newsnap.last_modified = info.stamp;
+  newsnap.change_attr++;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -10788,7 +11231,6 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   }
   snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
   dout(10) << " snapname " << snapname << " is " << snapid << dendl;
-
   if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
     MutationImpl::LockOpVec lov;
     lov.add_xlock(&diri->snaplock);
@@ -10836,6 +11278,8 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   newnode.snaps.erase(snapid);
   newnode.seq = seq;
   newnode.last_destroyed = seq;
+  newnode.last_modified = mdr->get_op_stamp();
+  newnode.change_attr++;
 
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
   le->metablob.add_table_transaction(TABLE_SNAP, stid);
@@ -10851,9 +11295,6 @@ void Server::_rmsnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid)
 {
   dout(10) << "_rmsnap_finish " << *mdr << " " << snapid << dendl;
   snapid_t stid = mdr->more()->stid;
-  auto p = mdr->more()->snapidbl.cbegin();
-  snapid_t seq;
-  decode(seq, p);  
 
   mdr->apply();
 
@@ -10868,6 +11309,8 @@ void Server::_rmsnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid)
 
   // yay
   mdr->in[0] = diri;
+  mdr->tracei = diri;
+  mdr->snapid = snapid;
   respond_to_request(mdr, 0);
 
   // purge snapshot data
@@ -10973,6 +11416,8 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   auto it = newsnap.snaps.find(snapid);
   ceph_assert(it != newsnap.snaps.end());
   it->second.name = dstname;
+  newsnap.last_modified = mdr->get_op_stamp();
+  newsnap.change_attr++;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -11027,3 +11472,18 @@ void Server::dump_reconnect_status(Formatter *f) const
   f->dump_stream("client_reconnect_gather") << client_reconnect_gather;
   f->close_section();
 }
+
+const bufferlist& Server::get_snap_trace(Session *session, SnapRealm *realm) const {
+  ceph_assert(session);
+  ceph_assert(realm);
+  if (session->info.has_feature(CEPHFS_FEATURE_NEW_SNAPREALM_INFO)) {
+    return realm->get_snap_trace_new();
+  } else {
+    return realm->get_snap_trace();
+  }
+}
+
+const bufferlist& Server::get_snap_trace(client_t client, SnapRealm *realm) const {
+  Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
+  return get_snap_trace(session, realm);
+}