]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / mds / Server.cc
index bf12cb7e2c8fa9f24be9f6eae1b330aa829351cb..ced4ecffae1fbf8e7cf7bb4e24e373270bc55557 100644 (file)
@@ -31,7 +31,6 @@
 #include "Mutation.h"
 #include "MetricsHandler.h"
 #include "cephfs_features.h"
-#include "MDSContext.h"
 
 #include "msg/Messenger.h"
 
@@ -118,7 +117,7 @@ public:
   }
   void _forward(mds_rank_t t) override {
     MDCache* mdcache = server->mdcache;
-    mdcache->mds->forward_message_mds(mdr->release_client_request(), t);
+    mdcache->mds->forward_message_mds(mdr, t);
     mdr->set_mds_stamp(ceph_clock_now());
     for (auto& m : batch_reqs) {
       if (!m->killed)
@@ -138,7 +137,7 @@ public:
     batch_reqs.clear();
     server->reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
   }
-  void print(std::ostream& o) {
+  void print(std::ostream& o) const override {
     o << "[batch front=" << *mdr << "]";
   }
 };
@@ -242,6 +241,8 @@ void Server::create_logger()
                    "Request type remove snapshot latency");
   plb.add_time_avg(l_mdss_req_renamesnap_latency, "req_renamesnap_latency",
                    "Request type rename snapshot latency");
+  plb.add_time_avg(l_mdss_req_snapdiff_latency, "req_snapdiff_latency",
+                  "Request type snapshot difference latency");
 
   plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
   plb.add_u64_counter(l_mdss_dispatch_client_request, "dispatch_client_request",
@@ -359,9 +360,6 @@ void Server::dispatch(const cref_t<Message> &m)
   case CEPH_MSG_CLIENT_REQUEST:
     handle_client_request(ref_cast<MClientRequest>(m));
     return;
-  case CEPH_MSG_CLIENT_REPLY:
-    handle_client_reply(ref_cast<MClientReply>(m));
-    return;
   case CEPH_MSG_CLIENT_RECLAIM:
     handle_client_reclaim(ref_cast<MClientReclaim>(m));
     return;
@@ -1982,23 +1980,20 @@ void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEv
     mdr->pin(dn);
 
   early_reply(mdr, in, dn);
-
+  
   mdr->committing = true;
   submit_mdlog_entry(le, fin, mdr, __func__);
-
+  
   if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
     if (mds->queue_one_replay()) {
       dout(10) << " queued next replay op" << dendl;
     } else {
       dout(10) << " journaled last replay op" << dendl;
     }
-  } else if (mdr->did_early_reply) {
+  } else if (mdr->did_early_reply)
     mds->locker->drop_rdlocks_for_early_reply(mdr.get());
-    if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
-      mdlog->flush();
-  } else {
+  else
     mdlog->flush();
-  }
 }
 
 void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
@@ -2125,6 +2120,9 @@ void Server::perf_gather_op_latency(const cref_t<MClientRequest> &req, utime_t l
   case CEPH_MDS_OP_RENAMESNAP:
     code = l_mdss_req_renamesnap_latency;
     break;
+  case CEPH_MDS_OP_READDIR_SNAPDIFF:
+    code = l_mdss_req_snapdiff_latency;
+    break;
   default:
     dout(1) << ": unknown client op" << dendl;
     return;
@@ -2295,10 +2293,6 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
     mds->send_message_client(reply, session);
   }
 
-  if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
-    mds->send_message(reply, mdr->client_request->get_connection());
-  }
-
   if (req->is_queued_for_replay() &&
       (mdr->has_completed || reply->get_result() < 0)) {
     if (reply->get_result() < 0) {
@@ -2388,7 +2382,8 @@ void Server::set_trace_dist(const ref_t<MClientReply> &reply,
   // inode
   if (in) {
     in->encode_inodestat(bl, session, NULL, snapid, 0, mdr->getattr_caps);
-    dout(20) << "set_trace_dist added in   " << *in << dendl;
+    dout(20) << "set_trace_dist added snap " << snapid << " in " << *in
+             << dendl;
     reply->head.is_target = 1;
   } else
     reply->head.is_target = 0;
@@ -2530,38 +2525,6 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   return;
 }
 
-void Server::handle_client_reply(const cref_t<MClientReply> &reply)
-{
-  dout(4) << "handle_client_reply " << *reply << dendl;
-
-  ceph_assert(reply->is_safe());
-  ceph_tid_t tid = reply->get_tid();
-
-  if (mds->internal_client_requests.count(tid) == 0) {
-    dout(1) << " no pending request on tid " << tid << dendl;
-    return;
-  }
-
-  auto &req = mds->internal_client_requests.at(tid);
-  CDentry *dn = req.get_dentry();
-
-  switch (reply->get_op()) {
-  case CEPH_MDS_OP_RENAME:
-    if (dn) {
-      dn->state_clear(CDentry::STATE_REINTEGRATING);
-
-      MDSContext::vec finished;
-      dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
-      mds->queue_waiters(finished);
-    }
-    break;
-  default:
-    dout(5) << " unknown client op " << reply->get_op() << dendl;
-  }
-
-  mds->internal_client_requests.erase(tid);
-}
-
 void Server::handle_osd_map()
 {
   /* Note that we check the OSDMAP_FULL flag directly rather than
@@ -2752,6 +2715,9 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
   case CEPH_MDS_OP_RENAMESNAP:
     handle_client_renamesnap(mdr);
     break;
+  case CEPH_MDS_OP_READDIR_SNAPDIFF:
+    handle_client_readdir_snapdiff(mdr);
+    break;
 
   default:
     dout(1) << " unknown client op " << req->get_op() << dendl;
@@ -3471,10 +3437,12 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
       _inode->mode |= S_ISGID;
     }
   } else {
-    _inode->gid = mdr->client_request->get_caller_gid();
+    _inode->gid = mdr->client_request->get_owner_gid();
+    ceph_assert(_inode->gid != (unsigned)-1);
   }
 
-  _inode->uid = mdr->client_request->get_caller_uid();
+  _inode->uid = mdr->client_request->get_owner_uid();
+  ceph_assert(_inode->uid != (unsigned)-1);
 
   _inode->btime = _inode->ctime = _inode->mtime = _inode->atime =
     mdr->get_op_stamp();
@@ -4028,6 +3996,7 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
       } else {
        dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
        em.first->second->add_request(mdr);
+        mdr->mark_event("joining batch lookup");
        return;
       }
     } else {
@@ -4039,6 +4008,7 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
       } else {
        dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
        em.first->second->add_request(mdr);
+        mdr->mark_event("joining batch getattr");
        return;
       }
     }
@@ -4082,6 +4052,24 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
     } else if (ref->filelock.is_stable() ||
               ref->filelock.get_num_wrlocks() > 0 ||
               !ref->filelock.can_read(mdr->get_client())) {
+      /* Since we're taking advantage of an optimization here:
+       *
+       * We cannot suddenly, due to a changing condition, add this filelock as
+       * it can cause lock-order deadlocks. In this case, that condition is the
+       * lock state changes between request retries. If that happens, we need
+       * to check if we've acquired the other locks in this vector. If we have,
+       * then we need to drop those locks and retry.
+       */
+      if (mdr->is_rdlocked(&ref->linklock) ||
+          mdr->is_rdlocked(&ref->authlock) ||
+          mdr->is_rdlocked(&ref->xattrlock)) {
+        /* start over */
+        dout(20) << " dropping locks and restarting request because filelock state change" << dendl;
+       mds->locker->drop_locks(mdr.get());
+       mdr->drop_local_auth_pins();
+       mds->queue_waiter(new C_MDS_RetryRequest(mdcache, mdr));
+        return;
+      }
       lov.add_rdlock(&ref->filelock);
       mdr->locking_state &= ~MutationImpl::ALL_LOCKED;
     }
@@ -4403,6 +4391,7 @@ void Server::handle_client_open(MDRequestRef& mdr)
   }
 
   MutationImpl::LockOpVec lov;
+  lov.add_rdlock(&cur->snaplock);
 
   unsigned mask = req->head.args.open.mask;
   if (mask) {
@@ -4564,21 +4553,11 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   if (!dn)
     return;
 
-  if (is_unlink_pending(dn)) {
-    wait_for_pending_unlink(dn, mdr);
-    return;
-  }
-
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
   if (!excl && !dnl->is_null()) {
     // it existed.
     ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
 
-    MutationImpl::LockOpVec lov;
-    lov.add_rdlock(&dnl->get_inode()->snaplock);
-    if (!mds->locker->acquire_locks(mdr, lov))
-      return;
-
     handle_client_open(mdr);
     return;
   }
@@ -4732,6 +4711,47 @@ void Server::handle_client_openc(MDRequestRef& mdr)
 }
 
 
+void Server::_finalize_readdir(MDRequestRef& mdr,
+                               CInode *diri,
+                               CDir* dir,
+                               bool start,
+                               bool end,
+                               __u16 flags,
+                               __u32 numfiles,
+                               bufferlist& dirbl,
+                               bufferlist& dnbl)
+{
+  const cref_t<MClientRequest> &req = mdr->client_request;
+  Session *session = mds->get_session(req);
+
+  session->touch_readdir_cap(numfiles);
+
+  if (end) {
+    flags |= CEPH_READDIR_FRAG_END;
+    if (start)
+      flags |= CEPH_READDIR_FRAG_COMPLETE; // FIXME: what purpose does this serve
+  }
+
+  // finish final blob
+  encode(numfiles, dirbl);
+  encode(flags, dirbl);
+  dirbl.claim_append(dnbl);
+
+  // yay, reply
+  dout(10) << "reply to " << *req << " readdir num=" << numfiles
+    << " bytes=" << dirbl.length()
+    << " start=" << (int)start
+    << " end=" << (int)end
+    << dendl;
+  mdr->reply_extra_bl = dirbl;
+
+  // bump popularity.  NOTE: this doesn't quite capture it.
+  mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
+
+  // reply
+  mdr->tracei = diri;
+  respond_to_request(mdr, 0);
+}
 
 void Server::handle_client_readdir(MDRequestRef& mdr)
 {
@@ -4759,6 +4779,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
       if (logger)
           logger->inc(l_mdss_cap_acquisition_throttle);
 
+      mdr->mark_event("cap_acquisition_throttle");
       mds->timer.add_event_after(caps_throttle_retry_request_timeout, new C_MDS_RetryRequest(mdcache, mdr));
       return;
   }
@@ -4937,7 +4958,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
       dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
       break;
     }
-    
+
     unsigned start_len = dnbl.length();
 
     // dentry
@@ -4946,7 +4967,7 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
 
     // inode
-    dout(12) << "including inode " << *in << dendl;
+    dout(12) << "including inode in " << *in << " snap " << snapid << dendl;
     int r = in->encode_inodestat(dnbl, mdr->session, realm, snapid, bytes_left - (int)dnbl.length());
     if (r < 0) {
       // chop off dn->name, lease
@@ -4962,39 +4983,12 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     // touch dn
     mdcache->lru.lru_touch(dn);
   }
-  
-  session->touch_readdir_cap(numfiles);
-
   __u16 flags = 0;
-  if (end) {
-    flags = CEPH_READDIR_FRAG_END;
-    if (start)
-      flags |= CEPH_READDIR_FRAG_COMPLETE; // FIXME: what purpose does this serve
-  }
   // client only understand END and COMPLETE flags ?
   if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
     flags |= CEPH_READDIR_HASH_ORDER | CEPH_READDIR_OFFSET_HASH;
   }
-  
-  // finish final blob
-  encode(numfiles, dirbl);
-  encode(flags, dirbl);
-  dirbl.claim_append(dnbl);
-  
-  // yay, reply
-  dout(10) << "reply to " << *req << " readdir num=" << numfiles
-          << " bytes=" << dirbl.length()
-          << " start=" << (int)start
-          << " end=" << (int)end
-          << dendl;
-  mdr->reply_extra_bl = dirbl;
-
-  // bump popularity.  NOTE: this doesn't quite capture it.
-  mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
-  
-  // reply
-  mdr->tracei = diri;
-  respond_to_request(mdr, 0);
+  _finalize_readdir(mdr, diri, dir, start, end, flags, numfiles, dirbl, dnbl);
 }
 
 
@@ -6820,84 +6814,6 @@ void Server::handle_client_getvxattr(MDRequestRef& mdr)
 
 // ------------------------------------------------
 
-struct C_WaitUnlinkToFinish : public MDSContext {
-protected:
-  MDCache *mdcache;
-  CDentry *dn;
-  MDSContext *fin;
-
-  MDSRank *get_mds() override
-  {
-    ceph_assert(mdcache != NULL);
-    return mdcache->mds;
-  }
-
-public:
-  C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
-    mdcache(m), dn(d), fin(f) {}
-  void finish(int r) override {
-    fin->complete(r);
-    dn->put(CDentry::PIN_PURGING);
-  }
-};
-
-bool Server::is_unlink_pending(CDentry *dn)
-{
-  CDentry::linkage_t *dnl = dn->get_projected_linkage();
-  if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
-      return true;
-  }
-  return false;
-}
-
-void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
-{
-  dout(20) << __func__ << " dn " << *dn << dendl;
-  mds->locker->drop_locks(mdr.get());
-  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
-  dn->get(CDentry::PIN_PURGING);
-  dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
-}
-
-struct C_WaitReintegrateToFinish : public MDSContext {
-protected:
-  MDCache *mdcache;
-  CDentry *dn;
-  MDSContext *fin;
-
-  MDSRank *get_mds() override
-  {
-    ceph_assert(mdcache != NULL);
-    return mdcache->mds;
-  }
-
-public:
-  C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
-    mdcache(m), dn(d), fin(f) {}
-  void finish(int r) override {
-    fin->complete(r);
-    dn->put(CDentry::PIN_PURGING);
-  }
-};
-
-bool Server::is_reintegrate_pending(CDentry *dn)
-{
-  CDentry::linkage_t *dnl = dn->get_projected_linkage();
-  if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
-      return true;
-  }
-  return false;
-}
-
-void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
-{
-  dout(20) << __func__ << " dn " << *dn << dendl;
-  mds->locker->drop_locks(mdr.get());
-  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
-  dn->get(CDentry::PIN_PURGING);
-  dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
-}
-
 // MKNOD
 
 class C_MDS_mknod_finish : public ServerLogContext {
@@ -6964,11 +6880,6 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
   if (!dn)
     return;
 
-  if (is_unlink_pending(dn)) {
-    wait_for_pending_unlink(dn, mdr);
-    return;
-  }
-
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
   if (!check_access(mdr, diri, MAY_WRITE))
@@ -7067,11 +6978,6 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   if (!dn)
     return;
 
-  if (is_unlink_pending(dn)) {
-    wait_for_pending_unlink(dn, mdr);
-    return;
-  }
-
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -7167,11 +7073,6 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   if (!dn)
     return;
 
-  if (is_unlink_pending(dn)) {
-    wait_for_pending_unlink(dn, mdr);
-    return;
-  }
-
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -7283,11 +7184,6 @@ void Server::handle_client_link(MDRequestRef& mdr)
     targeti = ret.second->get_projected_linkage()->get_inode();
   }
 
-  if (is_unlink_pending(destdn)) {
-    wait_for_pending_unlink(destdn, mdr);
-    return;
-  }
-
   ceph_assert(destdn->get_projected_linkage()->is_null());
   if (req->get_alternate_name().size() > alternate_name_max) {
     dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
@@ -7341,14 +7237,9 @@ void Server::handle_client_link(MDRequestRef& mdr)
   SnapRealm *target_realm = target_pin->find_snaprealm();
   if (target_pin != dir->inode &&
       target_realm->get_subvolume_ino() !=
-      dir->inode->find_snaprealm()->get_subvolume_ino()) {
-    if (target_pin->is_stray()) {
-      mds->locker->drop_locks(mdr.get());
-      targeti->add_waiter(CInode::WAIT_UNLINK,
-                          new C_MDS_RetryRequest(mdcache, mdr));
-      mdlog->flush();
-      return;
-    }
+      dir->inode->find_snaprealm()->get_subvolume_ino() &&
+      /* The inode is temporarily located in the stray dir pending reintegration */
+      !target_pin->is_stray()) {
     dout(7) << "target is in different subvolume, failing..." << dendl;
     respond_to_request(mdr, -CEPHFS_EXDEV);
     return;
@@ -7579,17 +7470,11 @@ void Server::_link_remote_finish(MDRequestRef& mdr, bool inc,
   mdr->apply();
 
   MDRequestRef null_ref;
-  if (inc) {
+  if (inc)
     mdcache->send_dentry_link(dn, null_ref);
-  } else {
-    dn->state_clear(CDentry::STATE_UNLINKING);
+  else
     mdcache->send_dentry_unlink(dn, NULL, null_ref);
-
-    MDSContext::vec finished;
-    dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
-    mdcache->mds->queue_waiters(finished);
-  }
-
+  
   // bump target popularity
   mds->balancer->hit_inode(targeti, META_POP_IWR);
   mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
@@ -7963,25 +7848,10 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   if (rmdir)
     mdr->disable_lock_cache();
-
   CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
   if (!dn)
     return;
 
-  if (is_reintegrate_pending(dn)) {
-    wait_for_pending_reintegrate(dn, mdr);
-    return;
-  }
-
-  // notify replica MDSes the dentry is under unlink
-  if (!dn->state_test(CDentry::STATE_UNLINKING)) {
-    dn->state_set(CDentry::STATE_UNLINKING);
-    mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
-    if (dn->replica_unlinking_ref) {
-      return;
-    }
-  }
-
   CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
   ceph_assert(!dnl->is_null());
   CInode *in = dnl->get_inode();
@@ -7998,13 +7868,11 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // do empty directory checks
       if (_dir_is_nonempty_unlocked(mdr, in)) {
-        dn->state_clear(CDentry::STATE_UNLINKING);
-        respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+       respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
        return;
       }
     } else {
       dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
-      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_EISDIR);
       return;
     }
@@ -8012,7 +7880,6 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // unlink
       dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
-      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_ENOTDIR);
       return;
     }
@@ -8020,10 +7887,8 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   CInode *diri = dn->get_dir()->get_inode();
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
-    if (!check_access(mdr, diri, MAY_WRITE)) {
-      dn->state_clear(CDentry::STATE_UNLINKING);
+    if (!check_access(mdr, diri, MAY_WRITE))
       return;
-    }
   }
 
   // -- create stray dentry? --
@@ -8062,7 +7927,6 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   if (in->is_dir() &&
       _dir_is_nonempty(mdr, in)) {
     respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
-    dn->state_clear(CDentry::STATE_UNLINKING);
     return;
   }
 
@@ -8262,14 +8126,9 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
   }
 
   mdr->apply();
-
-  dn->state_clear(CDentry::STATE_UNLINKING);
+  
   mdcache->send_dentry_unlink(dn, straydn, mdr);
-
-  MDSContext::vec finished;
-  dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
-  mdcache->mds->queue_waiters(finished);
-
+  
   if (straydn) {
     // update subtree map?
     if (strayin->is_dir())
@@ -8284,7 +8143,7 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
 
   // reply
   respond_to_request(mdr, 0);
-
+  
   // removing a new dn?
   dn->get_dir()->try_remove_unlinked_dn(dn);
 
@@ -8743,16 +8602,6 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   if (!destdn)
     return;
 
-  if (is_unlink_pending(destdn)) {
-    wait_for_pending_unlink(destdn, mdr);
-    return;
-  }
-
-  if (is_unlink_pending(srcdn)) {
-    wait_for_pending_unlink(srcdn, mdr);
-    return;
-  }
-
   dout(10) << " destdn " << *destdn << dendl;
   CDir *destdir = destdn->get_dir();
   ceph_assert(destdir->is_auth());
@@ -9169,12 +9018,6 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
 
   journal_and_reply(mdr, srci, destdn, le, fin);
-
-  // trigger to flush mdlog in case reintegrating or migrating the stray dn,
-  // because the link requests maybe waiting.
-  if (srcdn->get_dir()->inode->is_stray()) {
-    mdlog->flush();
-  }
   mds->balancer->maybe_fragment(destdn->get_dir(), false);
 }
 
@@ -9708,7 +9551,7 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
   // primary+remote link merge?
   bool linkmerge = (srcdnl->get_inode() == oldin);
   if (linkmerge)
-    ceph_assert(srcdnl->is_primary() || destdnl->is_remote());
+    ceph_assert(srcdnl->is_primary() && destdnl->is_remote());
 
   bool new_in_snaprealm = false;
   bool new_oldin_snaprealm = false;
@@ -9785,14 +9628,6 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
 
   srcdn->get_dir()->unlink_inode(srcdn);
 
-  // After the stray dn being unlinked from the corresponding inode in case of
-  // reintegrate_stray/migrate_stray, just wake up the waitiers.
-  MDSContext::vec finished;
-  in->take_waiting(CInode::WAIT_UNLINK, finished);
-  if (!finished.empty()) {
-    mds->queue_waiters(finished);
-  }
-
   // dest
   if (srcdn_was_remote) {
     if (!linkmerge) {
@@ -11372,7 +11207,8 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
     return;
   }
 
-  snapid_t snapid = diri->snaprealm->resolve_snapname(srcname, diri->ino());
+  snapid_t  snapid = diri->snaprealm->resolve_snapname(srcname, diri->ino());
+
   dout(10) << " snapname " << srcname << " is " << snapid << dendl;
 
   // lock snap
@@ -11457,6 +11293,159 @@ void Server::_renamesnap_finish(MDRequestRef& mdr, CInode *diri, snapid_t snapid
   respond_to_request(mdr, 0);
 }
 
+void Server::handle_client_readdir_snapdiff(MDRequestRef& mdr)
+{
+  const cref_t<MClientRequest>& req = mdr->client_request;
+  Session* session = mds->get_session(req);
+  MutationImpl::LockOpVec lov;
+  CInode* diri = rdlock_path_pin_ref(mdr, false, true);
+  if (!diri) return;
+
+  // it's a directory, right?
+  if (!diri->is_dir()) {
+    // not a dir
+    dout(10) << "reply to " << *req << " snapdiff -CEPHFS_ENOTDIR" << dendl;
+    respond_to_request(mdr, -CEPHFS_ENOTDIR);
+    return;
+  }
+
+  auto num_caps = session->get_num_caps();
+  auto session_cap_acquisition = session->get_cap_acquisition();
+
+  if (num_caps > static_cast<uint64_t>(max_caps_per_client * max_caps_throttle_ratio) && session_cap_acquisition >= cap_acquisition_throttle) {
+    dout(20) << "snapdiff throttled. max_caps_per_client: " << max_caps_per_client << " num_caps: " << num_caps
+      << " session_cap_acquistion: " << session_cap_acquisition << " cap_acquisition_throttle: " << cap_acquisition_throttle << dendl;
+    if (logger)
+      logger->inc(l_mdss_cap_acquisition_throttle);
+
+    mds->timer.add_event_after(caps_throttle_retry_request_timeout, new C_MDS_RetryRequest(mdcache, mdr));
+    return;
+  }
+
+  lov.add_rdlock(&diri->filelock);
+  lov.add_rdlock(&diri->dirfragtreelock);
+
+  if (!mds->locker->acquire_locks(mdr, lov))
+    return;
+
+  if (!check_access(mdr, diri, MAY_READ))
+    return;
+
+  // which frag?
+  frag_t fg = (__u32)req->head.args.snapdiff.frag;
+  unsigned req_flags = (__u32)req->head.args.snapdiff.flags;
+  string offset_str = req->get_path2();
+
+  __u32 offset_hash = 0;
+  if (!offset_str.empty()) {
+    offset_hash = ceph_frag_value(diri->hash_dentry_name(offset_str));
+  } else {
+    offset_hash = (__u32)req->head.args.snapdiff.offset_hash;
+  }
+
+  dout(10) << " frag " << fg << " offset '" << offset_str << "'"
+    << " offset_hash " << offset_hash << " flags " << req_flags << dendl;
+
+  // does the frag exist?
+  if (diri->dirfragtree[fg.value()] != fg) {
+    frag_t newfg;
+    if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
+      if (fg.contains((unsigned)offset_hash)) {
+       newfg = diri->dirfragtree[offset_hash];
+      } else {
+       // client actually wants next frag
+       newfg = diri->dirfragtree[fg.value()];
+      }
+    } else {
+      offset_str.clear();
+      newfg = diri->dirfragtree[fg.value()];
+    }
+    dout(10) << " adjust frag " << fg << " -> " << newfg << " " << diri->dirfragtree << dendl;
+    fg = newfg;
+  }
+
+  CDir* dir = try_open_auth_dirfrag(diri, fg, mdr);
+  if (!dir) return;
+
+  // ok!
+  dout(10) << __func__<< " on " << *dir << dendl;
+  ceph_assert(dir->is_auth());
+
+  if (!dir->is_complete()) {
+    if (dir->is_frozen()) {
+      dout(7) << "dir is frozen " << *dir << dendl;
+      mds->locker->drop_locks(mdr.get());
+      mdr->drop_local_auth_pins();
+      dir->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
+      return;
+    }
+    // fetch
+    dout(10) << " incomplete dir contents for snapdiff on " << *dir << ", fetching" << dendl;
+    dir->fetch(new C_MDS_RetryRequest(mdcache, mdr), true);
+    return;
+  }
+
+#ifdef MDS_VERIFY_FRAGSTAT
+  dir->verify_fragstat();
+#endif
+
+  utime_t now = ceph_clock_now();
+  mdr->set_mds_stamp(now);
+
+  mdr->snapid_diff_other = (uint64_t)req->head.args.snapdiff.snap_other;
+  if (mdr->snapid_diff_other == mdr->snapid ||
+      mdr->snapid == CEPH_NOSNAP ||
+      mdr->snapid_diff_other == CEPH_NOSNAP) {
+    dout(10) << "reply to " << *req << " snapdiff -CEPHFS_EINVAL" << dendl;
+    respond_to_request(mdr, -CEPHFS_EINVAL);
+  }
+
+  dout(10) << __func__
+    << " snap " << mdr->snapid
+    << " vs. snap " << mdr->snapid_diff_other
+    << dendl;
+
+  SnapRealm* realm = diri->find_snaprealm();
+
+  unsigned max = req->head.args.snapdiff.max_entries;
+  if (!max)
+    max = dir->get_num_any();  // whatever, something big.
+  unsigned max_bytes = req->head.args.snapdiff.max_bytes;
+  if (!max_bytes)
+    // make sure at least one item can be encoded
+    max_bytes = (512 << 10) + g_conf()->mds_max_xattr_pairs_size;
+
+  // start final blob
+  bufferlist dirbl;
+  DirStat ds;
+  ds.frag = dir->get_frag();
+  ds.auth = dir->get_dir_auth().first;
+  if (dir->is_auth() && !forward_all_requests_to_auth)
+    dir->get_dist_spec(ds.dist, mds->get_nodeid());
+
+  dir->encode_dirstat(dirbl, mdr->session->info, ds);
+
+  // count bytes available.
+  //  this isn't perfect, but we should capture the main variable/unbounded size items!
+  int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8) * 2;
+  int bytes_left = max_bytes - front_bytes;
+  bytes_left -= get_snap_trace(session, realm).length();
+
+  _readdir_diff(
+    now,
+    mdr,
+    diri,
+    dir,
+    realm,
+    max,
+    bytes_left,
+    offset_str,
+    offset_hash,
+    req_flags,
+    dirbl);
+}
+
+
 /**
  * Return true if server is in state RECONNECT and this
  * client has not yet reconnected.
@@ -11487,3 +11476,265 @@ const bufferlist& Server::get_snap_trace(client_t client, SnapRealm *realm) cons
   Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
   return get_snap_trace(session, realm);
 }
+
+void Server::_readdir_diff(
+  utime_t now,
+  MDRequestRef& mdr,
+  CInode* diri,
+  CDir* dir,
+  SnapRealm* realm,
+  unsigned max_entries,
+  int bytes_left,
+  const string& offset_str,
+  uint32_t offset_hash,
+  unsigned req_flags,
+  bufferlist& dirbl)
+{
+  // build dir contents
+  bufferlist dnbl;
+  __u32 numfiles = 0;
+
+  snapid_t snapid = mdr->snapid;
+  snapid_t snapid_prev = mdr->snapid_diff_other;
+  if (snapid < snapid_prev) {
+    std::swap(snapid, snapid_prev);
+  }
+  bool from_the_beginning = !offset_hash && offset_str.empty();
+  // skip all dns < dentry_key_t(snapid, offset_str, offset_hash)
+  dentry_key_t skip_key(snapid_prev, offset_str.c_str(), offset_hash);
+
+  bool end = build_snap_diff(
+    mdr,
+    dir,
+    bytes_left,
+    from_the_beginning ? nullptr : & skip_key,
+    snapid_prev,
+    snapid,
+    dnbl,
+    [&](CDentry* dn, CInode* in, bool exists) {
+      string name;
+      snapid_t effective_snapid;
+      const auto& dn_name = dn->get_name();
+      // provide the first snapid for removed entries and
+      // the last one for existent ones
+      effective_snapid = exists ? snapid : snapid_prev;
+      name.append(dn_name);
+      if ((int)(dnbl.length() + name.length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
+       dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
+       return false;
+      }
+
+      auto diri = dir->get_inode();
+      auto hash = ceph_frag_value(diri->hash_dentry_name(dn_name));
+      unsigned start_len = dnbl.length();
+      dout(10) << "inc dn " << *dn << " as " << name
+               << std::hex << " hash 0x" << hash << std::dec
+               << dendl;
+      encode(name, dnbl);
+      mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
+
+      // inode
+      dout(10) << "inc inode " << *in << " snap "      << effective_snapid << dendl;
+      int r = in->encode_inodestat(dnbl, mdr->session, realm, effective_snapid, bytes_left - (int)dnbl.length());
+      if (r < 0) {
+       // chop off dn->name, lease
+       dout(10) << " ran out of room, stopping at "
+                << start_len << " < " << bytes_left << dendl;
+       bufferlist keep;
+       keep.substr_of(dnbl, 0, start_len);
+       dnbl.swap(keep);
+       return false;
+      }
+
+      // touch dn
+      mdcache->lru.lru_touch(dn);
+      ++numfiles;
+      return true;
+    });
+
+  __u16 flags = 0;
+  if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
+    flags |= CEPH_READDIR_HASH_ORDER | CEPH_READDIR_OFFSET_HASH;
+  }
+
+  std::swap(mdr->snapid, mdr->snapid_diff_other); // we want opponent snapid to be used for tracei
+
+  _finalize_readdir(mdr, diri, dir, from_the_beginning, end, flags, numfiles,
+    dirbl, dnbl);
+}
+
+bool Server::build_snap_diff(
+  MDRequestRef& mdr,
+  CDir* dir,
+  int bytes_left,
+  dentry_key_t* skip_key,
+  snapid_t snapid_prev,
+  snapid_t snapid,
+  const bufferlist& dnbl,
+  std::function<bool (CDentry*, CInode*, bool)> add_result_cb)
+{
+  client_t client = mdr->client_request->get_source().num();
+
+  struct EntryInfo {
+    CDentry* dn = nullptr;
+    CInode* in = nullptr;
+    utime_t mtime;
+
+    void reset() {
+      *this = EntryInfo();
+    }
+  } before;
+
+  auto insert_deleted = [&](EntryInfo& ei) {
+    dout(20) << "build_snap_diff deleted file " << ei.dn->get_name() << " "
+      << ei.dn->first << "/" << ei.dn->last << dendl;
+    int r = add_result_cb(ei.dn, ei.in, false);
+    ei.reset();
+    return r;
+  };
+
+  auto it = !skip_key ? dir->begin() : dir->lower_bound(*skip_key);
+
+  while(it != dir->end()) {
+    CDentry* dn = it->second;
+    dout(20) << __func__ << " " << it->first << "->" << *dn << dendl;
+    ++it;
+    if (dn->state_test(CDentry::STATE_PURGING))
+      continue;
+
+    bool dnp = dn->use_projected(client, mdr);
+    CDentry::linkage_t* dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
+
+    if (dnl->is_null()) {
+      dout(20) << __func__ << " linkage is null, skipping" << dendl;
+      continue;
+    }
+
+    if (dn->last < snapid_prev || dn->first > snapid) {
+      dout(20) << __func__ << " not in range, skipping" << dendl;
+      continue;
+    }
+    if (skip_key) {
+      skip_key->snapid = dn->last;
+      if (!(*skip_key < dn->key()))
+       continue;
+    }
+
+    CInode* in = dnl->get_inode();
+    if (in && in->ino() == CEPH_INO_CEPH)
+      continue;
+
+    // remote link?
+    // better for the MDS to do the work, if we think the client will stat any of these files.
+    if (dnl->is_remote() && !in) {
+      in = mdcache->get_inode(dnl->get_remote_ino());
+      dout(20) << __func__ << " remote in: " << *in << " ino " << std::hex << dnl->get_remote_ino() << std::dec << dendl;
+      if (in) {
+       dn->link_remote(dnl, in);
+      } else if (dn->state_test(CDentry::STATE_BADREMOTEINO)) {
+       dout(10) << "skipping bad remote ino on " << *dn << dendl;
+       continue;
+      } else {
+       // touch everything i _do_ have
+       for (auto& p : *dir) {
+         if (!p.second->get_linkage()->is_null())
+           mdcache->lru.lru_touch(p.second);
+       }
+
+       // already issued caps and leases, reply immediately.
+       if (dnbl.length() > 0) {
+         mdcache->open_remote_dentry(dn, dnp, new C_MDSInternalNoop);
+         dout(10) << " open remote dentry after caps were issued, stopping at "
+           << dnbl.length() << " < " << bytes_left << dendl;
+       } else {
+         mds->locker->drop_locks(mdr.get());
+         mdr->drop_local_auth_pins();
+         mdcache->open_remote_dentry(dn, dnp, new C_MDS_RetryRequest(mdcache, mdr));
+       }
+       return false;
+      }
+    }
+    ceph_assert(in);
+
+    utime_t mtime = in->get_inode()->mtime;
+
+    if (in->is_dir()) {
+
+      // we need to maintain the order of entries (determined by their name hashes)
+      // hence need to insert the previous entry if any immediately.
+      if (before.dn) {
+       if (!insert_deleted(before)) {
+         break;
+       }
+      }
+
+      bool exists = true;
+      if (snapid_prev < dn->first && dn->last < snapid) {
+       dout(20) << __func__ << " skipping inner " << dn->get_name() << " "
+         << dn->first << "/" << dn->last << dendl;
+       continue;
+      } else if (dn->first <= snapid_prev && dn->last < snapid) {
+       // dir deleted
+       dout(20) << __func__ << " deleted dir " << dn->get_name() << " "
+         << dn->first << "/" << dn->last << dendl;
+       exists = false;
+      }
+      bool r = add_result_cb(dn, in, exists);
+      if (!r) {
+       break;
+      }
+    } else {
+      if (snapid_prev >= dn->first && snapid <= dn->last) {
+       dout(20) << __func__ << " skipping unchanged " << dn->get_name() << " "
+         << dn->first << "/" << dn->last << dendl;
+       continue;
+      } else if (snapid_prev < dn->first && snapid > dn->last) {
+       dout(20) << __func__ << " skipping inner modification " << dn->get_name() << " "
+         << dn->first << "/" << dn->last << dendl;
+       continue;
+      }
+      string_view name_before =
+        before.dn ? string_view(before.dn->get_name()) : string_view();
+      if (before.dn && dn->get_name() != name_before) {
+        if (!insert_deleted(before)) {
+          break;
+        }
+        before.reset();
+      }
+      if (snapid_prev >= dn->first && snapid_prev <= dn->last) {
+       dout(30) << __func__ << " dn_before " << dn->get_name() << " "
+         << dn->first << "/" << dn->last << dendl;
+       before = EntryInfo {dn, in, mtime};
+       continue;
+      } else {
+       if (before.dn && dn->get_name() == name_before) {
+         if (mtime == before.mtime) {
+           dout(30) << __func__ << " timestamp not changed " << dn->get_name() << " "
+             << dn->first << "/" << dn->last
+             << " " << mtime
+             << dendl;
+           before.reset();
+           continue;
+         } else {
+           dout(30) << __func__ << " timestamp changed " << dn->get_name() << " "
+             << dn->first << "/" << dn->last
+             << " " << before.mtime << " vs. " << mtime
+             << dendl;
+           before.reset();
+         }
+       }
+       dout(20) << __func__ << " new file " << dn->get_name() << " "
+         << dn->first << "/" << dn->last
+         << dendl;
+       ceph_assert(snapid >= dn->first && snapid <= dn->last);
+      }
+      if (!add_result_cb(dn, in, true)) {
+       break;
+      }
+    }
+  }
+  if (before.dn) {
+    insert_deleted(before);
+  }
+  return it == dir->end();
+}