]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/ScrubStack.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mds / ScrubStack.cc
index c36f932fda4a51f6f917408af962fda5ca1e2299..f709ecbe271565b47b179e8ca655c309c0bdd099 100644 (file)
@@ -12,8 +12,6 @@
  *
  */
 
-#include <iostream>
-
 #include "ScrubStack.h"
 #include "common/Finisher.h"
 #include "mds/MDSRank.h"
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
-#define dout_prefix _prefix(_dout, scrubstack->mdcache->mds)
-static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
+#define dout_prefix _prefix(_dout, mdcache->mds)
+
+using namespace std;
+
+static std::ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
   return *_dout << "mds." << mds->get_nodeid() << ".scrubstack ";
 }
 
@@ -49,75 +50,120 @@ std::ostream &operator<<(std::ostream &os, const ScrubStack::State &state) {
   return os;
 }
 
-void ScrubStack::push_inode(CInode *in)
+void ScrubStack::dequeue(MDSCacheObject *obj)
 {
-  dout(20) << "pushing " << *in << " on top of ScrubStack" << dendl;
-  if (!in->item_scrub.is_on_list()) {
-    in->get(CInode::PIN_SCRUBQUEUE);
-    stack_size++;
-  }
-  inode_stack.push_front(&in->item_scrub);
+  dout(20) << "dequeue " << *obj << " from ScrubStack" << dendl;
+  ceph_assert(obj->item_scrub.is_on_list());
+  obj->put(MDSCacheObject::PIN_SCRUBQUEUE);
+  obj->item_scrub.remove_myself();
+  stack_size--;
 }
 
-void ScrubStack::push_inode_bottom(CInode *in)
+int ScrubStack::_enqueue(MDSCacheObject *obj, ScrubHeaderRef& header, bool top)
 {
-  dout(20) << "pushing " << *in << " on bottom of ScrubStack" << dendl;
-  if (!in->item_scrub.is_on_list()) {
-    in->get(CInode::PIN_SCRUBQUEUE);
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
+  if (CInode *in = dynamic_cast<CInode*>(obj)) {
+    if (in->scrub_is_in_progress()) {
+      dout(10) << __func__ << " with {" << *in << "}" << ", already in scrubbing" << dendl;
+      return -CEPHFS_EBUSY;
+    }
+
+    dout(10) << __func__ << " with {" << *in << "}" << ", top=" << top << dendl;
+    in->scrub_initialize(header);
+  } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
+    if (dir->scrub_is_in_progress()) {
+      dout(10) << __func__ << " with {" << *dir << "}" << ", already in scrubbing" << dendl;
+      return -CEPHFS_EBUSY;
+    }
+
+    dout(10) << __func__ << " with {" << *dir << "}" << ", top=" << top << dendl;
+    // The edge directory must be in memory
+    dir->auth_pin(this);
+    dir->scrub_initialize(header);
+  } else {
+    ceph_assert(0 == "queue dentry to scrub stack");
+  }
+
+  dout(20) << "enqueue " << *obj << " to " << (top ? "top" : "bottom") << " of ScrubStack" << dendl;
+  if (!obj->item_scrub.is_on_list()) {
+    obj->get(MDSCacheObject::PIN_SCRUBQUEUE);
     stack_size++;
   }
-  inode_stack.push_back(&in->item_scrub);
+  if (top)
+    scrub_stack.push_front(&obj->item_scrub);
+  else
+    scrub_stack.push_back(&obj->item_scrub);
+  return 0;
 }
 
-void ScrubStack::pop_inode(CInode *in)
+int ScrubStack::enqueue(CInode *in, ScrubHeaderRef& header, bool top)
 {
-  dout(20) << "popping " << *in
-          << " off of ScrubStack" << dendl;
-  ceph_assert(in->item_scrub.is_on_list());
-  in->put(CInode::PIN_SCRUBQUEUE);
-  in->item_scrub.remove_myself();
-  stack_size--;
+  // abort in progress
+  if (clear_stack)
+    return -CEPHFS_EAGAIN;
+
+  header->set_origin(in->ino());
+  auto ret = scrubbing_map.emplace(header->get_tag(), header);
+  if (!ret.second) {
+    dout(10) << __func__ << " with {" << *in << "}"
+            << ", conflicting tag " << header->get_tag() << dendl;
+    return -CEPHFS_EEXIST;
+  }
+
+  int r = _enqueue(in, header, top);
+  if (r < 0)
+    return r;
+
+  clog_scrub_summary(in);
+
+  kick_off_scrubs();
+  return 0;
 }
 
-void ScrubStack::_enqueue_inode(CInode *in, CDentry *parent,
-                               ScrubHeaderRef& header,
-                               MDSContext *on_finish, bool top)
+void ScrubStack::add_to_waiting(MDSCacheObject *obj)
 {
-  dout(10) << __func__ << " with {" << *in << "}"
-           << ", on_finish=" << on_finish << ", top=" << top << dendl;
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
-  in->scrub_initialize(parent, header, on_finish);
-  if (top)
-    push_inode(in);
-  else
-    push_inode_bottom(in);
+  scrubs_in_progress++;
+  obj->item_scrub.remove_myself();
+  scrub_waiting.push_back(&obj->item_scrub);
 }
 
-void ScrubStack::enqueue_inode(CInode *in, ScrubHeaderRef& header,
-                               MDSContext *on_finish, bool top)
+void ScrubStack::remove_from_waiting(MDSCacheObject *obj, bool kick)
 {
-  // abort in progress
-  if (clear_inode_stack) {
-    on_finish->complete(-EAGAIN);
-    return;
+  scrubs_in_progress--;
+  if (obj->item_scrub.is_on_list()) {
+    obj->item_scrub.remove_myself();
+    scrub_stack.push_front(&obj->item_scrub);
+    if (kick)
+      kick_off_scrubs();
   }
-
-  _enqueue_inode(in, NULL, header, on_finish, top);
-  kick_off_scrubs();
 }
 
+class C_RetryScrub : public MDSInternalContext {
+public:
+  C_RetryScrub(ScrubStack *s, MDSCacheObject *o) :
+    MDSInternalContext(s->mdcache->mds), stack(s), obj(o) {
+    stack->add_to_waiting(obj);
+  }
+  void finish(int r) override {
+    stack->remove_from_waiting(obj);
+  }
+private:
+  ScrubStack *stack;
+  MDSCacheObject *obj;
+};
+
 void ScrubStack::kick_off_scrubs()
 {
-  ceph_assert(mdcache->mds->mds_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
   dout(20) << __func__ << ": state=" << state << dendl;
 
-  if (clear_inode_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
+  if (clear_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
     if (scrubs_in_progress == 0) {
       dout(10) << __func__ << ": in progress scrub operations finished, "
                << stack_size << " in the stack" << dendl;
 
       State final_state = state;
-      if (clear_inode_stack) {
+      if (clear_stack) {
         abort_pending_scrubs();
         final_state = STATE_IDLE;
       }
@@ -134,11 +180,9 @@ void ScrubStack::kick_off_scrubs()
 
   dout(20) << __func__ << " entering with " << scrubs_in_progress << " in "
               "progress and " << stack_size << " in the stack" << dendl;
-  bool can_continue = true;
-  elist<CInode*>::iterator i = inode_stack.begin();
-  while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress &&
-         can_continue) {
-    if (i.end()) {
+  elist<MDSCacheObject*>::iterator it = scrub_stack.begin();
+  while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
+    if (it.end()) {
       if (scrubs_in_progress == 0) {
         set_state(STATE_IDLE);
       }
@@ -149,272 +193,247 @@ void ScrubStack::kick_off_scrubs()
     assert(state == STATE_RUNNING || state == STATE_IDLE);
     set_state(STATE_RUNNING);
 
-    CInode *curi = *i;
-    ++i; // we have our reference, push iterator forward
+    if (CInode *in = dynamic_cast<CInode*>(*it)) {
+      dout(20) << __func__ << " examining " << *in << dendl;
+      ++it;
 
-    dout(20) << __func__ << " examining " << *curi << dendl;
+      if (!validate_inode_auth(in))
+       continue;
 
-    if (!curi->is_dir()) {
-      // it's a regular file, symlink, or hard link
-      pop_inode(curi); // we only touch it this once, so remove from stack
+      if (!in->is_dir()) {
+       // it's a regular file, symlink, or hard link
+       dequeue(in); // we only touch it this once, so remove from stack
 
-      if (!curi->scrub_info()->on_finish) {
-       scrubs_in_progress++;
-       curi->scrub_set_finisher(&scrub_kick);
+       scrub_file_inode(in);
+      } else {
+       bool added_children = false;
+       bool done = false; // it's done, so pop it off the stack
+       scrub_dir_inode(in, &added_children, &done);
+       if (done) {
+         dout(20) << __func__ << " dir inode, done" << dendl;
+         dequeue(in);
+       }
+       if (added_children) {
+         // dirfrags were queued at top of stack
+         it = scrub_stack.begin();
+       }
       }
-      scrub_file_inode(curi);
-      can_continue = true;
-    } else {
-      bool completed; // it's done, so pop it off the stack
-      bool terminal; // not done, but we can start ops on other directories
-      bool progress; // it added new dentries to the top of the stack
-      scrub_dir_inode(curi, &progress, &terminal, &completed);
-      if (completed) {
-        dout(20) << __func__ << " dir completed" << dendl;
-        pop_inode(curi);
-      } else if (progress) {
-        dout(20) << __func__ << " dir progressed" << dendl;
-        // we added new stuff to top of stack, so reset ourselves there
-        i = inode_stack.begin();
+    } else if (CDir *dir = dynamic_cast<CDir*>(*it)) {
+      auto next = it;
+      ++next;
+      bool done = false; // it's done, so pop it off the stack
+      scrub_dirfrag(dir, &done);
+      if (done) {
+       dout(20) << __func__ << " dirfrag, done" << dendl;
+       ++it; // child inodes were queued at bottom of stack
+       dequeue(dir);
       } else {
-        dout(20) << __func__ << " dir no-op" << dendl;
+       it = next;
       }
+    } else {
+      ceph_assert(0 == "dentry in scrub stack");
+    }
+  }
+}
 
-      can_continue = progress || terminal || completed;
+bool ScrubStack::validate_inode_auth(CInode *in)
+{
+  if (in->is_auth()) {
+    if (!in->can_auth_pin()) {
+      dout(10) << __func__ << " can't auth pin" << dendl;
+      in->add_waiter(CInode::WAIT_UNFREEZE, new C_RetryScrub(this, in));
+      return false;
+    }
+    return true;
+  } else {
+    MDSRank *mds = mdcache->mds;
+    if (in->is_ambiguous_auth()) {
+      dout(10) << __func__ << " ambiguous auth" << dendl;
+      in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_RetryScrub(this, in));
+    } else if (mds->is_cluster_degraded()) {
+      dout(20) << __func__ << " cluster degraded" << dendl;
+      mds->wait_for_cluster_recovered(new C_RetryScrub(this, in));
+    } else {
+      ScrubHeaderRef header = in->get_scrub_header();
+      ceph_assert(header);
+
+      auto ret = remote_scrubs.emplace(std::piecewise_construct,
+                                      std::forward_as_tuple(in),
+                                      std::forward_as_tuple());
+      ceph_assert(ret.second); // FIXME: parallel scrubs?
+      auto &scrub_r = ret.first->second;
+      scrub_r.tag = header->get_tag();
+
+      mds_rank_t auth = in->authority().first;
+      dout(10) << __func__ << " forward to mds." << auth << dendl;
+      auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO, in->ino(),
+                                      std::move(in->scrub_queued_frags()),
+                                      header->get_tag(), header->get_origin(),
+                                      header->is_internal_tag(), header->get_force(),
+                                      header->get_recursive(), header->get_repair());
+      mdcache->mds->send_message_mds(r, auth);
+
+      scrub_r.gather_set.insert(auth);
+      // wait for ACK
+      add_to_waiting(in);
     }
+    return false;
   }
 }
 
-void ScrubStack::scrub_dir_inode(CInode *in,
-                                 bool *added_children,
-                                 bool *terminal,
-                                 bool *done)
+void ScrubStack::scrub_dir_inode(CInode *in, bool *added_children, bool *done)
 {
   dout(10) << __func__ << " " << *in << dendl;
-
-  *added_children = false;
-  bool all_frags_terminal = true;
-  bool all_frags_done = true;
+  ceph_assert(in->is_auth());
+  MDSRank *mds = mdcache->mds;
 
   ScrubHeaderRef header = in->get_scrub_header();
-  ceph_assert(header != nullptr);
-
-  if (header->get_recursive()) {
-    frag_vec_t scrubbing_frags;
-    list<CDir*> scrubbing_cdirs;
-    in->scrub_dirfrags_scrubbing(&scrubbing_frags);
-    dout(20) << __func__ << " iterating over " << scrubbing_frags.size()
-      << " scrubbing frags" << dendl;
-    for (const auto& fg : scrubbing_frags) {
-      // turn frags into CDir *
-      CDir *dir = in->get_dirfrag(fg);
-      if (dir) {
-       scrubbing_cdirs.push_back(dir);
-       dout(25) << __func__ << " got CDir " << *dir << " presently scrubbing" << dendl;
+  ceph_assert(header);
+
+  MDSGatherBuilder gather(g_ceph_context);
+
+  auto &queued = in->scrub_queued_frags();
+  std::map<mds_rank_t, fragset_t> scrub_remote;
+
+  frag_vec_t frags;
+  in->dirfragtree.get_leaves(frags);
+  dout(20) << __func__ << "recursive mode, frags " << frags << dendl;
+  for (auto &fg : frags) {
+    if (queued.contains(fg))
+      continue;
+    CDir *dir = in->get_or_open_dirfrag(mdcache, fg);
+    if (!dir->is_auth()) {
+      if (dir->is_ambiguous_auth()) {
+       dout(20) << __func__ << " ambiguous auth " << *dir  << dendl;
+       dir->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, gather.new_sub());
+      } else if (mds->is_cluster_degraded()) {
+       dout(20) << __func__ << " cluster degraded" << dendl;
+       mds->wait_for_cluster_recovered(gather.new_sub());
       } else {
-       in->scrub_dirfrag_finished(fg);
-       dout(25) << __func__ << " missing dirfrag " << fg << " skip scrubbing" << dendl;
+       mds_rank_t auth = dir->authority().first;
+       scrub_remote[auth].insert_raw(fg);
       }
+    } else if (!dir->can_auth_pin()) {
+      dout(20) << __func__ << " freezing/frozen " << *dir  << dendl;
+      dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
+    } else if (dir->get_version() == 0) {
+      dout(20) << __func__ << " barebones " << *dir  << dendl;
+      dir->fetch(gather.new_sub());
+    } else {
+      _enqueue(dir, header, true);
+      queued.insert_raw(dir->get_frag());
+      *added_children = true;
     }
-
-    dout(20) << __func__ << " consuming from " << scrubbing_cdirs.size()
-            << " scrubbing cdirs" << dendl;
-
-    list<CDir*>::iterator i = scrubbing_cdirs.begin();
-    while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
-      // select next CDir
-      CDir *cur_dir = NULL;
-      if (i != scrubbing_cdirs.end()) {
-       cur_dir = *i;
-       ++i;
-       dout(20) << __func__ << " got cur_dir = " << *cur_dir << dendl;
-      } else {
-       bool ready = get_next_cdir(in, &cur_dir);
-       dout(20) << __func__ << " get_next_cdir ready=" << ready << dendl;
-
-       if (ready && cur_dir) {
-         scrubbing_cdirs.push_back(cur_dir);
-       } else if (!ready) {
-         // We are waiting for load of a frag
-         all_frags_done = false;
-         all_frags_terminal = false;
-         break;
-       } else {
-         // Finished with all frags
-         break;
-       }
-      }
-      // scrub that CDir
-      bool frag_added_children = false;
-      bool frag_terminal = true;
-      bool frag_done = false;
-      scrub_dirfrag(cur_dir, header,
-                   &frag_added_children, &frag_terminal, &frag_done);
-      if (frag_done) {
-       cur_dir->inode->scrub_dirfrag_finished(cur_dir->frag);
-      }
-      *added_children |= frag_added_children;
-      all_frags_terminal = all_frags_terminal && frag_terminal;
-      all_frags_done = all_frags_done && frag_done;
-    }
-
-    dout(20) << "finished looping; all_frags_terminal=" << all_frags_terminal
-            << ", all_frags_done=" << all_frags_done << dendl;
-  } else {
-    dout(20) << "!scrub_recursive" << dendl;
   }
 
-  if (all_frags_done) {
-    assert (!*added_children); // can't do this if children are still pending
+  queued.simplify();
 
-    // OK, so now I can... fire off a validate on the dir inode, and
-    // when it completes, come through here again, noticing that we've
-    // set a flag to indicate the validate happened, and 
-    scrub_dir_inode_final(in);
+  if (gather.has_subs()) {
+    gather.set_finisher(new C_RetryScrub(this, in));
+    gather.activate();
+    return;
   }
 
-  *terminal = all_frags_terminal;
-  *done = all_frags_done;
-  dout(10) << __func__ << " is exiting " << *terminal << " " << *done << dendl;
-  return;
-}
-
-bool ScrubStack::get_next_cdir(CInode *in, CDir **new_dir)
-{
-  dout(20) << __func__ << " on " << *in << dendl;
-  frag_t next_frag;
-  int r = in->scrub_dirfrag_next(&next_frag);
-  assert (r >= 0);
-
-  if (r == 0) {
-    // we got a frag to scrub, otherwise it would be ENOENT
-    dout(25) << "looking up new frag " << next_frag << dendl;
-    CDir *next_dir = in->get_or_open_dirfrag(mdcache, next_frag);
-    if (!next_dir->is_complete()) {
-      scrubs_in_progress++;
-      next_dir->fetch(&scrub_kick);
-      dout(25) << "fetching frag from RADOS" << dendl;
-      return false;
+  if (!scrub_remote.empty()) {
+    auto ret = remote_scrubs.emplace(std::piecewise_construct,
+                                    std::forward_as_tuple(in),
+                                    std::forward_as_tuple());
+    ceph_assert(ret.second); // FIXME: parallel scrubs?
+    auto &scrub_r = ret.first->second;
+    scrub_r.tag = header->get_tag();
+
+    for (auto& p : scrub_remote) {
+      p.second.simplify();
+      dout(20) << __func__ << " forward " << p.second  << " to mds." << p.first << dendl;
+      auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR, in->ino(),
+                                      std::move(p.second), header->get_tag(),
+                                      header->get_origin(), header->is_internal_tag(),
+                                      header->get_force(), header->get_recursive(),
+                                      header->get_repair());
+      mds->send_message_mds(r, p.first);
+      scrub_r.gather_set.insert(p.first);
     }
-    *new_dir = next_dir;
-    dout(25) << "returning dir " << *new_dir << dendl;
-    return true;
+    // wait for ACKs
+    add_to_waiting(in);
+    return;
   }
-  ceph_assert(r == ENOENT);
-  // there are no dirfrags left
-  *new_dir = NULL;
-  return true;
+
+  scrub_dir_inode_final(in);
+
+  *done = true;
+  dout(10) << __func__ << " done" << dendl;
 }
 
 class C_InodeValidated : public MDSInternalContext
 {
-  public:
-    ScrubStack *stack;
-    CInode::validated_data result;
-    CInode *target;
-
-    C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
-      : MDSInternalContext(mds), stack(stack_), target(target_)
-    {}
+public:
+  ScrubStack *stack;
+  CInode::validated_data result;
+  CInode *target;
 
-    void finish(int r) override
-    {
-      stack->_validate_inode_done(target, r, result);
-    }
+  C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
+    : MDSInternalContext(mds), stack(stack_), target(target_)
+  {
+    stack->scrubs_in_progress++;
+  }
+  void finish(int r) override {
+    stack->_validate_inode_done(target, r, result);
+    stack->scrubs_in_progress--;
+    stack->kick_off_scrubs();
+  }
 };
 
-
 void ScrubStack::scrub_dir_inode_final(CInode *in)
 {
   dout(20) << __func__ << " " << *in << dendl;
 
-  // Two passes through this function.  First one triggers inode validation,
-  // second one sets finally_done
-  // FIXME: kind of overloading scrub_in_progress here, using it while
-  // dentry is still on stack to indicate that we have finished
-  // doing our validate_disk_state on the inode
-  // FIXME: the magic-constructing scrub_info() is going to leave
-  // an unneeded scrub_infop lying around here
-  if (!in->scrub_info()->children_scrubbed) {
-    if (!in->scrub_info()->on_finish) {
-      scrubs_in_progress++;
-      in->scrub_set_finisher(&scrub_kick);
-    }
-
-    in->scrub_children_finished();
-    C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
-    in->validate_disk_state(&fin->result, fin);
-  }
-
+  C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
+  in->validate_disk_state(&fin->result, fin);
   return;
 }
 
-void ScrubStack::scrub_dirfrag(CDir *dir,
-                              ScrubHeaderRef& header,
-                              bool *added_children, bool *is_terminal,
-                              bool *done)
+void ScrubStack::scrub_dirfrag(CDir *dir, bool *done)
 {
   ceph_assert(dir != NULL);
 
-  dout(20) << __func__ << " on " << *dir << dendl;
-  *added_children = false;
-  *is_terminal = false;
-  *done = false;
+  dout(10) << __func__ << " " << *dir << dendl;
 
-
-  if (!dir->scrub_info()->directory_scrubbing) {
-    // Get the frag complete before calling
-    // scrub initialize, so that it can populate its lists
-    // of dentries.
-    if (!dir->is_complete()) {
-      scrubs_in_progress++;
-      dir->fetch(&scrub_kick);
-      return;
-    }
-
-    dir->scrub_initialize(header);
+  if (!dir->is_complete()) {
+    dir->fetch(new C_RetryScrub(this, dir), true); // already auth pinned
+    dout(10) << __func__ << " incomplete, fetching" << dendl;
+    return;
   }
 
-  int r = 0;
-  while(r == 0) {
-    CDentry *dn = NULL;
-    scrubs_in_progress++;
-    r = dir->scrub_dentry_next(&scrub_kick, &dn);
-    if (r != EAGAIN) {
-      scrubs_in_progress--;
-    }
-
-    if (r == EAGAIN) {
-      // Drop out, CDir fetcher will call back our kicker context
-      dout(20) << __func__ << " waiting for fetch on " << *dir << dendl;
-      return;
-    }
-
-    if (r == ENOENT) {
-      // Nothing left to scrub, are we done?
-      std::list<CDentry*> scrubbing;
-      dir->scrub_dentries_scrubbing(&scrubbing);
-      if (scrubbing.empty()) {
-        dout(20) << __func__ << " dirfrag done: " << *dir << dendl;
-        // FIXME: greg: What's the diff meant to be between done and terminal
-       dir->scrub_finished();
-        *done = true;
-        *is_terminal = true;
-      } else {
-        dout(20) << __func__ << " " << scrubbing.size() << " dentries still "
-                   "scrubbing in " << *dir << dendl;
+  ScrubHeaderRef header = dir->get_scrub_header();
+  version_t last_scrub = dir->scrub_info()->last_recursive.version;
+  if (header->get_recursive()) {
+    for (auto it = dir->begin(); it != dir->end(); ++it) {
+      if (it->first.snapid != CEPH_NOSNAP)
+       continue;
+      CDentry *dn = it->second;
+      CDentry::linkage_t *dnl = dn->get_linkage();
+      if (dn->get_version() <= last_scrub &&
+         dnl->get_remote_d_type() != DT_DIR &&
+         !header->get_force()) {
+       dout(15) << __func__ << " skip dentry " << it->first
+                << ", no change since last scrub" << dendl;
+       continue;
+      }
+      if (dnl->is_primary()) {
+       _enqueue(dnl->get_inode(), header, false);
+      } else if (dnl->is_remote()) {
+       // TODO: check remote linkage
       }
-      return;
     }
+  }
 
-    // scrub_dentry_next defined to only give EAGAIN, ENOENT, 0 -- we should
-    // never get random IO errors here.
-    ceph_assert(r == 0);
+  dir->scrub_local();
 
-    _enqueue_inode(dn->get_projected_inode(), dn, header, NULL, true);
+  dir->scrub_finished();
+  dir->auth_unpin(this);
 
-    *added_children = true;
-  }
+  *done = true;
+  dout(10) << __func__ << " done" << dendl;
 }
 
 void ScrubStack::scrub_file_inode(CInode *in)
@@ -442,7 +461,7 @@ void ScrubStack::_validate_inode_done(CInode *in, int r,
   {
     // Record backtrace fails as remote linkage damage, as
     // we may not be able to resolve hard links to this inode
-    mdcache->mds->damage_table.notify_remote_damaged(in->inode.ino, path);
+    mdcache->mds->damage_table.notify_remote_damaged(in->ino(), path);
   } else if (result.inode.checked && !result.inode.passed &&
              !result.inode.repaired) {
     // Record damaged inode structures as damaged dentries as
@@ -469,39 +488,19 @@ void ScrubStack::_validate_inode_done(CInode *in, int r,
     // Put the verbose JSON output into the MDS log for later inspection
     JSONFormatter f;
     result.dump(&f);
-    std::ostringstream out;
-    f.flush(out);
-    derr << __func__ << " scrub error on inode " << *in << ": " << out.str()
+    CachedStackStringStream css;
+    f.flush(*css);
+    derr << __func__ << " scrub error on inode " << *in << ": " << css->strv()
          << dendl;
   } else {
     dout(10) << __func__ << " scrub passed on inode " << *in << dendl;
   }
 
-  MDSContext *c = NULL;
-  in->scrub_finished(&c);
-
-  if (in == header->get_origin()) {
-    scrub_origins.erase(in);
-    if (!header->get_recursive()) {
-      if (r >= 0) { // we got into the scrubbing dump it
-        result.dump(&(header->get_formatter()));
-      } else { // we failed the lookup or something; dump ourselves
-        header->get_formatter().open_object_section("results");
-        header->get_formatter().dump_int("return_code", r);
-        header->get_formatter().close_section(); // results
-      }
-    }
-  }
-  if (c) {
-    finisher->queue(new MDSIOContextWrapper(mdcache->mds, c), 0);
-  }
+  in->scrub_finished();
 }
 
-ScrubStack::C_KickOffScrubs::C_KickOffScrubs(MDCache *mdcache, ScrubStack *s)
-  : MDSInternalContext(mdcache->mds), stack(s) { }
-
 void ScrubStack::complete_control_contexts(int r) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
 
   for (auto &ctx : control_ctxs) {
     ctx->complete(r);
@@ -514,11 +513,12 @@ void ScrubStack::set_state(State next_state) {
       dout(20) << __func__ << ", from state=" << state << ", to state="
                << next_state << dendl;
       state = next_state;
+      clog_scrub_summary();
     }
 }
 
 bool ScrubStack::scrub_in_transition_state() {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   dout(20) << __func__ << ": state=" << state << dendl;
 
   // STATE_RUNNING is considered as a transition state so as to
@@ -530,71 +530,132 @@ bool ScrubStack::scrub_in_transition_state() {
   return false;
 }
 
+std::string_view ScrubStack::scrub_summary() {
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
+
+  bool have_more = false;
+  CachedStackStringStream cs;
+
+  if (state == STATE_IDLE) {
+    if (scrubbing_map.empty())
+      return "idle";
+    *cs << "idle+waiting";
+  }
+
+  if (state == STATE_RUNNING) {
+    if (clear_stack) {
+      *cs << "aborting";
+    } else {
+      *cs << "active";
+    }
+  } else {
+    if (state == STATE_PAUSING) {
+      have_more = true;
+      *cs << "pausing";
+    } else if (state == STATE_PAUSED) {
+      have_more = true;
+      *cs << "paused";
+    }
+
+    if (clear_stack) {
+      if (have_more) {
+        *cs << "+";
+      }
+      *cs << "aborting";
+    }
+  }
+
+  if (!scrubbing_map.empty()) {
+    *cs << " paths [";
+    bool first = true;
+    for (auto &p : scrubbing_map) {
+      if (!first)
+       *cs << ",";
+      auto& header = p.second;
+      if (CInode *in = mdcache->get_inode(header->get_origin()))
+       *cs << scrub_inode_path(in);
+      else
+       *cs << "#" << header->get_origin();
+      first = false;
+    }
+    *cs << "]";
+  }
+
+  return cs->strv();
+}
+
 void ScrubStack::scrub_status(Formatter *f) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
 
   f->open_object_section("result");
 
-  std::stringstream ss;
+  CachedStackStringStream css;
   bool have_more = false;
 
   if (state == STATE_IDLE) {
-    ss << "no active scrubs running";
+    if (scrubbing_map.empty())
+      *css << "no active scrubs running";
+    else
+      *css << state << " (waiting for more scrubs)";
   } else if (state == STATE_RUNNING) {
-    if (clear_inode_stack) {
-      ss << "ABORTING";
+    if (clear_stack) {
+      *css << "ABORTING";
     } else {
-      ss << "scrub active";
+      *css << "scrub active";
     }
-    ss << " (" << stack_size << " inodes in the stack)";
+    *css << " (" << stack_size << " inodes in the stack)";
   } else {
     if (state == STATE_PAUSING || state == STATE_PAUSED) {
       have_more = true;
-      ss << state;
+      *css << state;
     }
-    if (clear_inode_stack) {
+    if (clear_stack) {
       if (have_more) {
-        ss << "+";
+        *css << "+";
       }
-      ss << "ABORTING";
+      *css << "ABORTING";
     }
 
-    ss << " (" << stack_size << " inodes in the stack)";
+    *css << " (" << stack_size << " inodes in the stack)";
   }
-  f->dump_string("status", ss.str());
+  f->dump_string("status", css->strv());
 
   f->open_object_section("scrubs");
-  for (auto &inode : scrub_origins) {
+
+  for (auto& p : scrubbing_map) {
     have_more = false;
-    ScrubHeaderRefConst header = inode->get_scrub_header();
+    auto& header = p.second;
 
     std::string tag(header->get_tag());
     f->open_object_section(tag.c_str()); // scrub id
 
-    std::string path;
-    inode->make_path_string(path, true);
-    f->dump_string("path", path.empty() ? "/" : path.c_str());
+    if (CInode *in = mdcache->get_inode(header->get_origin()))
+      f->dump_string("path", scrub_inode_path(in));
+    else
+      f->dump_stream("path") << "#" << header->get_origin();
 
-    std::stringstream optss;
+    f->dump_string("tag", header->get_tag());
+
+    CachedStackStringStream optcss;
     if (header->get_recursive()) {
-      optss << "recursive";
+      *optcss << "recursive";
       have_more = true;
     }
     if (header->get_repair()) {
       if (have_more) {
-        optss << ",";
+        *optcss << ",";
       }
-      optss << "repair";
+      *optcss << "repair";
       have_more = true;
     }
     if (header->get_force()) {
       if (have_more) {
-        optss << ",";
+        *optcss << ",";
       }
-      optss << "force";
+      *optcss << "force";
     }
 
-    f->dump_string("options", optss.str());
+    f->dump_string("options", optcss->strv());
     f->close_section(); // scrub id
   }
   f->close_section(); // scrubs
@@ -602,84 +663,119 @@ void ScrubStack::scrub_status(Formatter *f) {
 }
 
 void ScrubStack::abort_pending_scrubs() {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
-  ceph_assert(clear_inode_stack);
-
-  for (auto inode = inode_stack.begin(); !inode.end(); ++inode) {
-    CInode *in = *inode;
-    if (in == in->scrub_info()->header->get_origin()) {
-      scrub_origins.erase(in);
-    }
-
-    MDSContext *ctx = nullptr;
-    in->scrub_aborted(&ctx);
-    if (ctx != nullptr) {
-      ctx->complete(-ECANCELED);
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
+  ceph_assert(clear_stack);
+
+  auto abort_one = [this](MDSCacheObject *obj) {
+    if (CInode *in = dynamic_cast<CInode*>(obj))  {
+      in->scrub_aborted();
+    } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
+      dir->scrub_aborted();
+      dir->auth_unpin(this);
+    } else {
+      ceph_abort(0 == "dentry in scrub stack");
     }
-  }
+  };
+  for (auto it = scrub_stack.begin(); !it.end(); ++it)
+    abort_one(*it);
+  for (auto it = scrub_waiting.begin(); !it.end(); ++it)
+    abort_one(*it);
 
   stack_size = 0;
-  inode_stack.clear();
-  clear_inode_stack = false;
+  scrub_stack.clear();
+  scrub_waiting.clear();
+
+  for (auto& p : remote_scrubs)
+    remove_from_waiting(p.first, false);
+  remote_scrubs.clear();
+
+  clear_stack = false;
+}
+
+void ScrubStack::send_state_message(int op) {
+  MDSRank *mds = mdcache->mds;
+  set<mds_rank_t> up_mds;
+  mds->get_mds_map()->get_up_mds_set(up_mds);
+  for (auto& r : up_mds) {
+    if (r == 0)
+      continue;
+    auto m = make_message<MMDSScrub>(op);
+    mds->send_message_mds(m, r);
+  }
 }
 
 void ScrubStack::scrub_abort(Context *on_finish) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
-  ceph_assert(on_finish != nullptr);
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
 
   dout(10) << __func__ << ": aborting with " << scrubs_in_progress
            << " scrubs in progress and " << stack_size << " in the"
            << " stack" << dendl;
 
-  clear_inode_stack = true;
+  if (mdcache->mds->get_nodeid() == 0) {
+    scrub_epoch_last_abort = scrub_epoch;
+    scrub_any_peer_aborting = true;
+    send_state_message(MMDSScrub::OP_ABORT);
+  }
+
+  clear_stack = true;
   if (scrub_in_transition_state()) {
-    control_ctxs.push_back(on_finish);
+    if (on_finish)
+      control_ctxs.push_back(on_finish);
     return;
   }
 
   abort_pending_scrubs();
-  if (state != STATE_PAUSED) {
+  if (state != STATE_PAUSED)
     set_state(STATE_IDLE);
-  }
-  on_finish->complete(0);
+
+  if (on_finish)
+    on_finish->complete(0);
 }
 
 void ScrubStack::scrub_pause(Context *on_finish) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
-  ceph_assert(on_finish != nullptr);
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
 
   dout(10) << __func__ << ": pausing with " << scrubs_in_progress
            << " scrubs in progress and " << stack_size << " in the"
            << " stack" << dendl;
 
+  if (mdcache->mds->get_nodeid() == 0)
+    send_state_message(MMDSScrub::OP_PAUSE);
+
   // abort is in progress
-  if (clear_inode_stack) {
-    on_finish->complete(-EINVAL);
+  if (clear_stack) {
+    if (on_finish)
+      on_finish->complete(-CEPHFS_EINVAL);
     return;
   }
 
   bool done = scrub_in_transition_state();
   if (done) {
     set_state(STATE_PAUSING);
-    control_ctxs.push_back(on_finish);
+    if (on_finish)
+      control_ctxs.push_back(on_finish);
     return;
   }
 
   set_state(STATE_PAUSED);
-  on_finish->complete(0);
+  if (on_finish)
+    on_finish->complete(0);
 }
 
 bool ScrubStack::scrub_resume() {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   dout(20) << __func__ << ": state=" << state << dendl;
 
+  if (mdcache->mds->get_nodeid() == 0)
+    send_state_message(MMDSScrub::OP_RESUME);
+
   int r = 0;
 
-  if (clear_inode_stack) {
-    r = -EINVAL;
+  if (clear_stack) {
+    r = -CEPHFS_EINVAL;
   } else if (state == STATE_PAUSING) {
     set_state(STATE_RUNNING);
-    complete_control_contexts(-ECANCELED);
+    complete_control_contexts(-CEPHFS_ECANCELED);
   } else if (state == STATE_PAUSED) {
     set_state(STATE_RUNNING);
     kick_off_scrubs();
@@ -687,3 +783,349 @@ bool ScrubStack::scrub_resume() {
 
   return r;
 }
+
+// send current scrub summary to cluster log
+void ScrubStack::clog_scrub_summary(CInode *in) {
+  if (in) {
+    std::string what;
+    if (clear_stack) {
+      what = "aborted";
+    } else if (in->scrub_is_in_progress()) {
+      what = "queued";
+    } else {
+      what = "completed";
+    }
+    clog->info() << "scrub " << what << " for path: " << scrub_inode_path(in);
+  }
+
+  clog->info() << "scrub summary: " << scrub_summary();
+}
+
+void ScrubStack::dispatch(const cref_t<Message> &m)
+{
+  switch (m->get_type()) {
+  case MSG_MDS_SCRUB:
+    handle_scrub(ref_cast<MMDSScrub>(m));
+    break;
+
+  case MSG_MDS_SCRUB_STATS:
+    handle_scrub_stats(ref_cast<MMDSScrubStats>(m));
+    break;
+
+  default:
+    derr << " scrub stack unknown message " << m->get_type() << dendl_impl;
+    ceph_abort_msg("scrub stack unknown message");
+  }
+}
+
+void ScrubStack::handle_scrub(const cref_t<MMDSScrub> &m)
+{
+
+  mds_rank_t from = mds_rank_t(m->get_source().num());
+  dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
+
+  switch (m->get_op()) {
+  case MMDSScrub::OP_QUEUEDIR:
+    {
+      CInode *diri = mdcache->get_inode(m->get_ino());
+      ceph_assert(diri);
+
+      std::vector<CDir*> dfs;
+      MDSGatherBuilder gather(g_ceph_context);
+      for (const auto& fg : m->get_frags()) {
+       CDir *dir = diri->get_dirfrag(fg);
+       if (!dir) {
+         dout(10) << __func__ << " no frag " << fg << dendl;
+         continue;
+       }
+       if (!dir->is_auth()) {
+         dout(10) << __func__ << " not auth " << *dir << dendl;
+         continue;
+       }
+       if (!dir->can_auth_pin()) {
+         dout(10) << __func__ << " can't auth pin " << *dir <<  dendl;
+         dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
+         continue;
+       }
+       dfs.push_back(dir);
+      }
+
+      if (gather.has_subs()) {
+       gather.set_finisher(new C_MDS_RetryMessage(mdcache->mds, m));
+       gather.activate();
+       return;
+      }
+
+      fragset_t queued;
+      if (!dfs.empty()) {
+       ScrubHeaderRef header;
+       if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
+         header = it->second;
+       } else {
+         header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
+                                                m->is_force(), m->is_recursive(),
+                                                m->is_repair());
+         header->set_origin(m->get_origin());
+         scrubbing_map.emplace(header->get_tag(), header);
+       }
+       for (auto dir : dfs) {
+         queued.insert_raw(dir->get_frag());
+         _enqueue(dir, header, true);
+       }
+       queued.simplify();
+       kick_off_scrubs();
+      }
+
+      auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR_ACK, m->get_ino(),
+                                      std::move(queued), m->get_tag());
+      mdcache->mds->send_message_mds(r, from);
+    }
+    break;
+  case MMDSScrub::OP_QUEUEDIR_ACK:
+    {
+      CInode *diri = mdcache->get_inode(m->get_ino());
+      ceph_assert(diri);
+      auto it = remote_scrubs.find(diri);
+      if (it != remote_scrubs.end() &&
+         m->get_tag() == it->second.tag) {
+       if (it->second.gather_set.erase(from)) {
+         auto &queued = diri->scrub_queued_frags();
+         for (auto &fg : m->get_frags())
+           queued.insert_raw(fg);
+         queued.simplify();
+
+         if (it->second.gather_set.empty()) {
+           remote_scrubs.erase(it);
+
+           const auto& header = diri->get_scrub_header();
+           header->set_epoch_last_forwarded(scrub_epoch);
+           remove_from_waiting(diri);
+         }
+       }
+      }
+    }
+    break;
+  case MMDSScrub::OP_QUEUEINO:
+    {
+      CInode *in = mdcache->get_inode(m->get_ino());
+      ceph_assert(in);
+
+      ScrubHeaderRef header;
+      if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
+       header = it->second;
+      } else {
+       header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
+                                              m->is_force(), m->is_recursive(),
+                                              m->is_repair());
+       header->set_origin(m->get_origin());
+       scrubbing_map.emplace(header->get_tag(), header);
+      }
+
+      _enqueue(in, header, true);
+      in->scrub_queued_frags() = m->get_frags();
+      kick_off_scrubs();
+
+      fragset_t queued;
+      auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO_ACK, m->get_ino(),
+                                      std::move(queued), m->get_tag());
+      mdcache->mds->send_message_mds(r, from);
+    }
+    break;
+  case MMDSScrub::OP_QUEUEINO_ACK:
+    {
+      CInode *in = mdcache->get_inode(m->get_ino());
+      ceph_assert(in);
+      auto it = remote_scrubs.find(in);
+      if (it != remote_scrubs.end() &&
+         m->get_tag() == it->second.tag &&
+         it->second.gather_set.erase(from)) {
+       ceph_assert(it->second.gather_set.empty());
+       remote_scrubs.erase(it);
+
+       remove_from_waiting(in, false);
+       dequeue(in);
+
+       const auto& header = in->get_scrub_header();
+       header->set_epoch_last_forwarded(scrub_epoch);
+       in->scrub_finished();
+
+       kick_off_scrubs();
+      }
+    }
+    break;
+  case MMDSScrub::OP_ABORT:
+    scrub_abort(nullptr);
+    break;
+  case MMDSScrub::OP_PAUSE:
+    scrub_pause(nullptr);
+    break;
+  case MMDSScrub::OP_RESUME:
+    scrub_resume();
+    break;
+  default:
+    derr << " scrub stack unknown scrub operation " << m->get_op() << dendl_impl;
+    ceph_abort_msg("scrub stack unknown scrub operation");
+  }
+}
+
+void ScrubStack::handle_scrub_stats(const cref_t<MMDSScrubStats> &m)
+{
+  mds_rank_t from = mds_rank_t(m->get_source().num());
+  dout(7) << __func__ << " " << *m << " from mds." << from << dendl;
+
+  if (from == 0) {
+    if (scrub_epoch != m->get_epoch() - 1) {
+      scrub_epoch = m->get_epoch() - 1;
+      for (auto& p : scrubbing_map) {
+       if (p.second->get_epoch_last_forwarded())
+         p.second->set_epoch_last_forwarded(scrub_epoch);
+      }
+    }
+    bool any_finished = false;
+    bool any_repaired = false;
+    std::set<std::string> scrubbing_tags;
+    for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
+      auto& header = it->second;
+      if (header->get_num_pending() ||
+         header->get_epoch_last_forwarded() >= scrub_epoch) {
+       scrubbing_tags.insert(it->first);
+       ++it;
+      } else if (m->is_finished(it->first)) {
+       any_finished = true;
+       if (header->get_repaired())
+         any_repaired = true;
+       scrubbing_map.erase(it++);
+      } else {
+       ++it;
+      }
+    }
+
+    scrub_epoch = m->get_epoch();
+
+    auto ack = make_message<MMDSScrubStats>(scrub_epoch,
+                                           std::move(scrubbing_tags), clear_stack);
+    mdcache->mds->send_message_mds(ack, 0);
+
+    if (any_finished)
+      clog_scrub_summary();
+    if (any_repaired)
+      mdcache->mds->mdlog->trim_all();
+  } else {
+    if (scrub_epoch == m->get_epoch() &&
+       (size_t)from < mds_scrub_stats.size()) {
+      auto& stat = mds_scrub_stats[from];
+      stat.epoch_acked = m->get_epoch();
+      stat.scrubbing_tags = m->get_scrubbing_tags();
+      stat.aborting = m->is_aborting();
+    }
+  }
+}
+
+void ScrubStack::advance_scrub_status()
+{
+  if (!scrub_any_peer_aborting && scrubbing_map.empty())
+    return;
+
+  MDSRank *mds = mdcache->mds;
+
+  set<mds_rank_t> up_mds;
+  mds->get_mds_map()->get_up_mds_set(up_mds);
+  auto up_max = *up_mds.rbegin();
+
+  bool update_scrubbing = false;
+  std::set<std::string> scrubbing_tags;
+
+  if (up_max == 0) {
+    update_scrubbing = true;
+    scrub_any_peer_aborting = false;
+  } else if (mds_scrub_stats.size() > (size_t)(up_max)) {
+    bool any_aborting = false;
+    bool fully_acked = true;
+    for (const auto& stat : mds_scrub_stats) {
+      if (stat.aborting || stat.epoch_acked <= scrub_epoch_last_abort)
+       any_aborting = true;
+      if (stat.epoch_acked != scrub_epoch) {
+       fully_acked = false;
+       continue;
+      }
+      scrubbing_tags.insert(stat.scrubbing_tags.begin(),
+                           stat.scrubbing_tags.end());
+    }
+    if (!any_aborting)
+      scrub_any_peer_aborting = false;
+    if (fully_acked) {
+      // handle_scrub_stats() reports scrub is still in-progress if it has
+      // forwarded any object to other mds since previous epoch. Let's assume,
+      // at time 'A', we got scrub stats from all mds for previous epoch. If
+      // a scrub is not reported by any mds, we know there is no forward of
+      // the scrub since time 'A'. So we can consider the scrub is finished.
+      if (scrub_epoch_fully_acked + 1 == scrub_epoch)
+       update_scrubbing = true;
+      scrub_epoch_fully_acked = scrub_epoch;
+    }
+  }
+
+  if (mds_scrub_stats.size() != (size_t)up_max + 1)
+    mds_scrub_stats.resize((size_t)up_max + 1);
+  mds_scrub_stats.at(0).epoch_acked = scrub_epoch + 1;
+
+  bool any_finished = false;
+  bool any_repaired = false;
+
+  for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
+    auto& header = it->second;
+    if (header->get_num_pending() ||
+       header->get_epoch_last_forwarded() >= scrub_epoch) {
+      if (update_scrubbing && up_max != 0)
+       scrubbing_tags.insert(it->first);
+      ++it;
+    } else if (update_scrubbing && !scrubbing_tags.count(it->first)) {
+      // no longer being scrubbed globally
+      any_finished = true;
+      if (header->get_repaired())
+       any_repaired = true;
+      scrubbing_map.erase(it++);
+    } else {
+      ++it;
+    }
+  }
+
+  ++scrub_epoch;
+
+  for (auto& r : up_mds) {
+    if (r == 0)
+      continue;
+    auto m = update_scrubbing ?
+       make_message<MMDSScrubStats>(scrub_epoch, scrubbing_tags) :
+       make_message<MMDSScrubStats>(scrub_epoch);
+    mds->send_message_mds(m, r);
+  }
+
+  if (any_finished)
+    clog_scrub_summary();
+  if (any_repaired)
+    mdcache->mds->mdlog->trim_all();
+}
+
+void ScrubStack::handle_mds_failure(mds_rank_t mds)
+{
+  if (mds == 0) {
+    scrub_abort(nullptr);
+    return;
+  }
+
+  bool kick = false;
+  for (auto it = remote_scrubs.begin(); it != remote_scrubs.end(); ) {
+    if (it->second.gather_set.erase(mds) &&
+       it->second.gather_set.empty()) {
+      CInode *in = it->first;
+      remote_scrubs.erase(it++);
+      remove_from_waiting(in, false);
+      kick = true;
+    } else {
+      ++it;
+    }
+  }
+  if (kick)
+    kick_off_scrubs();
+}