]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / mds / Server.cc
index 7097eb675ead272096b2690f21fde571e6945027..bf12cb7e2c8fa9f24be9f6eae1b330aa829351cb 100644 (file)
@@ -31,6 +31,7 @@
 #include "Mutation.h"
 #include "MetricsHandler.h"
 #include "cephfs_features.h"
+#include "MDSContext.h"
 
 #include "msg/Messenger.h"
 
@@ -50,6 +51,7 @@
 #include "common/perf_counters.h"
 #include "include/compat.h"
 #include "osd/OSDMap.h"
+#include "fscrypt.h"
 
 #include <errno.h>
 
@@ -254,6 +256,7 @@ void Server::create_logger()
 Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
   mds(m), 
   mdcache(mds->mdcache), mdlog(mds->mdlog),
+  inject_rename_corrupt_dentry_first(g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first")),
   recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
   metrics_handler(metrics_handler)
 {
@@ -356,6 +359,9 @@ void Server::dispatch(const cref_t<Message> &m)
   case CEPH_MSG_CLIENT_REQUEST:
     handle_client_request(ref_cast<MClientRequest>(m));
     return;
+  case CEPH_MSG_CLIENT_REPLY:
+    handle_client_reply(ref_cast<MClientReply>(m));
+    return;
   case CEPH_MSG_CLIENT_RECLAIM:
     handle_client_reclaim(ref_cast<MClientReclaim>(m));
     return;
@@ -588,6 +594,16 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   uint64_t sseq = 0;
   switch (m->get_op()) {
   case CEPH_SESSION_REQUEST_OPEN:
+    if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+      dout(0) << "new sessions are not permitted, enable again via"
+                 "`ceph fs set <fs_name> refuse_client_session false`" << dendl;
+      auto reply = make_message<MClientSession>(CEPH_SESSION_REJECT);
+      reply->metadata["error_string"] = "new sessions are not permitted,"
+                                        " enable again via `ceph fs set"
+                                        " <fs_name> refuse_client_session false`";
+      mds->send_message(reply, m->get_connection());
+      return;
+    }
     if (session->is_opening() ||
        session->is_open() ||
        session->is_stale() ||
@@ -1188,7 +1204,7 @@ void Server::find_idle_sessions()
       if (mds->locker->revoke_stale_caps(session)) {
        mds->locker->remove_stale_leases(session);
        finish_flush_session(session, session->get_push_seq());
-       auto m = make_message<MClientSession>(CEPH_SESSION_STALE, session->get_push_seq());
+       auto m = make_message<MClientSession>(CEPH_SESSION_STALE);
        mds->send_message_client(m, session);
       } else {
        to_evict.push_back(session);
@@ -1294,6 +1310,9 @@ void Server::handle_conf_change(const std::set<std::string>& changed) {
   if (changed.count("mds_alternate_name_max")) {
     alternate_name_max  = g_conf().get_val<Option::size_t>("mds_alternate_name_max");
   }
+  if (changed.count("mds_fscrypt_last_block_max_size")) {
+    fscrypt_last_block_max_size = g_conf().get_val<Option::size_t>("mds_fscrypt_last_block_max_size");
+  }
   if (changed.count("mds_dir_max_entries")) {
     dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
     dout(20) << __func__ << " max entries per directory changed to "
@@ -1304,6 +1323,9 @@ void Server::handle_conf_change(const std::set<std::string>& changed) {
     dout(20) << __func__ << " max fragment size changed to "
             << bal_fragment_size_max << dendl;
   }
+  if (changed.count("mds_inject_rename_corrupt_dentry_first")) {
+    inject_rename_corrupt_dentry_first = g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first");
+  }
 }
 
 /*
@@ -1442,6 +1464,18 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
     return;
   }
 
+  if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+    mds->clog->warn() << "client could not reconnect as" 
+                         " file system flag refuse_client_session is set";
+    dout(0) << "client cannot reconnect when file system flag"
+               " refuse_client_session is set" << dendl;
+    auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
+    reply->metadata["error_string"] = "client cannot reconnect when file system flag" 
+                                        " refuse_client_session is set";
+    mds->send_message(reply, m->get_connection());
+    return;
+  }
+
   if (!session->is_open()) {
     dout(0) << " ignoring msg from not-open session" << *m << dendl;
     auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
@@ -2261,6 +2295,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
     mds->send_message_client(reply, session);
   }
 
+  if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+    mds->send_message(reply, mdr->client_request->get_connection());
+  }
+
   if (req->is_queued_for_replay() &&
       (mdr->has_completed || reply->get_result() < 0)) {
     if (reply->get_result() < 0) {
@@ -2316,7 +2354,7 @@ void Server::set_trace_dist(const ref_t<MClientReply> &reply,
       realm = in->find_snaprealm();
     else
       realm = dn->get_dir()->get_inode()->find_snaprealm();
-    reply->snapbl = realm->get_snap_trace();
+    reply->snapbl = get_snap_trace(session, realm);
     dout(10) << "set_trace_dist snaprealm " << *realm << " len=" << reply->snapbl.length() << dendl;
   }
 
@@ -2492,6 +2530,38 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   return;
 }
 
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+  dout(4) << "handle_client_reply " << *reply << dendl;
+
+  ceph_assert(reply->is_safe());
+  ceph_tid_t tid = reply->get_tid();
+
+  if (mds->internal_client_requests.count(tid) == 0) {
+    dout(1) << " no pending request on tid " << tid << dendl;
+    return;
+  }
+
+  auto &req = mds->internal_client_requests.at(tid);
+  CDentry *dn = req.get_dentry();
+
+  switch (reply->get_op()) {
+  case CEPH_MDS_OP_RENAME:
+    if (dn) {
+      dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+      MDSContext::vec finished;
+      dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+      mds->queue_waiters(finished);
+    }
+    break;
+  default:
+    dout(5) << " unknown client op " << reply->get_op() << dendl;
+  }
+
+  mds->internal_client_requests.erase(tid);
+}
+
 void Server::handle_osd_map()
 {
   /* Note that we check the OSDMAP_FULL flag directly rather than
@@ -3320,17 +3390,36 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   // while session is opening.
   bool allow_prealloc_inos = mdr->session->is_open();
 
+  inodeno_t _useino = useino;
+
   // assign ino
-  if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(useino))) {
-    mds->sessionmap.mark_projected(mdr->session);
-    dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
-            << " (" << mdr->session->info.prealloc_inos.size() << " left)"
-            << dendl;
-  } else {
-    mdr->alloc_ino = 
-      _inode->ino = mds->inotable->project_alloc_id(useino);
-    dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
-  }
+  do {
+    if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(_useino))) {
+      if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+        _inode->ino = 0;
+        dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+                 << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+                << " but has been taken, will try again!" << dendl;
+      } else {
+        mds->sessionmap.mark_projected(mdr->session);
+        dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+                 << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+                 << dendl;
+      }
+    } else {
+      mdr->alloc_ino =
+       _inode->ino = mds->inotable->project_alloc_id(_useino);
+      if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+        mds->inotable->apply_alloc_id(_inode->ino);
+        _inode->ino = 0;
+        dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino
+                << " but has been taken, will try again!" << dendl;
+      } else {
+        dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
+      }
+    }
+    _useino = 0;
+  } while (!_inode->ino);
 
   if (useino && useino != _inode->ino) {
     dout(0) << "WARNING: client specified " << useino << " and i allocated " << _inode->ino << dendl;
@@ -3339,7 +3428,7 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
        << " but mds." << mds->get_nodeid() << " allocated " << _inode->ino;
     //ceph_abort(); // just for now.
   }
-    
+
   if (allow_prealloc_inos &&
       mdr->session->get_num_projected_prealloc_inos() < g_conf()->mds_client_prealloc_inos / 2) {
     int need = g_conf()->mds_client_prealloc_inos - mdr->session->get_num_projected_prealloc_inos();
@@ -3393,6 +3482,11 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   _inode->change_attr = 0;
 
   const cref_t<MClientRequest> &req = mdr->client_request;
+
+  dout(10) << "copying fscrypt_auth len " << req->fscrypt_auth.size() << dendl;
+  _inode->fscrypt_auth = req->fscrypt_auth;
+  _inode->fscrypt_file = req->fscrypt_file;
+
   if (req->get_data().length()) {
     auto p = req->get_data().cbegin();
 
@@ -3400,9 +3494,6 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
     auto _xattrs = CInode::allocate_xattr_map();
     decode_noshare(*_xattrs, p);
     dout(10) << "prepare_new_inode setting xattrs " << *_xattrs << dendl;
-    if (_xattrs->count("encryption.ctx")) {
-      _inode->fscrypt = true;
-    }
     in->reset_xattrs(std::move(_xattrs));
   }
 
@@ -3588,12 +3679,18 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr,
 
 /** rdlock_path_xlock_dentry
  * traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
  * get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
  */
 CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
-                                         bool create, bool okexist, bool want_layout)
+                                         bool create, bool okexist, bool authexist,
+                                         bool want_layout)
 {
   const filepath& refpath = mdr->get_filepath();
   dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
@@ -3631,6 +3728,8 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
     flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
   if (create)
     flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+  if (authexist)
+    flags |= MDS_TRAVERSE_WANT_INODE;
   if (want_layout)
     flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
   int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
@@ -3652,7 +3751,9 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
   CInode *diri = dir->get_inode();
 
   if (!mdr->reqid.name.is_mds()) {
-    if (diri->is_system() && !diri->is_root()) {
+    if (diri->is_system() && !diri->is_root() &&
+       (!diri->is_lost_and_found() ||
+        mdr->client_request->get_op() != CEPH_MDS_OP_UNLINK)) {
       respond_to_request(mdr, -CEPHFS_EROFS);
       return nullptr;
     }
@@ -3947,8 +4048,6 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   if (!ref)
     return;
 
-  mdr->getattr_caps = mask;
-
   /*
    * if client currently holds the EXCL cap on a field, do not rdlock
    * it; client's stat() will result in valid info if _either_ EXCL
@@ -4001,7 +4100,7 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   // value for them.  (currently this matters for xattrs and inline data)
   mdr->getattr_caps = mask;
 
-  mds->balancer->hit_inode(ref, META_POP_IRD, req->get_source().num());
+  mds->balancer->hit_inode(ref, META_POP_IRD);
 
   // reply
   dout(10) << "reply to stat on " << *req << dendl;
@@ -4400,8 +4499,7 @@ void Server::handle_client_open(MDRequestRef& mdr)
   if (cmode & CEPH_FILE_MODE_WR)
     mds->balancer->hit_inode(cur, META_POP_IWR);
   else
-    mds->balancer->hit_inode(cur, META_POP_IRD,
-                            mdr->client_request->get_source().num());
+    mds->balancer->hit_inode(cur, META_POP_IRD);
 
   CDentry *dn = 0;
   if (req->get_dentry_wanted()) {
@@ -4423,6 +4521,9 @@ public:
   void finish(int r) override {
     ceph_assert(r == 0);
 
+    // crash current MDS and the replacing MDS will test the journal
+    ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
     dn->pop_projected_linkage();
 
     // dirty inode, dn, dir
@@ -4459,7 +4560,7 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   }
 
   bool excl = req->head.args.open.flags & CEPH_O_EXCL;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
   if (!dn)
     return;
 
@@ -4471,7 +4572,7 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
   if (!excl && !dnl->is_null()) {
     // it existed.
-    mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+    ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
 
     MutationImpl::LockOpVec lov;
     lov.add_rdlock(&dnl->get_inode()->snaplock);
@@ -4758,7 +4859,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   //  this isn't perfect, but we should capture the main variable/unbounded size items!
   int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8)*2;
   int bytes_left = max_bytes - front_bytes;
-  bytes_left -= realm->get_snap_trace().length();
+  bytes_left -= get_snap_trace(session, realm).length();
 
   // build dir contents
   bufferlist dnbl;
@@ -4778,8 +4879,11 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     bool dnp = dn->use_projected(client, mdr);
     CDentry::linkage_t *dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
 
-    if (dnl->is_null())
+    if (dnl->is_null()) {
+      if (dn->get_num_ref() == 0 && !dn->is_projected())
+       dir->remove_dentry(dn);
       continue;
+    }
 
     if (dn->last < snapid || dn->first > snapid) {
       dout(20) << "skipping non-overlapping snap " << *dn << dendl;
@@ -4886,7 +4990,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
   mdr->reply_extra_bl = dirbl;
 
   // bump popularity.  NOTE: this doesn't quite capture it.
-  mds->balancer->hit_dir(dir, META_POP_READDIR, -1, numfiles);
+  mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
   
   // reply
   mdr->tracei = diri;
@@ -5115,10 +5219,24 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
   __u32 mask = req->head.args.setattr.mask;
   __u32 access_mask = MAY_WRITE;
 
+  if (req->get_header().version < 6) {
+    // No changes to fscrypted inodes by downrevved clients
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EPERM);
+      return;
+    }
+
+    // Only allow fscrypt field changes by capable clients
+    if (mask & (CEPH_SETATTR_FSCRYPT_FILE|CEPH_SETATTR_FSCRYPT_AUTH)) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+  }
+
   // xlock inode
-  if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
+  if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID|CEPH_SETATTR_FSCRYPT_AUTH|CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID))
     lov.add_xlock(&cur->authlock);
-  if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
+  if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE|CEPH_SETATTR_FSCRYPT_FILE))
     lov.add_xlock(&cur->filelock);
   if (mask & CEPH_SETATTR_CTIME)
     lov.add_wrlock(&cur->versionlock);
@@ -5149,7 +5267,15 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   bool truncating_smaller = false;
   if (mask & CEPH_SETATTR_SIZE) {
-    truncating_smaller = req->head.args.setattr.size < old_size;
+    if (req->get_data().length() >
+        sizeof(struct ceph_fscrypt_last_block_header) + fscrypt_last_block_max_size) {
+      dout(10) << __func__ << ": the last block size is too large" << dendl;
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
+    truncating_smaller = req->head.args.setattr.size < old_size ||
+       (req->head.args.setattr.size == old_size && req->get_data().length());
     if (truncating_smaller && pip->is_truncating()) {
       dout(10) << " waiting for pending truncate from " << pip->truncate_from
               << " to " << pip->truncate_size << " to complete on " << *cur << dendl;
@@ -5158,6 +5284,32 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
       cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
       return;
     }
+
+    if (truncating_smaller && req->get_data().length()) {
+      struct ceph_fscrypt_last_block_header header;
+      memset(&header, 0, sizeof(header));
+      auto bl = req->get_data().cbegin();
+      DECODE_START(1, bl);
+      decode(header.change_attr, bl);
+      DECODE_FINISH(bl);
+
+      dout(20) << __func__ << " mdr->retry:" << mdr->retry
+               << " header.change_attr: " << header.change_attr
+               << " header.file_offset: " << header.file_offset
+               << " header.block_size: " << header.block_size
+               << dendl;
+
+      if (header.change_attr != pip->change_attr) {
+        dout(5) << __func__ << ": header.change_attr:" << header.change_attr
+                << " != current change_attr:" << pip->change_attr
+                << ", let client retry it!" << dendl;
+        // flush the journal to make sure the clients will get the lasted
+        // change_attr as possible for the next retry
+        mds->mdlog->flush();
+        respond_to_request(mdr, -CEPHFS_EAGAIN);
+        return;
+      }
+    }
   }
 
   bool changed_ranges = false;
@@ -5176,10 +5328,20 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   if (mask & CEPH_SETATTR_MODE)
     pi.inode->mode = (pi.inode->mode & ~07777) | (req->head.args.setattr.mode & 07777);
-  else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
-           S_ISREG(pi.inode->mode) &&
-            (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
-    pi.inode->mode &= ~(S_ISUID|S_ISGID);
+  else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID|
+                   CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID)) &&
+           S_ISREG(pi.inode->mode)) {
+    if (mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID) &&
+       (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+      pi.inode->mode &= ~(S_ISUID|S_ISGID);
+    } else {
+      if (mask & CEPH_SETATTR_KILL_SUID) {
+        pi.inode->mode &= ~S_ISUID;
+      }
+      if (mask & CEPH_SETATTR_KILL_SGID) {
+        pi.inode->mode &= ~S_ISGID;
+      }
+    }
   }
 
   if (mask & CEPH_SETATTR_MTIME)
@@ -5192,7 +5354,7 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     pi.inode->time_warp_seq++;   // maybe not a timewarp, but still a serialization point.
   if (mask & CEPH_SETATTR_SIZE) {
     if (truncating_smaller) {
-      pi.inode->truncate(old_size, req->head.args.setattr.size);
+      pi.inode->truncate(old_size, req->head.args.setattr.size, req->get_data());
       le->metablob.add_truncate_start(cur->ino());
     } else {
       pi.inode->size = req->head.args.setattr.size;
@@ -5208,6 +5370,11 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
     }
   }
 
+  if (mask & CEPH_SETATTR_FSCRYPT_AUTH)
+    pi.inode->fscrypt_auth = req->fscrypt_auth;
+  if (mask & CEPH_SETATTR_FSCRYPT_FILE)
+    pi.inode->fscrypt_file = req->fscrypt_file;
+
   pi.inode->version = cur->pre_dirty();
   pi.inode->ctime = mdr->get_op_stamp();
   if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
@@ -5797,6 +5964,7 @@ int Server::check_layout_vxattr(MDRequestRef& mdr,
 void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
 {
   const cref_t<MClientRequest> &req = mdr->client_request;
+  MutationImpl::LockOpVec lov;
   string name(req->get_path2());
   bufferlist bl = req->get_data();
   string value (bl.c_str(), bl.length());
@@ -5822,6 +5990,18 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     if (!xlock_policylock(mdr, cur, true))
       return;
 
+    /* We need 'As' caps for the fscrypt context */
+    lov.add_xlock(&cur->authlock);
+    if (!mds->locker->acquire_locks(mdr, lov)) {
+      return;
+    }
+
+    /* encrypted directories can't have their layout changed */
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
     file_layout_t layout;
     if (cur->get_projected_inode()->has_layout())
       layout = cur->get_projected_inode()->layout;
@@ -5853,11 +6033,16 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
     if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
       return;
 
-    MutationImpl::LockOpVec lov;
     lov.add_xlock(&cur->filelock);
     if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
+    /* encrypted files can't have their layout changed */
+    if (!cur->get_inode()->fscrypt_auth.empty()) {
+      respond_to_request(mdr, -CEPHFS_EINVAL);
+      return;
+    }
+
     auto pi = cur->project_inode(mdr);
     int64_t old_pool = pi.inode->layout.pool_id;
     pi.inode->add_old_pool(old_pool);
@@ -5878,7 +6063,7 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
       return;
     }
 
-    if (quota.is_enable() && !cur->get_projected_srnode())
+    if (quota.is_enabled() && !cur->get_projected_srnode())
       adjust_realm = true;
 
     if (!xlock_policylock(mdr, cur, false, adjust_realm))
@@ -5920,7 +6105,6 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
      */
     if (!mdr->more()->rdonly_checks) {
       if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
-        MutationImpl::LockOpVec lov;
         lov.add_rdlock(&cur->snaplock);
         if (!mds->locker->acquire_locks(mdr, lov))
           return;
@@ -6386,8 +6570,6 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
   pi.inode->ctime = mdr->get_op_stamp();
   if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
     pi.inode->rstat.rctime = mdr->get_op_stamp();
-  if (name == "encryption.ctx"sv)
-    pi.inode->fscrypt = true;
   pi.inode->change_attr++;
   pi.inode->xattr_version++;
 
@@ -6677,6 +6859,45 @@ void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
   dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
 }
 
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+  MDCache *mdcache;
+  CDentry *dn;
+  MDSContext *fin;
+
+  MDSRank *get_mds() override
+  {
+    ceph_assert(mdcache != NULL);
+    return mdcache->mds;
+  }
+
+public:
+  C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+    mdcache(m), dn(d), fin(f) {}
+  void finish(int r) override {
+    fin->complete(r);
+    dn->put(CDentry::PIN_PURGING);
+  }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+      return true;
+  }
+  return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+  dout(20) << __func__ << " dn " << *dn << dendl;
+  mds->locker->drop_locks(mdr.get());
+  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+  dn->get(CDentry::PIN_PURGING);
+  dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
 // MKNOD
 
 class C_MDS_mknod_finish : public ServerLogContext {
@@ -6688,6 +6909,9 @@ public:
   void finish(int r) override {
     ceph_assert(r == 0);
 
+    // crash current MDS and the replacing MDS will test the journal
+    ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
     // link the inode
     dn->pop_projected_linkage();
     
@@ -6736,7 +6960,7 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
     mode |= S_IFREG;
 
   mdr->disable_lock_cache();
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
   if (!dn)
     return;
 
@@ -6994,6 +7218,11 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
 
   journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi));
   mds->balancer->maybe_fragment(dir, false);
+
+  // flush the journal as soon as possible
+  if (g_conf()->mds_kill_skip_replaying_inotable) {
+    mdlog->flush();
+  }
 }
 
 
@@ -7113,6 +7342,13 @@ void Server::handle_client_link(MDRequestRef& mdr)
   if (target_pin != dir->inode &&
       target_realm->get_subvolume_ino() !=
       dir->inode->find_snaprealm()->get_subvolume_ino()) {
+    if (target_pin->is_stray()) {
+      mds->locker->drop_locks(mdr.get());
+      targeti->add_waiter(CInode::WAIT_UNLINK,
+                          new C_MDS_RetryRequest(mdcache, mdr));
+      mdlog->flush();
+      return;
+    }
     dout(7) << "target is in different subvolume, failing..." << dendl;
     respond_to_request(mdr, -CEPHFS_EXDEV);
     return;
@@ -7732,6 +7968,11 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_reintegrate_pending(dn)) {
+    wait_for_pending_reintegrate(dn, mdr);
+    return;
+  }
+
   // notify replica MDSes the dentry is under unlink
   if (!dn->state_test(CDentry::STATE_UNLINKING)) {
     dn->state_set(CDentry::STATE_UNLINKING);
@@ -8928,6 +9169,12 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
 
   journal_and_reply(mdr, srci, destdn, le, fin);
+
+  // trigger to flush mdlog in case reintegrating or migrating the stray dn,
+  // because the link requests maybe waiting.
+  if (srcdn->get_dir()->inode->is_stray()) {
+    mdlog->flush();
+  }
   mds->balancer->maybe_fragment(destdn->get_dir(), false);
 }
 
@@ -9386,6 +9633,16 @@ void Server::_rename_prepare(MDRequestRef& mdr,
       mdcache->journal_cow_dentry(mdr.get(), metablob, destdn, CEPH_NOSNAP, 0, destdnl);
 
     destdn->first = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+    {
+      auto do_corruption = inject_rename_corrupt_dentry_first;
+      if (unlikely(do_corruption > 0.0)) {
+        auto r = ceph::util::generate_random_number(0.0, 1.0);
+        if (r < do_corruption) {
+          dout(0) << "corrupting dn: " << *destdn << dendl;
+          destdn->first = -10;
+        }
+      }
+    }
 
     if (destdn->is_auth())
       metablob->add_primary_dentry(destdn, srci, true, true);
@@ -9528,6 +9785,14 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
 
   srcdn->get_dir()->unlink_inode(srcdn);
 
+  // After the stray dn being unlinked from the corresponding inode in case of
+  // reintegrate_stray/migrate_stray, just wake up the waitiers.
+  MDSContext::vec finished;
+  in->take_waiting(CInode::WAIT_UNLINK, finished);
+  if (!finished.empty()) {
+    mds->queue_waiters(finished);
+  }
+
   // dest
   if (srcdn_was_remote) {
     if (!linkmerge) {
@@ -10815,6 +11080,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     return;
   }
   if (snapname.length() == 0 ||
+      snapname.length() > snapshot_name_max ||
       snapname[0] == '_') {
     respond_to_request(mdr, -CEPHFS_EINVAL);
     return;
@@ -10873,6 +11139,8 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
     em.first->second = info;
   newsnap.seq = snapid;
   newsnap.last_created = snapid;
+  newsnap.last_modified = info.stamp;
+  newsnap.change_attr++;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -10963,7 +11231,6 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   }
   snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
   dout(10) << " snapname " << snapname << " is " << snapid << dendl;
-
   if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
     MutationImpl::LockOpVec lov;
     lov.add_xlock(&diri->snaplock);
@@ -11011,6 +11278,8 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   newnode.snaps.erase(snapid);
   newnode.seq = seq;
   newnode.last_destroyed = seq;
+  newnode.last_modified = mdr->get_op_stamp();
+  newnode.change_attr++;
 
   le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
   le->metablob.add_table_transaction(TABLE_SNAP, stid);
@@ -11026,9 +11295,6 @@ void Server::_rmsnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid)
 {
   dout(10) << "_rmsnap_finish " << *mdr << " " << snapid << dendl;
   snapid_t stid = mdr->more()->stid;
-  auto p = mdr->more()->snapidbl.cbegin();
-  snapid_t seq;
-  decode(seq, p);  
 
   mdr->apply();
 
@@ -11043,6 +11309,8 @@ void Server::_rmsnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid)
 
   // yay
   mdr->in[0] = diri;
+  mdr->tracei = diri;
+  mdr->snapid = snapid;
   respond_to_request(mdr, 0);
 
   // purge snapshot data
@@ -11148,6 +11416,8 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   auto it = newsnap.snaps.find(snapid);
   ceph_assert(it != newsnap.snaps.end());
   it->second.name = dstname;
+  newsnap.last_modified = mdr->get_op_stamp();
+  newsnap.change_attr++;
 
   // journal the inode changes
   mdr->ls = mdlog->get_current_segment();
@@ -11202,3 +11472,18 @@ void Server::dump_reconnect_status(Formatter *f) const
   f->dump_stream("client_reconnect_gather") << client_reconnect_gather;
   f->close_section();
 }
+
+const bufferlist& Server::get_snap_trace(Session *session, SnapRealm *realm) const {
+  ceph_assert(session);
+  ceph_assert(realm);
+  if (session->info.has_feature(CEPHFS_FEATURE_NEW_SNAPREALM_INFO)) {
+    return realm->get_snap_trace_new();
+  } else {
+    return realm->get_snap_trace();
+  }
+}
+
+const bufferlist& Server::get_snap_trace(client_t client, SnapRealm *realm) const {
+  Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
+  return get_snap_trace(session, realm);
+}