]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PG.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / osd / PG.cc
index e50a22a47b89385a674f661cf1e967e29c388b89..df8c4f3451d4adce2a00aba69017727520e84b14 100644 (file)
  */
 
 #include "PG.h"
-// #include "msg/Messenger.h"
 #include "messages/MOSDRepScrub.h"
-// #include "common/cmdparse.h"
-// #include "common/ceph_context.h"
 
 #include "common/errno.h"
 #include "common/ceph_releases.h"
@@ -24,6 +21,7 @@
 #include "OSD.h"
 #include "OpRequest.h"
 #include "ScrubStore.h"
+#include "pg_scrubber.h"
 #include "Session.h"
 #include "osd/scheduler/OpSchedulerItem.h"
 
@@ -32,7 +30,6 @@
 
 #include "messages/MOSDOp.h"
 #include "messages/MOSDPGNotify.h"
-// #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGScan.h"
 #include "messages/MOSDPGBackfill.h"
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 
+using std::list;
+using std::map;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
 using namespace ceph::osd::scheduler;
 
 template <class T>
@@ -197,7 +210,6 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   pg_stats_publish_valid(false),
   finish_sync_event(NULL),
   scrub_after_recovery(false),
-  save_req_scrub(false),
   active_pushes(0),
   recovery_state(
     o->cct,
@@ -293,7 +305,7 @@ void PG::log_state_exit(
   osd->pg_recovery_stats.log_exit(
     state_name, ceph_clock_now() - enter_time, events, event_dur);
 }
-  
+
 /********* PG **********/
 
 void PG::remove_snap_mapped_object(
@@ -341,6 +353,8 @@ void PG::update_object_snap_mapping(
 /******* PG ***********/
 void PG::clear_primary_state()
 {
+  dout(20) << __func__ << dendl;
+
   projected_log = PGLog::IndexedLog();
 
   snap_trimq.clear();
@@ -348,29 +362,14 @@ void PG::clear_primary_state()
   finish_sync_event = 0;  // so that _finish_recovery doesn't go off in another thread
   release_pg_backoffs();
 
-  scrubber.reserved_peers.clear();
+  if (m_scrubber) {
+    m_scrubber->discard_replica_reservations();
+  }
   scrub_after_recovery = false;
-  save_req_scrub = false;
 
   agent_clear();
 }
 
-PG::Scrubber::Scrubber()
- : local_reserved(false), remote_reserved(false), reserve_failed(false),
-   epoch_start(0),
-   active(false),
-   shallow_errors(0), deep_errors(0), fixed(0),
-   must_scrub(false), must_deep_scrub(false), must_repair(false),
-   need_auto(false), req_scrub(false), time_for_deep(false),
-   auto_repair(false),
-   check_repair(false),
-   deep_scrub_on_error(false),
-   num_digest_updates_pending(0),
-   state(INACTIVE),
-   deep(false)
-{}
-
-PG::Scrubber::~Scrubber() {}
 
 bool PG::op_has_sufficient_caps(OpRequestRef& op)
 {
@@ -414,20 +413,6 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
   return cap;
 }
 
-bool PG::requeue_scrub(bool high_priority)
-{
-  ceph_assert(ceph_mutex_is_locked(_lock));
-  if (scrub_queued) {
-    dout(10) << __func__ << ": already queued" << dendl;
-    return false;
-  } else {
-    dout(10) << __func__ << ": queueing" << dendl;
-    scrub_queued = true;
-    osd->queue_for_scrub(this, high_priority);
-    return true;
-  }
-}
-
 void PG::queue_recovery()
 {
   if (!is_primary() || !is_peered()) {
@@ -442,41 +427,36 @@ void PG::queue_recovery()
   }
 }
 
-bool PG::queue_scrub()
+void PG::queue_scrub_after_repair()
 {
+  dout(10) << __func__ << dendl;
   ceph_assert(ceph_mutex_is_locked(_lock));
+
+  m_planned_scrub.must_deep_scrub = true;
+  m_planned_scrub.check_repair = true;
+  m_planned_scrub.must_scrub = true;
+
   if (is_scrubbing()) {
-    return false;
+    dout(10) << __func__ << ": scrubbing already" << dendl;
+    return;
   }
-  // An interrupted recovery repair could leave this set.
-  state_clear(PG_STATE_REPAIR);
-  if (scrubber.need_auto) {
-    scrubber.must_scrub = true;
-    scrubber.must_deep_scrub = true;
-    scrubber.auto_repair = true;
-    scrubber.need_auto = false;
-  }
-  scrubber.priority = scrubber.must_scrub ?
-         cct->_conf->osd_requested_scrub_priority : get_scrub_priority();
-  scrubber.must_scrub = false;
-  state_set(PG_STATE_SCRUBBING);
-  if (scrubber.must_deep_scrub) {
-    state_set(PG_STATE_DEEP_SCRUB);
-    scrubber.must_deep_scrub = false;
-  }
-  if (scrubber.must_repair || scrubber.auto_repair) {
-    state_set(PG_STATE_REPAIR);
-    scrubber.must_repair = false;
-  }
-  requeue_scrub();
-  return true;
+  if (scrub_queued) {
+    dout(10) << __func__ << ": already queued" << dendl;
+    return;
+  }
+
+  m_scrubber->set_op_parameters(m_planned_scrub);
+  dout(15) << __func__ << ": queueing" << dendl;
+
+  scrub_queued = true;
+  osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
 }
 
 unsigned PG::get_scrub_priority()
 {
   // a higher value -> a higher priority
-  int64_t pool_scrub_priority = 0;
-  pool.info.opts.get(pool_opts_t::SCRUB_PRIORITY, &pool_scrub_priority);
+  int64_t pool_scrub_priority =
+    pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
   return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
 }
 
@@ -494,8 +474,11 @@ Context *PG::finish_recovery()
   return finish_sync_event;
 }
 
-void PG::_finish_recovery(Context *c)
+void PG::_finish_recovery(Contextc)
 {
+  dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
+                << is_clean() << dendl;
+
   std::scoped_lock locker{*this};
   if (recovery_state.is_deleting() || !is_clean()) {
     dout(10) << __func__ << " raced with delete or repair" << dendl;
@@ -504,7 +487,7 @@ void PG::_finish_recovery(Context *c)
   // When recovery is initiated by a repair, that flag is left on
   state_clear(PG_STATE_REPAIR);
   if (c == finish_sync_event) {
-    dout(10) << "_finish_recovery" << dendl;
+    dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
     finish_sync_event = 0;
     recovery_state.purge_strays();
 
@@ -513,11 +496,7 @@ void PG::_finish_recovery(Context *c)
     if (scrub_after_recovery) {
       dout(10) << "_finish_recovery requeueing for scrub" << dendl;
       scrub_after_recovery = false;
-      scrubber.must_deep_scrub = true;
-      scrubber.check_repair = true;
-      // We remember whether req_scrub was set when scrub_after_recovery set to true
-      scrubber.req_scrub = save_req_scrub;
-      queue_scrub();
+      queue_scrub_after_repair();
     }
   } else {
     dout(10) << "_finish_recovery -- stale" << dendl;
@@ -543,7 +522,7 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
 {
   dout(10) << "finish_recovery_op " << soid
 #ifdef DEBUG_RECOVERY_OIDS
-          << " (" << recovering_oids << ")" 
+          << " (" << recovering_oids << ")"
 #endif
           << dendl;
   ceph_assert(recovery_ops_active > 0);
@@ -745,7 +724,7 @@ void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
   }
 }
 
-void PG::clear_recovery_state() 
+void PG::clear_recovery_state()
 {
   dout(10) << "clear_recovery_state" << dendl;
 
@@ -783,13 +762,12 @@ void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
 }
 
 void PG::send_cluster_message(
-  int target, Message *m,
+  int target, MessageRef m,
   epoch_t epoch, bool share_map_update=false)
 {
   ConnectionRef con = osd->get_con_osd_cluster(
     target, get_osdmap_epoch());
   if (!con) {
-    m->put();
     return;
   }
 
@@ -1185,16 +1163,14 @@ void PG::update_snap_map(
   const vector<pg_log_entry_t> &log_entries,
   ObjectStore::Transaction &t)
 {
-  for (vector<pg_log_entry_t>::const_iterator i = log_entries.begin();
-       i != log_entries.end();
-       ++i) {
+  for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
     OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
     if (i->soid.snap < CEPH_MAXSNAP) {
       if (i->is_delete()) {
        int r = snap_mapper.remove_oid(
          i->soid,
          &_t);
-       if (r != 0)
+       if (r)
          derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
         // On removal tolerate missing key corruption
         ceph_assert(r == 0 || r == -ENOENT);
@@ -1266,9 +1242,7 @@ void PG::filter_snapc(vector<snapid_t> &snaps)
 
 void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
 {
-  for (map<hobject_t, list<OpRequestRef>>::iterator it = m.begin();
-       it != m.end();
-       ++it)
+  for (auto it = m.begin(); it != m.end(); ++it)
     requeue_ops(it->second);
   m.clear();
 }
@@ -1330,233 +1304,249 @@ void PG::requeue_map_waiters()
   }
 }
 
+bool PG::get_must_scrub() const
+{
+  dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
+  return m_planned_scrub.must_scrub;
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
+{
+  return m_scrubber->scrub_requeue_priority(with_priority);
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
+{
+  return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
+}
 
 // ==========================================================================================
 // SCRUB
 
 /*
- * when holding pg and sched_scrub_lock, then the states are:
- *   scheduling:
- *     scrubber.local_reserved = true
- *     scrubber.active = false
- *     scrubber.reserved_peers includes whoami
- *     osd->scrubs_local++
- *   scheduling, replica declined:
- *     scrubber.local_reserved = true
- *     scrubber.reserved_peers includes -1
- *     osd->scrub_local++
- *   pending:
- *     scrubber.local_reserved = true
- *     scrubber.active = false
- *     scrubber.reserved_peers.size() == acting.size();
- *     pg on scrub_wq
- *     osd->scrub_local++
- *   scrubbing:
- *     scrubber.local_reserved = true;
- *     scrubber.active = true
- *     scrubber.reserved_peers empty
+ *  implementation note:
+ *  PG::sched_scrub() is called only once per a specific scrub session.
+ *  That call commits us to the whatever choices are made (deep/shallow, etc').
+ *  Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
+ *  PgScrubber's m_flags, then cleared.
  */
-
-// returns true if a scrub has been newly kicked off
 bool PG::sched_scrub()
 {
+  dout(15) << __func__ << " pg(" << info.pgid
+         << (is_active() ? ") <active>" : ") <not-active>")
+         << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
   ceph_assert(ceph_mutex_is_locked(_lock));
   ceph_assert(!is_scrubbing());
-  if (!(is_primary() && is_active() && is_clean())) {
+
+  if (!is_primary() || !is_active() || !is_clean()) {
     return false;
   }
 
-  // All processing the first time through commits us to whatever
-  // choices are made.
-  if (!scrubber.local_reserved) {
-    dout(20) << __func__ << ": Start processing pg " << info.pgid << dendl;
+  if (scrub_queued) {
+    // only applicable to the very first time a scrub event is queued
+    // (until handled and posted to the scrub FSM)
+    dout(10) << __func__ << ": already queued" << dendl;
+    return false;
+  }
 
-    bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
-                      pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
-    bool allow_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
-                 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
-    bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
-    bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair
-                               && get_pgbackend()->auto_repair_supported());
+  // analyse the combination of the requested scrub flags, the osd/pool configuration
+  // and the PG status to determine whether we should scrub now, and what type of scrub
+  // should that be.
+  auto updated_flags = verify_scrub_mode();
+  if (!updated_flags) {
+    // the stars do not align for starting a scrub for this PG at this time
+    // (due to configuration or priority issues)
+    // The reason was already reported by the callee.
+    dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
+    return false;
+  }
 
-    scrubber.time_for_deep = false;
-    // Clear these in case user issues the scrub/repair command during
-    // the scheduling of the scrub/repair (e.g. request reservation)
-    scrubber.deep_scrub_on_error = false;
-    scrubber.auto_repair = false;
+  // try to reserve the local OSD resources. If failing: no harm. We will
+  // be retried by the OSD later on.
+  if (!m_scrubber->reserve_local()) {
+    dout(10) << __func__ << ": failed to reserve locally" << dendl;
+    return false;
+  }
 
-    // All periodic scrub handling goes here because must_scrub is
-    // always set for must_deep_scrub and must_repair.
-    if (!scrubber.must_scrub) {
-      ceph_assert(!scrubber.must_deep_scrub && !scrubber.must_repair);
-      // Handle deep scrub determination only if allowed
-      if (allow_deep_scrub) {
-        // Initial entry and scheduled scrubs without nodeep_scrub set get here
-        if (scrubber.need_auto) {
-         dout(20) << __func__ << ": need repair after scrub errors" << dendl;
-          scrubber.time_for_deep = true;
-        } else {
-          double deep_scrub_interval = 0;
-          pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &deep_scrub_interval);
-          if (deep_scrub_interval <= 0) {
-           deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
-          }
-          scrubber.time_for_deep = ceph_clock_now() >=
-                 info.history.last_deep_scrub_stamp + deep_scrub_interval;
-
-          bool deep_coin_flip = false;
-         // If we randomize when !allow_scrub && allow_deep_scrub, then it guarantees
-         // we will deep scrub because this function is called often.
-         if (!scrubber.time_for_deep && allow_scrub)
-           deep_coin_flip = (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
-          dout(20) << __func__ << ": time_for_deep=" << scrubber.time_for_deep << " deep_coin_flip=" << deep_coin_flip << dendl;
-
-          scrubber.time_for_deep = (scrubber.time_for_deep || deep_coin_flip);
-        }
+  // can commit to the updated flags now, as nothing will stop the scrub
+  m_planned_scrub = *updated_flags;
 
-        if (!scrubber.time_for_deep && has_deep_errors) {
-         osd->clog->info() << "osd." << osd->whoami
-                           << " pg " << info.pgid
-                           << " Deep scrub errors, upgrading scrub to deep-scrub";
-         scrubber.time_for_deep = true;
-        }
+  // An interrupted recovery repair could leave this set.
+  state_clear(PG_STATE_REPAIR);
 
-        if (try_to_auto_repair) {
-          if (scrubber.time_for_deep) {
-            dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
-            scrubber.auto_repair = true;
-          } else if (allow_scrub) {
-            dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found" << dendl;
-            scrubber.deep_scrub_on_error = true;
-          }
-        }
-      } else { // !allow_deep_scrub
-        dout(20) << __func__ << ": nodeep_scrub set" << dendl;
-        if (has_deep_errors) {
-          osd->clog->error() << "osd." << osd->whoami
-                            << " pg " << info.pgid
-                            << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
-          return false;
-        }
-      }
+  // Pass control to the scrubber. It is the scrubber that handles the replicas'
+  // resources reservations.
+  m_scrubber->set_op_parameters(m_planned_scrub);
 
-      //NOSCRUB so skip regular scrubs
-      if (!allow_scrub && !scrubber.time_for_deep) {
-        return false;
-      }
-    // scrubber.must_scrub
-    } else if (!scrubber.must_deep_scrub && has_deep_errors) {
-       osd->clog->error() << "osd." << osd->whoami
-                          << " pg " << info.pgid
-                          << " Regular scrub request, deep-scrub details will be lost";
-    }
-    // Unless precluded this was handle above
-    scrubber.need_auto = false;
-
-    ceph_assert(scrubber.reserved_peers.empty());
-    bool allow_scrubing = cct->_conf->osd_scrub_during_recovery ||
-                          (cct->_conf->osd_repair_during_recovery && scrubber.must_repair) ||
-                          !osd->is_recovery_active();
-    if (allow_scrubing &&
-         osd->inc_scrubs_local()) {
-      dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
-      scrubber.local_reserved = true;
-      scrubber.reserved_peers.insert(pg_whoami);
-      scrub_reserve_replicas();
-    } else {
-      dout(20) << __func__ << ": failed to reserve locally" << dendl;
-      return false;
-    }
+  dout(10) << __func__ << ": queueing" << dendl;
+
+  scrub_queued = true;
+  osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
+  return true;
+}
+
+double PG::next_deepscrub_interval() const
+{
+  double deep_scrub_interval =
+    pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
+  if (deep_scrub_interval <= 0.0)
+    deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
+  return info.history.last_deep_scrub_stamp + deep_scrub_interval;
+}
+
+bool PG::is_time_for_deep(bool allow_deep_scrub,
+                         bool allow_scrub,
+                         bool has_deep_errors,
+                         const requested_scrub_t& planned) const
+{
+  dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? " << allow_deep_scrub << dendl;
+
+  if (!allow_deep_scrub)
+    return false;
+
+  if (planned.need_auto) {
+    dout(10) << __func__ << ": need repair after scrub errors" << dendl;
+    return true;
+  }
+
+  if (ceph_clock_now() >= next_deepscrub_interval())
+    return true;
+
+  if (has_deep_errors) {
+    osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
+                     << " Deep scrub errors, upgrading scrub to deep-scrub";
+    return true;
   }
 
-  if (scrubber.local_reserved) {
-    if (scrubber.reserve_failed) {
-      dout(20) << __func__ << ": failed, a peer declined" << dendl;
-      clear_scrub_reserved();
-      scrub_unreserve_replicas();
+  // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
+  // called often, we will probably be deep-scrubbing most of the time.
+  if (allow_scrub) {
+    bool deep_coin_flip =
+      (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
+
+    dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
+            << " deep_coin_flip=" << deep_coin_flip << dendl;
+
+    if (deep_coin_flip)
+      return true;
+  }
+
+  return false;
+}
+
+bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
+                             bool try_to_auto_repair,
+                             bool allow_regular_scrub,
+                             bool has_deep_errors,
+                             requested_scrub_t& planned) const
+
+{
+  ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
+
+  if (!allow_deep_scrub && has_deep_errors) {
+      osd->clog->error()
+       << "osd." << osd->whoami << " pg " << info.pgid
+       << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
       return false;
-    } else if (scrubber.reserved_peers.size() == get_actingset().size()) {
-      dout(20) << __func__ << ": success, reserved self and replicas" << dendl;
-      if (scrubber.time_for_deep) {
-       dout(10) << __func__ << ": scrub will be deep" << dendl;
-       state_set(PG_STATE_DEEP_SCRUB);
-       scrubber.time_for_deep = false;
+  }
+
+  if (allow_deep_scrub) {
+    // Initial entry and scheduled scrubs without nodeep_scrub set get here
+
+    planned.time_for_deep =
+      is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
+
+    if (try_to_auto_repair) {
+      if (planned.time_for_deep) {
+       dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
+       planned.auto_repair = true;
+      } else if (allow_regular_scrub) {
+       dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
+                << dendl;
+       planned.deep_scrub_on_error = true;
       }
-      queue_scrub();
-    } else {
-      // none declined, since scrubber.reserved is set
-      dout(20) << __func__ << ": reserved " << scrubber.reserved_peers
-              << ", waiting for replicas" << dendl;
     }
   }
+
+  dout(20) << __func__ << " updated flags: " << planned
+          << " allow_regular_scrub: " << allow_regular_scrub << dendl;
+
+  // NOSCRUB so skip regular scrubs
+  if (!allow_regular_scrub && !planned.time_for_deep) {
+    return false;
+  }
+
   return true;
 }
 
-bool PG::is_scrub_registered()
+std::optional<requested_scrub_t> PG::verify_scrub_mode() const
 {
-  return !scrubber.scrub_reg_stamp.is_zero();
-}
+  dout(10) << __func__ << " processing pg " << info.pgid << dendl;
 
-void PG::reg_next_scrub()
-{
-  if (!is_primary())
-    return;
+  bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
+                           pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
+  bool allow_regular_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
+                              pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
+  bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
+  bool try_to_auto_repair =
+    (cct->_conf->osd_scrub_auto_repair && get_pgbackend()->auto_repair_supported());
 
-  utime_t reg_stamp;
-  bool must = false;
-  if (scrubber.must_scrub || scrubber.need_auto) {
-    // Set the smallest time that isn't utime_t()
-    reg_stamp = Scrubber::scrub_must_stamp();
-    must = true;
-  } else if (info.stats.stats_invalid && cct->_conf->osd_scrub_invalid_stats) {
-    reg_stamp = ceph_clock_now();
-    must = true;
-  } else {
-    reg_stamp = info.history.last_scrub_stamp;
+  auto upd_flags = m_planned_scrub;
+
+  upd_flags.time_for_deep = false;
+  // Clear these in case user issues the scrub/repair command during
+  // the scheduling of the scrub/repair (e.g. request reservation)
+  upd_flags.deep_scrub_on_error = false;
+  upd_flags.auto_repair = false;
+
+  if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
+    osd->clog->error() << "osd." << osd->whoami << " pg " << info.pgid
+                      << " Regular scrub request, deep-scrub details will be lost";
+  }
+
+  if (!upd_flags.must_scrub) {
+    // All periodic scrub handling goes here because must_scrub is
+    // always set for must_deep_scrub and must_repair.
+
+    bool can_start_periodic =
+      verify_periodic_scrub_mode(allow_deep_scrub, try_to_auto_repair,
+                                allow_regular_scrub, has_deep_errors, upd_flags);
+    if (!can_start_periodic) {
+      return std::nullopt;
+    }
   }
-  // note down the sched_time, so we can locate this scrub, and remove it
-  // later on.
-  double scrub_min_interval = 0, scrub_max_interval = 0;
-  pool.info.opts.get(pool_opts_t::SCRUB_MIN_INTERVAL, &scrub_min_interval);
-  pool.info.opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &scrub_max_interval);
-  ceph_assert(!is_scrub_registered());
-  scrubber.scrub_reg_stamp = osd->reg_pg_scrub(info.pgid,
-                                              reg_stamp,
-                                              scrub_min_interval,
-                                              scrub_max_interval,
-                                              must);
-  dout(10) << __func__ << " pg " << pg_id << " register next scrub, scrub time "
-      << scrubber.scrub_reg_stamp << ", must = " << (int)must << dendl;
+
+  //  scrubbing while recovering?
+
+  bool prevented_by_recovery =
+    osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
+    (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
+
+  if (prevented_by_recovery) {
+    dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
+    return std::nullopt;
+  }
+
+  upd_flags.need_auto = false;
+  return upd_flags;
 }
 
-void PG::unreg_next_scrub()
+void PG::reg_next_scrub()
 {
-  if (is_scrub_registered()) {
-    osd->unreg_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
-    scrubber.scrub_reg_stamp = utime_t();
-  }
+  m_scrubber->reg_next_scrub(m_planned_scrub);
 }
 
 void PG::on_info_history_change()
 {
-  unreg_next_scrub();
-  reg_next_scrub();
+  if (m_scrubber) {
+    m_scrubber->unreg_next_scrub();
+    m_scrubber->reg_next_scrub(m_planned_scrub);
+  }
 }
 
-void PG::scrub_requested(bool deep, bool repair, bool need_auto)
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
 {
-  unreg_next_scrub();
-  if (need_auto) {
-    scrubber.need_auto = true;
-  } else {
-    scrubber.must_scrub = true;
-    scrubber.must_deep_scrub = deep || repair;
-    scrubber.must_repair = repair;
-    // User might intervene, so clear this
-    scrubber.need_auto = false;
-    scrubber.req_scrub = true;
-  }
-  reg_next_scrub();
+  m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
 }
 
 void PG::clear_ready_to_merge() {
@@ -1577,6 +1567,7 @@ void PG::on_role_change() {
 }
 
 void PG::on_new_interval() {
+  dout(20) << __func__ << " scrub_queued was " << scrub_queued << " flags: " << m_planned_scrub << dendl;
   scrub_queued = false;
   projected_last_update = eversion_t();
   cancel_recovery();
@@ -1611,15 +1602,15 @@ void PG::schedule_event_after(
 
 void PG::request_local_background_io_reservation(
   unsigned priority,
-  PGPeeringEventRef on_grant,
-  PGPeeringEventRef on_preempt) {
+  PGPeeringEventURef on_grant,
+  PGPeeringEventURef on_preempt) {
   osd->local_reserver.request_reservation(
     pg_id,
     on_grant ? new QueuePeeringEvt(
-      this, on_grant) : nullptr,
+      this, std::move(on_grant)) : nullptr,
     priority,
     on_preempt ? new QueuePeeringEvt(
-      this, on_preempt) : nullptr);
+      this, std::move(on_preempt)) : nullptr);
 }
 
 void PG::update_local_background_io_priority(
@@ -1636,15 +1627,15 @@ void PG::cancel_local_background_io_reservation() {
 
 void PG::request_remote_recovery_reservation(
   unsigned priority,
-  PGPeeringEventRef on_grant,
-  PGPeeringEventRef on_preempt) {
+  PGPeeringEventURef on_grant,
+  PGPeeringEventURef on_preempt) {
   osd->remote_reserver.request_reservation(
     pg_id,
     on_grant ? new QueuePeeringEvt(
-      this, on_grant) : nullptr,
+      this, std::move(on_grant)) : nullptr,
     priority,
     on_preempt ? new QueuePeeringEvt(
-      this, on_preempt) : nullptr);
+      this, std::move(on_preempt)) : nullptr);
 }
 
 void PG::cancel_remote_recovery_reservation() {
@@ -1659,6 +1650,15 @@ void PG::schedule_event_on_commit(
   t.register_on_commit(new QueuePeeringEvt(this, on_commit));
 }
 
+void PG::on_activate(interval_set<snapid_t> snaps)
+{
+  ceph_assert(!m_scrubber->are_callbacks_pending());
+  ceph_assert(callbacks_for_degraded_object.empty());
+  snap_trimq = snaps;
+  release_pg_backoffs();
+  projected_last_update = info.last_update;
+}
+
 void PG::on_active_exit()
 {
   backfill_reserving = false;
@@ -1864,133 +1864,6 @@ void PG::on_activate_committed()
   }
 }
 
-void PG::do_replica_scrub_map(OpRequestRef op)
-{
-  auto m = op->get_req<MOSDRepScrubMap>();
-  dout(7) << __func__ << " " << *m << dendl;
-  if (m->map_epoch < info.history.same_interval_since) {
-    dout(10) << __func__ << " discarding old from "
-            << m->map_epoch << " < " << info.history.same_interval_since
-            << dendl;
-    return;
-  }
-  if (!scrubber.is_chunky_scrub_active()) {
-    dout(10) << __func__ << " scrub isn't active" << dendl;
-    return;
-  }
-
-  op->mark_started();
-
-  auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
-  scrubber.received_maps[m->from].decode(p, info.pgid.pool());
-  dout(10) << "map version is "
-          << scrubber.received_maps[m->from].valid_through
-          << dendl;
-
-  dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
-          << dendl;
-  ceph_assert(scrubber.waiting_on_whom.count(m->from));
-  scrubber.waiting_on_whom.erase(m->from);
-  if (m->preempted) {
-    dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
-    scrub_preempted = true;
-  }
-  if (scrubber.waiting_on_whom.empty()) {
-    requeue_scrub(ops_blocked_by_scrub());
-  }
-}
-
-// send scrub v3 messages (chunky scrub)
-void PG::_request_scrub_map(
-  pg_shard_t replica, eversion_t version,
-  hobject_t start, hobject_t end,
-  bool deep,
-  bool allow_preemption)
-{
-  ceph_assert(replica != pg_whoami);
-  dout(10) << "scrub  requesting scrubmap from osd." << replica
-          << " deep " << (int)deep << dendl;
-  MOSDRepScrub *repscrubop = new MOSDRepScrub(
-    spg_t(info.pgid.pgid, replica.shard), version,
-    get_osdmap_epoch(),
-    get_last_peering_reset(),
-    start, end, deep,
-    allow_preemption,
-    scrubber.priority,
-    ops_blocked_by_scrub());
-  // default priority, we want the rep scrub processed prior to any recovery
-  // or client io messages (we are holding a lock!)
-  osd->send_message_osd_cluster(
-    replica.osd, repscrubop, get_osdmap_epoch());
-}
-
-void PG::handle_scrub_reserve_request(OpRequestRef op)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  if (scrubber.remote_reserved) {
-    dout(10) << __func__ << " ignoring reserve request: Already reserved"
-            << dendl;
-    return;
-  }
-  if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
-      osd->inc_scrubs_remote()) {
-    scrubber.remote_reserved = true;
-  } else {
-    dout(20) << __func__ << ": failed to reserve remotely" << dendl;
-    scrubber.remote_reserved = false;
-  }
-  auto m = op->get_req<MOSDScrubReserve>();
-  Message *reply = new MOSDScrubReserve(
-    spg_t(info.pgid.pgid, get_primary().shard),
-    m->map_epoch,
-    scrubber.remote_reserved ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT,
-    pg_whoami);
-  osd->send_message_osd_cluster(reply, op->get_req()->get_connection());
-}
-
-void PG::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  if (!scrubber.local_reserved) {
-    dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
-    return;
-  }
-  if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
-    dout(10) << " already had osd." << from << " reserved" << dendl;
-  } else {
-    dout(10) << " osd." << from << " scrub reserve = success" << dendl;
-    scrubber.reserved_peers.insert(from);
-    sched_scrub();
-  }
-}
-
-void PG::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  if (!scrubber.local_reserved) {
-    dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
-    return;
-  }
-  if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
-    dout(10) << " already had osd." << from << " reserved" << dendl;
-  } else {
-    /* One decline stops this pg from being scheduled for scrubbing. */
-    dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
-    scrubber.reserve_failed = true;
-    sched_scrub();
-  }
-}
-
-void PG::handle_scrub_reserve_release(OpRequestRef op)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  clear_scrub_reserved();
-}
-
 // Compute pending backfill data
 static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
 {
@@ -2078,62 +1951,6 @@ bool PG::try_reserve_recovery_space(
 void PG::unreserve_recovery_space() {
   primary_num_bytes.store(0);
   local_num_bytes.store(0);
-  return;
-}
-
-void PG::clear_scrub_reserved()
-{
-  scrubber.reserved_peers.clear();
-  scrubber.reserve_failed = false;
-
-  if (scrubber.local_reserved) {
-    scrubber.local_reserved = false;
-    osd->dec_scrubs_local();
-  }
-  if (scrubber.remote_reserved) {
-    scrubber.remote_reserved = false;
-    osd->dec_scrubs_remote();
-  }
-}
-
-void PG::scrub_reserve_replicas()
-{
-  ceph_assert(recovery_state.get_backfill_targets().empty());
-  std::vector<std::pair<int, Message*>> messages;
-  messages.reserve(get_actingset().size());
-  epoch_t  e = get_osdmap_epoch();
-  for (set<pg_shard_t>::iterator i = get_actingset().begin();
-      i != get_actingset().end();
-      ++i) {
-    if (*i == pg_whoami) continue;
-    dout(10) << "scrub requesting reserve from osd." << *i << dendl;
-    Message* m =  new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
-                                       MOSDScrubReserve::REQUEST, pg_whoami);
-    messages.push_back(std::make_pair(i->osd, m));
-  }
-  if (!messages.empty()) {
-    osd->send_message_osd_cluster(messages, e);
-  }
-}
-
-void PG::scrub_unreserve_replicas()
-{
-  ceph_assert(recovery_state.get_backfill_targets().empty());
-  std::vector<std::pair<int, Message*>> messages;
-  messages.reserve(get_actingset().size());
-  epoch_t e = get_osdmap_epoch();
-  for (set<pg_shard_t>::iterator i = get_actingset().begin();
-       i != get_actingset().end();
-       ++i) {
-    if (*i == pg_whoami) continue;
-    dout(10) << "scrub requesting unreserve from osd." << *i << dendl;
-    Message* m =  new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
-                                       MOSDScrubReserve::RELEASE, pg_whoami);
-    messages.push_back(std::make_pair(i->osd, m));
-  }
-  if (!messages.empty()) {
-    osd->send_message_osd_cluster(messages, e);
-  }
 }
 
 void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
@@ -2160,111 +1977,6 @@ void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
   }
 }
 
-void PG::_scan_snaps(ScrubMap &smap) 
-{
-  hobject_t head;
-  SnapSet snapset;
-
-  // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify 
-  // caller using clean_meta_map(), and it works properly.
-  dout(20) << __func__ << " start" << dendl;
-
-  for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
-       i != smap.objects.rend();
-       ++i) {
-    const hobject_t &hoid = i->first;
-    ScrubMap::object &o = i->second;
-
-    dout(20) << __func__ << " " << hoid << dendl;
-
-    ceph_assert(!hoid.is_snapdir());
-    if (hoid.is_head()) {
-      // parse the SnapSet
-      bufferlist bl;
-      if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
-       continue;
-      }
-      bl.push_back(o.attrs[SS_ATTR]);
-      auto p = bl.cbegin();
-      try {
-       decode(snapset, p);
-      } catch(...) {
-       continue;
-      }
-      head = hoid.get_head();
-      continue;
-    }
-    if (hoid.snap < CEPH_MAXSNAP) {
-      // check and if necessary fix snap_mapper
-      if (hoid.get_head() != head) {
-       derr << __func__ << " no head for " << hoid << " (have " << head << ")"
-            << dendl;
-       continue;
-      }
-      set<snapid_t> obj_snaps;
-      auto p = snapset.clone_snaps.find(hoid.snap);
-      if (p == snapset.clone_snaps.end()) {
-       derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset
-            << dendl;
-       continue;
-      }
-      obj_snaps.insert(p->second.begin(), p->second.end());
-      set<snapid_t> cur_snaps;
-      int r = snap_mapper.get_snaps(hoid, &cur_snaps);
-      if (r != 0 && r != -ENOENT) {
-       derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
-       ceph_abort();
-      }
-      if (r == -ENOENT || cur_snaps != obj_snaps) {
-       ObjectStore::Transaction t;
-       OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
-       if (r == 0) {
-         r = snap_mapper.remove_oid(hoid, &_t);
-         if (r != 0) {
-           derr << __func__ << ": remove_oid returned " << cpp_strerror(r)
-                << dendl;
-           ceph_abort();
-         }
-         osd->clog->error() << "osd." << osd->whoami
-                           << " found snap mapper error on pg "
-                           << info.pgid
-                           << " oid " << hoid << " snaps in mapper: "
-                           << cur_snaps << ", oi: "
-                           << obj_snaps
-                           << "...repaired";
-       } else {
-         osd->clog->error() << "osd." << osd->whoami
-                           << " found snap mapper error on pg "
-                           << info.pgid
-                           << " oid " << hoid << " snaps missing in mapper"
-                           << ", should be: "
-                           << obj_snaps
-                            << " was " << cur_snaps << " r " << r
-                           << "...repaired";
-       }
-       snap_mapper.add_oid(hoid, obj_snaps, &_t);
-
-       // wait for repair to apply to avoid confusing other bits of the system.
-       {
-         ceph::condition_variable my_cond;
-         ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
-         int r = 0;
-         bool done;
-         t.register_on_applied_sync(
-           new C_SafeCond(my_lock, my_cond, &done, &r));
-         r = osd->store->queue_transaction(ch, std::move(t));
-         if (r != 0) {
-           derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
-                << dendl;
-         } else {
-           std::unique_lock l{my_lock};
-           my_cond.wait(l, [&done] { return done;});
-         }
-       }
-      }
-    }
-  }
-}
 
 void PG::_repair_oinfo_oid(ScrubMap &smap)
 {
@@ -2311,82 +2023,6 @@ void PG::_repair_oinfo_oid(ScrubMap &smap)
     }
   }
 }
-int PG::build_scrub_map_chunk(
-  ScrubMap &map,
-  ScrubMapBuilder &pos,
-  hobject_t start,
-  hobject_t end,
-  bool deep,
-  ThreadPool::TPHandle &handle)
-{
-  dout(10) << __func__ << " [" << start << "," << end << ") "
-          << " pos " << pos
-          << dendl;
-
-  // start
-  while (pos.empty()) {
-    pos.deep = deep;
-    map.valid_through = info.last_update;
-
-    // objects
-    vector<ghobject_t> rollback_obs;
-    pos.ret = get_pgbackend()->objects_list_range(
-      start,
-      end,
-      &pos.ls,
-      &rollback_obs);
-    if (pos.ret < 0) {
-      dout(5) << "objects_list_range error: " << pos.ret << dendl;
-      return pos.ret;
-    }
-    if (pos.ls.empty()) {
-      break;
-    }
-    _scan_rollback_obs(rollback_obs);
-    pos.pos = 0;
-    return -EINPROGRESS;
-  }
-
-  // scan objects
-  while (!pos.done()) {
-    int r = get_pgbackend()->be_scan_list(map, pos);
-    if (r == -EINPROGRESS) {
-      return r;
-    }
-  }
-
-  // finish
-  dout(20) << __func__ << " finishing" << dendl;
-  ceph_assert(pos.done());
-  _repair_oinfo_oid(map);
-  if (!is_primary()) {
-    ScrubMap for_meta_scrub;
-    // In case we restarted smaller chunk, clear old data
-    scrubber.cleaned_meta_map.clear_from(scrubber.start);
-    scrubber.cleaned_meta_map.insert(map);
-    scrubber.clean_meta_map(for_meta_scrub);
-    _scan_snaps(for_meta_scrub);
-  }
-
-  dout(20) << __func__ << " done, got " << map.objects.size() << " items"
-          << dendl;
-  return 0;
-}
-
-void PG::Scrubber::cleanup_store(ObjectStore::Transaction *t) {
-  if (!store)
-    return;
-  struct OnComplete : Context {
-    std::unique_ptr<Scrub::Store> store;
-    explicit OnComplete(
-      std::unique_ptr<Scrub::Store> &&store)
-      : store(std::move(store)) {}
-    void finish(int) override {}
-  };
-  store->cleanup(t);
-  t->register_on_complete(new OnComplete(std::move(store)));
-  ceph_assert(!store);
-}
 
 void PG::repair_object(
   const hobject_t &soid,
@@ -2427,950 +2063,141 @@ void PG::repair_object(
   recovery_state.force_object_missing(bad_peers, soid, oi.version);
 }
 
-/* replica_scrub
- *
- * Wait for last_update_applied to match msg->scrub_to as above. Wait
- * for pushes to complete in case of recent recovery. Build a single
- * scrubmap of objects that are in the range [msg->start, msg->end).
- */
-void PG::replica_scrub(
-  OpRequestRef op,
-  ThreadPool::TPHandle &handle)
+void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued)
 {
-  auto msg = op->get_req<MOSDRepScrub>();
-  ceph_assert(!scrubber.active_rep_scrub);
-  dout(7) << "replica_scrub" << dendl;
-
-  if (msg->map_epoch < info.history.same_interval_since) {
-    dout(10) << "replica_scrub discarding old replica_scrub from "
-            << msg->map_epoch << " < " << info.history.same_interval_since 
-            << dendl;
-    return;
-  }
-
-  ceph_assert(msg->chunky);
-  if (active_pushes > 0) {
-    dout(10) << "waiting for active pushes to finish" << dendl;
-    scrubber.active_rep_scrub = op;
-    return;
-  }
-
-  scrubber.state = Scrubber::BUILD_MAP_REPLICA;
-  scrubber.replica_scrub_start = msg->min_epoch;
-  scrubber.start = msg->start;
-  scrubber.end = msg->end;
-  scrubber.max_end = msg->end;
-  scrubber.deep = msg->deep;
-  scrubber.epoch_start = info.history.same_interval_since;
-  if (msg->priority) {
-    scrubber.priority = msg->priority;
+  dout(20) << __func__ << " queued at: " << epoch_queued << dendl;
+  if (is_active() && m_scrubber) {
+    ((*m_scrubber).*fn)(epoch_queued);
   } else {
-    scrubber.priority = get_scrub_priority();
+    // pg might be in the process of being deleted
+    dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
+            (is_active() ? "(active) " : "(not active) ") <<  dendl;
   }
+}
 
-  scrub_can_preempt = msg->allow_preemption;
-  scrub_preempted = false;
-  scrubber.replica_scrubmap_pos.reset();
-
-  requeue_scrub(msg->high_priority);
+void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
+{
+  dout(10) << __func__ << " (op)" << dendl;
+  if (m_scrubber)
+    m_scrubber->replica_scrub_op(op);
 }
 
-/* Scrub:
- * PG_STATE_SCRUBBING is set when the scrub is queued
- * 
- * scrub will be chunky if all OSDs in PG support chunky scrub
- * scrub will fail if OSDs are too old.
- */
-void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
-{
-  OSDService *osds = osd;
-  double scrub_sleep = osds->osd->scrub_sleep_time(scrubber.must_scrub);
-  if (scrub_sleep > 0 &&
-      (scrubber.state == PG::Scrubber::NEW_CHUNK ||
-       scrubber.state == PG::Scrubber::INACTIVE) &&
-       scrubber.needs_sleep) {
-    ceph_assert(!scrubber.sleeping);
-    dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
-
-    // Do an async sleep so we don't block the op queue
-    spg_t pgid = get_pgid();
-    int state = scrubber.state;
-    auto scrub_requeue_callback =
-        new LambdaContext([osds, pgid, state](int r) {
-          PGRef pg = osds->osd->lookup_lock_pg(pgid);
-          if (pg == nullptr) {
-            lgeneric_dout(osds->osd->cct, 20)
-                << "scrub_requeue_callback: Could not find "
-                << "PG " << pgid << " can't complete scrub requeue after sleep"
-                << dendl;
-            return;
-          }
-          pg->scrubber.sleeping = false;
-          pg->scrubber.needs_sleep = false;
-          lgeneric_dout(pg->cct, 20)
-              << "scrub_requeue_callback: slept for "
-              << ceph_clock_now() - pg->scrubber.sleep_start
-              << ", re-queuing scrub with state " << state << dendl;
-          pg->scrub_queued = false;
-          pg->requeue_scrub();
-          pg->scrubber.sleep_start = utime_t();
-          pg->unlock();
-        });
-    std::lock_guard l(osd->sleep_lock);
-    osd->sleep_timer.add_event_after(scrub_sleep,
-                                           scrub_requeue_callback);
-    scrubber.sleeping = true;
-    scrubber.sleep_start = ceph_clock_now();
-    return;
-  }
-  if (pg_has_reset_since(queued)) {
-    return;
-  }
-  ceph_assert(scrub_queued);
+void PG::scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
+{
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  // a new scrub
   scrub_queued = false;
-  scrubber.needs_sleep = true;
-
-  // for the replica
-  if (!is_primary() &&
-      scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
-    chunky_scrub(handle);
-    return;
-  }
-
-  if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
-    dout(10) << "scrub -- not primary or active or not clean" << dendl;
-    state_clear(PG_STATE_SCRUBBING);
-    state_clear(PG_STATE_REPAIR);
-    state_clear(PG_STATE_DEEP_SCRUB);
-    publish_stats_to_osd();
-    return;
-  }
-
-  if (!scrubber.active) {
-    ceph_assert(recovery_state.get_backfill_targets().empty());
-
-    scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
-
-    dout(10) << "starting a new chunky scrub" << dendl;
-  }
-
-  chunky_scrub(handle);
+  forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, epoch_queued);
 }
 
-void PG::abort_scrub()
+// note: no need to secure OSD resources for a recovery scrub
+void PG::recovery_scrub(epoch_t epoch_queued,
+                        [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  scrub_clear_state();
-  scrub_unreserve_replicas();
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  // a new scrub
+  scrub_queued = false;
+  forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, epoch_queued);
 }
 
-/*
- * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
- * chunk.
- *
- * The object store is partitioned into chunks which end on hash boundaries. For
- * each chunk, the following logic is performed:
- *
- *  (1) Block writes on the chunk
- *  (2) Request maps from replicas
- *  (3) Wait for pushes to be applied (after recovery)
- *  (4) Wait for writes to flush on the chunk
- *  (5) Wait for maps from replicas
- *  (6) Compare / repair all scrub maps
- *  (7) Wait for digest updates to apply
- *
- * This logic is encoded in the mostly linear state machine:
- *
- *           +------------------+
- *  _________v__________        |
- * |                    |       |
- * |      INACTIVE      |       |
- * |____________________|       |
- *           |                  |
- *           |   +----------+   |
- *  _________v___v______    |   |
- * |                    |   |   |
- * |      NEW_CHUNK     |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |     WAIT_PUSHES    |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |  WAIT_LAST_UPDATE  |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |      BUILD_MAP     |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |    WAIT_REPLICAS   |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |    COMPARE_MAPS    |   |   |
- * |____________________|   |   |
- *           |              |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |WAIT_DIGEST_UPDATES |   |   |
- * |____________________|   |   |
- *           |   |          |   |
- *           |   +----------+   |
- *  _________v__________        |
- * |                    |       |
- * |       FINISH       |       |
- * |____________________|       |
- *           |                  |
- *           +------------------+
- *
- * The primary determines the last update from the subset by walking the log. If
- * it sees a log entry pertaining to a file in the chunk, it tells the replicas
- * to wait until that update is applied before building a scrub map. Both the
- * primary and replicas will wait for any active pushes to be applied.
- *
- * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
- *
- * scrubber.state encodes the current state of the scrub (refer to state diagram
- * for details).
- */
-void PG::chunky_scrub(ThreadPool::TPHandle &handle)
-{
-  // check for map changes
-  if (scrubber.is_chunky_scrub_active()) {
-    if (scrubber.epoch_start != info.history.same_interval_since) {
-      dout(10) << "scrub pg changed, aborting" << dendl;
-      abort_scrub();
-      return;
-    }
-  }
-
-  bool done = false;
-  int ret;
-
-  while (!done) {
-    dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
-            << " [" << scrubber.start << "," << scrubber.end << ")"
-            << " max_end " << scrubber.max_end << dendl;
-
-    switch (scrubber.state) {
-      case PG::Scrubber::INACTIVE:
-        dout(10) << "scrub start" << dendl;
-       ceph_assert(is_primary());
-
-        publish_stats_to_osd();
-        scrubber.epoch_start = info.history.same_interval_since;
-        scrubber.active = true;
-
-       {
-         ObjectStore::Transaction t;
-         scrubber.cleanup_store(&t);
-         scrubber.store.reset(Scrub::Store::create(osd->store, &t,
-                                                   info.pgid, coll));
-         osd->store->queue_transaction(ch, std::move(t), nullptr);
-       }
-
-        // Don't include temporary objects when scrubbing
-        scrubber.start = info.pgid.pgid.get_hobj_start();
-        scrubber.state = PG::Scrubber::NEW_CHUNK;
-
-       {
-         bool repair = state_test(PG_STATE_REPAIR);
-         bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
-         const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-         stringstream oss;
-         oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
-         osd->clog->debug(oss);
-       }
-
-       scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
-         "osd_scrub_max_preemptions");
-       scrubber.preempt_divisor = 1;
-        break;
-
-      case PG::Scrubber::NEW_CHUNK:
-        scrubber.primary_scrubmap = ScrubMap();
-        scrubber.received_maps.clear();
-
-       // begin (possible) preemption window
-       if (scrub_preempted) {
-         scrubber.preempt_left--;
-         scrubber.preempt_divisor *= 2;
-         dout(10) << __func__ << " preempted, " << scrubber.preempt_left
-                  << " left" << dendl;
-         scrub_preempted = false;
-       }
-       scrub_can_preempt = scrubber.preempt_left > 0;
-
-        {
-          /* get the start and end of our scrub chunk
-          *
-          * Our scrub chunk has an important restriction we're going to need to
-          * respect. We can't let head be start or end.
-          * Using a half-open interval means that if end == head,
-          * we'd scrub/lock head and the clone right next to head in different
-          * chunks which would allow us to miss clones created between
-          * scrubbing that chunk and scrubbing the chunk including head.
-          * This isn't true for any of the other clones since clones can
-          * only be created "just to the left of" head.  There is one exception
-          * to this: promotion of clones which always happens to the left of the
-          * left-most clone, but promote_object checks the scrubber in that
-          * case, so it should be ok.  Also, it's ok to "miss" clones at the
-          * left end of the range if we are a tier because they may legitimately
-          * not exist (see _scrub).
-          */
-          ceph_assert(scrubber.preempt_divisor > 0);
-         int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
-                                     scrubber.preempt_divisor);
-         int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
-                                      scrubber.preempt_divisor);
-          hobject_t start = scrubber.start;
-         hobject_t candidate_end;
-         vector<hobject_t> objects;
-         ret = get_pgbackend()->objects_list_partial(
-           start,
-           min,
-           max,
-           &objects,
-           &candidate_end);
-         ceph_assert(ret >= 0);
-
-         if (!objects.empty()) {
-           hobject_t back = objects.back();
-           while (candidate_end.is_head() &&
-                  candidate_end == back.get_head()) {
-             candidate_end = back;
-             objects.pop_back();
-             if (objects.empty()) {
-               ceph_assert(0 ==
-                      "Somehow we got more than 2 objects which"
-                      "have the same head but are not clones");
-             }
-             back = objects.back();
-           }
-           if (candidate_end.is_head()) {
-             ceph_assert(candidate_end != back.get_head());
-             candidate_end = candidate_end.get_object_boundary();
-           }
-         } else {
-           ceph_assert(candidate_end.is_max());
-         }
-
-         if (!_range_available_for_scrub(scrubber.start, candidate_end)) {
-           // we'll be requeued by whatever made us unavailable for scrub
-           dout(10) << __func__ << ": scrub blocked somewhere in range "
-                    << "[" << scrubber.start << ", " << candidate_end << ")"
-                    << dendl;
-           done = true;
-           break;
-         }
-         scrubber.end = candidate_end;
-         if (scrubber.end > scrubber.max_end)
-           scrubber.max_end = scrubber.end;
-        }
-
-        // walk the log to find the latest update that affects our chunk
-        scrubber.subset_last_update = eversion_t();
-       for (auto p = projected_log.log.rbegin();
-            p != projected_log.log.rend();
-            ++p) {
-          if (p->soid >= scrubber.start &&
-             p->soid < scrubber.end) {
-            scrubber.subset_last_update = p->version;
-            break;
-         }
-       }
-       if (scrubber.subset_last_update == eversion_t()) {
-         for (list<pg_log_entry_t>::const_reverse_iterator p =
-                recovery_state.get_pg_log().get_log().log.rbegin();
-              p != recovery_state.get_pg_log().get_log().log.rend();
-              ++p) {
-           if (p->soid >= scrubber.start &&
-               p->soid < scrubber.end) {
-             scrubber.subset_last_update = p->version;
-             break;
-           }
-         }
-       }
-
-        scrubber.state = PG::Scrubber::WAIT_PUSHES;
-        break;
-
-      case PG::Scrubber::WAIT_PUSHES:
-        if (active_pushes == 0) {
-          scrubber.state = PG::Scrubber::WAIT_LAST_UPDATE;
-        } else {
-          dout(15) << "wait for pushes to apply" << dendl;
-          done = true;
-        }
-        break;
-
-      case PG::Scrubber::WAIT_LAST_UPDATE:
-        if (recovery_state.get_last_update_applied() <
-         scrubber.subset_last_update) {
-          // will be requeued by op_applied
-          dout(15) << "wait for EC read/modify/writes to queue" << dendl;
-          done = true;
-         break;
-       }
-
-        // ask replicas to scan
-        scrubber.waiting_on_whom.insert(pg_whoami);
-
-        // request maps from replicas
-       for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
-            i != get_acting_recovery_backfill().end();
-            ++i) {
-         if (*i == pg_whoami) continue;
-          _request_scrub_map(*i, scrubber.subset_last_update,
-                             scrubber.start, scrubber.end, scrubber.deep,
-                            scrubber.preempt_left > 0);
-          scrubber.waiting_on_whom.insert(*i);
-        }
-       dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
-                << dendl;
-
-       scrubber.state = PG::Scrubber::BUILD_MAP;
-       scrubber.primary_scrubmap_pos.reset();
-        break;
-
-      case PG::Scrubber::BUILD_MAP:
-        ceph_assert(recovery_state.get_last_update_applied() >=
-         scrubber.subset_last_update);
-
-        // build my own scrub map
-       if (scrub_preempted) {
-         dout(10) << __func__ << " preempted" << dendl;
-         scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
-         break;
-       }
-       ret = build_scrub_map_chunk(
-         scrubber.primary_scrubmap,
-         scrubber.primary_scrubmap_pos,
-         scrubber.start, scrubber.end,
-         scrubber.deep,
-         handle);
-       if (ret == -EINPROGRESS) {
-         requeue_scrub();
-         done = true;
-         break;
-       }
-       scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
-       break;
-
-      case PG::Scrubber::BUILD_MAP_DONE:
-       if (scrubber.primary_scrubmap_pos.ret < 0) {
-         dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
-                 << ", aborting" << dendl;
-          scrub_clear_state();
-          scrub_unreserve_replicas();
-          return;
-        }
-       dout(10) << __func__ << " waiting_on_whom was "
-                << scrubber.waiting_on_whom << dendl;
-       ceph_assert(scrubber.waiting_on_whom.count(pg_whoami));
-        scrubber.waiting_on_whom.erase(pg_whoami);
-
-        scrubber.state = PG::Scrubber::WAIT_REPLICAS;
-        break;
-
-      case PG::Scrubber::WAIT_REPLICAS:
-        if (!scrubber.waiting_on_whom.empty()) {
-          // will be requeued by do_replica_scrub_map
-          dout(10) << "wait for replicas to build scrub map" << dendl;
-          done = true;
-         break;
-       }
-        // Since repair is only by request and we need to scrub afterward
-        // treat the same as req_scrub.
-        if (!scrubber.req_scrub) {
-          if (state_test(PG_STATE_DEEP_SCRUB)) {
-            if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
-               pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
-              dout(10) << "nodeep_scrub set, aborting" << dendl;
-             abort_scrub();
-              return;
-            }
-          } else if (state_test(PG_STATE_SCRUBBING)) {
-            if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) || pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
-              dout(10) << "noscrub set, aborting" << dendl;
-             abort_scrub();
-              return;
-            }
-          }
-        }
-       // end (possible) preemption window
-       scrub_can_preempt = false;
-       if (scrub_preempted) {
-         dout(10) << __func__ << " preempted, restarting chunk" << dendl;
-         scrubber.state = PG::Scrubber::NEW_CHUNK;
-       } else {
-          scrubber.state = PG::Scrubber::COMPARE_MAPS;
-        }
-        break;
-
-      case PG::Scrubber::COMPARE_MAPS:
-        ceph_assert(recovery_state.get_last_update_applied() >=
-         scrubber.subset_last_update);
-        ceph_assert(scrubber.waiting_on_whom.empty());
-
-        scrub_compare_maps();
-       scrubber.start = scrubber.end;
-       scrubber.run_callbacks();
-
-        // requeue the writes from the chunk that just finished
-        requeue_ops(waiting_for_scrub);
-
-       scrubber.state = PG::Scrubber::WAIT_DIGEST_UPDATES;
-
-       // fall-thru
-
-      case PG::Scrubber::WAIT_DIGEST_UPDATES:
-       if (scrubber.num_digest_updates_pending) {
-         dout(10) << __func__ << " waiting on "
-                  << scrubber.num_digest_updates_pending
-                  << " digest updates" << dendl;
-         done = true;
-         break;
-       }
-
-       scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
-         "osd_scrub_max_preemptions");
-       scrubber.preempt_divisor = 1;
-
-       if (!(scrubber.end.is_max())) {
-         scrubber.state = PG::Scrubber::NEW_CHUNK;
-         requeue_scrub();
-          done = true;
-        } else {
-          scrubber.state = PG::Scrubber::FINISH;
-        }
-
-       break;
-
-      case PG::Scrubber::FINISH:
-        scrub_finish();
-        scrubber.state = PG::Scrubber::INACTIVE;
-        done = true;
-
-       if (!snap_trimq.empty()) {
-         dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
-         snap_trimmer_scrub_complete();
-       }
-
-        break;
-
-      case PG::Scrubber::BUILD_MAP_REPLICA:
-        // build my own scrub map
-       if (scrub_preempted) {
-         dout(10) << __func__ << " preempted" << dendl;
-         ret = 0;
-       } else {
-         ret = build_scrub_map_chunk(
-           scrubber.replica_scrubmap,
-           scrubber.replica_scrubmap_pos,
-           scrubber.start, scrubber.end,
-           scrubber.deep,
-           handle);
-       }
-       if (ret == -EINPROGRESS) {
-         requeue_scrub();
-         done = true;
-         break;
-       }
-       // reply
-       {
-         MOSDRepScrubMap *reply = new MOSDRepScrubMap(
-           spg_t(info.pgid.pgid, get_primary().shard),
-           scrubber.replica_scrub_start,
-           pg_whoami);
-         reply->preempted = scrub_preempted;
-         ::encode(scrubber.replica_scrubmap, reply->get_data());
-         osd->send_message_osd_cluster(
-           get_primary().osd, reply,
-           scrubber.replica_scrub_start);
-       }
-       scrub_preempted = false;
-       scrub_can_preempt = false;
-       scrubber.state = PG::Scrubber::INACTIVE;
-       scrubber.replica_scrubmap = ScrubMap();
-       scrubber.replica_scrubmap_pos = ScrubMapBuilder();
-       scrubber.start = hobject_t();
-       scrubber.end = hobject_t();
-       scrubber.max_end = hobject_t();
-       done = true;
-       break;
-
-      default:
-        ceph_abort();
-    }
-  }
-  dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
-          << " [" << scrubber.start << "," << scrubber.end << ")"
-          << " max_end " << scrubber.max_end << dendl;
+void PG::replica_scrub(epoch_t epoch_queued,
+                      [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+  dout(10) << __func__ << " queued at: " << epoch_queued
+          << (is_primary() ? " (primary)" : " (replica)") << dendl;
+  scrub_queued = false;
+  forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued);
 }
 
-bool PG::write_blocked_by_scrub(const hobject_t& soid)
+void PG::scrub_send_scrub_resched(epoch_t epoch_queued,
+                                 [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  if (soid < scrubber.start || soid >= scrubber.end) {
-    return false;
-  }
-  if (scrub_can_preempt) {
-    if (!scrub_preempted) {
-      dout(10) << __func__ << " " << soid << " preempted" << dendl;
-      scrub_preempted = true;
-    } else {
-      dout(10) << __func__ << " " << soid << " already preempted" << dendl;
-    }
-    return false;
-  }
-  return true;
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  scrub_queued = false;
+  forward_scrub_event(&ScrubPgIF::send_scrub_resched, epoch_queued);
 }
 
-bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
+void PG::scrub_send_resources_granted(epoch_t epoch_queued,
+                                     [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  // does [start, end] intersect [scrubber.start, scrubber.max_end)
-  return (start < scrubber.max_end &&
-         end >= scrubber.start);
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::send_remotes_reserved, epoch_queued);
 }
 
-void PG::scrub_clear_state(bool has_error)
+void PG::scrub_send_resources_denied(epoch_t epoch_queued,
+                                    [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  ceph_assert(is_locked());
-  state_clear(PG_STATE_SCRUBBING);
-  if (!has_error)
-    state_clear(PG_STATE_REPAIR);
-  state_clear(PG_STATE_DEEP_SCRUB);
-  publish_stats_to_osd();
-
-  scrubber.req_scrub = false;
-  // local -> nothing.
-  if (scrubber.local_reserved) {
-    osd->dec_scrubs_local();
-    scrubber.local_reserved = false;
-    scrubber.reserved_peers.clear();
-  }
-
-  requeue_ops(waiting_for_scrub);
-
-  scrubber.reset();
-
-  // type-specific state clear
-  _scrub_clear_state();
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::send_reservation_failure, epoch_queued);
 }
 
-void PG::scrub_compare_maps() 
+void PG::replica_scrub_resched(epoch_t epoch_queued,
+                              [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  dout(10) << __func__ << " has maps, analyzing" << dendl;
-
-  // construct authoritative scrub map for type specific scrubbing
-  scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
-  map<hobject_t,
-      pair<std::optional<uint32_t>,
-           std::optional<uint32_t>>> missing_digest;
-
-  map<pg_shard_t, ScrubMap *> maps;
-  maps[pg_whoami] = &scrubber.primary_scrubmap;
-
-  for (const auto& i : get_acting_recovery_backfill()) {
-    if (i == pg_whoami) continue;
-    dout(2) << __func__ << " replica " << i << " has "
-            << scrubber.received_maps[i].objects.size()
-            << " items" << dendl;
-    maps[i] = &scrubber.received_maps[i];
-  }
-
-  set<hobject_t> master_set;
-
-  // Construct master set
-  for (const auto map : maps) {
-    for (const auto i : map.second->objects) {
-      master_set.insert(i.first);
-    }
-  }
-
-  stringstream ss;
-  get_pgbackend()->be_omap_checks(maps, master_set,
-                                  scrubber.omap_stats, ss);
-
-  if (!ss.str().empty()) {
-    osd->clog->warn(ss);
-  }
-
-  if (recovery_state.get_acting().size() > 1) {
-    dout(10) << __func__ << "  comparing replica scrub maps" << dendl;
-
-    // Map from object with errors to good peer
-    map<hobject_t, list<pg_shard_t>> authoritative;
-
-    dout(2) << __func__ << get_primary() << " has "
-           << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
-
-    ss.str("");
-    ss.clear();
-
-    get_pgbackend()->be_compare_scrubmaps(
-      maps,
-      master_set,
-      state_test(PG_STATE_REPAIR),
-      scrubber.missing,
-      scrubber.inconsistent,
-      authoritative,
-      missing_digest,
-      scrubber.shallow_errors,
-      scrubber.deep_errors,
-      scrubber.store.get(),
-      info.pgid, recovery_state.get_acting(),
-      ss);
-    dout(2) << ss.str() << dendl;
-
-    if (!ss.str().empty()) {
-      osd->clog->error(ss);
-    }
-
-    for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
-        i != authoritative.end();
-        ++i) {
-      list<pair<ScrubMap::object, pg_shard_t> > good_peers;
-      for (list<pg_shard_t>::const_iterator j = i->second.begin();
-          j != i->second.end();
-          ++j) {
-       good_peers.emplace_back(maps[*j]->objects[i->first], *j);
-      }
-      scrubber.authoritative.emplace(i->first, good_peers);
-    }
-
-    for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
-        i != authoritative.end();
-        ++i) {
-      scrubber.cleaned_meta_map.objects.erase(i->first);
-      scrubber.cleaned_meta_map.objects.insert(
-       *(maps[i->second.back()]->objects.find(i->first))
-       );
-    }
-  }
-
-  ScrubMap for_meta_scrub;
-  scrubber.clean_meta_map(for_meta_scrub);
-
-  // ok, do the pg-type specific scrubbing
-  scrub_snapshot_metadata(for_meta_scrub, missing_digest);
-  // Called here on the primary can use an authoritative map if it isn't the primary
-  _scan_snaps(for_meta_scrub);
-  if (!scrubber.store->empty()) {
-    if (state_test(PG_STATE_REPAIR)) {
-      dout(10) << __func__ << ": discarding scrub results" << dendl;
-      scrubber.store->flush(nullptr);
-    } else {
-      dout(10) << __func__ << ": updating scrub object" << dendl;
-      ObjectStore::Transaction t;
-      scrubber.store->flush(&t);
-      osd->store->queue_transaction(ch, std::move(t), nullptr);
-    }
-  }
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  scrub_queued = false;
+  forward_scrub_event(&ScrubPgIF::send_sched_replica, epoch_queued);
 }
 
-bool PG::scrub_process_inconsistent()
-{
-  dout(10) << __func__ << ": checking authoritative" << dendl;
-  bool repair = state_test(PG_STATE_REPAIR);
-  bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
-  const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-  
-  // authoriative only store objects which missing or inconsistent.
-  if (!scrubber.authoritative.empty()) {
-    stringstream ss;
-    ss << info.pgid << " " << mode << " "
-       << scrubber.missing.size() << " missing, "
-       << scrubber.inconsistent.size() << " inconsistent objects";
-    dout(2) << ss.str() << dendl;
-    osd->clog->error(ss);
-    if (repair) {
-      state_clear(PG_STATE_CLEAN);
-      for (map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >>::iterator i =
-            scrubber.authoritative.begin();
-          i != scrubber.authoritative.end();
-          ++i) {
-       auto missing_entry = scrubber.missing.find(i->first);
-       if (missing_entry != scrubber.missing.end()) {
-          repair_object(
-            i->first,
-            i->second,
-           missing_entry->second);
-         scrubber.fixed += missing_entry->second.size();
-       }
-       if (scrubber.inconsistent.count(i->first)) {
-          repair_object(
-            i->first,
-            i->second,
-           scrubber.inconsistent[i->first]);
-         scrubber.fixed += missing_entry->second.size();
-       }
-      }
-    }
-  }
-  return (!scrubber.authoritative.empty() && repair);
+void PG::scrub_send_pushes_update(epoch_t epoch_queued,
+                                 [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::active_pushes_notification, epoch_queued);
 }
 
-bool PG::ops_blocked_by_scrub() const {
-  return (waiting_for_scrub.size() != 0);
+void PG::scrub_send_replica_pushes(epoch_t epoch_queued,
+                                  [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, epoch_queued);
 }
 
-// the part that actually finalizes a scrub
-void PG::scrub_finish() 
+void PG::scrub_send_applied_update(epoch_t epoch_queued,
+                                  [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  dout(20) << __func__ << dendl;
-  bool repair = state_test(PG_STATE_REPAIR);
-  bool do_auto_scrub = false;
-  // if the repair request comes from auto-repair and large number of errors,
-  // we would like to cancel auto-repair
-  if (repair && scrubber.auto_repair
-      && scrubber.authoritative.size() > cct->_conf->osd_scrub_auto_repair_num_errors) {
-    state_clear(PG_STATE_REPAIR);
-    repair = false;
-  }
-  bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
-  const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-
-  // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
-  if (scrubber.deep_scrub_on_error
-      && scrubber.authoritative.size()
-      && scrubber.authoritative.size() <= cct->_conf->osd_scrub_auto_repair_num_errors) {
-    ceph_assert(!deep_scrub);
-    do_auto_scrub = true;
-    dout(20) << __func__ << " Try to auto repair after scrub errors" << dendl;
-  }
-  scrubber.deep_scrub_on_error = false;
-
-  // type-specific finish (can tally more errors)
-  _scrub_finish();
-
-  bool has_error = scrub_process_inconsistent();
-
-  {
-    stringstream oss;
-    oss << info.pgid.pgid << " " << mode << " ";
-    int total_errors = scrubber.shallow_errors + scrubber.deep_errors;
-    if (total_errors)
-      oss << total_errors << " errors";
-    else
-      oss << "ok";
-    if (!deep_scrub && info.stats.stats.sum.num_deep_scrub_errors)
-      oss << " ( " << info.stats.stats.sum.num_deep_scrub_errors
-          << " remaining deep scrub error details lost)";
-    if (repair)
-      oss << ", " << scrubber.fixed << " fixed";
-    if (total_errors)
-      osd->clog->error(oss);
-    else
-      osd->clog->debug(oss);
-  }
-
-  // Since we don't know which errors were fixed, we can only clear them
-  // when every one has been fixed.
-  if (repair) {
-    if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
-      ceph_assert(deep_scrub);
-      scrubber.shallow_errors = scrubber.deep_errors = 0;
-      dout(20) << __func__ << " All may be fixed" << dendl;
-    } else if (has_error) {
-      // Deep scrub in order to get corrected error counts
-      scrub_after_recovery = true;
-      save_req_scrub = scrubber.req_scrub;
-      dout(20) << __func__ << " Set scrub_after_recovery, req_scrub=" << save_req_scrub << dendl;
-    } else if (scrubber.shallow_errors || scrubber.deep_errors) {
-      // We have errors but nothing can be fixed, so there is no repair
-      // possible.
-      state_set(PG_STATE_FAILED_REPAIR);
-      dout(10) << __func__ << " " << (scrubber.shallow_errors + scrubber.deep_errors)
-              << " error(s) present with no repair possible" << dendl;
-    }
-  }
-
-  {
-    // finish up
-    ObjectStore::Transaction t;
-    recovery_state.update_stats(
-      [this, deep_scrub](auto &history, auto &stats) {
-       utime_t now = ceph_clock_now();
-       history.last_scrub = recovery_state.get_info().last_update;
-       history.last_scrub_stamp = now;
-       if (scrubber.deep) {
-         history.last_deep_scrub = recovery_state.get_info().last_update;
-         history.last_deep_scrub_stamp = now;
-       }
+  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::update_applied_notification, epoch_queued);
+}
 
-       if (deep_scrub) {
-         if ((scrubber.shallow_errors == 0) && (scrubber.deep_errors == 0))
-           history.last_clean_scrub_stamp = now;
-         stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
-         stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
-         stats.stats.sum.num_large_omap_objects = scrubber.omap_stats.large_omap_objects;
-         stats.stats.sum.num_omap_bytes = scrubber.omap_stats.omap_bytes;
-         stats.stats.sum.num_omap_keys = scrubber.omap_stats.omap_keys;
-         dout(25) << "scrub_finish shard " << pg_whoami << " num_omap_bytes = "
-                  << stats.stats.sum.num_omap_bytes << " num_omap_keys = "
-                  << stats.stats.sum.num_omap_keys << dendl;
-       } else {
-         stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
-         // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
-         // because of deep-scrub errors
-         if (scrubber.shallow_errors == 0)
-           history.last_clean_scrub_stamp = now;
-       }
-       stats.stats.sum.num_scrub_errors =
-         stats.stats.sum.num_shallow_scrub_errors +
-         stats.stats.sum.num_deep_scrub_errors;
-       if (scrubber.check_repair) {
-         scrubber.check_repair = false;
-         if (info.stats.stats.sum.num_scrub_errors) {
-           state_set(PG_STATE_FAILED_REPAIR);
-           dout(10) << "scrub_finish " << info.stats.stats.sum.num_scrub_errors
-                    << " error(s) still present after re-scrub" << dendl;
-         }
-       }
-       return true;
-      },
-      &t);
-    int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
-    ceph_assert(tr == 0);
-  }
+void PG::scrub_send_unblocking(epoch_t epoch_queued,
+                              [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::send_scrub_unblock, epoch_queued);
+}
 
-  if (has_error) {
-    queue_peering_event(
-      PGPeeringEventRef(
-       std::make_shared<PGPeeringEvent>(
-         get_osdmap_epoch(),
-         get_osdmap_epoch(),
-         PeeringState::DoRecovery())));
-  }
+void PG::scrub_send_digest_update(epoch_t epoch_queued,
+                                 [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::digest_update_notification, epoch_queued);
+}
 
-  scrub_clear_state(has_error);
-  scrub_unreserve_replicas();
+void PG::scrub_send_replmaps_ready(epoch_t epoch_queued,
+                                  [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+  forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, epoch_queued);
+}
 
-  if (do_auto_scrub) {
-    scrub_requested(false, false, true);
-  }
+bool PG::ops_blocked_by_scrub() const
+{
+  return !waiting_for_scrub.empty();
+}
 
-  if (is_active() && is_primary()) {
-    recovery_state.share_pg_info();
-  }
+Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
+{
+  return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
+                                  : Scrub::scrub_prio_t::high_priority;
 }
 
 bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
 {
-  if (get_last_peering_reset() > reply_epoch ||
-      get_last_peering_reset() > query_epoch) {
-    dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch " << query_epoch
-            << " last_peering_reset " << get_last_peering_reset()
-            << dendl;
+  if (auto last_reset = get_last_peering_reset();
+      last_reset > reply_epoch || last_reset > query_epoch) {
+    dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
+            << query_epoch << " last_peering_reset " << last_reset << dendl;
     return true;
   }
   return false;
@@ -3400,7 +2227,6 @@ void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
 
 bool PG::try_flush_or_schedule_async()
 {
-  
   Context *c = new QueuePeeringEvt(
     this, get_osdmap_epoch(), PeeringState::IntervalFlush());
   if (!ch->flush_commit(c)) {
@@ -3414,24 +2240,12 @@ bool PG::try_flush_or_schedule_async()
 ostream& operator<<(ostream& out, const PG& pg)
 {
   out << pg.recovery_state;
-  if (pg.scrubber.must_repair)
-    out << " MUST_REPAIR";
-  if (pg.scrubber.auto_repair)
-    out << " AUTO_REPAIR";
-  if (pg.scrubber.check_repair)
-    out << " CHECK_REPAIR";
-  if (pg.scrubber.deep_scrub_on_error)
-    out << " DEEP_SCRUB_ON_ERROR";
-  if (pg.scrubber.must_deep_scrub)
-    out << " MUST_DEEP_SCRUB";
-  if (pg.scrubber.must_scrub)
-    out << " MUST_SCRUB";
-  if (pg.scrubber.time_for_deep)
-    out << " TIME_FOR_DEEP";
-  if (pg.scrubber.need_auto)
-    out << " NEED_AUTO";
-  if (pg.scrubber.req_scrub)
-    out << " REQ_SCRUB";
+
+  // listing all scrub-related flags - both current and "planned next scrub"
+  if (pg.is_scrubbing()) {
+    out << *pg.m_scrubber;
+  }
+  out << pg.m_planned_scrub;
 
   if (pg.recovery_ops_active)
     out << " rops=" << pg.recovery_ops_active;
@@ -3473,8 +2287,6 @@ ostream& operator<<(ostream& out, const PG& pg)
   }
 
   out << "]";
-
-
   return out;
 }
 
@@ -3557,15 +2369,19 @@ bool PG::can_discard_replica_op(OpRequestRef& op)
   // resets the messenger sesssion when the replica reconnects. to avoid the
   // out-of-order replies, the messages from that replica should be discarded.
   OSDMapRef next_map = osd->get_next_osdmap();
-  if (next_map->is_down(from))
+  if (next_map->is_down(from)) {
+    dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
     return true;
+  }
   /* Mostly, this overlaps with the old_peering_msg
    * condition.  An important exception is pushes
    * sent by replicas not in the acting set, since
    * if such a replica goes down it does not cause
    * a new interval. */
-  if (next_map->get_down_at(from) >= m->map_epoch)
+  if (next_map->get_down_at(from) >= m->map_epoch) {
+    dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
     return true;
+  }
 
   // same pg?
   //  if pg changes _at all_, we reset and repeer!
@@ -3759,32 +2575,17 @@ void PG::handle_initialize(PeeringCtx &rctx)
   recovery_state.handle_event(evt, &rctx);
 }
 
+
 void PG::handle_query_state(Formatter *f)
 {
   dout(10) << "handle_query_state" << dendl;
   PeeringState::QueryState q(f);
   recovery_state.handle_event(q, 0);
 
-  if (is_primary() && is_active()) {
-    f->open_object_section("scrub");
-    f->dump_stream("scrubber.epoch_start") << scrubber.epoch_start;
-    f->dump_bool("scrubber.active", scrubber.active);
-    f->dump_string("scrubber.state", PG::Scrubber::state_string(scrubber.state));
-    f->dump_stream("scrubber.start") << scrubber.start;
-    f->dump_stream("scrubber.end") << scrubber.end;
-    f->dump_stream("scrubber.max_end") << scrubber.max_end;
-    f->dump_stream("scrubber.subset_last_update") << scrubber.subset_last_update;
-    f->dump_bool("scrubber.deep", scrubber.deep);
-    {
-      f->open_array_section("scrubber.waiting_on_whom");
-      for (set<pg_shard_t>::iterator p = scrubber.waiting_on_whom.begin();
-          p != scrubber.waiting_on_whom.end();
-          ++p) {
-       f->dump_stream("shard") << *p;
-      }
-      f->close_section();
-    }
-    f->close_section();
+  // This code has moved to after the close of recovery_state array.
+  // I don't think that scrub is a recovery state
+  if (is_primary() && is_active() && m_scrubber && m_scrubber->is_scrub_active()) {
+    m_scrubber->handle_query_state(f);
   }
 }
 
@@ -3812,8 +2613,9 @@ void PG::C_DeleteMore::complete(int r) {
   delete this;
 }
 
-ghobject_t PG::do_delete_work(ObjectStore::Transaction &t,
-                        ghobject_t _next)
+std::pair<ghobject_t, bool> PG::do_delete_work(
+  ObjectStore::Transaction &t,
+  ghobject_t _next)
 {
   dout(10) << __func__ << dendl;
 
@@ -3839,7 +2641,7 @@ ghobject_t PG::do_delete_work(ObjectStore::Transaction &t,
       osd->sleep_timer.add_event_at(delete_schedule_time,
                                    delete_requeue_callback);
       dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
-      return _next;
+      return std::make_pair(_next, true);
     }
   }
 
@@ -3895,6 +2697,7 @@ ghobject_t PG::do_delete_work(ObjectStore::Transaction &t,
     t.remove(coll, oid);
     ++num;
   }
+  bool running = true;
   if (num) {
     dout(20) << __func__ << " deleting " << num << " objects" << dendl;
     Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
@@ -3931,10 +2734,10 @@ ghobject_t PG::do_delete_work(ObjectStore::Transaction &t,
       // exit() methods don't run when that happens.
       osd->local_reserver.cancel_reservation(info.pgid);
 
-      osd->logger->dec(l_osd_pg_removing);
+      running = false;
     }
   }
-  return next;
+  return {next, running};
 }
 
 int PG::pg_stat_adjust(osd_stat_t *ns)
@@ -3965,16 +2768,6 @@ int PG::pg_stat_adjust(osd_stat_t *ns)
   return 0;
 }
 
-ostream& operator<<(ostream& out, const PG::BackfillInterval& bi)
-{
-  out << "BackfillInfo(" << bi.begin << "-" << bi.end
-      << " " << bi.objects.size() << " objects";
-  if (!bi.objects.empty())
-    out << " " << bi.objects;
-  out << ")";
-  return out;
-}
-
 void PG::dump_pgstate_history(Formatter *f)
 {
   std::scoped_lock l{*this};