]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PG.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / osd / PG.cc
index e5a921499438693557fc41fde73d848a7028fd63..d77cef0d7b5c552b1156a2124b56dcea82c65a7d 100644 (file)
  */
 
 #include "PG.h"
-// #include "msg/Messenger.h"
 #include "messages/MOSDRepScrub.h"
-// #include "common/cmdparse.h"
-// #include "common/ceph_context.h"
 
 #include "common/errno.h"
 #include "common/ceph_releases.h"
 #include "common/config.h"
 #include "OSD.h"
 #include "OpRequest.h"
-#include "ScrubStore.h"
-#include "Session.h"
+#include "osd/scrubber/ScrubStore.h"
+#include "osd/scrubber/pg_scrubber.h"
 #include "osd/scheduler/OpSchedulerItem.h"
+#include "Session.h"
 
 #include "common/Timer.h"
 #include "common/perf_counters.h"
 
 #include "messages/MOSDOp.h"
-#include "messages/MOSDPGNotify.h"
-// #include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGScan.h"
 #include "messages/MOSDPGBackfill.h"
 #include "messages/MOSDPGBackfillRemove.h"
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 
+using std::list;
+using std::map;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
 using namespace ceph::osd::scheduler;
 
 template <class T>
@@ -125,7 +136,7 @@ uint64_t PG::get_with_id()
   ref++;
   std::lock_guard l(_ref_id_lock);
   uint64_t id = ++_ref_id;
-  BackTrace bt(0);
+  ClibBackTrace bt(0);
   stringstream ss;
   bt.print(ss);
   lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
@@ -190,14 +201,11 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   info_struct_v(0),
   pgmeta_oid(p.make_pgmeta_oid()),
   stat_queue_item(this),
-  scrub_queued(false),
   recovery_queued(false),
   recovery_ops_active(0),
   backfill_reserving(false),
-  pg_stats_publish_valid(false),
   finish_sync_event(NULL),
   scrub_after_recovery(false),
-  save_req_scrub(false),
   active_pushes(0),
   recovery_state(
     o->cct,
@@ -293,7 +301,7 @@ void PG::log_state_exit(
   osd->pg_recovery_stats.log_exit(
     state_name, ceph_clock_now() - enter_time, events, event_dur);
 }
-  
+
 /********* PG **********/
 
 void PG::remove_snap_mapped_object(
@@ -341,6 +349,8 @@ void PG::update_object_snap_mapping(
 /******* PG ***********/
 void PG::clear_primary_state()
 {
+  dout(20) << __func__ << dendl;
+
   projected_log = PGLog::IndexedLog();
 
   snap_trimq.clear();
@@ -348,29 +358,14 @@ void PG::clear_primary_state()
   finish_sync_event = 0;  // so that _finish_recovery doesn't go off in another thread
   release_pg_backoffs();
 
-  scrubber.reserved_peers.clear();
+  if (m_scrubber) {
+    m_scrubber->discard_replica_reservations();
+  }
   scrub_after_recovery = false;
-  save_req_scrub = false;
 
   agent_clear();
 }
 
-PG::Scrubber::Scrubber()
- : local_reserved(false), remote_reserved(false), reserve_failed(false),
-   epoch_start(0),
-   active(false),
-   shallow_errors(0), deep_errors(0), fixed(0),
-   must_scrub(false), must_deep_scrub(false), must_repair(false),
-   need_auto(false), req_scrub(false), time_for_deep(false),
-   auto_repair(false),
-   check_repair(false),
-   deep_scrub_on_error(false),
-   num_digest_updates_pending(0),
-   state(INACTIVE),
-   deep(false)
-{}
-
-PG::Scrubber::~Scrubber() {}
 
 bool PG::op_has_sufficient_caps(OpRequestRef& op)
 {
@@ -414,20 +409,6 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
   return cap;
 }
 
-bool PG::requeue_scrub(bool high_priority)
-{
-  ceph_assert(ceph_mutex_is_locked(_lock));
-  if (scrub_queued) {
-    dout(10) << __func__ << ": already queued" << dendl;
-    return false;
-  } else {
-    dout(10) << __func__ << ": queueing" << dendl;
-    scrub_queued = true;
-    osd->queue_for_scrub(this, high_priority);
-    return true;
-  }
-}
-
 void PG::queue_recovery()
 {
   if (!is_primary() || !is_peered()) {
@@ -442,41 +423,32 @@ void PG::queue_recovery()
   }
 }
 
-bool PG::queue_scrub()
+void PG::queue_scrub_after_repair()
 {
+  dout(10) << __func__ << dendl;
   ceph_assert(ceph_mutex_is_locked(_lock));
-  if (is_scrubbing()) {
-    return false;
+
+  m_planned_scrub.must_deep_scrub = true;
+  m_planned_scrub.check_repair = true;
+  m_planned_scrub.must_scrub = true;
+
+  if (is_scrub_queued_or_active()) {
+    dout(10) << __func__ << ": scrubbing already ("
+             << (is_scrubbing() ? "active)" : "queued)") << dendl;
+    return;
   }
-  // An interrupted recovery repair could leave this set.
-  state_clear(PG_STATE_REPAIR);
-  if (scrubber.need_auto) {
-    scrubber.must_scrub = true;
-    scrubber.must_deep_scrub = true;
-    scrubber.auto_repair = true;
-    scrubber.need_auto = false;
-  }
-  scrubber.priority = scrubber.must_scrub ?
-         cct->_conf->osd_requested_scrub_priority : get_scrub_priority();
-  scrubber.must_scrub = false;
-  state_set(PG_STATE_SCRUBBING);
-  if (scrubber.must_deep_scrub) {
-    state_set(PG_STATE_DEEP_SCRUB);
-    scrubber.must_deep_scrub = false;
-  }
-  if (scrubber.must_repair || scrubber.auto_repair) {
-    state_set(PG_STATE_REPAIR);
-    scrubber.must_repair = false;
-  }
-  requeue_scrub();
-  return true;
+
+  m_scrubber->set_op_parameters(m_planned_scrub);
+  dout(15) << __func__ << ": queueing" << dendl;
+
+  osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
 }
 
 unsigned PG::get_scrub_priority()
 {
   // a higher value -> a higher priority
-  int64_t pool_scrub_priority = 0;
-  pool.info.opts.get(pool_opts_t::SCRUB_PRIORITY, &pool_scrub_priority);
+  int64_t pool_scrub_priority =
+    pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
   return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
 }
 
@@ -494,8 +466,11 @@ Context *PG::finish_recovery()
   return finish_sync_event;
 }
 
-void PG::_finish_recovery(Context *c)
+void PG::_finish_recovery(Contextc)
 {
+  dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
+                << is_clean() << dendl;
+
   std::scoped_lock locker{*this};
   if (recovery_state.is_deleting() || !is_clean()) {
     dout(10) << __func__ << " raced with delete or repair" << dendl;
@@ -504,7 +479,7 @@ void PG::_finish_recovery(Context *c)
   // When recovery is initiated by a repair, that flag is left on
   state_clear(PG_STATE_REPAIR);
   if (c == finish_sync_event) {
-    dout(10) << "_finish_recovery" << dendl;
+    dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
     finish_sync_event = 0;
     recovery_state.purge_strays();
 
@@ -513,11 +488,7 @@ void PG::_finish_recovery(Context *c)
     if (scrub_after_recovery) {
       dout(10) << "_finish_recovery requeueing for scrub" << dendl;
       scrub_after_recovery = false;
-      scrubber.must_deep_scrub = true;
-      scrubber.check_repair = true;
-      // We remember whether req_scrub was set when scrub_after_recovery set to true
-      scrubber.req_scrub = save_req_scrub;
-      queue_scrub();
+      queue_scrub_after_repair();
     }
   } else {
     dout(10) << "_finish_recovery -- stale" << dendl;
@@ -543,7 +514,7 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
 {
   dout(10) << "finish_recovery_op " << soid
 #ifdef DEBUG_RECOVERY_OIDS
-          << " (" << recovering_oids << ")" 
+          << " (" << recovering_oids << ")"
 #endif
           << dendl;
   ceph_assert(recovery_ops_active > 0);
@@ -657,8 +628,8 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
       auto q = p->second.begin();
       while (q != p->second.end()) {
        dout(20) << __func__ << " checking  " << *q << dendl;
-       int r = cmp((*q)->begin, begin);
-       if (r == 0 || (r > 0 && (*q)->end < end)) {
+       int rr = cmp((*q)->begin, begin);
+       if (rr == 0 || (rr > 0 && (*q)->end < end)) {
          bv.push_back(*q);
          q = p->second.erase(q);
        } else {
@@ -745,7 +716,7 @@ void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
   }
 }
 
-void PG::clear_recovery_state() 
+void PG::clear_recovery_state()
 {
   dout(10) << "clear_recovery_state" << dendl;
 
@@ -783,13 +754,12 @@ void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
 }
 
 void PG::send_cluster_message(
-  int target, Message *m,
-  epoch_t epoch, bool share_map_update=false)
+  int target, MessageRef m,
+  epoch_t epoch, bool share_map_update)
 {
   ConnectionRef con = osd->get_con_osd_cluster(
     target, get_osdmap_epoch());
   if (!con) {
-    m->put();
     return;
   }
 
@@ -843,14 +813,18 @@ void PG::publish_stats_to_osd()
   if (!is_primary())
     return;
 
+  ceph_assert(m_scrubber);
+  recovery_state.update_stats_wo_resched(
+    [scrubber = m_scrubber.get()](pg_history_t& hist,
+                                  pg_stat_t& info) mutable -> void {
+      info.scrub_sched_status = scrubber->get_schedule();
+    });
+
   std::lock_guard l{pg_stats_publish_lock};
-  auto stats = recovery_state.prepare_stats_for_publish(
-    pg_stats_publish_valid,
-    pg_stats_publish,
-    unstable_stats);
+  auto stats =
+    recovery_state.prepare_stats_for_publish(pg_stats_publish, unstable_stats);
   if (stats) {
-    pg_stats_publish = stats.value();
-    pg_stats_publish_valid = true;
+    pg_stats_publish = std::move(stats);
   }
 }
 
@@ -863,7 +837,7 @@ void PG::clear_publish_stats()
 {
   dout(15) << "clear_stats" << dendl;
   std::lock_guard l{pg_stats_publish_lock};
-  pg_stats_publish_valid = false;
+  pg_stats_publish.reset();
 }
 
 /**
@@ -886,12 +860,11 @@ void PG::init(
   const vector<int>& newacting, int new_acting_primary,
   const pg_history_t& history,
   const PastIntervals& pi,
-  bool backfill,
   ObjectStore::Transaction &t)
 {
   recovery_state.init(
     role, newup, new_up_primary, newacting,
-    new_acting_primary, history, pi, backfill, t);
+    new_acting_primary, history, pi, t);
 }
 
 void PG::shutdown()
@@ -1012,7 +985,6 @@ int PG::peek_map_epoch(ObjectStore *store,
                       epoch_t *pepoch)
 {
   coll_t coll(pgid);
-  ghobject_t legacy_infos_oid(OSD::make_infos_oid());
   ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
   epoch_t cur_epoch = 0;
 
@@ -1173,7 +1145,7 @@ void PG::read_state(ObjectStore *store)
   // init pool options
   store->set_collection_opts(ch, pool.info.opts);
 
-  PeeringCtx rctx(ceph_release_t::unknown);
+  PeeringCtx rctx;
   handle_initialize(rctx);
   // note: we don't activate here because we know the OSD will advance maps
   // during boot.
@@ -1185,16 +1157,14 @@ void PG::update_snap_map(
   const vector<pg_log_entry_t> &log_entries,
   ObjectStore::Transaction &t)
 {
-  for (vector<pg_log_entry_t>::const_iterator i = log_entries.begin();
-       i != log_entries.end();
-       ++i) {
+  for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
     OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
     if (i->soid.snap < CEPH_MAXSNAP) {
       if (i->is_delete()) {
        int r = snap_mapper.remove_oid(
          i->soid,
          &_t);
-       if (r != 0)
+       if (r)
          derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
         // On removal tolerate missing key corruption
         ceph_assert(r == 0 || r == -ENOENT);
@@ -1266,9 +1236,7 @@ void PG::filter_snapc(vector<snapid_t> &snaps)
 
 void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
 {
-  for (map<hobject_t, list<OpRequestRef>>::iterator it = m.begin();
-       it != m.end();
-       ++it)
+  for (auto it = m.begin(); it != m.end(); ++it)
     requeue_ops(it->second);
   m.clear();
 }
@@ -1330,233 +1298,280 @@ void PG::requeue_map_waiters()
   }
 }
 
+bool PG::get_must_scrub() const
+{
+  dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
+  return m_planned_scrub.must_scrub;
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
+{
+  return m_scrubber->scrub_requeue_priority(with_priority);
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
+{
+  return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
+}
 
 // ==========================================================================================
 // SCRUB
 
 /*
- * when holding pg and sched_scrub_lock, then the states are:
- *   scheduling:
- *     scrubber.local_reserved = true
- *     scrubber.active = false
- *     scrubber.reserved_peers includes whoami
- *     osd->scrubs_local++
- *   scheduling, replica declined:
- *     scrubber.local_reserved = true
- *     scrubber.reserved_peers includes -1
- *     osd->scrub_local++
- *   pending:
- *     scrubber.local_reserved = true
- *     scrubber.active = false
- *     scrubber.reserved_peers.size() == acting.size();
- *     pg on scrub_wq
- *     osd->scrub_local++
- *   scrubbing:
- *     scrubber.local_reserved = true;
- *     scrubber.active = true
- *     scrubber.reserved_peers empty
+ *  implementation note:
+ *  PG::sched_scrub() is called only once per a specific scrub session.
+ *  That call commits us to the whatever choices are made (deep/shallow, etc').
+ *  Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
+ *  PgScrubber's m_flags, then cleared.
  */
-
-// returns true if a scrub has been newly kicked off
-bool PG::sched_scrub()
+Scrub::schedule_result_t PG::sched_scrub()
 {
+  dout(15) << __func__ << " pg(" << info.pgid
+         << (is_active() ? ") <active>" : ") <not-active>")
+         << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
   ceph_assert(ceph_mutex_is_locked(_lock));
-  ceph_assert(!is_scrubbing());
-  if (!(is_primary() && is_active() && is_clean())) {
+  ceph_assert(m_scrubber);
+
+  if (is_scrub_queued_or_active()) {
+    return Scrub::schedule_result_t::already_started;
+  }
+
+  if (!is_primary() || !is_active() || !is_clean()) {
+    return Scrub::schedule_result_t::bad_pg_state;
+  }
+
+  // analyse the combination of the requested scrub flags, the osd/pool configuration
+  // and the PG status to determine whether we should scrub now, and what type of scrub
+  // should that be.
+  auto updated_flags = verify_scrub_mode();
+  if (!updated_flags) {
+    // the stars do not align for starting a scrub for this PG at this time
+    // (due to configuration or priority issues)
+    // The reason was already reported by the callee.
+    dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
+    return Scrub::schedule_result_t::preconditions;
+  }
+
+  // try to reserve the local OSD resources. If failing: no harm. We will
+  // be retried by the OSD later on.
+  if (!m_scrubber->reserve_local()) {
+    dout(10) << __func__ << ": failed to reserve locally" << dendl;
+    return Scrub::schedule_result_t::no_local_resources;
+  }
+
+  // can commit to the updated flags now, as nothing will stop the scrub
+  m_planned_scrub = *updated_flags;
+
+  // An interrupted recovery repair could leave this set.
+  state_clear(PG_STATE_REPAIR);
+
+  // Pass control to the scrubber. It is the scrubber that handles the replicas'
+  // resources reservations.
+  m_scrubber->set_op_parameters(m_planned_scrub);
+
+  dout(10) << __func__ << ": queueing" << dendl;
+  osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
+  return Scrub::schedule_result_t::scrub_initiated;
+}
+
+double PG::next_deepscrub_interval() const
+{
+  double deep_scrub_interval =
+    pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
+  if (deep_scrub_interval <= 0.0)
+    deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
+  return info.history.last_deep_scrub_stamp + deep_scrub_interval;
+}
+
+bool PG::is_time_for_deep(bool allow_deep_scrub,
+                         bool allow_scrub,
+                         bool has_deep_errors,
+                         const requested_scrub_t& planned) const
+{
+  dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? "
+          << allow_deep_scrub << dendl;
+
+  if (!allow_deep_scrub)
     return false;
+
+  if (planned.need_auto) {
+    dout(10) << __func__ << ": need repair after scrub errors" << dendl;
+    return true;
   }
 
-  // All processing the first time through commits us to whatever
-  // choices are made.
-  if (!scrubber.local_reserved) {
-    dout(20) << __func__ << ": Start processing pg " << info.pgid << dendl;
+  if (ceph_clock_now() >= next_deepscrub_interval()) {
+    dout(20) << __func__ << ": now (" << ceph_clock_now() << ") >= time for deep ("
+            << next_deepscrub_interval() << ")" << dendl;
+    return true;
+  }
 
-    bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
-                      pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
-    bool allow_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
-                 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
-    bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
-    bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair
-                               && get_pgbackend()->auto_repair_supported());
+  if (has_deep_errors) {
+    osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
+                     << " Deep scrub errors, upgrading scrub to deep-scrub";
+    return true;
+  }
 
-    scrubber.time_for_deep = false;
-    // Clear these in case user issues the scrub/repair command during
-    // the scheduling of the scrub/repair (e.g. request reservation)
-    scrubber.deep_scrub_on_error = false;
-    scrubber.auto_repair = false;
+  // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
+  // called often, we will probably be deep-scrubbing most of the time.
+  if (allow_scrub) {
+    bool deep_coin_flip =
+      (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
 
-    // All periodic scrub handling goes here because must_scrub is
-    // always set for must_deep_scrub and must_repair.
-    if (!scrubber.must_scrub) {
-      ceph_assert(!scrubber.must_deep_scrub && !scrubber.must_repair);
-      // Handle deep scrub determination only if allowed
-      if (allow_deep_scrub) {
-        // Initial entry and scheduled scrubs without nodeep_scrub set get here
-        if (scrubber.need_auto) {
-         dout(20) << __func__ << ": need repair after scrub errors" << dendl;
-          scrubber.time_for_deep = true;
-        } else {
-          double deep_scrub_interval = 0;
-          pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &deep_scrub_interval);
-          if (deep_scrub_interval <= 0) {
-           deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
-          }
-          scrubber.time_for_deep = ceph_clock_now() >=
-                 info.history.last_deep_scrub_stamp + deep_scrub_interval;
-
-          bool deep_coin_flip = false;
-         // If we randomize when !allow_scrub && allow_deep_scrub, then it guarantees
-         // we will deep scrub because this function is called often.
-         if (!scrubber.time_for_deep && allow_scrub)
-           deep_coin_flip = (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
-          dout(20) << __func__ << ": time_for_deep=" << scrubber.time_for_deep << " deep_coin_flip=" << deep_coin_flip << dendl;
-
-          scrubber.time_for_deep = (scrubber.time_for_deep || deep_coin_flip);
-        }
+    dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
+            << " deep_coin_flip=" << deep_coin_flip << dendl;
 
-        if (!scrubber.time_for_deep && has_deep_errors) {
-         osd->clog->info() << "osd." << osd->whoami
-                           << " pg " << info.pgid
-                           << " Deep scrub errors, upgrading scrub to deep-scrub";
-         scrubber.time_for_deep = true;
-        }
+    if (deep_coin_flip)
+      return true;
+  }
 
-        if (try_to_auto_repair) {
-          if (scrubber.time_for_deep) {
-            dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
-            scrubber.auto_repair = true;
-          } else if (allow_scrub) {
-            dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found" << dendl;
-            scrubber.deep_scrub_on_error = true;
-          }
-        }
-      } else { // !allow_deep_scrub
-        dout(20) << __func__ << ": nodeep_scrub set" << dendl;
-        if (has_deep_errors) {
-          osd->clog->error() << "osd." << osd->whoami
-                            << " pg " << info.pgid
-                            << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
-          return false;
-        }
-      }
+  return false;
+}
 
-      //NOSCRUB so skip regular scrubs
-      if (!allow_scrub && !scrubber.time_for_deep) {
-        return false;
-      }
-    // scrubber.must_scrub
-    } else if (!scrubber.must_deep_scrub && has_deep_errors) {
-       osd->clog->error() << "osd." << osd->whoami
-                          << " pg " << info.pgid
-                          << " Regular scrub request, deep-scrub details will be lost";
-    }
-    // Unless precluded this was handle above
-    scrubber.need_auto = false;
-
-    ceph_assert(scrubber.reserved_peers.empty());
-    bool allow_scrubing = cct->_conf->osd_scrub_during_recovery ||
-                          (cct->_conf->osd_repair_during_recovery && scrubber.must_repair) ||
-                          !osd->is_recovery_active();
-    if (allow_scrubing &&
-         osd->inc_scrubs_local()) {
-      dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
-      scrubber.local_reserved = true;
-      scrubber.reserved_peers.insert(pg_whoami);
-      scrub_reserve_replicas();
-    } else {
-      dout(20) << __func__ << ": failed to reserve locally" << dendl;
+bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
+                             bool try_to_auto_repair,
+                             bool allow_regular_scrub,
+                             bool has_deep_errors,
+                             requested_scrub_t& planned) const
+
+{
+  ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
+
+  if (!allow_deep_scrub && has_deep_errors) {
+      osd->clog->error()
+       << "osd." << osd->whoami << " pg " << info.pgid
+       << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
       return false;
-    }
   }
 
-  if (scrubber.local_reserved) {
-    if (scrubber.reserve_failed) {
-      dout(20) << __func__ << ": failed, a peer declined" << dendl;
-      clear_scrub_reserved();
-      scrub_unreserve_replicas();
-      return false;
-    } else if (scrubber.reserved_peers.size() == get_actingset().size()) {
-      dout(20) << __func__ << ": success, reserved self and replicas" << dendl;
-      if (scrubber.time_for_deep) {
-       dout(10) << __func__ << ": scrub will be deep" << dendl;
-       state_set(PG_STATE_DEEP_SCRUB);
-       scrubber.time_for_deep = false;
+  if (allow_deep_scrub) {
+    // Initial entry and scheduled scrubs without nodeep_scrub set get here
+
+    planned.time_for_deep =
+      is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
+
+    if (try_to_auto_repair) {
+      if (planned.time_for_deep) {
+       dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
+       planned.auto_repair = true;
+      } else if (allow_regular_scrub) {
+       dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
+                << dendl;
+       planned.deep_scrub_on_error = true;
       }
-      queue_scrub();
-    } else {
-      // none declined, since scrubber.reserved is set
-      dout(20) << __func__ << ": reserved " << scrubber.reserved_peers
-              << ", waiting for replicas" << dendl;
     }
   }
+
+  dout(20) << __func__ << " updated flags: " << planned
+          << " allow_regular_scrub: " << allow_regular_scrub << dendl;
+
+  // NOSCRUB so skip regular scrubs
+  if (!allow_regular_scrub && !planned.time_for_deep) {
+    return false;
+  }
+
   return true;
 }
 
-bool PG::is_scrub_registered()
+std::optional<requested_scrub_t> PG::verify_scrub_mode() const
 {
-  return !scrubber.scrub_reg_stamp.is_zero();
+  const bool allow_regular_scrub =
+    !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
+      pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
+  const bool allow_deep_scrub =
+    allow_regular_scrub &&
+    !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
+      pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
+  const bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
+  const bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair &&
+                                   get_pgbackend()->auto_repair_supported());
+
+  dout(10) << __func__ << " pg: " << info.pgid
+           << " allow: " << allow_regular_scrub << "/" << allow_deep_scrub
+           << " deep errs: " << has_deep_errors
+           << " auto-repair: " << try_to_auto_repair << " ("
+           << cct->_conf->osd_scrub_auto_repair << ")" << dendl;
+
+  auto upd_flags = m_planned_scrub;
+
+  upd_flags.time_for_deep = false;
+  // Clear these in case user issues the scrub/repair command during
+  // the scheduling of the scrub/repair (e.g. request reservation)
+  upd_flags.deep_scrub_on_error = false;
+  upd_flags.auto_repair = false;
+
+  if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
+    osd->clog->error()
+      << "osd." << osd->whoami << " pg " << info.pgid
+      << " Regular scrub request, deep-scrub details will be lost";
+  }
+
+  if (!upd_flags.must_scrub) {
+    // All periodic scrub handling goes here because must_scrub is
+    // always set for must_deep_scrub and must_repair.
+
+    const bool can_start_periodic = verify_periodic_scrub_mode(
+      allow_deep_scrub, try_to_auto_repair, allow_regular_scrub,
+      has_deep_errors, upd_flags);
+    if (!can_start_periodic) {
+      // "I don't want no scrub"
+      dout(20) << __func__ << ": no periodic scrubs allowed" << dendl;
+      return std::nullopt;
+    }
+  }
+
+  //  scrubbing while recovering?
+
+  bool prevented_by_recovery =
+    osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
+    (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
+
+  if (prevented_by_recovery) {
+    dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
+    return std::nullopt;
+  }
+
+  upd_flags.need_auto = false;
+  return upd_flags;
 }
 
-void PG::reg_next_scrub()
+/*
+ * Note: on_info_history_change() is used in those two cases where we're not sure
+ * whether the role of the PG was changed, and if so - was this change relayed to the
+ * scrub-queue.
+ */
+void PG::on_info_history_change()
 {
-  if (!is_primary())
-    return;
+  dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
 
-  utime_t reg_stamp;
-  bool must = false;
-  if (scrubber.must_scrub || scrubber.need_auto) {
-    // Set the smallest time that isn't utime_t()
-    reg_stamp = Scrubber::scrub_must_stamp();
-    must = true;
-  } else if (info.stats.stats_invalid && cct->_conf->osd_scrub_invalid_stats) {
-    reg_stamp = ceph_clock_now();
-    must = true;
-  } else {
-    reg_stamp = info.history.last_scrub_stamp;
-  }
-  // note down the sched_time, so we can locate this scrub, and remove it
-  // later on.
-  double scrub_min_interval = 0, scrub_max_interval = 0;
-  pool.info.opts.get(pool_opts_t::SCRUB_MIN_INTERVAL, &scrub_min_interval);
-  pool.info.opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &scrub_max_interval);
-  ceph_assert(!is_scrub_registered());
-  scrubber.scrub_reg_stamp = osd->reg_pg_scrub(info.pgid,
-                                              reg_stamp,
-                                              scrub_min_interval,
-                                              scrub_max_interval,
-                                              must);
-  dout(10) << __func__ << " pg " << pg_id << " register next scrub, scrub time "
-      << scrubber.scrub_reg_stamp << ", must = " << (int)must << dendl;
+  ceph_assert(m_scrubber);
+  m_scrubber->on_maybe_registration_change(m_planned_scrub);
 }
 
-void PG::unreg_next_scrub()
+void PG::reschedule_scrub()
 {
-  if (is_scrub_registered()) {
-    osd->unreg_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
-    scrubber.scrub_reg_stamp = utime_t();
+  dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+  // we are assuming no change in primary status
+  if (is_primary()) {
+    ceph_assert(m_scrubber);
+    m_scrubber->update_scrub_job(m_planned_scrub);
   }
 }
 
-void PG::on_info_history_change()
+void PG::on_primary_status_change(bool was_primary, bool now_primary)
 {
-  unreg_next_scrub();
-  reg_next_scrub();
+  // make sure we have a working scrubber when becoming a primary
+
+  if (was_primary != now_primary) {
+    ceph_assert(m_scrubber);
+    m_scrubber->on_primary_change(m_planned_scrub);
+  }
 }
 
-void PG::scrub_requested(bool deep, bool repair, bool need_auto)
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
 {
-  unreg_next_scrub();
-  if (need_auto) {
-    scrubber.need_auto = true;
-  } else {
-    scrubber.must_scrub = true;
-    scrubber.must_deep_scrub = deep || repair;
-    scrubber.must_repair = repair;
-    // User might intervene, so clear this
-    scrubber.need_auto = false;
-    scrubber.req_scrub = true;
-  }
-  reg_next_scrub();
+  ceph_assert(m_scrubber);
+  m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
 }
 
 void PG::clear_ready_to_merge() {
@@ -1576,10 +1591,17 @@ void PG::on_role_change() {
   plpg_on_role_change();
 }
 
-void PG::on_new_interval() {
-  scrub_queued = false;
+void PG::on_new_interval()
+{
   projected_last_update = eversion_t();
   cancel_recovery();
+
+  assert(m_scrubber);
+  // log some scrub data before we react to the interval
+  dout(20) << __func__ << (is_scrub_queued_or_active() ? " scrubbing " : " ")
+           << "flags: " << m_planned_scrub << dendl;
+
+  m_scrubber->on_maybe_registration_change(m_planned_scrub);
 }
 
 epoch_t PG::oldest_stored_osdmap() {
@@ -1611,15 +1633,15 @@ void PG::schedule_event_after(
 
 void PG::request_local_background_io_reservation(
   unsigned priority,
-  PGPeeringEventRef on_grant,
-  PGPeeringEventRef on_preempt) {
+  PGPeeringEventURef on_grant,
+  PGPeeringEventURef on_preempt) {
   osd->local_reserver.request_reservation(
     pg_id,
     on_grant ? new QueuePeeringEvt(
-      this, on_grant) : nullptr,
+      this, std::move(on_grant)) : nullptr,
     priority,
     on_preempt ? new QueuePeeringEvt(
-      this, on_preempt) : nullptr);
+      this, std::move(on_preempt)) : nullptr);
 }
 
 void PG::update_local_background_io_priority(
@@ -1636,15 +1658,15 @@ void PG::cancel_local_background_io_reservation() {
 
 void PG::request_remote_recovery_reservation(
   unsigned priority,
-  PGPeeringEventRef on_grant,
-  PGPeeringEventRef on_preempt) {
+  PGPeeringEventURef on_grant,
+  PGPeeringEventURef on_preempt) {
   osd->remote_reserver.request_reservation(
     pg_id,
     on_grant ? new QueuePeeringEvt(
-      this, on_grant) : nullptr,
+      this, std::move(on_grant)) : nullptr,
     priority,
     on_preempt ? new QueuePeeringEvt(
-      this, on_preempt) : nullptr);
+      this, std::move(on_preempt)) : nullptr);
 }
 
 void PG::cancel_remote_recovery_reservation() {
@@ -1659,6 +1681,15 @@ void PG::schedule_event_on_commit(
   t.register_on_commit(new QueuePeeringEvt(this, on_commit));
 }
 
+void PG::on_activate(interval_set<snapid_t> snaps)
+{
+  ceph_assert(!m_scrubber->are_callbacks_pending());
+  ceph_assert(callbacks_for_degraded_object.empty());
+  snap_trimq = snaps;
+  release_pg_backoffs();
+  projected_last_update = info.last_update;
+}
+
 void PG::on_active_exit()
 {
   backfill_reserving = false;
@@ -1864,133 +1895,6 @@ void PG::on_activate_committed()
   }
 }
 
-void PG::do_replica_scrub_map(OpRequestRef op)
-{
-  auto m = op->get_req<MOSDRepScrubMap>();
-  dout(7) << __func__ << " " << *m << dendl;
-  if (m->map_epoch < info.history.same_interval_since) {
-    dout(10) << __func__ << " discarding old from "
-            << m->map_epoch << " < " << info.history.same_interval_since
-            << dendl;
-    return;
-  }
-  if (!scrubber.is_chunky_scrub_active()) {
-    dout(10) << __func__ << " scrub isn't active" << dendl;
-    return;
-  }
-
-  op->mark_started();
-
-  auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
-  scrubber.received_maps[m->from].decode(p, info.pgid.pool());
-  dout(10) << "map version is "
-          << scrubber.received_maps[m->from].valid_through
-          << dendl;
-
-  dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
-          << dendl;
-  ceph_assert(scrubber.waiting_on_whom.count(m->from));
-  scrubber.waiting_on_whom.erase(m->from);
-  if (m->preempted) {
-    dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
-    scrub_preempted = true;
-  }
-  if (scrubber.waiting_on_whom.empty()) {
-    requeue_scrub(ops_blocked_by_scrub());
-  }
-}
-
-// send scrub v3 messages (chunky scrub)
-void PG::_request_scrub_map(
-  pg_shard_t replica, eversion_t version,
-  hobject_t start, hobject_t end,
-  bool deep,
-  bool allow_preemption)
-{
-  ceph_assert(replica != pg_whoami);
-  dout(10) << "scrub  requesting scrubmap from osd." << replica
-          << " deep " << (int)deep << dendl;
-  MOSDRepScrub *repscrubop = new MOSDRepScrub(
-    spg_t(info.pgid.pgid, replica.shard), version,
-    get_osdmap_epoch(),
-    get_last_peering_reset(),
-    start, end, deep,
-    allow_preemption,
-    scrubber.priority,
-    ops_blocked_by_scrub());
-  // default priority, we want the rep scrub processed prior to any recovery
-  // or client io messages (we are holding a lock!)
-  osd->send_message_osd_cluster(
-    replica.osd, repscrubop, get_osdmap_epoch());
-}
-
-void PG::handle_scrub_reserve_request(OpRequestRef op)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  if (scrubber.remote_reserved) {
-    dout(10) << __func__ << " ignoring reserve request: Already reserved"
-            << dendl;
-    return;
-  }
-  if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
-      osd->inc_scrubs_remote()) {
-    scrubber.remote_reserved = true;
-  } else {
-    dout(20) << __func__ << ": failed to reserve remotely" << dendl;
-    scrubber.remote_reserved = false;
-  }
-  auto m = op->get_req<MOSDScrubReserve>();
-  Message *reply = new MOSDScrubReserve(
-    spg_t(info.pgid.pgid, get_primary().shard),
-    m->map_epoch,
-    scrubber.remote_reserved ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT,
-    pg_whoami);
-  osd->send_message_osd_cluster(reply, op->get_req()->get_connection());
-}
-
-void PG::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  if (!scrubber.local_reserved) {
-    dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
-    return;
-  }
-  if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
-    dout(10) << " already had osd." << from << " reserved" << dendl;
-  } else {
-    dout(10) << " osd." << from << " scrub reserve = success" << dendl;
-    scrubber.reserved_peers.insert(from);
-    sched_scrub();
-  }
-}
-
-void PG::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  if (!scrubber.local_reserved) {
-    dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
-    return;
-  }
-  if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
-    dout(10) << " already had osd." << from << " reserved" << dendl;
-  } else {
-    /* One decline stops this pg from being scheduled for scrubbing. */
-    dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
-    scrubber.reserve_failed = true;
-    sched_scrub();
-  }
-}
-
-void PG::handle_scrub_reserve_release(OpRequestRef op)
-{
-  dout(7) << __func__ << " " << *op->get_req() << dendl;
-  op->mark_started();
-  clear_scrub_reserved();
-}
-
 // Compute pending backfill data
 static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
 {
@@ -2078,62 +1982,6 @@ bool PG::try_reserve_recovery_space(
 void PG::unreserve_recovery_space() {
   primary_num_bytes.store(0);
   local_num_bytes.store(0);
-  return;
-}
-
-void PG::clear_scrub_reserved()
-{
-  scrubber.reserved_peers.clear();
-  scrubber.reserve_failed = false;
-
-  if (scrubber.local_reserved) {
-    scrubber.local_reserved = false;
-    osd->dec_scrubs_local();
-  }
-  if (scrubber.remote_reserved) {
-    scrubber.remote_reserved = false;
-    osd->dec_scrubs_remote();
-  }
-}
-
-void PG::scrub_reserve_replicas()
-{
-  ceph_assert(recovery_state.get_backfill_targets().empty());
-  std::vector<std::pair<int, Message*>> messages;
-  messages.reserve(get_actingset().size());
-  epoch_t  e = get_osdmap_epoch();
-  for (set<pg_shard_t>::iterator i = get_actingset().begin();
-      i != get_actingset().end();
-      ++i) {
-    if (*i == pg_whoami) continue;
-    dout(10) << "scrub requesting reserve from osd." << *i << dendl;
-    Message* m =  new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
-                                       MOSDScrubReserve::REQUEST, pg_whoami);
-    messages.push_back(std::make_pair(i->osd, m));
-  }
-  if (!messages.empty()) {
-    osd->send_message_osd_cluster(messages, e);
-  }
-}
-
-void PG::scrub_unreserve_replicas()
-{
-  ceph_assert(recovery_state.get_backfill_targets().empty());
-  std::vector<std::pair<int, Message*>> messages;
-  messages.reserve(get_actingset().size());
-  epoch_t e = get_osdmap_epoch();
-  for (set<pg_shard_t>::iterator i = get_actingset().begin();
-       i != get_actingset().end();
-       ++i) {
-    if (*i == pg_whoami) continue;
-    dout(10) << "scrub requesting unreserve from osd." << *i << dendl;
-    Message* m =  new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
-                                       MOSDScrubReserve::RELEASE, pg_whoami);
-    messages.push_back(std::make_pair(i->osd, m));
-  }
-  if (!messages.empty()) {
-    osd->send_message_osd_cluster(messages, e);
-  }
 }
 
 void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
@@ -2160,111 +2008,6 @@ void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
   }
 }
 
-void PG::_scan_snaps(ScrubMap &smap) 
-{
-  hobject_t head;
-  SnapSet snapset;
-
-  // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify 
-  // caller using clean_meta_map(), and it works properly.
-  dout(20) << __func__ << " start" << dendl;
-
-  for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
-       i != smap.objects.rend();
-       ++i) {
-    const hobject_t &hoid = i->first;
-    ScrubMap::object &o = i->second;
-
-    dout(20) << __func__ << " " << hoid << dendl;
-
-    ceph_assert(!hoid.is_snapdir());
-    if (hoid.is_head()) {
-      // parse the SnapSet
-      bufferlist bl;
-      if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
-       continue;
-      }
-      bl.push_back(o.attrs[SS_ATTR]);
-      auto p = bl.cbegin();
-      try {
-       decode(snapset, p);
-      } catch(...) {
-       continue;
-      }
-      head = hoid.get_head();
-      continue;
-    }
-    if (hoid.snap < CEPH_MAXSNAP) {
-      // check and if necessary fix snap_mapper
-      if (hoid.get_head() != head) {
-       derr << __func__ << " no head for " << hoid << " (have " << head << ")"
-            << dendl;
-       continue;
-      }
-      set<snapid_t> obj_snaps;
-      auto p = snapset.clone_snaps.find(hoid.snap);
-      if (p == snapset.clone_snaps.end()) {
-       derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset
-            << dendl;
-       continue;
-      }
-      obj_snaps.insert(p->second.begin(), p->second.end());
-      set<snapid_t> cur_snaps;
-      int r = snap_mapper.get_snaps(hoid, &cur_snaps);
-      if (r != 0 && r != -ENOENT) {
-       derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
-       ceph_abort();
-      }
-      if (r == -ENOENT || cur_snaps != obj_snaps) {
-       ObjectStore::Transaction t;
-       OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
-       if (r == 0) {
-         r = snap_mapper.remove_oid(hoid, &_t);
-         if (r != 0) {
-           derr << __func__ << ": remove_oid returned " << cpp_strerror(r)
-                << dendl;
-           ceph_abort();
-         }
-         osd->clog->error() << "osd." << osd->whoami
-                           << " found snap mapper error on pg "
-                           << info.pgid
-                           << " oid " << hoid << " snaps in mapper: "
-                           << cur_snaps << ", oi: "
-                           << obj_snaps
-                           << "...repaired";
-       } else {
-         osd->clog->error() << "osd." << osd->whoami
-                           << " found snap mapper error on pg "
-                           << info.pgid
-                           << " oid " << hoid << " snaps missing in mapper"
-                           << ", should be: "
-                           << obj_snaps
-                            << " was " << cur_snaps << " r " << r
-                           << "...repaired";
-       }
-       snap_mapper.add_oid(hoid, obj_snaps, &_t);
-
-       // wait for repair to apply to avoid confusing other bits of the system.
-       {
-         ceph::condition_variable my_cond;
-         ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
-         int r = 0;
-         bool done;
-         t.register_on_applied_sync(
-           new C_SafeCond(my_lock, my_cond, &done, &r));
-         r = osd->store->queue_transaction(ch, std::move(t));
-         if (r != 0) {
-           derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
-                << dendl;
-         } else {
-           std::unique_lock l{my_lock};
-           my_cond.wait(l, [&done] { return done;});
-         }
-       }
-      }
-    }
-  }
-}
 
 void PG::_repair_oinfo_oid(ScrubMap &smap)
 {
@@ -2311,82 +2054,6 @@ void PG::_repair_oinfo_oid(ScrubMap &smap)
     }
   }
 }
-int PG::build_scrub_map_chunk(
-  ScrubMap &map,
-  ScrubMapBuilder &pos,
-  hobject_t start,
-  hobject_t end,
-  bool deep,
-  ThreadPool::TPHandle &handle)
-{
-  dout(10) << __func__ << " [" << start << "," << end << ") "
-          << " pos " << pos
-          << dendl;
-
-  // start
-  while (pos.empty()) {
-    pos.deep = deep;
-    map.valid_through = info.last_update;
-
-    // objects
-    vector<ghobject_t> rollback_obs;
-    pos.ret = get_pgbackend()->objects_list_range(
-      start,
-      end,
-      &pos.ls,
-      &rollback_obs);
-    if (pos.ret < 0) {
-      dout(5) << "objects_list_range error: " << pos.ret << dendl;
-      return pos.ret;
-    }
-    if (pos.ls.empty()) {
-      break;
-    }
-    _scan_rollback_obs(rollback_obs);
-    pos.pos = 0;
-    return -EINPROGRESS;
-  }
-
-  // scan objects
-  while (!pos.done()) {
-    int r = get_pgbackend()->be_scan_list(map, pos);
-    if (r == -EINPROGRESS) {
-      return r;
-    }
-  }
-
-  // finish
-  dout(20) << __func__ << " finishing" << dendl;
-  ceph_assert(pos.done());
-  _repair_oinfo_oid(map);
-  if (!is_primary()) {
-    ScrubMap for_meta_scrub;
-    // In case we restarted smaller chunk, clear old data
-    scrubber.cleaned_meta_map.clear_from(scrubber.start);
-    scrubber.cleaned_meta_map.insert(map);
-    scrubber.clean_meta_map(for_meta_scrub);
-    _scan_snaps(for_meta_scrub);
-  }
-
-  dout(20) << __func__ << " done, got " << map.objects.size() << " items"
-          << dendl;
-  return 0;
-}
-
-void PG::Scrubber::cleanup_store(ObjectStore::Transaction *t) {
-  if (!store)
-    return;
-  struct OnComplete : Context {
-    std::unique_ptr<Scrub::Store> store;
-    explicit OnComplete(
-      std::unique_ptr<Scrub::Store> &&store)
-      : store(std::move(store)) {}
-    void finish(int) override {}
-  };
-  store->cleanup(t);
-  t->register_on_complete(new OnComplete(std::move(store)));
-  ceph_assert(!store);
-}
 
 void PG::repair_object(
   const hobject_t &soid,
@@ -2427,950 +2094,71 @@ void PG::repair_object(
   recovery_state.force_object_missing(bad_peers, soid, oi.version);
 }
 
-/* replica_scrub
- *
- * Wait for last_update_applied to match msg->scrub_to as above. Wait
- * for pushes to complete in case of recent recovery. Build a single
- * scrubmap of objects that are in the range [msg->start, msg->end).
- */
-void PG::replica_scrub(
-  OpRequestRef op,
-  ThreadPool::TPHandle &handle)
+void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
 {
-  auto msg = op->get_req<MOSDRepScrub>();
-  ceph_assert(!scrubber.active_rep_scrub);
-  dout(7) << "replica_scrub" << dendl;
-
-  if (msg->map_epoch < info.history.same_interval_since) {
-    dout(10) << "replica_scrub discarding old replica_scrub from "
-            << msg->map_epoch << " < " << info.history.same_interval_since 
-            << dendl;
-    return;
-  }
-
-  ceph_assert(msg->chunky);
-  if (active_pushes > 0) {
-    dout(10) << "waiting for active pushes to finish" << dendl;
-    scrubber.active_rep_scrub = op;
-    return;
-  }
-
-  scrubber.state = Scrubber::BUILD_MAP_REPLICA;
-  scrubber.replica_scrub_start = msg->min_epoch;
-  scrubber.start = msg->start;
-  scrubber.end = msg->end;
-  scrubber.max_end = msg->end;
-  scrubber.deep = msg->deep;
-  scrubber.epoch_start = info.history.same_interval_since;
-  if (msg->priority) {
-    scrubber.priority = msg->priority;
+  dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
+  ceph_assert(m_scrubber);
+  if (is_active()) {
+    ((*m_scrubber).*fn)(epoch_queued);
   } else {
-    scrubber.priority = get_scrub_priority();
-  }
-
-  scrub_can_preempt = msg->allow_preemption;
-  scrub_preempted = false;
-  scrubber.replica_scrubmap_pos.reset();
-
-  requeue_scrub(msg->high_priority);
-}
-
-/* Scrub:
- * PG_STATE_SCRUBBING is set when the scrub is queued
- * 
- * scrub will be chunky if all OSDs in PG support chunky scrub
- * scrub will fail if OSDs are too old.
- */
-void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
-{
-  OSDService *osds = osd;
-  double scrub_sleep = osds->osd->scrub_sleep_time(scrubber.must_scrub);
-  if (scrub_sleep > 0 &&
-      (scrubber.state == PG::Scrubber::NEW_CHUNK ||
-       scrubber.state == PG::Scrubber::INACTIVE) &&
-       scrubber.needs_sleep) {
-    ceph_assert(!scrubber.sleeping);
-    dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
-
-    // Do an async sleep so we don't block the op queue
-    spg_t pgid = get_pgid();
-    int state = scrubber.state;
-    auto scrub_requeue_callback =
-        new LambdaContext([osds, pgid, state](int r) {
-          PGRef pg = osds->osd->lookup_lock_pg(pgid);
-          if (pg == nullptr) {
-            lgeneric_dout(osds->osd->cct, 20)
-                << "scrub_requeue_callback: Could not find "
-                << "PG " << pgid << " can't complete scrub requeue after sleep"
-                << dendl;
-            return;
-          }
-          pg->scrubber.sleeping = false;
-          pg->scrubber.needs_sleep = false;
-          lgeneric_dout(pg->cct, 20)
-              << "scrub_requeue_callback: slept for "
-              << ceph_clock_now() - pg->scrubber.sleep_start
-              << ", re-queuing scrub with state " << state << dendl;
-          pg->scrub_queued = false;
-          pg->requeue_scrub();
-          pg->scrubber.sleep_start = utime_t();
-          pg->unlock();
-        });
-    std::lock_guard l(osd->sleep_lock);
-    osd->sleep_timer.add_event_after(scrub_sleep,
-                                           scrub_requeue_callback);
-    scrubber.sleeping = true;
-    scrubber.sleep_start = ceph_clock_now();
-    return;
-  }
-  if (pg_has_reset_since(queued)) {
-    return;
-  }
-  ceph_assert(scrub_queued);
-  scrub_queued = false;
-  scrubber.needs_sleep = true;
-
-  // for the replica
-  if (!is_primary() &&
-      scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
-    chunky_scrub(handle);
-    return;
-  }
-
-  if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
-    dout(10) << "scrub -- not primary or active or not clean" << dendl;
-    state_clear(PG_STATE_SCRUBBING);
-    state_clear(PG_STATE_REPAIR);
-    state_clear(PG_STATE_DEEP_SCRUB);
-    publish_stats_to_osd();
-    return;
-  }
-
-  if (!scrubber.active) {
-    ceph_assert(recovery_state.get_backfill_targets().empty());
-
-    scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
-
-    dout(10) << "starting a new chunky scrub" << dendl;
-  }
-
-  chunky_scrub(handle);
-}
-
-void PG::abort_scrub()
-{
-  scrub_clear_state();
-  scrub_unreserve_replicas();
-}
-
-/*
- * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
- * chunk.
- *
- * The object store is partitioned into chunks which end on hash boundaries. For
- * each chunk, the following logic is performed:
- *
- *  (1) Block writes on the chunk
- *  (2) Request maps from replicas
- *  (3) Wait for pushes to be applied (after recovery)
- *  (4) Wait for writes to flush on the chunk
- *  (5) Wait for maps from replicas
- *  (6) Compare / repair all scrub maps
- *  (7) Wait for digest updates to apply
- *
- * This logic is encoded in the mostly linear state machine:
- *
- *           +------------------+
- *  _________v__________        |
- * |                    |       |
- * |      INACTIVE      |       |
- * |____________________|       |
- *           |                  |
- *           |   +----------+   |
- *  _________v___v______    |   |
- * |                    |   |   |
- * |      NEW_CHUNK     |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |     WAIT_PUSHES    |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |  WAIT_LAST_UPDATE  |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |      BUILD_MAP     |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |    WAIT_REPLICAS   |   |   |
- * |____________________|   |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |    COMPARE_MAPS    |   |   |
- * |____________________|   |   |
- *           |              |   |
- *           |              |   |
- *  _________v__________    |   |
- * |                    |   |   |
- * |WAIT_DIGEST_UPDATES |   |   |
- * |____________________|   |   |
- *           |   |          |   |
- *           |   +----------+   |
- *  _________v__________        |
- * |                    |       |
- * |       FINISH       |       |
- * |____________________|       |
- *           |                  |
- *           +------------------+
- *
- * The primary determines the last update from the subset by walking the log. If
- * it sees a log entry pertaining to a file in the chunk, it tells the replicas
- * to wait until that update is applied before building a scrub map. Both the
- * primary and replicas will wait for any active pushes to be applied.
- *
- * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
- *
- * scrubber.state encodes the current state of the scrub (refer to state diagram
- * for details).
- */
-void PG::chunky_scrub(ThreadPool::TPHandle &handle)
-{
-  // check for map changes
-  if (scrubber.is_chunky_scrub_active()) {
-    if (scrubber.epoch_start != info.history.same_interval_since) {
-      dout(10) << "scrub pg changed, aborting" << dendl;
-      abort_scrub();
-      return;
-    }
-  }
-
-  bool done = false;
-  int ret;
-
-  while (!done) {
-    dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
-            << " [" << scrubber.start << "," << scrubber.end << ")"
-            << " max_end " << scrubber.max_end << dendl;
-
-    switch (scrubber.state) {
-      case PG::Scrubber::INACTIVE:
-        dout(10) << "scrub start" << dendl;
-       ceph_assert(is_primary());
-
-        publish_stats_to_osd();
-        scrubber.epoch_start = info.history.same_interval_since;
-        scrubber.active = true;
-
-       {
-         ObjectStore::Transaction t;
-         scrubber.cleanup_store(&t);
-         scrubber.store.reset(Scrub::Store::create(osd->store, &t,
-                                                   info.pgid, coll));
-         osd->store->queue_transaction(ch, std::move(t), nullptr);
-       }
-
-        // Don't include temporary objects when scrubbing
-        scrubber.start = info.pgid.pgid.get_hobj_start();
-        scrubber.state = PG::Scrubber::NEW_CHUNK;
-
-       {
-         bool repair = state_test(PG_STATE_REPAIR);
-         bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
-         const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-         stringstream oss;
-         oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
-         osd->clog->debug(oss);
-       }
-
-       scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
-         "osd_scrub_max_preemptions");
-       scrubber.preempt_divisor = 1;
-        break;
-
-      case PG::Scrubber::NEW_CHUNK:
-        scrubber.primary_scrubmap = ScrubMap();
-        scrubber.received_maps.clear();
-
-       // begin (possible) preemption window
-       if (scrub_preempted) {
-         scrubber.preempt_left--;
-         scrubber.preempt_divisor *= 2;
-         dout(10) << __func__ << " preempted, " << scrubber.preempt_left
-                  << " left" << dendl;
-         scrub_preempted = false;
-       }
-       scrub_can_preempt = scrubber.preempt_left > 0;
-
-        {
-          /* get the start and end of our scrub chunk
-          *
-          * Our scrub chunk has an important restriction we're going to need to
-          * respect. We can't let head be start or end.
-          * Using a half-open interval means that if end == head,
-          * we'd scrub/lock head and the clone right next to head in different
-          * chunks which would allow us to miss clones created between
-          * scrubbing that chunk and scrubbing the chunk including head.
-          * This isn't true for any of the other clones since clones can
-          * only be created "just to the left of" head.  There is one exception
-          * to this: promotion of clones which always happens to the left of the
-          * left-most clone, but promote_object checks the scrubber in that
-          * case, so it should be ok.  Also, it's ok to "miss" clones at the
-          * left end of the range if we are a tier because they may legitimately
-          * not exist (see _scrub).
-          */
-          ceph_assert(scrubber.preempt_divisor > 0);
-         int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
-                                     scrubber.preempt_divisor);
-         int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
-                                      scrubber.preempt_divisor);
-          hobject_t start = scrubber.start;
-         hobject_t candidate_end;
-         vector<hobject_t> objects;
-         ret = get_pgbackend()->objects_list_partial(
-           start,
-           min,
-           max,
-           &objects,
-           &candidate_end);
-         ceph_assert(ret >= 0);
-
-         if (!objects.empty()) {
-           hobject_t back = objects.back();
-           while (candidate_end.is_head() &&
-                  candidate_end == back.get_head()) {
-             candidate_end = back;
-             objects.pop_back();
-             if (objects.empty()) {
-               ceph_assert(0 ==
-                      "Somehow we got more than 2 objects which"
-                      "have the same head but are not clones");
-             }
-             back = objects.back();
-           }
-           if (candidate_end.is_head()) {
-             ceph_assert(candidate_end != back.get_head());
-             candidate_end = candidate_end.get_object_boundary();
-           }
-         } else {
-           ceph_assert(candidate_end.is_max());
-         }
-
-         if (!_range_available_for_scrub(scrubber.start, candidate_end)) {
-           // we'll be requeued by whatever made us unavailable for scrub
-           dout(10) << __func__ << ": scrub blocked somewhere in range "
-                    << "[" << scrubber.start << ", " << candidate_end << ")"
-                    << dendl;
-           done = true;
-           break;
-         }
-         scrubber.end = candidate_end;
-         if (scrubber.end > scrubber.max_end)
-           scrubber.max_end = scrubber.end;
-        }
-
-        // walk the log to find the latest update that affects our chunk
-        scrubber.subset_last_update = eversion_t();
-       for (auto p = projected_log.log.rbegin();
-            p != projected_log.log.rend();
-            ++p) {
-          if (p->soid >= scrubber.start &&
-             p->soid < scrubber.end) {
-            scrubber.subset_last_update = p->version;
-            break;
-         }
-       }
-       if (scrubber.subset_last_update == eversion_t()) {
-         for (list<pg_log_entry_t>::const_reverse_iterator p =
-                recovery_state.get_pg_log().get_log().log.rbegin();
-              p != recovery_state.get_pg_log().get_log().log.rend();
-              ++p) {
-           if (p->soid >= scrubber.start &&
-               p->soid < scrubber.end) {
-             scrubber.subset_last_update = p->version;
-             break;
-           }
-         }
-       }
-
-        scrubber.state = PG::Scrubber::WAIT_PUSHES;
-        break;
-
-      case PG::Scrubber::WAIT_PUSHES:
-        if (active_pushes == 0) {
-          scrubber.state = PG::Scrubber::WAIT_LAST_UPDATE;
-        } else {
-          dout(15) << "wait for pushes to apply" << dendl;
-          done = true;
-        }
-        break;
-
-      case PG::Scrubber::WAIT_LAST_UPDATE:
-        if (recovery_state.get_last_update_applied() <
-         scrubber.subset_last_update) {
-          // will be requeued by op_applied
-          dout(15) << "wait for EC read/modify/writes to queue" << dendl;
-          done = true;
-         break;
-       }
-
-        // ask replicas to scan
-        scrubber.waiting_on_whom.insert(pg_whoami);
-
-        // request maps from replicas
-       for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
-            i != get_acting_recovery_backfill().end();
-            ++i) {
-         if (*i == pg_whoami) continue;
-          _request_scrub_map(*i, scrubber.subset_last_update,
-                             scrubber.start, scrubber.end, scrubber.deep,
-                            scrubber.preempt_left > 0);
-          scrubber.waiting_on_whom.insert(*i);
-        }
-       dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
-                << dendl;
-
-       scrubber.state = PG::Scrubber::BUILD_MAP;
-       scrubber.primary_scrubmap_pos.reset();
-        break;
-
-      case PG::Scrubber::BUILD_MAP:
-        ceph_assert(recovery_state.get_last_update_applied() >=
-         scrubber.subset_last_update);
-
-        // build my own scrub map
-       if (scrub_preempted) {
-         dout(10) << __func__ << " preempted" << dendl;
-         scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
-         break;
-       }
-       ret = build_scrub_map_chunk(
-         scrubber.primary_scrubmap,
-         scrubber.primary_scrubmap_pos,
-         scrubber.start, scrubber.end,
-         scrubber.deep,
-         handle);
-       if (ret == -EINPROGRESS) {
-         requeue_scrub();
-         done = true;
-         break;
-       }
-       scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
-       break;
-
-      case PG::Scrubber::BUILD_MAP_DONE:
-       if (scrubber.primary_scrubmap_pos.ret < 0) {
-         dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
-                 << ", aborting" << dendl;
-          scrub_clear_state();
-          scrub_unreserve_replicas();
-          return;
-        }
-       dout(10) << __func__ << " waiting_on_whom was "
-                << scrubber.waiting_on_whom << dendl;
-       ceph_assert(scrubber.waiting_on_whom.count(pg_whoami));
-        scrubber.waiting_on_whom.erase(pg_whoami);
-
-        scrubber.state = PG::Scrubber::WAIT_REPLICAS;
-        break;
-
-      case PG::Scrubber::WAIT_REPLICAS:
-        if (!scrubber.waiting_on_whom.empty()) {
-          // will be requeued by do_replica_scrub_map
-          dout(10) << "wait for replicas to build scrub map" << dendl;
-          done = true;
-         break;
-       }
-        // Since repair is only by request and we need to scrub afterward
-        // treat the same as req_scrub.
-        if (!scrubber.req_scrub) {
-          if (state_test(PG_STATE_DEEP_SCRUB)) {
-            if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
-               pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
-              dout(10) << "nodeep_scrub set, aborting" << dendl;
-             abort_scrub();
-              return;
-            }
-          } else if (state_test(PG_STATE_SCRUBBING)) {
-            if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) || pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
-              dout(10) << "noscrub set, aborting" << dendl;
-             abort_scrub();
-              return;
-            }
-          }
-        }
-       // end (possible) preemption window
-       scrub_can_preempt = false;
-       if (scrub_preempted) {
-         dout(10) << __func__ << " preempted, restarting chunk" << dendl;
-         scrubber.state = PG::Scrubber::NEW_CHUNK;
-       } else {
-          scrubber.state = PG::Scrubber::COMPARE_MAPS;
-        }
-        break;
-
-      case PG::Scrubber::COMPARE_MAPS:
-        ceph_assert(recovery_state.get_last_update_applied() >=
-         scrubber.subset_last_update);
-        ceph_assert(scrubber.waiting_on_whom.empty());
-
-        scrub_compare_maps();
-       scrubber.start = scrubber.end;
-       scrubber.run_callbacks();
-
-        // requeue the writes from the chunk that just finished
-        requeue_ops(waiting_for_scrub);
-
-       scrubber.state = PG::Scrubber::WAIT_DIGEST_UPDATES;
-
-       // fall-thru
-
-      case PG::Scrubber::WAIT_DIGEST_UPDATES:
-       if (scrubber.num_digest_updates_pending) {
-         dout(10) << __func__ << " waiting on "
-                  << scrubber.num_digest_updates_pending
-                  << " digest updates" << dendl;
-         done = true;
-         break;
-       }
-
-       scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
-         "osd_scrub_max_preemptions");
-       scrubber.preempt_divisor = 1;
-
-       if (!(scrubber.end.is_max())) {
-         scrubber.state = PG::Scrubber::NEW_CHUNK;
-         requeue_scrub();
-          done = true;
-        } else {
-          scrubber.state = PG::Scrubber::FINISH;
-        }
-
-       break;
-
-      case PG::Scrubber::FINISH:
-        scrub_finish();
-        scrubber.state = PG::Scrubber::INACTIVE;
-        done = true;
-
-       if (!snap_trimq.empty()) {
-         dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
-         snap_trimmer_scrub_complete();
-       }
-
-        break;
-
-      case PG::Scrubber::BUILD_MAP_REPLICA:
-        // build my own scrub map
-       if (scrub_preempted) {
-         dout(10) << __func__ << " preempted" << dendl;
-         ret = 0;
-       } else {
-         ret = build_scrub_map_chunk(
-           scrubber.replica_scrubmap,
-           scrubber.replica_scrubmap_pos,
-           scrubber.start, scrubber.end,
-           scrubber.deep,
-           handle);
-       }
-       if (ret == -EINPROGRESS) {
-         requeue_scrub();
-         done = true;
-         break;
-       }
-       // reply
-       {
-         MOSDRepScrubMap *reply = new MOSDRepScrubMap(
-           spg_t(info.pgid.pgid, get_primary().shard),
-           scrubber.replica_scrub_start,
-           pg_whoami);
-         reply->preempted = scrub_preempted;
-         ::encode(scrubber.replica_scrubmap, reply->get_data());
-         osd->send_message_osd_cluster(
-           get_primary().osd, reply,
-           scrubber.replica_scrub_start);
-       }
-       scrub_preempted = false;
-       scrub_can_preempt = false;
-       scrubber.state = PG::Scrubber::INACTIVE;
-       scrubber.replica_scrubmap = ScrubMap();
-       scrubber.replica_scrubmap_pos = ScrubMapBuilder();
-       scrubber.start = hobject_t();
-       scrubber.end = hobject_t();
-       scrubber.max_end = hobject_t();
-       done = true;
-       break;
-
-      default:
-        ceph_abort();
-    }
+    // pg might be in the process of being deleted
+    dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
+             (is_active() ? "(active) " : "(not active) ") <<  dendl;
   }
-  dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
-          << " [" << scrubber.start << "," << scrubber.end << ")"
-          << " max_end " << scrubber.max_end << dendl;
 }
 
-bool PG::write_blocked_by_scrub(const hobject_t& soid)
+void PG::forward_scrub_event(ScrubSafeAPI fn,
+                            epoch_t epoch_queued,
+                            Scrub::act_token_t act_token,
+                            std::string_view desc)
 {
-  if (soid < scrubber.start || soid >= scrubber.end) {
-    return false;
-  }
-  if (scrub_can_preempt) {
-    if (!scrub_preempted) {
-      dout(10) << __func__ << " " << soid << " preempted" << dendl;
-      scrub_preempted = true;
-    } else {
-      dout(10) << __func__ << " " << soid << " already preempted" << dendl;
-    }
-    return false;
+  dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
+          << " token: " << act_token << dendl;
+  ceph_assert(m_scrubber);
+  if (is_active()) {
+    ((*m_scrubber).*fn)(epoch_queued, act_token);
+  } else {
+    // pg might be in the process of being deleted
+    dout(5) << __func__ << " refusing to forward. "
+           << (is_clean() ? "(clean) " : "(not clean) ")
+           << (is_active() ? "(active) " : "(not active) ") << dendl;
   }
-  return true;
 }
 
-bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
+void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
 {
-  // does [start, end] intersect [scrubber.start, scrubber.max_end)
-  return (start < scrubber.max_end &&
-         end >= scrubber.start);
+  dout(10) << __func__ << " (op)" << dendl;
+  ceph_assert(m_scrubber);
+  m_scrubber->replica_scrub_op(op);
 }
 
-void PG::scrub_clear_state(bool has_error)
+void PG::replica_scrub(epoch_t epoch_queued,
+                      Scrub::act_token_t act_token,
+                      [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
-  ceph_assert(is_locked());
-  state_clear(PG_STATE_SCRUBBING);
-  if (!has_error)
-    state_clear(PG_STATE_REPAIR);
-  state_clear(PG_STATE_DEEP_SCRUB);
-  publish_stats_to_osd();
-
-  scrubber.req_scrub = false;
-  // local -> nothing.
-  if (scrubber.local_reserved) {
-    osd->dec_scrubs_local();
-    scrubber.local_reserved = false;
-    scrubber.reserved_peers.clear();
-  }
-
-  requeue_ops(waiting_for_scrub);
-
-  scrubber.reset();
-
-  // type-specific state clear
-  _scrub_clear_state();
+  dout(10) << __func__ << " queued at: " << epoch_queued
+          << (is_primary() ? " (primary)" : " (replica)") << dendl;
+  forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
+                     "StartReplica/nw");
 }
 
-void PG::scrub_compare_maps() 
+bool PG::ops_blocked_by_scrub() const
 {
-  dout(10) << __func__ << " has maps, analyzing" << dendl;
-
-  // construct authoritative scrub map for type specific scrubbing
-  scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
-  map<hobject_t,
-      pair<std::optional<uint32_t>,
-           std::optional<uint32_t>>> missing_digest;
-
-  map<pg_shard_t, ScrubMap *> maps;
-  maps[pg_whoami] = &scrubber.primary_scrubmap;
-
-  for (const auto& i : get_acting_recovery_backfill()) {
-    if (i == pg_whoami) continue;
-    dout(2) << __func__ << " replica " << i << " has "
-            << scrubber.received_maps[i].objects.size()
-            << " items" << dendl;
-    maps[i] = &scrubber.received_maps[i];
-  }
-
-  set<hobject_t> master_set;
-
-  // Construct master set
-  for (const auto map : maps) {
-    for (const auto i : map.second->objects) {
-      master_set.insert(i.first);
-    }
-  }
-
-  stringstream ss;
-  get_pgbackend()->be_omap_checks(maps, master_set,
-                                  scrubber.omap_stats, ss);
-
-  if (!ss.str().empty()) {
-    osd->clog->warn(ss);
-  }
-
-  if (recovery_state.get_acting().size() > 1) {
-    dout(10) << __func__ << "  comparing replica scrub maps" << dendl;
-
-    // Map from object with errors to good peer
-    map<hobject_t, list<pg_shard_t>> authoritative;
-
-    dout(2) << __func__ << get_primary() << " has "
-           << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
-
-    ss.str("");
-    ss.clear();
-
-    get_pgbackend()->be_compare_scrubmaps(
-      maps,
-      master_set,
-      state_test(PG_STATE_REPAIR),
-      scrubber.missing,
-      scrubber.inconsistent,
-      authoritative,
-      missing_digest,
-      scrubber.shallow_errors,
-      scrubber.deep_errors,
-      scrubber.store.get(),
-      info.pgid, recovery_state.get_acting(),
-      ss);
-    dout(2) << ss.str() << dendl;
-
-    if (!ss.str().empty()) {
-      osd->clog->error(ss);
-    }
-
-    for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
-        i != authoritative.end();
-        ++i) {
-      list<pair<ScrubMap::object, pg_shard_t> > good_peers;
-      for (list<pg_shard_t>::const_iterator j = i->second.begin();
-          j != i->second.end();
-          ++j) {
-       good_peers.emplace_back(maps[*j]->objects[i->first], *j);
-      }
-      scrubber.authoritative.emplace(i->first, good_peers);
-    }
-
-    for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
-        i != authoritative.end();
-        ++i) {
-      scrubber.cleaned_meta_map.objects.erase(i->first);
-      scrubber.cleaned_meta_map.objects.insert(
-       *(maps[i->second.back()]->objects.find(i->first))
-       );
-    }
-  }
-
-  ScrubMap for_meta_scrub;
-  scrubber.clean_meta_map(for_meta_scrub);
-
-  // ok, do the pg-type specific scrubbing
-  scrub_snapshot_metadata(for_meta_scrub, missing_digest);
-  // Called here on the primary can use an authoritative map if it isn't the primary
-  _scan_snaps(for_meta_scrub);
-  if (!scrubber.store->empty()) {
-    if (state_test(PG_STATE_REPAIR)) {
-      dout(10) << __func__ << ": discarding scrub results" << dendl;
-      scrubber.store->flush(nullptr);
-    } else {
-      dout(10) << __func__ << ": updating scrub object" << dendl;
-      ObjectStore::Transaction t;
-      scrubber.store->flush(&t);
-      osd->store->queue_transaction(ch, std::move(t), nullptr);
-    }
-  }
+  return !waiting_for_scrub.empty();
 }
 
-bool PG::scrub_process_inconsistent()
-{
-  dout(10) << __func__ << ": checking authoritative" << dendl;
-  bool repair = state_test(PG_STATE_REPAIR);
-  bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
-  const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-  
-  // authoriative only store objects which missing or inconsistent.
-  if (!scrubber.authoritative.empty()) {
-    stringstream ss;
-    ss << info.pgid << " " << mode << " "
-       << scrubber.missing.size() << " missing, "
-       << scrubber.inconsistent.size() << " inconsistent objects";
-    dout(2) << ss.str() << dendl;
-    osd->clog->error(ss);
-    if (repair) {
-      state_clear(PG_STATE_CLEAN);
-      for (map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >>::iterator i =
-            scrubber.authoritative.begin();
-          i != scrubber.authoritative.end();
-          ++i) {
-       auto missing_entry = scrubber.missing.find(i->first);
-       if (missing_entry != scrubber.missing.end()) {
-          repair_object(
-            i->first,
-            i->second,
-           missing_entry->second);
-         scrubber.fixed += missing_entry->second.size();
-       }
-       if (scrubber.inconsistent.count(i->first)) {
-          repair_object(
-            i->first,
-            i->second,
-           scrubber.inconsistent[i->first]);
-         scrubber.fixed += missing_entry->second.size();
-       }
-      }
-    }
-  }
-  return (!scrubber.authoritative.empty() && repair);
-}
-
-bool PG::ops_blocked_by_scrub() const {
-  return (waiting_for_scrub.size() != 0);
-}
-
-// the part that actually finalizes a scrub
-void PG::scrub_finish() 
+Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
 {
-  dout(20) << __func__ << dendl;
-  bool repair = state_test(PG_STATE_REPAIR);
-  bool do_auto_scrub = false;
-  // if the repair request comes from auto-repair and large number of errors,
-  // we would like to cancel auto-repair
-  if (repair && scrubber.auto_repair
-      && scrubber.authoritative.size() > cct->_conf->osd_scrub_auto_repair_num_errors) {
-    state_clear(PG_STATE_REPAIR);
-    repair = false;
-  }
-  bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
-  const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-
-  // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
-  if (scrubber.deep_scrub_on_error
-      && scrubber.authoritative.size()
-      && scrubber.authoritative.size() <= cct->_conf->osd_scrub_auto_repair_num_errors) {
-    ceph_assert(!deep_scrub);
-    do_auto_scrub = true;
-    dout(20) << __func__ << " Try to auto repair after scrub errors" << dendl;
-  }
-  scrubber.deep_scrub_on_error = false;
-
-  // type-specific finish (can tally more errors)
-  _scrub_finish();
-
-  bool has_error = scrub_process_inconsistent();
-
-  {
-    stringstream oss;
-    oss << info.pgid.pgid << " " << mode << " ";
-    int total_errors = scrubber.shallow_errors + scrubber.deep_errors;
-    if (total_errors)
-      oss << total_errors << " errors";
-    else
-      oss << "ok";
-    if (!deep_scrub && info.stats.stats.sum.num_deep_scrub_errors)
-      oss << " ( " << info.stats.stats.sum.num_deep_scrub_errors
-          << " remaining deep scrub error details lost)";
-    if (repair)
-      oss << ", " << scrubber.fixed << " fixed";
-    if (total_errors)
-      osd->clog->error(oss);
-    else
-      osd->clog->debug(oss);
-  }
-
-  // Since we don't know which errors were fixed, we can only clear them
-  // when every one has been fixed.
-  if (repair) {
-    if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
-      ceph_assert(deep_scrub);
-      scrubber.shallow_errors = scrubber.deep_errors = 0;
-      dout(20) << __func__ << " All may be fixed" << dendl;
-    } else if (has_error) {
-      // Deep scrub in order to get corrected error counts
-      scrub_after_recovery = true;
-      save_req_scrub = scrubber.req_scrub;
-      dout(20) << __func__ << " Set scrub_after_recovery, req_scrub=" << save_req_scrub << dendl;
-    } else if (scrubber.shallow_errors || scrubber.deep_errors) {
-      // We have errors but nothing can be fixed, so there is no repair
-      // possible.
-      state_set(PG_STATE_FAILED_REPAIR);
-      dout(10) << __func__ << " " << (scrubber.shallow_errors + scrubber.deep_errors)
-              << " error(s) present with no repair possible" << dendl;
-    }
-  }
-
-  {
-    // finish up
-    ObjectStore::Transaction t;
-    recovery_state.update_stats(
-      [this, deep_scrub](auto &history, auto &stats) {
-       utime_t now = ceph_clock_now();
-       history.last_scrub = recovery_state.get_info().last_update;
-       history.last_scrub_stamp = now;
-       if (scrubber.deep) {
-         history.last_deep_scrub = recovery_state.get_info().last_update;
-         history.last_deep_scrub_stamp = now;
-       }
-
-       if (deep_scrub) {
-         if ((scrubber.shallow_errors == 0) && (scrubber.deep_errors == 0))
-           history.last_clean_scrub_stamp = now;
-         stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
-         stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
-         stats.stats.sum.num_large_omap_objects = scrubber.omap_stats.large_omap_objects;
-         stats.stats.sum.num_omap_bytes = scrubber.omap_stats.omap_bytes;
-         stats.stats.sum.num_omap_keys = scrubber.omap_stats.omap_keys;
-         dout(25) << "scrub_finish shard " << pg_whoami << " num_omap_bytes = "
-                  << stats.stats.sum.num_omap_bytes << " num_omap_keys = "
-                  << stats.stats.sum.num_omap_keys << dendl;
-       } else {
-         stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
-         // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
-         // because of deep-scrub errors
-         if (scrubber.shallow_errors == 0)
-           history.last_clean_scrub_stamp = now;
-       }
-       stats.stats.sum.num_scrub_errors =
-         stats.stats.sum.num_shallow_scrub_errors +
-         stats.stats.sum.num_deep_scrub_errors;
-       if (scrubber.check_repair) {
-         scrubber.check_repair = false;
-         if (info.stats.stats.sum.num_scrub_errors) {
-           state_set(PG_STATE_FAILED_REPAIR);
-           dout(10) << "scrub_finish " << info.stats.stats.sum.num_scrub_errors
-                    << " error(s) still present after re-scrub" << dendl;
-         }
-       }
-       return true;
-      },
-      &t);
-    int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
-    ceph_assert(tr == 0);
-  }
-
-  if (has_error) {
-    queue_peering_event(
-      PGPeeringEventRef(
-       std::make_shared<PGPeeringEvent>(
-         get_osdmap_epoch(),
-         get_osdmap_epoch(),
-         PeeringState::DoRecovery())));
-  }
-
-  scrub_clear_state(has_error);
-  scrub_unreserve_replicas();
-
-  if (do_auto_scrub) {
-    scrub_requested(false, false, true);
-  }
-
-  if (is_active() && is_primary()) {
-    recovery_state.share_pg_info();
-  }
+  return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
+                                  : Scrub::scrub_prio_t::high_priority;
 }
 
 bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
 {
-  if (get_last_peering_reset() > reply_epoch ||
-      get_last_peering_reset() > query_epoch) {
-    dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch " << query_epoch
-            << " last_peering_reset " << get_last_peering_reset()
-            << dendl;
+  if (auto last_reset = get_last_peering_reset();
+      last_reset > reply_epoch || last_reset > query_epoch) {
+    dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
+            << query_epoch << " last_peering_reset " << last_reset << dendl;
     return true;
   }
   return false;
@@ -3400,7 +2188,6 @@ void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
 
 bool PG::try_flush_or_schedule_async()
 {
-  
   Context *c = new QueuePeeringEvt(
     this, get_osdmap_epoch(), PeeringState::IntervalFlush());
   if (!ch->flush_commit(c)) {
@@ -3414,24 +2201,12 @@ bool PG::try_flush_or_schedule_async()
 ostream& operator<<(ostream& out, const PG& pg)
 {
   out << pg.recovery_state;
-  if (pg.scrubber.must_repair)
-    out << " MUST_REPAIR";
-  if (pg.scrubber.auto_repair)
-    out << " AUTO_REPAIR";
-  if (pg.scrubber.check_repair)
-    out << " CHECK_REPAIR";
-  if (pg.scrubber.deep_scrub_on_error)
-    out << " DEEP_SCRUB_ON_ERROR";
-  if (pg.scrubber.must_deep_scrub)
-    out << " MUST_DEEP_SCRUB";
-  if (pg.scrubber.must_scrub)
-    out << " MUST_SCRUB";
-  if (pg.scrubber.time_for_deep)
-    out << " TIME_FOR_DEEP";
-  if (pg.scrubber.need_auto)
-    out << " NEED_AUTO";
-  if (pg.scrubber.req_scrub)
-    out << " REQ_SCRUB";
+
+  // listing all scrub-related flags - both current and "planned next scrub"
+  if (pg.is_scrubbing()) {
+    out << *pg.m_scrubber;
+  }
+  out << pg.m_planned_scrub;
 
   if (pg.recovery_ops_active)
     out << " rops=" << pg.recovery_ops_active;
@@ -3473,8 +2248,6 @@ ostream& operator<<(ostream& out, const PG& pg)
   }
 
   out << "]";
-
-
   return out;
 }
 
@@ -3557,15 +2330,19 @@ bool PG::can_discard_replica_op(OpRequestRef& op)
   // resets the messenger sesssion when the replica reconnects. to avoid the
   // out-of-order replies, the messages from that replica should be discarded.
   OSDMapRef next_map = osd->get_next_osdmap();
-  if (next_map->is_down(from))
+  if (next_map->is_down(from)) {
+    dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
     return true;
+  }
   /* Mostly, this overlaps with the old_peering_msg
    * condition.  An important exception is pushes
    * sent by replicas not in the acting set, since
    * if such a replica goes down it does not cause
    * a new interval. */
-  if (next_map->get_down_at(from) >= m->map_epoch)
+  if (next_map->get_down_at(from) >= m->map_epoch) {
+    dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
     return true;
+  }
 
   // same pg?
   //  if pg changes _at all_, we reset and repeer!
@@ -3759,33 +2536,12 @@ void PG::handle_initialize(PeeringCtx &rctx)
   recovery_state.handle_event(evt, &rctx);
 }
 
+
 void PG::handle_query_state(Formatter *f)
 {
   dout(10) << "handle_query_state" << dendl;
   PeeringState::QueryState q(f);
   recovery_state.handle_event(q, 0);
-
-  if (is_primary() && is_active()) {
-    f->open_object_section("scrub");
-    f->dump_stream("scrubber.epoch_start") << scrubber.epoch_start;
-    f->dump_bool("scrubber.active", scrubber.active);
-    f->dump_string("scrubber.state", PG::Scrubber::state_string(scrubber.state));
-    f->dump_stream("scrubber.start") << scrubber.start;
-    f->dump_stream("scrubber.end") << scrubber.end;
-    f->dump_stream("scrubber.max_end") << scrubber.max_end;
-    f->dump_stream("scrubber.subset_last_update") << scrubber.subset_last_update;
-    f->dump_bool("scrubber.deep", scrubber.deep);
-    {
-      f->open_array_section("scrubber.waiting_on_whom");
-      for (set<pg_shard_t>::iterator p = scrubber.waiting_on_whom.begin();
-          p != scrubber.waiting_on_whom.end();
-          ++p) {
-       f->dump_stream("shard") << *p;
-      }
-      f->close_section();
-    }
-    f->close_section();
-  }
 }
 
 void PG::init_collection_pool_opts()
@@ -3812,7 +2568,9 @@ void PG::C_DeleteMore::complete(int r) {
   delete this;
 }
 
-void PG::do_delete_work(ObjectStore::Transaction &t)
+std::pair<ghobject_t, bool> PG::do_delete_work(
+  ObjectStore::Transaction &t,
+  ghobject_t _next)
 {
   dout(10) << __func__ << dendl;
 
@@ -3822,7 +2580,7 @@ void PG::do_delete_work(ObjectStore::Transaction &t)
       epoch_t e = get_osdmap()->get_epoch();
       PGRef pgref(this);
       auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
-        dout(20) << __func__ << " wake up at "
+        dout(20) << "do_delete_work() [cb] wake up at "
                  << ceph_clock_now()
                 << ", re-queuing delete" << dendl;
         std::scoped_lock locker{*this};
@@ -3838,25 +2596,49 @@ void PG::do_delete_work(ObjectStore::Transaction &t)
       osd->sleep_timer.add_event_at(delete_schedule_time,
                                    delete_requeue_callback);
       dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
-      return;
+      return std::make_pair(_next, true);
     }
   }
 
   delete_needs_sleep = true;
 
+  ghobject_t next;
+
   vector<ghobject_t> olist;
   int max = std::min(osd->store->get_ideal_list_max(),
                     (int)cct->_conf->osd_target_transaction_size);
-  ghobject_t next;
+
   osd->store->collection_list(
     ch,
-    next,
+    _next,
     ghobject_t::get_max(),
     max,
     &olist,
     &next);
   dout(20) << __func__ << " " << olist << dendl;
 
+  // make sure we've removed everything
+  // by one more listing from the beginning
+  if (_next != ghobject_t() && olist.empty()) {
+    next = ghobject_t();
+    osd->store->collection_list(
+      ch,
+      next,
+      ghobject_t::get_max(),
+      max,
+      &olist,
+      &next);
+    for (auto& oid : olist) {
+      if (oid == pgmeta_oid) {
+        dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
+      } else {
+        dout(0) << __func__ << " additional unexpected onode"
+                <<" new onode has appeared since PG removal started"
+                << oid << dendl;
+      }
+    }
+  }
+
   OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
   int64_t num = 0;
   for (auto& oid : olist) {
@@ -3874,12 +2656,12 @@ void PG::do_delete_work(ObjectStore::Transaction &t)
     t.remove(coll, oid);
     ++num;
   }
+  bool running = true;
   if (num) {
     dout(20) << __func__ << " deleting " << num << " objects" << dendl;
     Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
     t.register_on_commit(fin);
   } else {
-    dout(20) << __func__ << " finished" << dendl;
     if (cct->_conf->osd_inject_failure_on_pg_removal) {
       _exit(1);
     }
@@ -3911,9 +2693,10 @@ void PG::do_delete_work(ObjectStore::Transaction &t)
       // exit() methods don't run when that happens.
       osd->local_reserver.cancel_reservation(info.pgid);
 
-      osd->logger->dec(l_osd_pg_removing);
+      running = false;
     }
   }
+  return {next, running};
 }
 
 int PG::pg_stat_adjust(osd_stat_t *ns)
@@ -3944,16 +2727,6 @@ int PG::pg_stat_adjust(osd_stat_t *ns)
   return 0;
 }
 
-ostream& operator<<(ostream& out, const PG::BackfillInterval& bi)
-{
-  out << "BackfillInfo(" << bi.begin << "-" << bi.end
-      << " " << bi.objects.size() << " objects";
-  if (!bi.objects.empty())
-    out << " " << bi.objects;
-  out << ")";
-  return out;
-}
-
 void PG::dump_pgstate_history(Formatter *f)
 {
   std::scoped_lock l{*this};
@@ -3980,15 +2753,15 @@ void PG::dump_missing(Formatter *f)
   }
 }
 
-void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
+void PG::with_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)>&& f)
 {
   std::lock_guard l{pg_stats_publish_lock};
-  if (pg_stats_publish_valid) {
-    f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
+  if (pg_stats_publish) {
+    f(*pg_stats_publish, pg_stats_publish->get_effective_last_epoch_clean());
   }
 }
 
-void PG::with_heartbeat_peers(std::function<void(int)> f)
+void PG::with_heartbeat_peers(std::function<void(int)>&& f)
 {
   std::lock_guard l{heartbeat_peer_lock};
   for (auto p : heartbeat_peers) {