]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.cc
import quincy 17.2.0
[ceph.git] / ceph / src / osd / OSD.cc
index 7b6e1aca215d448af9bb7df24ac1f6bc60bd1f8c..c35ac0b526746fd85d400f6d74a43e9eae9d7afe 100644 (file)
@@ -24,7 +24,6 @@
 #include <sys/stat.h>
 #include <signal.h>
 #include <time.h>
-#include <boost/scoped_ptr.hpp>
 #include <boost/range/adaptor/reversed.hpp>
 
 #ifdef HAVE_SYS_PARAM_H
 #endif
 
 #include "osd/PG.h"
+#include "osd/scrubber/scrub_machine.h"
+#include "osd/scrubber/pg_scrubber.h"
 
 #include "include/types.h"
 #include "include/compat.h"
 #include "include/random.h"
+#include "include/scope_guard.h"
 
 #include "OSD.h"
 #include "OSDMap.h"
@@ -51,6 +53,7 @@
 #include "common/ceph_releases.h"
 #include "common/ceph_time.h"
 #include "common/version.h"
+#include "common/async/blocked_completion.h"
 #include "common/pick_address.h"
 #include "common/blkdev.h"
 #include "common/numa.h"
@@ -89,7 +92,6 @@
 #include "messages/MMonGetOSDMap.h"
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGNotify2.h"
-#include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGQuery2.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
@@ -97,7 +99,6 @@
 #include "messages/MOSDPGInfo2.h"
 #include "messages/MOSDPGCreate.h"
 #include "messages/MOSDPGCreate2.h"
-#include "messages/MOSDPGScan.h"
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
 #include "messages/MOSDForceRecovery.h"
 
 #include "messages/MOSDScrub.h"
 #include "messages/MOSDScrub2.h"
-#include "messages/MOSDRepScrub.h"
 
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
 
 #include "messages/MPGStats.h"
-#include "messages/MPGStatsAck.h"
-
-#include "messages/MWatchNotify.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDPGPull.h"
 
 #include "messages/MMonGetPurgedSnaps.h"
 #include "messages/MMonGetPurgedSnapsReply.h"
 #include "perfglue/cpu_profiler.h"
 #include "perfglue/heap_profiler.h"
 
+#include "osd/ClassHandler.h"
 #include "osd/OpRequest.h"
 
 #include "auth/AuthAuthorizeHandler.h"
 #define tracepoint(...)
 #endif
 
+#include "osd_tracer.h"
+
+
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
 
+using std::deque;
+using std::list;
+using std::lock_guard;
+using std::make_pair;
+using std::make_tuple;
+using std::make_unique;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::fixed_u_to_string;
+using ceph::Formatter;
+using ceph::heartbeat_handle_d;
+using ceph::make_mutex;
+
 using namespace ceph::osd::scheduler;
 using TOPNSPC::common::cmd_getval;
+using TOPNSPC::common::cmd_getval_or;
 
 static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
   return *_dout << "osd." << whoami << " " << epoch << " ";
 }
 
+
 //Initial features in new superblock.
 //Features here are also automatically upgraded
 CompatSet OSD::get_osd_initial_compat_set() {
@@ -220,10 +246,10 @@ CompatSet OSD::get_osd_compat_set() {
   return compat;
 }
 
-OSDService::OSDService(OSD *osd) :
+OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
   osd(osd),
   cct(osd->cct),
-  whoami(osd->whoami), store(osd->store),
+  whoami(osd->whoami), store(osd->store.get()),
   log_client(osd->log_client), clog(osd->clog),
   pg_recovery_stats(osd->pg_recovery_stats),
   cluster_messenger(osd->cluster_messenger),
@@ -236,8 +262,7 @@ OSDService::OSDService(OSD *osd) :
   publish_lock{ceph::make_mutex("OSDService::publish_lock")},
   pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
   max_oldest_map(0),
-  scrubs_local(0),
-  scrubs_remote(0),
+  m_scrub_queue{cct, *this},
   agent_valid_iterator(false),
   agent_ops(0),
   flush_mode_high_count(0),
@@ -248,9 +273,10 @@ OSDService::OSDService(OSD *osd) :
   last_recalibrate(ceph_clock_now()),
   promote_max_objects(0),
   promote_max_bytes(0),
+  poolctx(poolctx),
   objecter(make_unique<Objecter>(osd->client_messenger->cct,
                                 osd->objecter_messenger,
-                                osd->monc, nullptr, 0, 0)),
+                                osd->monc, poolctx)),
   m_objecter_finishers(cct->_conf->osd_objecter_finishers),
   watch_timer(osd->client_messenger->cct, watch_lock),
   next_notif_id(0),
@@ -284,7 +310,7 @@ OSDService::OSDService(OSD *osd) :
 }
 
 #ifdef PG_DEBUG_REFS
-void OSDService::add_pgid(spg_t pgid, PG *pg){
+void OSDService::add_pgid(spg_t pgid, PG *pg) {
   std::lock_guard l(pgid_lock);
   if (!pgid_tracker.count(pgid)) {
     live_pgs[pgid] = pg;
@@ -594,7 +620,7 @@ void OSDService::agent_entry()
        << " no agent_work, delay for " << cct->_conf->osd_agent_delay_time
        << " seconds" << dendl;
 
-      osd->logger->inc(l_osd_tier_delay);
+      logger->inc(l_osd_tier_delay);
       // Queue a timer to call agent_choose_mode for this pg in 5 seconds
       std::lock_guard timer_locker{agent_timer_lock};
       Context *cb = new AgentTimeoutCB(pg);
@@ -943,9 +969,9 @@ void OSDService::set_statfs(const struct store_statfs_t &stbuf,
     used = bytes - avail;
   }
 
-  osd->logger->set(l_osd_stat_bytes, bytes);
-  osd->logger->set(l_osd_stat_bytes_used, used);
-  osd->logger->set(l_osd_stat_bytes_avail, avail);
+  logger->set(l_osd_stat_bytes, bytes);
+  logger->set(l_osd_stat_bytes_used, used);
+  logger->set(l_osd_stat_bytes_avail, avail);
 
   std::lock_guard l(stat_lock);
   osd_stat.statfs = stbuf;
@@ -994,7 +1020,7 @@ float OSDService::compute_adjusted_ratio(osd_stat_t new_stat, float *pratio,
                                         uint64_t adjust_used)
 {
   *pratio =
-   ((float)new_stat.statfs.get_used()) / ((float)new_stat.statfs.total);
+   ((float)new_stat.statfs.get_used_raw()) / ((float)new_stat.statfs.total);
 
   if (adjust_used) {
     dout(20) << __func__ << " Before kb_used() " << new_stat.statfs.kb_used()  << dendl;
@@ -1015,7 +1041,7 @@ float OSDService::compute_adjusted_ratio(osd_stat_t new_stat, float *pratio,
   if (backfill_adjusted) {
     dout(20) << __func__ << " backfill adjusted " << new_stat << dendl;
   }
-  return ((float)new_stat.statfs.get_used()) / ((float)new_stat.statfs.total);
+  return ((float)new_stat.statfs.get_used_raw()) / ((float)new_stat.statfs.total);
 }
 
 void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
@@ -1235,79 +1261,6 @@ void OSDService::prune_pg_created()
 // --------------------------------------
 // dispatch
 
-bool OSDService::can_inc_scrubs()
-{
-  bool can_inc = false;
-  std::lock_guard l(sched_scrub_lock);
-
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " == true " << scrubs_local << " local + " << scrubs_remote
-            << " remote < max " << cct->_conf->osd_max_scrubs << dendl;
-    can_inc = true;
-  } else {
-    dout(20) << __func__ << " == false " << scrubs_local << " local + " << scrubs_remote
-            << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-
-  return can_inc;
-}
-
-bool OSDService::inc_scrubs_local()
-{
-  bool result = false;
-  std::lock_guard l{sched_scrub_lock};
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
-    result = true;
-    ++scrubs_local;
-  } else {
-    dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  return result;
-}
-
-void OSDService::dec_scrubs_local()
-{
-  std::lock_guard l{sched_scrub_lock};
-  dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
-  --scrubs_local;
-  ceph_assert(scrubs_local >= 0);
-}
-
-bool OSDService::inc_scrubs_remote()
-{
-  bool result = false;
-  std::lock_guard l{sched_scrub_lock};
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
-    result = true;
-    ++scrubs_remote;
-  } else {
-    dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  return result;
-}
-
-void OSDService::dec_scrubs_remote()
-{
-  std::lock_guard l{sched_scrub_lock};
-  dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
-  --scrubs_remote;
-  ceph_assert(scrubs_remote >= 0);
-}
-
-void OSDService::dump_scrub_reservations(Formatter *f)
-{
-  std::lock_guard l{sched_scrub_lock};
-  f->dump_int("scrubs_local", scrubs_local);
-  f->dump_int("scrubs_remote", scrubs_remote);
-  f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
-}
-
 void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
                                  epoch_t *_bind_epoch) const
 {
@@ -1346,7 +1299,7 @@ bool OSDService::prepare_to_stop()
 
   OSDMapRef osdmap = get_osdmap();
   if (osdmap && osdmap->is_up(whoami)) {
-    dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+    dout(0) << __func__ << " telling mon we are shutting down and dead " << dendl;
     set_state(PREPARING_TO_STOP);
     monc->send_mon_message(
       new MOSDMarkMeDown(
@@ -1354,12 +1307,14 @@ bool OSDService::prepare_to_stop()
        whoami,
        osdmap->get_addrs(whoami),
        osdmap->get_epoch(),
-       true  // request ack
+       true,  // request ack
+       true   // mark as down and dead
        ));
     const auto timeout = ceph::make_timespan(cct->_conf->osd_mon_shutdown_timeout);
     is_stopping_cond.wait_for(l, timeout,
       [this] { return get_state() == STOPPING; });
   }
+
   dout(0) << __func__ << " starting shutdown" << dendl;
   set_state(STOPPING);
   return true;
@@ -1401,19 +1356,19 @@ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
     }
     max--;
     max_bytes -= bl.length();
-    m->maps[since].claim(bl);
+    m->maps[since] = std::move(bl);
   }
   for (epoch_t e = since + 1; e <= to; ++e) {
     bufferlist bl;
     if (get_inc_map_bl(e, bl)) {
-      m->incremental_maps[e].claim(bl);
+      m->incremental_maps[e] = std::move(bl);
     } else {
       dout(10) << __func__ << " missing incremental map " << e << dendl;
       if (!get_map_bl(e, bl)) {
        derr << __func__ << " also missing full map " << e << dendl;
        goto panic;
       }
-      m->maps[e].claim(bl);
+      m->maps[e] = std::move(bl);
     }
     max--;
     max_bytes -= bl.length();
@@ -1432,7 +1387,7 @@ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
   // send something
   bufferlist bl;
   if (get_inc_map_bl(m->newest_map, bl)) {
-    m->incremental_maps[m->newest_map].claim(bl);
+    m->incremental_maps[m->newest_map] = std::move(bl);
   } else {
     derr << __func__ << " unable to load latest map " << m->newest_map << dendl;
     if (!get_map_bl(m->newest_map, bl)) {
@@ -1440,7 +1395,7 @@ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
           << dendl;
       ceph_abort();
     }
-    m->maps[m->newest_map].claim(bl);
+    m->maps[m->newest_map] = std::move(bl);
   }
   return m;
 }
@@ -1486,12 +1441,10 @@ bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
 {
   bool found = map_bl_cache.lookup(e, &bl);
   if (found) {
-    if (logger)
-      logger->inc(l_osd_map_bl_cache_hit);
+    logger->inc(l_osd_map_bl_cache_hit);
     return true;
   }
-  if (logger)
-    logger->inc(l_osd_map_bl_cache_miss);
+  logger->inc(l_osd_map_bl_cache_miss);
   found = store->read(meta_ch,
                      OSD::get_osdmap_pobject_name(e), 0, 0, bl,
                      CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
@@ -1506,12 +1459,10 @@ bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
   std::lock_guard l(map_cache_lock);
   bool found = map_bl_inc_cache.lookup(e, &bl);
   if (found) {
-    if (logger)
-      logger->inc(l_osd_map_bl_cache_hit);
+    logger->inc(l_osd_map_bl_cache_hit);
     return true;
   }
-  if (logger)
-    logger->inc(l_osd_map_bl_cache_miss);
+  logger->inc(l_osd_map_bl_cache_miss);
   found = store->read(meta_ch,
                      OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl,
                      CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
@@ -1568,12 +1519,10 @@ OSDMapRef OSDService::try_get_map(epoch_t epoch)
   OSDMapRef retval = map_cache.lookup(epoch);
   if (retval) {
     dout(30) << "get_map " << epoch << " -cached" << dendl;
-    if (logger) {
-      logger->inc(l_osd_map_cache_hit);
-    }
+    logger->inc(l_osd_map_cache_hit);
     return retval;
   }
-  if (logger) {
+  {
     logger->inc(l_osd_map_cache_miss);
     epoch_t lb = map_cache.cached_key_lower_bound();
     if (epoch < lb) {
@@ -1718,21 +1667,150 @@ void OSDService::queue_for_snap_trim(PG *pg)
       pg->get_osdmap_epoch()));
 }
 
-void OSDService::queue_for_scrub(PG *pg, bool with_high_priority)
+template <class MSG_TYPE>
+void OSDService::queue_scrub_event_msg(PG* pg,
+                                      Scrub::scrub_prio_t with_priority,
+                                      unsigned int qu_priority,
+                                      Scrub::act_token_t act_token)
 {
-  unsigned scrub_queue_priority = pg->scrubber.priority;
-  if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
-    scrub_queue_priority = cct->_conf->osd_client_op_priority;
-  }
   const auto epoch = pg->get_osdmap_epoch();
-  enqueue_back(
-    OpSchedulerItem(
-      unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
-      cct->_conf->osd_scrub_cost,
-      scrub_queue_priority,
-      ceph_clock_now(),
-      0,
-      epoch));
+  auto msg = new MSG_TYPE(pg->get_pgid(), epoch, act_token);
+  dout(15) << "queue a scrub event (" << *msg << ") for " << *pg
+           << ". Epoch: " << epoch << " token: " << act_token << dendl;
+
+  enqueue_back(OpSchedulerItem(
+    unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
+    pg->scrub_requeue_priority(with_priority, qu_priority), ceph_clock_now(), 0, epoch));
+}
+
+template <class MSG_TYPE>
+void OSDService::queue_scrub_event_msg(PG* pg,
+                                       Scrub::scrub_prio_t with_priority)
+{
+  const auto epoch = pg->get_osdmap_epoch();
+  auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
+  dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+
+  enqueue_back(OpSchedulerItem(
+    unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
+    pg->scrub_requeue_priority(with_priority), ceph_clock_now(), 0, epoch));
+}
+
+void OSDService::queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  queue_scrub_event_msg<PGScrub>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  queue_scrub_event_msg<PGScrubAfterRepair>(pg, with_priority);
+}
+
+void OSDService::queue_for_rep_scrub(PG* pg,
+                                    Scrub::scrub_prio_t with_priority,
+                                    unsigned int qu_priority,
+                                    Scrub::act_token_t act_token)
+{
+  queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority, act_token);
+}
+
+void OSDService::queue_for_rep_scrub_resched(PG* pg,
+                                            Scrub::scrub_prio_t with_priority,
+                                            unsigned int qu_priority,
+                                            Scrub::act_token_t act_token)
+{
+  // Resulting scrub event: 'SchedReplica'
+  queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority,
+                                          act_token);
+}
+
+void OSDService::queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'RemotesReserved'
+  queue_scrub_event_msg<PGScrubResourcesOK>(pg, with_priority);
+}
+
+void OSDService::queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ReservationFailure'
+  queue_scrub_event_msg<PGScrubDenied>(pg, with_priority);
+}
+
+void OSDService::queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'InternalSchedScrub'
+  queue_scrub_event_msg<PGScrubResched>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ActivePushesUpd'
+  queue_scrub_event_msg<PGScrubPushesUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'SelectedChunkFree'
+  queue_scrub_event_msg<PGScrubChunkIsFree>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ChunkIsBusy'
+  queue_scrub_event_msg<PGScrubChunkIsBusy>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  queue_scrub_event_msg<PGScrubAppliedUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'Unblocked'
+  queue_scrub_event_msg<PGScrubUnblocked>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'DigestUpdate'
+  queue_scrub_event_msg<PGScrubDigestUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'IntLocalMapDone'
+  queue_scrub_event_msg<PGScrubGotLocalMap>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'GotReplicas'
+  queue_scrub_event_msg<PGScrubGotReplMaps>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'MapsCompared'
+  queue_scrub_event_msg<PGScrubMapsCompared>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ReplicaPushesUpd'
+  queue_scrub_event_msg<PGScrubReplicaPushes>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_is_finished(PG *pg)
+{
+  // Resulting scrub event: 'ScrubFinished'
+  queue_scrub_event_msg<PGScrubScrubFinished>(pg, Scrub::scrub_prio_t::high_priority);
+}
+
+void OSDService::queue_scrub_next_chunk(PG *pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'NextChunk'
+  queue_scrub_event_msg<PGScrubGetNextChunk>(pg, with_priority);
 }
 
 void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
@@ -1911,21 +1989,22 @@ void OSDService::_queue_for_recovery(
 #define dout_prefix *_dout
 
 // Commands shared between OSD's console and admin console:
-namespace ceph { 
-namespace osd_cmds { 
+namespace ceph::osd_cmds {
 
 int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f, std::ostream& os);
-}} // namespace ceph::osd_cmds
 
-int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, string osdspec_affinity)
+} // namespace ceph::osd_cmds
+
+int OSD::mkfs(CephContext *cct,
+             std::unique_ptr<ObjectStore> store,
+             uuid_d fsid,
+             int whoami,
+             string osdspec_affinity)
 {
   int ret;
 
   OSDSuperblock sb;
   bufferlist sbbl;
-  ObjectStore::CollectionHandle ch;
-
   // if we are fed a uuid for this osd, use it.
   store->set_fsid(cct->_conf->osd_uuid);
 
@@ -1933,7 +2012,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
   if (ret) {
     derr << "OSD::mkfs: ObjectStore::mkfs failed with error "
          << cpp_strerror(ret) << dendl;
-    goto free_store;
+    return ret;
   }
 
   store->set_cache_shards(1);  // doesn't matter for mkfs!
@@ -1942,15 +2021,20 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
   if (ret) {
     derr << "OSD::mkfs: couldn't mount ObjectStore: error "
          << cpp_strerror(ret) << dendl;
-    goto free_store;
+    return ret;
   }
 
-  ch = store->open_collection(coll_t::meta());
+  auto umount_store = make_scope_guard([&] {
+    store->umount();
+  });
+
+  ObjectStore::CollectionHandle ch =
+    store->open_collection(coll_t::meta());
   if (ch) {
     ret = store->read(ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, sbbl);
     if (ret < 0) {
       derr << "OSD::mkfs: have meta collection but no superblock" << dendl;
-      goto free_store;
+      return ret;
     }
     /* if we already have superblock, check content of superblock */
     dout(0) << " have superblock" << dendl;
@@ -1959,14 +2043,12 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
     if (whoami != sb.whoami) {
       derr << "provided osd id " << whoami << " != superblock's " << sb.whoami
           << dendl;
-      ret = -EINVAL;
-      goto umount_store;
+      return -EINVAL;
     }
     if (fsid != sb.cluster_fsid) {
       derr << "provided cluster fsid " << fsid
           << " != superblock's " << sb.cluster_fsid << dendl;
-      ret = -EINVAL;
-      goto umount_store;
+      return -EINVAL;
     }
   } else {
     // create superblock
@@ -1987,24 +2069,16 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
     if (ret) {
       derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
           << "queue_transaction returned " << cpp_strerror(ret) << dendl;
-      goto umount_store;
+      return ret;
     }
+    ch->flush();
   }
 
-  ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami, osdspec_affinity);
+  ret = write_meta(cct, store.get(), sb.cluster_fsid, sb.osd_fsid, whoami, osdspec_affinity);
   if (ret) {
     derr << "OSD::mkfs: failed to write fsid file: error "
          << cpp_strerror(ret) << dendl;
-    goto umount_store;
-  }
-
-umount_store:
-  if (ch) {
-    ch.reset();
   }
-  store->umount();
-free_store:
-  delete store;
   return ret;
 }
 
@@ -2111,7 +2185,8 @@ int OSD::peek_meta(ObjectStore *store,
 
 // cons/des
 
-OSD::OSD(CephContext *cct_, ObjectStore *store_,
+OSD::OSD(CephContext *cct_,
+        std::unique_ptr<ObjectStore> store_,
         int id,
         Messenger *internal_messenger,
         Messenger *external_messenger,
@@ -2121,7 +2196,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
         Messenger *hb_back_serverm,
         Messenger *osdc_messenger,
         MonClient *mc,
-        const std::string &dev, const std::string &jdev) :
+        const std::string &dev, const std::string &jdev,
+        ceph::async::io_context_pool& poolctx) :
   Dispatcher(cct_),
   tick_timer(cct, osd_lock),
   tick_timer_without_osd_lock(cct, tick_timer_lock),
@@ -2131,9 +2207,9 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   objecter_messenger(osdc_messenger),
   monc(mc),
   mgrc(cct_, client_messenger, &mc->monmap),
-  logger(NULL),
-  recoverystate_perf(NULL),
-  store(store_),
+  logger(create_logger()),
+  recoverystate_perf(create_recoverystate_perf()),
+  store(std::move(store_)),
   log_client(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
   clog(log_client.create_channel()),
   whoami(id),
@@ -2160,20 +2236,20 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   test_ops_hook(NULL),
   op_shardedwq(
     this,
-    cct->_conf->osd_op_thread_timeout,
-    cct->_conf->osd_op_thread_suicide_timeout,
+    ceph::make_timespan(cct->_conf->osd_op_thread_timeout),
+    ceph::make_timespan(cct->_conf->osd_op_thread_suicide_timeout),
     &osd_op_tp),
   last_pg_create_epoch(0),
   boot_finisher(cct),
   up_thru_wanted(0),
   requested_full_first(0),
   requested_full_last(0),
-  service(this)
+  service(this, poolctx)
 {
 
   if (!gss_ktfile_client.empty()) {
-    // Assert we can export environment variable 
-    /* 
+    // Assert we can export environment variable
+    /*
         The default client keytab is used, if it is present and readable,
         to automatically obtain initial credentials for GSSAPI client
         applications. The principal name of the first entry in the client
@@ -2182,7 +2258,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
         2. The default_client_keytab_name profile variable in [libdefaults].
         3. The hardcoded default, DEFCKTNAME.
     */
-    const int32_t set_result(setenv("KRB5_CLIENT_KTNAME", 
+    const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
                                     gss_ktfile_client.c_str(), 1));
     ceph_assert(set_result == 0);
   }
@@ -2222,7 +2298,6 @@ OSD::~OSD()
   cct->get_perfcounters_collection()->remove(logger);
   delete recoverystate_perf;
   delete logger;
-  delete store;
 }
 
 double OSD::get_tick_interval() const
@@ -2375,6 +2450,70 @@ std::set<int64_t> OSD::get_mapped_pools()
   return pools;
 }
 
+OSD::PGRefOrError OSD::locate_asok_target(const cmdmap_t& cmdmap,
+                                    stringstream& ss,
+                                    bool only_primary)
+{
+  string pgidstr;
+  if (!cmd_getval(cmdmap, "pgid", pgidstr)) {
+    ss << "no pgid specified";
+    return OSD::PGRefOrError{std::nullopt, -EINVAL};
+  }
+
+  pg_t pgid;
+  if (!pgid.parse(pgidstr.c_str())) {
+    ss << "couldn't parse pgid '" << pgidstr << "'";
+    return OSD::PGRefOrError{std::nullopt, -EINVAL};
+  }
+
+  spg_t pcand;
+  PGRef pg;
+  if (get_osdmap()->get_primary_shard(pgid, &pcand) && (pg = _lookup_lock_pg(pcand))) {
+    if (pg->is_primary() || !only_primary) {
+      return OSD::PGRefOrError{pg, 0};
+    }
+
+    ss << "not primary for pgid " << pgid;
+    pg->unlock();
+    return OSD::PGRefOrError{std::nullopt, -EAGAIN};
+  } else {
+    ss << "i don't have pgid " << pgid;
+    return OSD::PGRefOrError{std::nullopt, -ENOENT};
+  }
+}
+
+// note that the cmdmap is explicitly copied into asok_route_to_pg()
+int OSD::asok_route_to_pg(
+  bool only_primary,
+  std::string_view prefix,
+  cmdmap_t cmdmap,
+  Formatter* f,
+  stringstream& ss,
+  const bufferlist& inbl,
+  bufferlist& outbl,
+  std::function<void(int, const std::string&, bufferlist&)> on_finish)
+{
+  auto [target_pg, ret] = locate_asok_target(cmdmap, ss, only_primary);
+
+  if (!target_pg.has_value()) {
+    // 'ss' and 'ret' already contain the error information
+    on_finish(ret, ss.str(), outbl);
+    return ret;
+  }
+
+  //  the PG was locked by locate_asok_target()
+  try {
+    (*target_pg)->do_command(prefix, cmdmap, inbl, on_finish);
+    (*target_pg)->unlock();
+    return 0;  // the pg handler calls on_finish directly
+  } catch (const TOPNSPC::common::bad_cmd_get& e) {
+    (*target_pg)->unlock();
+    ss << e.what();
+    on_finish(ret, ss.str(), outbl);
+    return -EINVAL;
+  }
+}
+
 void OSD::asok_command(
   std::string_view prefix, const cmdmap_t& cmdmap,
   Formatter *f,
@@ -2435,6 +2574,13 @@ void OSD::asok_command(
     }
   }
 
+  // --- PG commands that will be answered even if !primary ---
+
+  else if (prefix == "scrubdebug") {
+    asok_route_to_pg(false, prefix, cmdmap, f, ss, inbl, outbl, on_finish);
+    return;
+  }
+
   // --- OSD commands follow ---
 
   else if (prefix == "status") {
@@ -2508,12 +2654,12 @@ will start to track new ops received afterwards.";
     f->open_object_section("pq");
     op_shardedwq.dump(f);
     f->close_section();
-  } else if (prefix == "dump_blacklist") {
+  } else if (prefix == "dump_blocklist") {
     list<pair<entity_addr_t,utime_t> > bl;
     OSDMapRef curmap = service.get_osdmap();
 
-    f->open_array_section("blacklist");
-    curmap->get_blacklist(&bl);
+    f->open_array_section("blocklist");
+    curmap->get_blocklist(&bl);
     for (list<pair<entity_addr_t,utime_t> >::iterator it = bl.begin();
        it != bl.end(); ++it) {
       f->open_object_section("entry");
@@ -2523,7 +2669,7 @@ will start to track new ops received afterwards.";
       it->second.localtime(f->dump_stream("expire_time"));
       f->close_section(); //entry
     }
-    f->close_section(); //blacklist
+    f->close_section(); //blocklist
   } else if (prefix == "dump_watchers") {
     list<obj_watch_item_t> watchers;
     // scan pg's
@@ -2570,7 +2716,7 @@ will start to track new ops received afterwards.";
     f->close_section();
   } else if (prefix == "dump_scrub_reservations") {
     f->open_object_section("scrub_reservations");
-    service.dump_scrub_reservations(f);
+    service.get_scrub_services().dump_scrub_reservations(f);
     f->close_section();
   } else if (prefix == "get_latest_osdmap") {
     get_latest_osdmap();
@@ -2620,7 +2766,7 @@ will start to track new ops received afterwards.";
   } else if (prefix == "dump_objectstore_kv_stats") {
     store->get_db_statistics(f);
   } else if (prefix == "dump_scrubs") {
-    service.dumps_scrub(f);
+    service.get_scrub_services().dump_scrubs(f);
   } else if (prefix == "calc_objectstore_db_histogram") {
     store->generate_db_histogram(f);
   } else if (prefix == "flush_store_cache") {
@@ -2645,7 +2791,7 @@ will start to track new ops received afterwards.";
     store->compact();
     auto end = ceph::coarse_mono_clock::now();
     double duration = std::chrono::duration<double>(end-start).count();
-    dout(1) << "finished manual compaction in " 
+    dout(1) << "finished manual compaction in "
             << duration
             << " seconds" << dendl;
     f->open_object_section("compact_result");
@@ -2709,145 +2855,18 @@ will start to track new ops received afterwards.";
   }
 
   else if (prefix == "bench") {
-    int64_t count;
-    int64_t bsize;
-    int64_t osize, onum;
     // default count 1G, size 4MB
-    cmd_getval(cmdmap, "count", count, (int64_t)1 << 30);
-    cmd_getval(cmdmap, "size", bsize, (int64_t)4 << 20);
-    cmd_getval(cmdmap, "object_size", osize, (int64_t)0);
-    cmd_getval(cmdmap, "object_num", onum, (int64_t)0);
-
-    uint32_t duration = cct->_conf->osd_bench_duration;
-
-    if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
-      // let us limit the block size because the next checks rely on it
-      // having a sane value.  If we allow any block size to be set things
-      // can still go sideways.
-      ss << "block 'size' values are capped at "
-         << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
-         << " a higher value, please adjust 'osd_bench_max_block_size'";
-      ret = -EINVAL;
-      goto out;
-    } else if (bsize < (int64_t) (1 << 20)) {
-      // entering the realm of small block sizes.
-      // limit the count to a sane value, assuming a configurable amount of
-      // IOPS and duration, so that the OSD doesn't get hung up on this,
-      // preventing timeouts from going off
-      int64_t max_count =
-        bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
-      if (count > max_count) {
-        ss << "'count' values greater than " << max_count
-           << " for a block size of " << byte_u_t(bsize) << ", assuming "
-           << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
-           << " for " << duration << " seconds,"
-           << " can cause ill effects on osd. "
-           << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
-           << " value if you wish to use a higher 'count'.";
-        ret = -EINVAL;
-        goto out;
-      }
-    } else {
-      // 1MB block sizes are big enough so that we get more stuff done.
-      // However, to avoid the osd from getting hung on this and having
-      // timers being triggered, we are going to limit the count assuming
-      // a configurable throughput and duration.
-      // NOTE: max_count is the total amount of bytes that we believe we
-      //       will be able to write during 'duration' for the given
-      //       throughput.  The block size hardly impacts this unless it's
-      //       way too big.  Given we already check how big the block size
-      //       is, it's safe to assume everything will check out.
-      int64_t max_count =
-        cct->_conf->osd_bench_large_size_max_throughput * duration;
-      if (count > max_count) {
-        ss << "'count' values greater than " << max_count
-           << " for a block size of " << byte_u_t(bsize) << ", assuming "
-           << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
-           << " for " << duration << " seconds,"
-           << " can cause ill effects on osd. "
-           << " Please adjust 'osd_bench_large_size_max_throughput'"
-           << " with a higher value if you wish to use a higher 'count'.";
-        ret = -EINVAL;
-        goto out;
-      }
-    }
-
-    if (osize && bsize > osize)
-      bsize = osize;
-
-    dout(1) << " bench count " << count
-            << " bsize " << byte_u_t(bsize) << dendl;
-
-    ObjectStore::Transaction cleanupt;
-
-    if (osize && onum) {
-      bufferlist bl;
-      bufferptr bp(osize);
-      bp.zero();
-      bl.push_back(std::move(bp));
-      bl.rebuild_page_aligned();
-      for (int i=0; i<onum; ++i) {
-       char nm[30];
-       snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
-       object_t oid(nm);
-       hobject_t soid(sobject_t(oid, 0));
-       ObjectStore::Transaction t;
-       t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
-       store->queue_transaction(service.meta_ch, std::move(t), NULL);
-       cleanupt.remove(coll_t(), ghobject_t(soid));
-      }
-    }
-
-    bufferlist bl;
-    bufferptr bp(bsize);
-    bp.zero();
-    bl.push_back(std::move(bp));
-    bl.rebuild_page_aligned();
+    int64_t count = cmd_getval_or<int64_t>(cmdmap, "count", 1LL << 30);
+    int64_t bsize = cmd_getval_or<int64_t>(cmdmap, "size", 4LL << 20);
+    int64_t osize = cmd_getval_or<int64_t>(cmdmap, "object_size", 0);
+    int64_t onum = cmd_getval_or<int64_t>(cmdmap, "object_num", 0);
+    double elapsed = 0.0;
 
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-
-    utime_t start = ceph_clock_now();
-    for (int64_t pos = 0; pos < count; pos += bsize) {
-      char nm[30];
-      unsigned offset = 0;
-      if (onum && osize) {
-       snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
-       offset = rand() % (osize / bsize) * bsize;
-      } else {
-       snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
-      }
-      object_t oid(nm);
-      hobject_t soid(sobject_t(oid, 0));
-      ObjectStore::Transaction t;
-      t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
-      store->queue_transaction(service.meta_ch, std::move(t), NULL);
-      if (!onum || !osize)
-       cleanupt.remove(coll_t::meta(), ghobject_t(soid));
-    }
-
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-    utime_t end = ceph_clock_now();
-
-    // clean up
-    store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
+    ret = run_osd_bench_test(count, bsize, osize, onum, &elapsed, ss);
+    if (ret != 0) {
+      goto out;
     }
 
-    double elapsed = end - start;
     double rate = count / elapsed;
     double iops = rate / bsize;
     f->open_object_section("osd_bench_results");
@@ -3094,6 +3113,30 @@ will start to track new ops received afterwards.";
     }
     f->close_section(); // entries
     f->close_section(); // network_ping_times
+  } else if (prefix == "dump_pool_statfs") {
+    lock_guard l(osd_lock);
+
+    int64_t p = 0;
+    if (!(cmd_getval(cmdmap, "poolid", p))) {
+      ss << "Error dumping pool statfs: no poolid provided";
+      ret = -EINVAL;
+      goto out;
+    }
+
+    store_statfs_t st;
+    bool per_pool_omap_stats = false;
+
+    ret = store->pool_statfs(p, &st, &per_pool_omap_stats);
+    if (ret < 0) {
+      ss << "Error dumping pool statfs: " << cpp_strerror(ret);
+      goto out;
+    } else {
+      ss << "dumping pool statfs...";
+      f->open_object_section("pool_statfs");
+      f->dump_int("poolid", p);
+      st.dump(f);
+      f->close_section();
+    }
   } else {
     ceph_abort_msg("broken asok registration");
   }
@@ -3102,6 +3145,150 @@ will start to track new ops received afterwards.";
   on_finish(ret, ss.str(), outbl);
 }
 
+int OSD::run_osd_bench_test(
+  int64_t count,
+  int64_t bsize,
+  int64_t osize,
+  int64_t onum,
+  double *elapsed,
+  ostream &ss)
+{
+  int ret = 0;
+  uint32_t duration = cct->_conf->osd_bench_duration;
+
+  if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
+    // let us limit the block size because the next checks rely on it
+    // having a sane value.  If we allow any block size to be set things
+    // can still go sideways.
+    ss << "block 'size' values are capped at "
+       << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
+       << " a higher value, please adjust 'osd_bench_max_block_size'";
+    ret = -EINVAL;
+    return ret;
+  } else if (bsize < (int64_t) (1 << 20)) {
+    // entering the realm of small block sizes.
+    // limit the count to a sane value, assuming a configurable amount of
+    // IOPS and duration, so that the OSD doesn't get hung up on this,
+    // preventing timeouts from going off
+    int64_t max_count =
+      bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
+    if (count > max_count) {
+      ss << "'count' values greater than " << max_count
+         << " for a block size of " << byte_u_t(bsize) << ", assuming "
+         << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
+         << " for " << duration << " seconds,"
+         << " can cause ill effects on osd. "
+         << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
+         << " value if you wish to use a higher 'count'.";
+      ret = -EINVAL;
+      return ret;
+    }
+  } else {
+    // 1MB block sizes are big enough so that we get more stuff done.
+    // However, to avoid the osd from getting hung on this and having
+    // timers being triggered, we are going to limit the count assuming
+    // a configurable throughput and duration.
+    // NOTE: max_count is the total amount of bytes that we believe we
+    //       will be able to write during 'duration' for the given
+    //       throughput.  The block size hardly impacts this unless it's
+    //       way too big.  Given we already check how big the block size
+    //       is, it's safe to assume everything will check out.
+    int64_t max_count =
+      cct->_conf->osd_bench_large_size_max_throughput * duration;
+    if (count > max_count) {
+      ss << "'count' values greater than " << max_count
+         << " for a block size of " << byte_u_t(bsize) << ", assuming "
+         << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
+         << " for " << duration << " seconds,"
+         << " can cause ill effects on osd. "
+         << " Please adjust 'osd_bench_large_size_max_throughput'"
+         << " with a higher value if you wish to use a higher 'count'.";
+      ret = -EINVAL;
+      return ret;
+    }
+  }
+
+  if (osize && bsize > osize) {
+    bsize = osize;
+  }
+
+  dout(1) << " bench count " << count
+          << " bsize " << byte_u_t(bsize) << dendl;
+
+  ObjectStore::Transaction cleanupt;
+
+  if (osize && onum) {
+    bufferlist bl;
+    bufferptr bp(osize);
+    memset(bp.c_str(), 'a', bp.length());
+    bl.push_back(std::move(bp));
+    bl.rebuild_page_aligned();
+    for (int i=0; i<onum; ++i) {
+      char nm[30];
+      snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
+      object_t oid(nm);
+      hobject_t soid(sobject_t(oid, 0));
+      ObjectStore::Transaction t;
+      t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
+      store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+      cleanupt.remove(coll_t(), ghobject_t(soid));
+    }
+  }
+
+  bufferlist bl;
+  bufferptr bp(bsize);
+  memset(bp.c_str(), 'a', bp.length());
+  bl.push_back(std::move(bp));
+  bl.rebuild_page_aligned();
+
+  {
+    C_SaferCond waiter;
+    if (!service.meta_ch->flush_commit(&waiter)) {
+      waiter.wait();
+    }
+  }
+
+  utime_t start = ceph_clock_now();
+  for (int64_t pos = 0; pos < count; pos += bsize) {
+    char nm[30];
+    unsigned offset = 0;
+    if (onum && osize) {
+      snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
+      offset = rand() % (osize / bsize) * bsize;
+    } else {
+      snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
+    }
+    object_t oid(nm);
+    hobject_t soid(sobject_t(oid, 0));
+    ObjectStore::Transaction t;
+    t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
+    store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+    if (!onum || !osize) {
+      cleanupt.remove(coll_t::meta(), ghobject_t(soid));
+    }
+  }
+
+  {
+    C_SaferCond waiter;
+    if (!service.meta_ch->flush_commit(&waiter)) {
+      waiter.wait();
+    }
+  }
+  utime_t end = ceph_clock_now();
+  *elapsed = end - start;
+
+  // clean up
+  store->queue_transaction(service.meta_ch, std::move(cleanupt), nullptr);
+  {
+    C_SaferCond waiter;
+    if (!service.meta_ch->flush_commit(&waiter)) {
+      waiter.wait();
+    }
+  }
+
+ return ret;
+}
+
 class TestOpsSocketHook : public AdminSocketHook {
   OSDService *service;
   ObjectStore *store;
@@ -3174,7 +3361,7 @@ int OSD::enable_disable_fuse(bool stop)
           << cpp_strerror(r) << dendl;
       return r;
     }
-    fuse_store = new FuseStore(store, mntpath);
+    fuse_store = new FuseStore(store.get(), mntpath);
     r = fuse_store->start();
     if (r < 0) {
       derr << __func__ << " unable to start fuse: " << cpp_strerror(r) << dendl;
@@ -3265,7 +3452,7 @@ int OSD::init()
   std::lock_guard lock(osd_lock);
   if (is_stopping())
     return 0;
-
+  tracing::osd::tracer.init("osd");
   tick_timer.init();
   tick_timer_without_osd_lock.init();
   service.recovery_request_timer.init();
@@ -3288,6 +3475,10 @@ int OSD::init()
 
   store->set_cache_shards(get_num_cache_shards());
 
+ int rotating_auth_attempts = 0;
+ auto rotating_auth_timeout =
+   g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
+
   int r = store->mount();
   if (r < 0) {
     derr << "OSD:init: unable to mount object store" << dendl;
@@ -3302,7 +3493,12 @@ int OSD::init()
   dout(2) << "boot" << dendl;
 
   service.meta_ch = store->open_collection(coll_t::meta());
-
+  if (!service.meta_ch) {
+    derr << "OSD:init: unable to open meta collection"
+         << dendl;
+    r = -ENOENT;
+    goto out;
+  }
   // initialize the daily loadavg with current 15min loadavg
   double loadavgs[3];
   if (getloadavg(loadavgs, 3) == 3) {
@@ -3312,10 +3508,6 @@ int OSD::init()
     daily_loadavg = 1.0;
   }
 
-  int rotating_auth_attempts = 0;
-  auto rotating_auth_timeout =
-    g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
-
   // sanity check long object name handling
   {
     hobject_t l;
@@ -3419,7 +3611,7 @@ int OSD::init()
       auto ch = service.meta_ch;
       auto hoid = make_snapmapper_oid();
       unsigned max = cct->_conf->osd_target_transaction_size;
-      r = SnapMapper::convert_legacy(cct, store, ch, hoid, max);
+      r = SnapMapper::convert_legacy(cct, store.get(), ch, hoid, max);
       if (r < 0)
        goto out;
     }
@@ -3459,8 +3651,6 @@ int OSD::init()
 
   check_osdmap_features();
 
-  create_recoverystate_perf();
-
   {
     epoch_t bind_epoch = osdmap->get_epoch();
     service.set_epochs(NULL, NULL, &bind_epoch);
@@ -3479,7 +3669,10 @@ int OSD::init()
 
   dout(2) << "superblock: I am osd." << superblock.whoami << dendl;
 
-  create_logger();
+  if (cct->_conf.get_val<bool>("osd_compact_on_start")) {
+    dout(2) << "compacting object store's omap" << dendl;
+    store->compact();
+  }
 
   // prime osd stats
   {
@@ -3490,7 +3683,7 @@ int OSD::init()
     service.set_statfs(stbuf, alerts);
   }
 
-  // client_messenger auth_client is already set up by monc.
+  // client_messenger's auth_client will be set up by monc->init() later.
   for (auto m : { cluster_messenger,
        objecter_messenger,
        hb_front_client_messenger,
@@ -3513,7 +3706,7 @@ int OSD::init()
   if (r < 0)
     goto out;
 
-  mgrc.set_pgstats_cb([this](){ return collect_pg_stats(); });
+  mgrc.set_pgstats_cb([this]() { return collect_pg_stats(); });
   mgrc.set_perf_metric_query_cb(
     [this](const ConfigPayload &config_payload) {
         set_perf_queries(config_payload);
@@ -3650,13 +3843,16 @@ int OSD::init()
 
   start_boot();
 
+  // Override a few options if mclock scheduler is enabled.
+  maybe_override_max_osd_capacity_for_qos();
+  maybe_override_options_for_qos();
+
   return 0;
 
 out:
   enable_disable_fuse(true);
   store->umount();
-  delete store;
-  store = NULL;
+  store.reset();
   return r;
 }
 
@@ -3703,11 +3899,11 @@ void OSD::final_init()
   ceph_assert(r == 0);
   r = admin_socket->register_command("dump_op_pq_state",
                                     asok_hook,
-                                    "dump op priority queue state");
+                                    "dump op queue state");
   ceph_assert(r == 0);
-  r = admin_socket->register_command("dump_blacklist",
+  r = admin_socket->register_command("dump_blocklist",
                                     asok_hook,
-                                    "dump blacklisted clients and times");
+                                    "dump blocklisted clients and times");
   ceph_assert(r == 0);
   r = admin_socket->register_command("dump_watchers",
                                     asok_hook,
@@ -3795,7 +3991,12 @@ void OSD::final_init()
     "Dump osd heartbeat network ping times");
   ceph_assert(r == 0);
 
-  test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
+  r = admin_socket->register_command(
+    "dump_pool_statfs name=poolid,type=CephInt,req=true", asok_hook,
+    "Dump store's statistics for the given pool");
+  ceph_assert(r == 0);
+
+  test_ops_hook = new TestOpsSocketHook(&(this->service), this->store.get());
   // Note: pools are CephString instead of CephPoolname because
   // these commands traditionally support both pool names and numbers
   r = admin_socket->register_command(
@@ -3944,6 +4145,14 @@ void OSD::final_init()
     asok_hook,
     "Scrub purged_snaps vs snapmapper index");
   ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "scrubdebug "                                              \
+    "name=pgid,type=CephPgid "                                 \
+    "name=cmd,type=CephChoices,strings=block|unblock|set|unset " \
+    "name=value,type=CephString,req=false",
+    asok_hook,
+    "debug the scrubber");
+  ceph_assert(r == 0);
 
   // -- pg commands --
   // old form: ceph pg <pgid> command ...
@@ -4022,43 +4231,60 @@ void OSD::final_init()
   ceph_assert(r == 0);
 }
 
-void OSD::create_logger()
+PerfCounters* OSD::create_logger()
 {
-  dout(10) << "create_logger" << dendl;
-
-  logger = build_osd_logger(cct);
+  PerfCounters* logger = build_osd_logger(cct);
   cct->get_perfcounters_collection()->add(logger);
+  return logger;
 }
 
-void OSD::create_recoverystate_perf()
+PerfCounters* OSD::create_recoverystate_perf()
 {
-  dout(10) << "create_recoverystate_perf" << dendl;
-
-  recoverystate_perf = build_recoverystate_perf(cct);
+  PerfCounters* recoverystate_perf = build_recoverystate_perf(cct);
   cct->get_perfcounters_collection()->add(recoverystate_perf);
+  return recoverystate_perf;
 }
 
 int OSD::shutdown()
 {
+  // vstart overwrites osd_fast_shutdown value in the conf file -> force the value here!
+  //cct->_conf->osd_fast_shutdown = true;
+
+  dout(0) << "Fast Shutdown: - cct->_conf->osd_fast_shutdown = "
+         << cct->_conf->osd_fast_shutdown
+         << ", null-fm = " << store->has_null_manager() << dendl;
+
+  utime_t  start_time_func = ceph_clock_now();
+
   if (cct->_conf->osd_fast_shutdown) {
     derr << "*** Immediate shutdown (osd_fast_shutdown=true) ***" << dendl;
-    cct->_log->flush();
-    _exit(0);
+    if (cct->_conf->osd_fast_shutdown_notify_mon)
+      service.prepare_to_stop();
+
+    // There is no state we need to keep wehn running in NULL-FM moode
+    if (!store->has_null_manager()) {
+      cct->_log->flush();
+      _exit(0);
+    }
+  } else if (!service.prepare_to_stop()) {
+    return 0; // already shutting down
   }
 
-  if (!service.prepare_to_stop())
-    return 0; // already shutting down
   osd_lock.lock();
   if (is_stopping()) {
     osd_lock.unlock();
     return 0;
   }
-  dout(0) << "shutdown" << dendl;
 
+  if (!cct->_conf->osd_fast_shutdown) {
+    dout(0) << "shutdown" << dendl;
+  }
+
+  // don't accept new task for this OSD
   set_state(STATE_STOPPING);
 
-  // Debugging
-  if (cct->_conf.get_val<bool>("osd_debug_shutdown")) {
+  // Disabled debugging during fast-shutdown
+  if (!cct->_conf->osd_fast_shutdown && cct->_conf.get_val<bool>("osd_debug_shutdown")) {
     cct->_conf.set_val("debug_osd", "100");
     cct->_conf.set_val("debug_journal", "100");
     cct->_conf.set_val("debug_filestore", "100");
@@ -4067,6 +4293,45 @@ int OSD::shutdown()
     cct->_conf.apply_changes(nullptr);
   }
 
+  if (cct->_conf->osd_fast_shutdown) {
+    // first, stop new task from being taken from op_shardedwq
+    // and clear all pending tasks
+    op_shardedwq.stop_for_fast_shutdown();
+
+    utime_t  start_time_timer = ceph_clock_now();
+    tick_timer.shutdown();
+    {
+      std::lock_guard l(tick_timer_lock);
+      tick_timer_without_osd_lock.shutdown();
+    }
+
+    osd_lock.unlock();
+    utime_t  start_time_osd_drain = ceph_clock_now();
+
+    // then, wait on osd_op_tp to drain (TBD: should probably add a timeout)
+    osd_op_tp.drain();
+    osd_op_tp.stop();
+
+    utime_t  start_time_umount = ceph_clock_now();
+    store->prepare_for_fast_shutdown();
+    std::lock_guard lock(osd_lock);
+    // TBD: assert in allocator that nothing is being add
+    store->umount();
+
+    utime_t end_time = ceph_clock_now();
+    if (cct->_conf->osd_fast_shutdown_timeout) {
+      ceph_assert(end_time - start_time_func < cct->_conf->osd_fast_shutdown_timeout);
+    }
+    dout(0) <<"Fast Shutdown duration total     :" << end_time              - start_time_func       << " seconds" << dendl;
+    dout(0) <<"Fast Shutdown duration osd_drain :" << start_time_umount     - start_time_osd_drain  << " seconds" << dendl;
+    dout(0) <<"Fast Shutdown duration umount    :" << end_time              - start_time_umount     << " seconds" << dendl;
+    dout(0) <<"Fast Shutdown duration timer     :" << start_time_osd_drain  - start_time_timer      << " seconds" << dendl;
+    cct->_log->flush();
+
+    // now it is safe to exit
+    _exit(0);
+  }
+
   // stop MgrClient earlier as it's more like an internal consumer of OSD
   mgrc.shutdown();
 
@@ -4214,8 +4479,7 @@ int OSD::shutdown()
 
   std::lock_guard lock(osd_lock);
   store->umount();
-  delete store;
-  store = nullptr;
+  store.reset();
   dout(10) << "Store synced" << dendl;
 
   op_tracker.on_shutdown();
@@ -4229,6 +4493,11 @@ int OSD::shutdown()
   hb_front_server_messenger->shutdown();
   hb_back_server_messenger->shutdown();
 
+  utime_t duration = ceph_clock_now() - start_time_func;
+  dout(0) <<"Slow Shutdown duration:" << duration << " seconds" << dendl;
+
+  tracing::osd::tracer.shutdown();
+
   return r;
 }
 
@@ -4505,7 +4774,7 @@ PG* OSD::_make_pg(
     }
     decode(ec_profile, p);
   }
-  PGPool pool(cct, createmap, pgid.pool(), pi, name);
+  PGPool pool(createmap, pgid.pool(), pi, name);
   PG *pg;
   if (pi.type == pg_pool_t::TYPE_REPLICATED ||
       pi.type == pg_pool_t::TYPE_ERASURE)
@@ -4655,10 +4924,10 @@ void OSD::load_pgs()
        ++it) {
     spg_t pgid;
     if (it->is_temp(&pgid) ||
-       (it->is_pg(&pgid) && PG::_has_removal_flag(store, pgid))) {
+       (it->is_pg(&pgid) && PG::_has_removal_flag(store.get(), pgid))) {
       dout(10) << "load_pgs " << *it
               << " removing, legacy or flagged for removal pg" << dendl;
-      recursive_remove_collection(cct, store, pgid, *it);
+      recursive_remove_collection(cct, store.get(), pgid, *it);
       continue;
     }
 
@@ -4669,7 +4938,7 @@ void OSD::load_pgs()
 
     dout(10) << "pgid " << pgid << " coll " << coll_t(pgid) << dendl;
     epoch_t map_epoch = 0;
-    int r = PG::peek_map_epoch(store, pgid, &map_epoch);
+    int r = PG::peek_map_epoch(store.get(), pgid, &map_epoch);
     if (r < 0) {
       derr << __func__ << " unable to peek at " << pgid << " metadata, skipping"
           << dendl;
@@ -4699,7 +4968,7 @@ void OSD::load_pgs()
       pg = _make_pg(get_osdmap(), pgid);
     }
     if (!pg) {
-      recursive_remove_collection(cct, store, pgid, *it);
+      recursive_remove_collection(cct, store.get(), pgid, *it);
       continue;
     }
 
@@ -4709,13 +4978,13 @@ void OSD::load_pgs()
     pg->ch = store->open_collection(pg->coll);
 
     // read pg state, log
-    pg->read_state(store);
+    pg->read_state(store.get());
 
     if (pg->dne())  {
       dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
       pg->ch = nullptr;
       pg->unlock();
-      recursive_remove_collection(cct, store, pgid, *it);
+      recursive_remove_collection(cct, store.get(), pgid, *it);
       continue;
     }
     {
@@ -4724,8 +4993,6 @@ void OSD::load_pgs()
       store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
     }
 
-    pg->reg_next_scrub();
-
     dout(10) << __func__ << " loaded " << *pg << dendl;
     pg->unlock();
 
@@ -4746,8 +5013,6 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
     return nullptr;
   }
 
-  PeeringCtx rctx = create_context();
-
   OSDMapRef startmap = get_map(info->epoch);
 
   if (info->by_mon) {
@@ -4781,6 +5046,7 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
                 << "the pool allows ec overwrites but is not stored in "
                 << "bluestore, so deep scrubbing will not detect bitrot";
   }
+  PeeringCtx rctx;
   create_pg_collection(
     rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
   init_pg_ondisk(rctx.transaction, pgid, pp);
@@ -4809,7 +5075,6 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
     acting_primary,
     info->history,
     info->past_intervals,
-    false,
     rctx.transaction);
 
   pg->init_collection_pool_opts();
@@ -5199,14 +5464,15 @@ void OSD::reset_heartbeat_peers(bool all)
   stale -= cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
   std::lock_guard l(heartbeat_lock);
   for (auto it = heartbeat_peers.begin(); it != heartbeat_peers.end();) {
-    HeartbeatInfo& hi = it->second;
+    auto& [peer, hi] = *it;
     if (all || hi.is_stale(stale)) {
       hi.clear_mark_down();
       // stop sending failure_report to mon too
-      failure_queue.erase(it->first);
-      heartbeat_peers.erase(it++);
+      failure_queue.erase(peer);
+      failure_pending.erase(peer);
+      it = heartbeat_peers.erase(it);
     } else {
-      it++;
+      ++it;
     }
   }
 }
@@ -5636,22 +5902,10 @@ void OSD::heartbeat()
   ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
   dout(30) << "heartbeat" << dendl;
 
-  // get CPU load avg
-  double loadavgs[1];
-  int hb_interval = cct->_conf->osd_heartbeat_interval;
-  int n_samples = 86400;
-  if (hb_interval > 1) {
-    n_samples /= hb_interval;
-    if (n_samples < 1)
-      n_samples = 1;
+  auto load_for_logger = service.get_scrub_services().update_load_average();
+  if (load_for_logger) {
+    logger->set(l_osd_loadavg, load_for_logger.value());
   }
-
-  if (getloadavg(loadavgs, 1) == 1) {
-    logger->set(l_osd_loadavg, 100 * loadavgs[0]);
-    daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
-    dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
-  }
-
   dout(30) << "heartbeat checking stats" << dendl;
 
   // refresh peer list and osd stats
@@ -5680,6 +5934,11 @@ void OSD::heartbeat()
        i != heartbeat_peers.end();
        ++i) {
     int peer = i->first;
+    Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
+    if (!s) {
+      dout(30) << "heartbeat osd." << peer << " has no open con" << dendl;
+      continue;
+    }
     dout(30) << "heartbeat sending ping to osd." << peer << dendl;
 
     i->second.last_tx = now;
@@ -5690,7 +5949,6 @@ void OSD::heartbeat()
     if (i->second.hb_interval_start == utime_t())
       i->second.hb_interval_start = now;
 
-    Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
     std::optional<ceph::signedspan> delta_ub;
     s->stamps->sent_ping(&delta_ub);
 
@@ -5816,7 +6074,7 @@ void OSD::tick()
     // use a seed that is stable for each scrub interval, but varies
     // by OSD to avoid any herds.
     rng.seed(whoami + superblock.last_purged_snaps_scrub.sec());
-    double r = (rng() % 1024) / 1024;
+    double r = (rng() % 1024) / 1024.0;
     next +=
       cct->_conf->osd_scrub_min_interval *
       cct->_conf->osd_scrub_interval_randomize_ratio * r;
@@ -5838,9 +6096,9 @@ void OSD::tick_without_osd_lock()
   ceph_assert(ceph_mutex_is_locked(tick_timer_lock));
   dout(10) << "tick_without_osd_lock" << dendl;
 
-  logger->set(l_osd_cached_crc, buffer::get_cached_crc());
-  logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
-  logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+  logger->set(l_osd_cached_crc, ceph::buffer::get_cached_crc());
+  logger->set(l_osd_cached_crc_adjusted, ceph::buffer::get_cached_crc_adjusted());
+  logger->set(l_osd_missed_crc, ceph::buffer::get_missed_crc());
 
   // refresh osd stats
   struct store_statfs_t stbuf;
@@ -5893,7 +6151,7 @@ void OSD::tick_without_osd_lock()
       // borrow lec lock to pretect last_sent_beacon from changing
       std::lock_guard l{min_last_epoch_clean_lock};
       const auto elapsed = now - last_sent_beacon;
-      if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
+      if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() >
         cct->_conf->osd_beacon_report_interval) {
         need_send_beacon = true;
       }
@@ -5963,8 +6221,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
       return;
     }
 
-    int64_t shardid;
-    cmd_getval(cmdmap, "shardid", shardid, int64_t(shard_id_t::NO_SHARD));
+    int64_t shardid = cmd_getval_or<int64_t>(cmdmap, "shardid", shard_id_t::NO_SHARD);
     hobject_t obj(object_t(objname), string(""), CEPH_NOSNAP, rawpg.ps(), pool, nspace);
     ghobject_t gobj(obj, ghobject_t::NO_GEN, shard_id_t(uint8_t(shardid)));
     spg_t pgid(curmap->raw_pg_to_pg(rawpg), shard_id_t(shardid));
@@ -6053,8 +6310,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     return;
   }
   if (command == "set_recovery_delay") {
-    int64_t delay;
-    cmd_getval(cmdmap, "utime", delay, (int64_t)0);
+    int64_t delay = cmd_getval_or<int64_t>(cmdmap, "utime", 0);
     ostringstream oss;
     oss << delay;
     int r = service->cct->_conf.set_val("osd_recovery_delay_start",
@@ -6071,11 +6327,10 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     return;
   }
   if (command == "injectfull") {
-    int64_t count;
-    string type;
+    int64_t count = cmd_getval_or<int64_t>(cmdmap, "count", -1);
+    string type = cmd_getval_or<string>(cmdmap, "type", "full");
     OSDService::s_names state;
-    cmd_getval(cmdmap, "type", type, string("full"));
-    cmd_getval(cmdmap, "count", count, (int64_t)-1);
+
     if (type == "none" || count == 0) {
       type = "none";
       count = 0;
@@ -6218,12 +6473,12 @@ bool OSD::ms_handle_refused(Connection *con)
   return true;
 }
 
-struct C_OSD_GetVersion : public Context {
+struct CB_OSD_GetVersion {
   OSD *osd;
-  uint64_t oldest, newest;
-  explicit C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
-  void finish(int r) override {
-    if (r >= 0)
+  explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
+  void operator ()(boost::system::error_code ec, version_t newest,
+                  version_t oldest) {
+    if (!ec)
       osd->_got_mon_epochs(oldest, newest);
   }
 };
@@ -6243,8 +6498,7 @@ void OSD::start_boot()
   set_state(STATE_PREBOOT);
   dout(10) << "start_boot - have maps " << superblock.oldest_map
           << ".." << superblock.newest_map << dendl;
-  C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
-  monc->get_version("osdmap", &c->newest, &c->oldest, c);
+  monc->get_version("osdmap", CB_OSD_GetVersion(this));
 }
 
 void OSD::_got_mon_epochs(epoch_t oldest, epoch_t newest)
@@ -6284,9 +6538,6 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest)
   } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     derr << "osdmap SORTBITWISE OSDMap flag is NOT set; please set it"
         << dendl;
-  } else if (osdmap->require_osd_release < ceph_release_t::luminous) {
-    derr << "osdmap require_osd_release < luminous; please upgrade to luminous"
-        << dendl;
   } else if (service.need_fullness_update()) {
     derr << "osdmap fullness state needs update" << dendl;
     send_full_update();
@@ -6349,7 +6600,7 @@ void OSD::handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *m)
       m->last < superblock.purged_snaps_last) {
     goto out;
   }
-  SnapMapper::record_purged_snaps(cct, store, service.meta_ch,
+  SnapMapper::record_purged_snaps(cct, store.get(), service.meta_ch,
                                  make_purged_snaps_oid(), &t,
                                  m->purged_snaps);
   superblock.purged_snaps_last = m->last;
@@ -6653,7 +6904,7 @@ void OSD::got_full_map(epoch_t e)
     requested_full_first = requested_full_last = 0;
     return;
   }
-  
+
   requested_full_first = e + 1;
 
   dout(10) << __func__ << " " << e << ", requested " << requested_full_first
@@ -6732,7 +6983,8 @@ void OSD::send_beacon(const ceph::coarse_mono_clock::time_point& now)
       std::lock_guard l{min_last_epoch_clean_lock};
       beacon = new MOSDBeacon(get_osdmap_epoch(),
                              min_last_epoch_clean,
-                             superblock.last_purged_snaps_scrub);
+                             superblock.last_purged_snaps_scrub,
+                             cct->_conf->osd_beacon_report_interval);
       beacon->pgs = min_last_epoch_clean_pgs;
       last_sent_beacon = now;
     }
@@ -6780,7 +7032,7 @@ void OSD::scrub_purged_snaps()
 {
   dout(10) << __func__ << dendl;
   ceph_assert(ceph_mutex_is_locked(osd_lock));
-  SnapMapper::Scrubber s(cct, store, service.meta_ch,
+  SnapMapper::Scrubber s(cct, store.get(), service.meta_ch,
                         make_snapmapper_oid(),
                         make_purged_snaps_oid());
   clog->debug() << "purged_snaps scrub starts";
@@ -6999,12 +7251,12 @@ void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRe
 
 void OSD::ms_fast_dispatch(Message *m)
 {
+  auto dispatch_span = tracing::osd::tracer.start_trace(__func__);
   FUNCTRACE(cct);
   if (service.is_stopping()) {
     m->put();
     return;
   }
-
   // peering event?
   switch (m->get_type()) {
   case CEPH_MSG_PING:
@@ -7017,18 +7269,14 @@ void OSD::ms_fast_dispatch(Message *m)
   case MSG_OSD_SCRUB2:
     handle_fast_scrub(static_cast<MOSDScrub2*>(m));
     return;
-
   case MSG_OSD_PG_CREATE2:
     return handle_fast_pg_create(static_cast<MOSDPGCreate2*>(m));
-  case MSG_OSD_PG_QUERY:
-    return handle_fast_pg_query(static_cast<MOSDPGQuery*>(m));
   case MSG_OSD_PG_NOTIFY:
     return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));
   case MSG_OSD_PG_INFO:
     return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));
   case MSG_OSD_PG_REMOVE:
     return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));
-
     // these are single-pg messages that handle themselves
   case MSG_OSD_PG_LOG:
   case MSG_OSD_PG_TRIM:
@@ -7059,6 +7307,7 @@ void OSD::ms_fast_dispatch(Message *m)
     tracepoint(osd, ms_fast_dispatch, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
   }
+  op->osd_parent_span = tracing::osd::tracer.add_span("op-request-created", dispatch_span);
 
   if (m->trace)
     op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
@@ -7091,7 +7340,7 @@ void OSD::ms_fast_dispatch(Message *m)
       service.release_map(nextmap);
     }
   }
-  OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); 
+  OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
 }
 
 int OSD::ms_handle_authentication(Connection *con)
@@ -7120,7 +7369,7 @@ int OSD::ms_handle_authentication(Connection *con)
     try {
       decode(str, p);
     }
-    catch (buffer::error& e) {
+    catch (ceph::buffer::error& e) {
       dout(10) << __func__ << " session " << s << " " << s->entity_name
               << " failed to decode caps string" << dendl;
       ret = -EACCES;
@@ -7291,239 +7540,111 @@ bool OSD::scrub_random_backoff()
   return false;
 }
 
-OSDService::ScrubJob::ScrubJob(CephContext* cct,
-                              const spg_t& pg, const utime_t& timestamp,
-                              double pool_scrub_min_interval,
-                              double pool_scrub_max_interval, bool must)
-  : cct(cct),
-    pgid(pg),
-    sched_time(timestamp),
-    deadline(timestamp)
-{
-  // if not explicitly requested, postpone the scrub with a random delay
-  if (!must) {
-    double scrub_min_interval = pool_scrub_min_interval > 0 ?
-      pool_scrub_min_interval : cct->_conf->osd_scrub_min_interval;
-    double scrub_max_interval = pool_scrub_max_interval > 0 ?
-      pool_scrub_max_interval : cct->_conf->osd_scrub_max_interval;
-
-    sched_time += scrub_min_interval;
-    double r = rand() / (double)RAND_MAX;
-    sched_time +=
-      scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
-    if (scrub_max_interval == 0) {
-      deadline = utime_t();
-    } else {
-      deadline += scrub_max_interval;
-    }
-
-  }
-}
-
-bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs) const {
-  if (sched_time < rhs.sched_time)
-    return true;
-  if (sched_time > rhs.sched_time)
-    return false;
-  return pgid < rhs.pgid;
-}
 
-double OSD::scrub_sleep_time(bool must_scrub)
+void OSD::sched_scrub()
 {
-  if (must_scrub) {
-    return cct->_conf->osd_scrub_sleep;
+  auto& scrub_scheduler = service.get_scrub_services();
+
+  // fail fast if no resources are available
+  if (!scrub_scheduler.can_inc_scrubs()) {
+    dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
+    return;
   }
-  utime_t now = ceph_clock_now();
-  if (scrub_time_permit(now)) {
-    return cct->_conf->osd_scrub_sleep;
+
+  // if there is a PG that is just now trying to reserve scrub replica resources -
+  // we should wait and not initiate a new scrub
+  if (scrub_scheduler.is_reserving_now()) {
+    dout(20) << __func__ << ": scrub resources reservation in progress" << dendl;
+    return;
   }
-  double normal_sleep = cct->_conf->osd_scrub_sleep;
-  double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
-  return std::max(extended_sleep, normal_sleep);
-}
 
-bool OSD::scrub_time_permit(utime_t now)
-{
-  struct tm bdt;
-  time_t tt = now.sec();
-  localtime_r(&tt, &bdt);
+  Scrub::ScrubPreconds env_conditions;
 
-  bool day_permit = false;
-  if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
-    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
-      day_permit = true;
-    }
-  } else {
-    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
-      day_permit = true;
+  if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
+    if (!cct->_conf->osd_repair_during_recovery) {
+      dout(15) << __func__ << ": not scheduling scrubs due to active recovery"
+              << dendl;
+      return;
     }
+    dout(10) << __func__
+      << " will only schedule explicitly requested repair due to active recovery"
+      << dendl;
+    env_conditions.allow_requested_repair_only = true;
   }
 
-  if (!day_permit) {
-    dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
-            << " - " << cct->_conf->osd_scrub_end_week_day
-            << " now " << bdt.tm_wday << " = no" << dendl;
-    return false;
-  }
-
-  bool time_permit = false;
-  if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
-    if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
-      time_permit = true;
-    }
-  } else {
-    if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
-      time_permit = true;
+  if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+    dout(20) << __func__ << " sched_scrub starts" << dendl;
+    auto all_jobs = scrub_scheduler.list_registered_jobs();
+    for (const auto& sj : all_jobs) {
+      dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl;
     }
   }
-  if (!time_permit) {
-    dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
-            << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = no" << dendl;
-  } else {
-    dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
-            << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = yes" << dendl;
-  }
-  return time_permit;
+
+  auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions);
+  dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started)
+          << ")" << dendl;
 }
 
-bool OSD::scrub_load_below_threshold()
+Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid,
+                                                     bool allow_requested_repair_only)
 {
-  double loadavgs[3];
-  if (getloadavg(loadavgs, 3) != 3) {
-    dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
-    return false;
-  }
+  dout(20) << __func__ << " trying " << pgid << dendl;
 
-  // allow scrub if below configured threshold
-  long cpus = sysconf(_SC_NPROCESSORS_ONLN);
-  double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
-  if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
-    dout(20) << __func__ << " loadavg per cpu " << loadavg_per_cpu
-            << " < max " << cct->_conf->osd_scrub_load_threshold
-            << " = yes" << dendl;
-    return true;
-  }
+  // we have a candidate to scrub. We need some PG information to know if scrubbing is
+  // allowed
 
-  // allow scrub if below daily avg and currently decreasing
-  if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
-    dout(20) << __func__ << " loadavg " << loadavgs[0]
-            << " < daily_loadavg " << daily_loadavg
-            << " and < 15m avg " << loadavgs[2]
-            << " = yes" << dendl;
-    return true;
+  PGRef pg = osd->lookup_lock_pg(pgid);
+  if (!pg) {
+    // the PG was dequeued in the short timespan between creating the candidates list
+    // (collect_ripe_jobs()) and here
+    dout(5) << __func__ << " pg  " << pgid << " not found" << dendl;
+    return Scrub::schedule_result_t::no_such_pg;
   }
 
-  dout(20) << __func__ << " loadavg " << loadavgs[0]
-          << " >= max " << cct->_conf->osd_scrub_load_threshold
-          << " and ( >= daily_loadavg " << daily_loadavg
-          << " or >= 15m avg " << loadavgs[2]
-          << ") = no" << dendl;
-  return false;
-}
-
-void OSD::sched_scrub()
-{
-  // if not permitted, fail fast
-  if (!service.can_inc_scrubs()) {
-    return;
+  // This has already started, so go on to the next scrub job
+  if (pg->is_scrub_queued_or_active()) {
+    pg->unlock();
+    dout(20) << __func__ << ": already in progress pgid " << pgid << dendl;
+    return Scrub::schedule_result_t::already_started;
   }
-  bool allow_requested_repair_only = false;
-  if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
-    if (!cct->_conf->osd_repair_during_recovery) {
-      dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
-      return;
-    }
-    dout(10) << __func__
-             << " will only schedule explicitly requested repair due to active recovery"
-             << dendl;
-    allow_requested_repair_only = true;
+  // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+  if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
+    pg->unlock();
+    dout(10) << __func__ << " skip " << pgid
+            << " because repairing is not explicitly requested on it" << dendl;
+    return Scrub::schedule_result_t::preconditions;
   }
 
-  utime_t now = ceph_clock_now();
-  bool time_permit = scrub_time_permit(now);
-  bool load_is_low = scrub_load_below_threshold();
-  dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
-
-  OSDService::ScrubJob scrub;
-  if (service.first_scrub_stamp(&scrub)) {
-    do {
-      dout(30) << "sched_scrub examine " << scrub.pgid << " at " << scrub.sched_time << dendl;
-
-      if (scrub.sched_time > now) {
-       // save ourselves some effort
-       dout(10) << "sched_scrub " << scrub.pgid << " scheduled at " << scrub.sched_time
-                << " > " << now << dendl;
-       break;
-      }
-
-      if ((scrub.deadline.is_zero() || scrub.deadline >= now) && !(time_permit && load_is_low)) {
-        dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
-                 << (!time_permit ? "time not permit" : "high load") << dendl;
-        continue;
-      }
-
-      PGRef pg = _lookup_lock_pg(scrub.pgid);
-      if (!pg)
-       continue;
-      // This has already started, so go on to the next scrub job
-      if (pg->scrubber.active) {
-       pg->unlock();
-       dout(30) << __func__ << ": already in progress pgid " << scrub.pgid << dendl;
-       continue;
-      }
-      // Skip other kinds of scrubing if only explicitly requested repairing is allowed
-      if (allow_requested_repair_only && !pg->scrubber.must_repair) {
-        pg->unlock();
-        dout(10) << __func__ << " skip " << scrub.pgid
-                 << " because repairing is not explicitly requested on it"
-                 << dendl;
-        continue;
-      }
-      // If it is reserving, let it resolve before going to the next scrub job
-      if (pg->scrubber.local_reserved && !pg->scrubber.active) {
-       pg->unlock();
-       dout(30) << __func__ << ": reserve in progress pgid " << scrub.pgid << dendl;
-       break;
-      }
-      dout(10) << "sched_scrub scrubbing " << scrub.pgid << " at " << scrub.sched_time
-              << (pg->get_must_scrub() ? ", explicitly requested" :
-                  (load_is_low ? ", load_is_low" : " deadline < now"))
-              << dendl;
-      if (pg->sched_scrub()) {
-       pg->unlock();
-       break;
-      }
-      pg->unlock();
-    } while (service.next_scrub_stamp(scrub, &scrub));
-  }
-  dout(20) << "sched_scrub done" << dendl;
+  auto scrub_attempt = pg->sched_scrub();
+  pg->unlock();
+  return scrub_attempt;
 }
 
 void OSD::resched_all_scrubs()
 {
   dout(10) << __func__ << ": start" << dendl;
-  OSDService::ScrubJob scrub;
-  if (service.first_scrub_stamp(&scrub)) {
-    do {
-      dout(20) << __func__ << ": examine " << scrub.pgid << dendl;
+  auto all_jobs = service.get_scrub_services().list_registered_jobs();
+  for (auto& e : all_jobs) {
 
-      PGRef pg = _lookup_lock_pg(scrub.pgid);
-      if (!pg)
-       continue;
-      if (!pg->scrubber.must_scrub && !pg->scrubber.need_auto) {
-        dout(20) << __func__ << ": reschedule " << scrub.pgid << dendl;
-        pg->on_info_history_change();
-      }
-      pg->unlock();
-    } while (service.next_scrub_stamp(scrub, &scrub));
+    auto& job = *e;
+    dout(20) << __func__ << ": examine " << job.pgid << dendl;
+
+    PGRef pg = _lookup_lock_pg(job.pgid);
+    if (!pg)
+      continue;
+
+    if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+      dout(15) << __func__ << ": reschedule " << job.pgid << dendl;
+      pg->reschedule_scrub();
+    }
+    pg->unlock();
   }
   dout(10) << __func__ << ": done" << dendl;
 }
 
 MPGStats* OSD::collect_pg_stats()
 {
+  dout(15) << __func__ << dendl;
   // This implementation unconditionally sends every is_primary PG's
   // stats every time we're called.  This has equivalent cost to the
   // previous implementation's worst case where all PGs are busy and
@@ -7549,9 +7670,9 @@ MPGStats* OSD::collect_pg_stats()
     if (!pg->is_primary()) {
       continue;
     }
-    pg->get_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
+    pg->with_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
        m->pg_stat[pg->pg_id.pgid] = s;
-       min_last_epoch_clean = min(min_last_epoch_clean, lec);
+       min_last_epoch_clean = std::min(min_last_epoch_clean, lec);
        min_last_epoch_clean_pgs.push_back(pg->pg_id.pgid);
       });
   }
@@ -7587,6 +7708,15 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
     too_old -= cct->_conf.get_val<double>("osd_op_complaint_time");
     int slow = 0;
     TrackedOpRef oldest_op;
+    OSDMapRef osdmap = get_osdmap();
+    // map of slow op counts by slow op event type for an aggregated logging to
+    // the cluster log.
+    map<uint8_t, int> slow_op_types;
+    // map of slow op counts by pool for reporting a pool name with highest
+    // slow ops.
+    map<uint64_t, int> slow_op_pools;
+    bool log_aggregated_slow_op =
+           cct->_conf.get_val<bool>("osd_aggregated_slow_ops_logging");
     auto count_slow_ops = [&](TrackedOp& op) {
       if (op.get_initiated() < too_old) {
         stringstream ss;
@@ -7596,7 +7726,19 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
            << " currently "
            << op.state_string();
         lgeneric_subdout(cct,osd,20) << ss.str() << dendl;
-        clog->warn() << ss.str();
+        if (log_aggregated_slow_op) {
+          if (const OpRequest *req = dynamic_cast<const OpRequest *>(&op)) {
+            uint8_t op_type = req->state_flag();
+            auto m = req->get_req<MOSDFastDispatchOp>();
+            uint64_t poolid = m->get_spg().pgid.m_pool;
+            slow_op_types[op_type]++;
+            if (poolid > 0 && poolid <= (uint64_t) osdmap->get_pool_max()) {
+              slow_op_pools[poolid]++;
+            }
+          }
+        } else {
+          clog->warn() << ss.str();
+        }
        slow++;
        if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
          oldest_op = &op;
@@ -7610,6 +7752,32 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
       if (slow) {
        derr << __func__ << " reporting " << slow << " slow ops, oldest is "
             << oldest_op->get_desc() << dendl;
+        if (log_aggregated_slow_op &&
+             slow_op_types.size() > 0) {
+          stringstream ss;
+          ss << slow << " slow requests (by type [ ";
+          for (const auto& [op_type, count] : slow_op_types) {
+            ss << "'" << OpRequest::get_state_string(op_type)
+               << "' : " << count
+               << " ";
+          }
+          auto slow_pool_it = std::max_element(slow_op_pools.begin(), slow_op_pools.end(),
+                                 [](std::pair<uint64_t, int> p1, std::pair<uint64_t, int> p2) {
+                                   return p1.second < p2.second;
+                                 });
+          if (osdmap->get_pools().find(slow_pool_it->first) != osdmap->get_pools().end()) {
+            string pool_name = osdmap->get_pool_name(slow_pool_it->first);
+            ss << "] most affected pool [ '"
+               << pool_name
+               << "' : "
+               << slow_pool_it->second
+               << " ])";
+          } else {
+            ss << "])";
+          }
+          lgeneric_subdout(cct,osd,20) << ss.str() << dendl;
+          clog->warn() << ss.str();
+        }
       }
       metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
     } else {
@@ -8038,7 +8206,7 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   // record new purged_snaps
   if (superblock.purged_snaps_last == start - 1) {
-    SnapMapper::record_purged_snaps(cct, store, service.meta_ch,
+    SnapMapper::record_purged_snaps(cct, store.get(), service.meta_ch,
                                    make_purged_snaps_oid(), &t,
                                    purged_snaps);
     superblock.purged_snaps_last = last;
@@ -8088,7 +8256,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     OSDMapRef newmap = get_map(cur);
     ceph_assert(newmap);  // we just cached it above!
 
-    // start blacklisting messages sent to peers that go down.
+    // start blocklisting messages sent to peers that go down.
     service.pre_publish_map(newmap);
 
     // kill connections to newly down osds
@@ -8251,7 +8419,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        set<int> avoid_ports;
 #if defined(__FreeBSD__)
         // prevent FreeBSD from grabbing the client_messenger port during
-        // rebinding. In which case a cluster_meesneger will connect also 
+        // rebinding. In which case a cluster_meesneger will connect also
        // to the same port
        client_messenger->get_myaddrs().get_ports(&avoid_ports);
 #endif
@@ -8273,6 +8441,9 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        reset_heartbeat_peers(true);
       }
     }
+  } else if (osdmap->get_epoch() > 0 && osdmap->is_stop(whoami)) {
+    derr << "map says i am stopped by admin. shutting down." << dendl;
+    do_shutdown = true;
   }
 
   map_lock.unlock();
@@ -8407,7 +8578,7 @@ void OSD::_finish_splits(set<PGRef>& pgs)
        ++i) {
     PG *pg = i->get();
 
-    PeeringCtx rctx = create_context();
+    PeeringCtx rctx;
     pg->lock();
     dout(10) << __func__ << " " << *pg << dendl;
     epoch_t e = pg->get_osdmap_epoch();
@@ -8444,7 +8615,6 @@ bool OSD::advance_pg(
   }
   ceph_assert(pg->is_locked());
   OSDMapRef lastmap = pg->get_osdmap();
-  ceph_assert(lastmap->get_epoch() < osd_epoch);
   set<PGRef> new_pgs;  // any split children
   bool ret = true;
 
@@ -9063,11 +9233,6 @@ void OSD::handle_pg_create(OpRequestRef op)
 // ----------------------------------------
 // peering and recovery
 
-PeeringCtx OSD::create_context()
-{
-  return PeeringCtx(get_osdmap()->require_osd_release);
-}
-
 void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
                            ThreadPool::TPHandle *handle)
 {
@@ -9176,31 +9341,6 @@ void OSD::handle_fast_pg_create(MOSDPGCreate2 *m)
   m->put();
 }
 
-void OSD::handle_fast_pg_query(MOSDPGQuery *m)
-{
-  dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
-  if (!require_osd_peer(m)) {
-    m->put();
-    return;
-  }
-  int from = m->get_source().num();
-  for (auto& p : m->pg_list) {
-    enqueue_peering_evt(
-      p.first,
-      PGPeeringEventRef(
-       std::make_shared<PGPeeringEvent>(
-         p.second.epoch_sent, p.second.epoch_sent,
-         MQuery(
-           p.first,
-           pg_shard_t(from, p.second.from),
-           p.second,
-           p.second.epoch_sent),
-         false))
-      );
-  }
-  m->put();
-}
-
 void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
@@ -9245,12 +9385,12 @@ void OSD::handle_fast_pg_info(MOSDPGInfo* m)
     enqueue_peering_evt(
       spg_t(p.info.pgid.pgid, p.to),
       PGPeeringEventRef(
-       std::make_shared<PGPeeringEvent>(
-         p.epoch_sent, p.query_epoch,
-         MInfoRec(
-           pg_shard_t(from, p.from),
-           p.info,
-           p.epoch_sent)))
+       std::make_shared<PGPeeringEvent>(
+         p.epoch_sent, p.query_epoch,
+         MInfoRec(
+           pg_shard_t(from, p.from),
+           p.info,
+           p.epoch_sent)))
       );
   }
   m->put();
@@ -9341,15 +9481,13 @@ void OSD::handle_pg_query_nopg(const MQuery& q)
        osdmap->get_epoch(), empty,
        q.query.epoch_sent);
     } else {
-      vector<pg_notify_t> ls;
-      ls.push_back(
-       pg_notify_t(
-         q.query.from, q.query.to,
-         q.query.epoch_sent,
-         osdmap->get_epoch(),
-         empty,
-         PastIntervals()));
-      m = new MOSDPGNotify(osdmap->get_epoch(), std::move(ls));
+      pg_notify_t notify{q.query.from, q.query.to,
+                        q.query.epoch_sent,
+                        osdmap->get_epoch(),
+                        empty,
+                        PastIntervals()};
+      m = new MOSDPGNotify2(spg_t{pgid.pgid, q.query.from},
+                           std::move(notify));
     }
     service.maybe_share_map(con.get(), osdmap);
     con->send_message(m);
@@ -9476,7 +9614,7 @@ void OSD::do_recovery(
       // This is true for the first recovery op and when the previous recovery op
       // has been scheduled in the past. The next recovery op is scheduled after
       // completing the sleep from now.
-      
+
       if (auto now = ceph::real_clock::now();
          service.recovery_schedule_time < now) {
         service.recovery_schedule_time = now;
@@ -9506,11 +9644,11 @@ void OSD::do_recovery(
 #endif
 
     bool do_unfound = pg->start_recovery_ops(reserved_pushes, handle, &started);
-    dout(10) << "do_recovery started " << started << "/" << reserved_pushes 
+    dout(10) << "do_recovery started " << started << "/" << reserved_pushes
             << " on " << *pg << dendl;
 
     if (do_unfound) {
-      PeeringCtx rctx = create_context();
+      PeeringCtx rctx;
       rctx.handle = &handle;
       pg->find_unfound(queued, rctx);
       dispatch_context(rctx, pg, pg->get_osdmap());
@@ -9599,8 +9737,10 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
   const unsigned priority = op->get_req()->get_priority();
   const int cost = op->get_req()->get_cost();
   const uint64_t owner = op->get_req()->get_source().num();
+  const int type = op->get_req()->get_type();
 
   dout(15) << "enqueue_op " << op << " prio " << priority
+           << " type " << type
           << " cost " << cost
           << " latency " << latency
           << " epoch " << epoch
@@ -9608,12 +9748,30 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
   op->osd_trace.event("enqueue op");
   op->osd_trace.keyval("priority", priority);
   op->osd_trace.keyval("cost", cost);
+
+  auto enqueue_span = tracing::osd::tracer.add_span(__func__, op->osd_parent_span);
+  enqueue_span->AddEvent(__func__, {
+    {"priority", priority},
+    {"cost", cost},
+    {"epoch", epoch},
+    {"owner", owner},
+    {"type", type}
+    });
+
   op->mark_queued_for_pg();
   logger->tinc(l_osd_op_before_queue_op_lat, latency);
-  op_shardedwq.queue(
-    OpSchedulerItem(
-      unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
-      cost, priority, stamp, owner, epoch));
+  if (type == MSG_OSD_PG_PUSH ||
+      type == MSG_OSD_PG_PUSH_REPLY) {
+    op_shardedwq.queue(
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecoveryMsg(pg, std::move(op))),
+        cost, priority, stamp, owner, epoch));
+  } else {
+    op_shardedwq.queue(
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
+        cost, priority, stamp, owner, epoch));
+  }
 }
 
 void OSD::enqueue_peering_evt(spg_t pgid, PGPeeringEventRef evt)
@@ -9677,7 +9835,6 @@ void OSD::dequeue_peering_evt(
   PGPeeringEventRef evt,
   ThreadPool::TPHandle& handle)
 {
-  PeeringCtx rctx = create_context();
   auto curmap = sdata->get_osdmap();
   bool need_up_thru = false;
   epoch_t same_interval_since = 0;
@@ -9688,7 +9845,8 @@ void OSD::dequeue_peering_evt(
       derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
       ceph_abort();
     }
-  } else if (advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
+  } else if (PeeringCtx rctx;
+            advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
     pg->do_peering_event(evt, rctx);
     if (pg->is_deleted()) {
       pg->unlock();
@@ -9743,6 +9901,22 @@ const char** OSD::get_tracked_conf_keys() const
     "osd_map_cache_size",
     "osd_pg_epoch_max_lag_factor",
     "osd_pg_epoch_persisted_max_stale",
+    "osd_recovery_sleep",
+    "osd_recovery_sleep_hdd",
+    "osd_recovery_sleep_ssd",
+    "osd_recovery_sleep_hybrid",
+    "osd_delete_sleep",
+    "osd_delete_sleep_hdd",
+    "osd_delete_sleep_ssd",
+    "osd_delete_sleep_hybrid",
+    "osd_snap_trim_sleep",
+    "osd_snap_trim_sleep_hdd",
+    "osd_snap_trim_sleep_ssd",
+    "osd_snap_trim_sleep_hybrid",
+    "osd_scrub_sleep",
+    "osd_recovery_max_active",
+    "osd_recovery_max_active_hdd",
+    "osd_recovery_max_active_ssd",
     // clog & admin clog
     "clog_to_monitors",
     "clog_to_syslog",
@@ -9771,9 +9945,30 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
                             const std::set <std::string> &changed)
 {
   std::lock_guard l{osd_lock};
-  if (changed.count("osd_max_backfills")) {
-    service.local_reserver.set_max(cct->_conf->osd_max_backfills);
-    service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
+
+  if (changed.count("osd_max_backfills") ||
+      changed.count("osd_delete_sleep") ||
+      changed.count("osd_delete_sleep_hdd") ||
+      changed.count("osd_delete_sleep_ssd") ||
+      changed.count("osd_delete_sleep_hybrid") ||
+      changed.count("osd_snap_trim_sleep") ||
+      changed.count("osd_snap_trim_sleep_hdd") ||
+      changed.count("osd_snap_trim_sleep_ssd") ||
+      changed.count("osd_snap_trim_sleep_hybrid") ||
+      changed.count("osd_scrub_sleep") ||
+      changed.count("osd_recovery_sleep") ||
+      changed.count("osd_recovery_sleep_hdd") ||
+      changed.count("osd_recovery_sleep_ssd") ||
+      changed.count("osd_recovery_sleep_hybrid") ||
+      changed.count("osd_recovery_max_active") ||
+      changed.count("osd_recovery_max_active_hdd") ||
+      changed.count("osd_recovery_max_active_ssd")) {
+    if (!maybe_override_options_for_qos() &&
+        changed.count("osd_max_backfills")) {
+      // Scheduler is not "mclock". Fallback to earlier behavior
+      service.local_reserver.set_max(cct->_conf->osd_max_backfills);
+      service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
+    }
   }
   if (changed.count("osd_min_recovery_priority")) {
     service.local_reserver.set_min_priority(cct->_conf->osd_min_recovery_priority);
@@ -9837,14 +10032,14 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
   if (changed.count("osd_client_message_cap")) {
     uint64_t newval = cct->_conf->osd_client_message_cap;
     Messenger::Policy pol = client_messenger->get_policy(entity_name_t::TYPE_CLIENT);
-    if (pol.throttler_messages && newval > 0) {
+    if (pol.throttler_messages) {
       pol.throttler_messages->reset_max(newval);
     }
   }
   if (changed.count("osd_client_message_size_cap")) {
     uint64_t newval = cct->_conf->osd_client_message_size_cap;
     Messenger::Policy pol = client_messenger->get_policy(entity_name_t::TYPE_CLIENT);
-    if (pol.throttler_bytes && newval > 0) {
+    if (pol.throttler_bytes) {
       pol.throttler_bytes->reset_max(newval);
     }
   }
@@ -9858,29 +10053,186 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
     dout(0) << __func__ << ": scrub interval change" << dendl;
   }
   check_config();
+  if (changed.count("osd_asio_thread_count")) {
+    service.poolctx.stop();
+    service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+  }
+}
+
+void OSD::maybe_override_max_osd_capacity_for_qos()
+{
+  // If the scheduler enabled is mclock, override the default
+  // osd capacity with the value obtained from running the
+  // osd bench test. This is later used to setup mclock.
+  if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") &&
+      (cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) &&
+      (!unsupported_objstore_for_qos())) {
+    std::string max_capacity_iops_config;
+    bool force_run_benchmark =
+      cct->_conf.get_val<bool>("osd_mclock_force_run_benchmark_on_init");
+
+    if (store_is_rotational) {
+      max_capacity_iops_config = "osd_mclock_max_capacity_iops_hdd";
+    } else {
+      max_capacity_iops_config = "osd_mclock_max_capacity_iops_ssd";
+    }
+
+    if (!force_run_benchmark) {
+      double default_iops = 0.0;
+
+      // Get the current osd iops capacity
+      double cur_iops = cct->_conf.get_val<double>(max_capacity_iops_config);
+
+      // Get the default max iops capacity
+      auto val = cct->_conf.get_val_default(max_capacity_iops_config);
+      if (!val.has_value()) {
+        derr << __func__ << " Unable to determine default value of "
+            << max_capacity_iops_config << dendl;
+        // Cannot determine default iops. Force a run of the OSD benchmark.
+        force_run_benchmark = true;
+      } else {
+        // Default iops
+        default_iops = std::stod(val.value());
+      }
+
+      // Determine if we really need to run the osd benchmark
+      if (!force_run_benchmark && (default_iops != cur_iops)) {
+        dout(1) << __func__ << std::fixed << std::setprecision(2)
+                << " default_iops: " << default_iops
+                << " cur_iops: " << cur_iops
+                << ". Skip OSD benchmark test." << dendl;
+        return;
+      }
+    }
+
+    // Run osd bench: write 100 4MiB objects with blocksize 4KiB
+    int64_t count = 12288000; // Count of bytes to write
+    int64_t bsize = 4096;     // Block size
+    int64_t osize = 4194304;  // Object size
+    int64_t onum = 100;       // Count of objects to write
+    double elapsed = 0.0;     // Time taken to complete the test
+    double iops = 0.0;
+    stringstream ss;
+    int ret = run_osd_bench_test(count, bsize, osize, onum, &elapsed, ss);
+    if (ret != 0) {
+      derr << __func__
+           << " osd bench err: " << ret
+           << " osd bench errstr: " << ss.str()
+           << dendl;
+      return;
+    }
+
+    double rate = count / elapsed;
+    iops = rate / bsize;
+    dout(1) << __func__
+            << " osd bench result -"
+            << std::fixed << std::setprecision(3)
+            << " bandwidth (MiB/sec): " << rate / (1024 * 1024)
+            << " iops: " << iops
+            << " elapsed_sec: " << elapsed
+            << dendl;
+
+    // Persist iops to the MON store
+    ret = mon_cmd_set_config(max_capacity_iops_config, std::to_string(iops));
+    if (ret < 0) {
+      // Fallback to setting the config within the in-memory "values" map.
+      cct->_conf.set_val(max_capacity_iops_config, std::to_string(iops));
+    }
+
+    // Override the max osd capacity for all shards
+    for (auto& shard : shards) {
+      shard->update_scheduler_config();
+    }
+  }
+}
+
+bool OSD::maybe_override_options_for_qos()
+{
+  // If the scheduler enabled is mclock, override the recovery, backfill
+  // and sleep options so that mclock can meet the QoS goals.
+  if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+      !unsupported_objstore_for_qos()) {
+    dout(1) << __func__
+            << ": Changing recovery/backfill/sleep settings for QoS" << dendl;
+
+    // Set high value for recovery max active
+    uint32_t rec_max_active = 1000;
+    cct->_conf.set_val(
+      "osd_recovery_max_active", std::to_string(rec_max_active));
+    cct->_conf.set_val(
+      "osd_recovery_max_active_hdd", std::to_string(rec_max_active));
+    cct->_conf.set_val(
+      "osd_recovery_max_active_ssd", std::to_string(rec_max_active));
+
+    // Set high value for osd_max_backfill
+    uint32_t max_backfills = 1000;
+    cct->_conf.set_val("osd_max_backfills", std::to_string(max_backfills));
+    service.local_reserver.set_max(max_backfills);
+    service.remote_reserver.set_max(max_backfills);
+
+    // Disable recovery sleep
+    cct->_conf.set_val("osd_recovery_sleep", std::to_string(0));
+    cct->_conf.set_val("osd_recovery_sleep_hdd", std::to_string(0));
+    cct->_conf.set_val("osd_recovery_sleep_ssd", std::to_string(0));
+    cct->_conf.set_val("osd_recovery_sleep_hybrid", std::to_string(0));
+
+    // Disable delete sleep
+    cct->_conf.set_val("osd_delete_sleep", std::to_string(0));
+    cct->_conf.set_val("osd_delete_sleep_hdd", std::to_string(0));
+    cct->_conf.set_val("osd_delete_sleep_ssd", std::to_string(0));
+    cct->_conf.set_val("osd_delete_sleep_hybrid", std::to_string(0));
+
+    // Disable snap trim sleep
+    cct->_conf.set_val("osd_snap_trim_sleep", std::to_string(0));
+    cct->_conf.set_val("osd_snap_trim_sleep_hdd", std::to_string(0));
+    cct->_conf.set_val("osd_snap_trim_sleep_ssd", std::to_string(0));
+    cct->_conf.set_val("osd_snap_trim_sleep_hybrid", std::to_string(0));
+
+    // Disable scrub sleep
+    cct->_conf.set_val("osd_scrub_sleep", std::to_string(0));
+    return true;
+  }
+  return false;
+}
+
+int OSD::mon_cmd_set_config(const std::string &key, const std::string &val)
+{
+  std::string cmd =
+    "{"
+      "\"prefix\": \"config set\", "
+      "\"who\": \"osd." + std::to_string(whoami) + "\", "
+      "\"name\": \"" + key + "\", "
+      "\"value\": \"" + val + "\""
+    "}";
+
+  vector<std::string> vcmd{cmd};
+  bufferlist inbl;
+  std::string outs;
+  C_SaferCond cond;
+  monc->start_mon_command(vcmd, inbl, nullptr, &outs, &cond);
+  int r = cond.wait();
+  if (r < 0) {
+    derr << __func__ << " Failed to set config key " << key
+         << " err: " << cpp_strerror(r)
+         << " errstr: " << outs << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+bool OSD::unsupported_objstore_for_qos()
+{
+  static const std::vector<std::string> unsupported_objstores = { "filestore" };
+  return std::find(unsupported_objstores.begin(),
+                   unsupported_objstores.end(),
+                   store->get_type()) != unsupported_objstores.end();
 }
 
 void OSD::update_log_config()
 {
-  map<string,string> log_to_monitors;
-  map<string,string> log_to_syslog;
-  map<string,string> log_channel;
-  map<string,string> log_prio;
-  map<string,string> log_to_graylog;
-  map<string,string> log_to_graylog_host;
-  map<string,string> log_to_graylog_port;
-  uuid_d fsid;
-  string host;
-
-  if (parse_log_client_options(cct, log_to_monitors, log_to_syslog,
-                              log_channel, log_prio, log_to_graylog,
-                              log_to_graylog_host, log_to_graylog_port,
-                              fsid, host) == 0)
-    clog->update_config(log_to_monitors, log_to_syslog,
-                       log_channel, log_prio, log_to_graylog,
-                       log_to_graylog_host, log_to_graylog_port,
-                       fsid, host);
-  derr << "log_to_monitors " << log_to_monitors << dendl;
+  auto parsed_options = clog->parse_client_options(cct);
+  derr << "log_to_monitors " << parsed_options.log_to_monitors << dendl;
 }
 
 void OSD::check_config()
@@ -9892,7 +10244,7 @@ void OSD::check_config()
                 << cct->_conf->osd_pg_epoch_persisted_max_stale << ")";
   }
   if (cct->_conf->osd_object_clean_region_max_num_intervals < 0) {
-    clog->warn() << "osd_object_clean_region_max_num_intervals (" 
+    clog->warn() << "osd_object_clean_region_max_num_intervals ("
                  << cct->_conf->osd_object_clean_region_max_num_intervals
                 << ") is < 0";
   }
@@ -9904,9 +10256,8 @@ void OSD::get_latest_osdmap()
 {
   dout(10) << __func__ << " -- start" << dendl;
 
-  C_SaferCond cond;
-  service.objecter->wait_for_latest_osdmap(&cond);
-  cond.wait();
+  boost::system::error_code ec;
+  service.objecter->wait_for_latest_osdmap(ceph::async::use_blocked[ec]);
 
   dout(10) << __func__ << " -- finish" << dendl;
 }
@@ -10070,7 +10421,7 @@ void OSDShard::consume_map(
   dout(10) << new_osdmap->get_epoch()
            << " (was " << (old_osdmap ? old_osdmap->get_epoch() : 0) << ")"
           << dendl;
-  bool queued = false;
+  int queued = 0;
 
   // check slots
   auto p = pg_slots.begin();
@@ -10097,8 +10448,7 @@ void OSDShard::consume_map(
        dout(20) << __func__ << "  " << pgid
                 << " pending_peering first epoch " << first
                 << " <= " << new_osdmap->get_epoch() << ", requeueing" << dendl;
-       _wake_pg_slot(pgid, slot);
-       queued = true;
+       queued += _wake_pg_slot(pgid, slot);
       }
       ++p;
       continue;
@@ -10138,14 +10488,18 @@ void OSDShard::consume_map(
   }
   if (queued) {
     std::lock_guard l{sdata_wait_lock};
-    sdata_cond.notify_one();
+    if (queued == 1)
+      sdata_cond.notify_one();
+    else
+      sdata_cond.notify_all();
   }
 }
 
-void OSDShard::_wake_pg_slot(
+int OSDShard::_wake_pg_slot(
   spg_t pgid,
   OSDShardPGSlot *slot)
 {
+  int count = 0;
   dout(20) << __func__ << " " << pgid
           << " to_process " << slot->to_process
           << " waiting " << slot->waiting
@@ -10154,12 +10508,14 @@ void OSDShard::_wake_pg_slot(
        i != slot->to_process.rend();
        ++i) {
     scheduler->enqueue_front(std::move(*i));
+    count++;
   }
   slot->to_process.clear();
   for (auto i = slot->waiting.rbegin();
        i != slot->waiting.rend();
        ++i) {
     scheduler->enqueue_front(std::move(*i));
+    count++;
   }
   slot->waiting.clear();
   for (auto i = slot->waiting_peering.rbegin();
@@ -10170,10 +10526,12 @@ void OSDShard::_wake_pg_slot(
     // someday, if we decide this inefficiency matters
     for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
       scheduler->enqueue_front(std::move(*j));
+      count++;
     }
   }
   slot->waiting_peering.clear();
   ++slot->requeue_seq;
+  return count;
 }
 
 void OSDShard::identify_splits_and_merges(
@@ -10306,15 +10664,16 @@ void OSDShard::prime_merges(const OSDMapRef& as_of_osdmap,
 
 void OSDShard::register_and_wake_split_child(PG *pg)
 {
+  dout(15) <<  __func__ << ": " << pg << " #:" << pg_slots.size() << dendl;
   epoch_t epoch;
   {
     std::lock_guard l(shard_lock);
-    dout(10) << pg->pg_id << " " << pg << dendl;
+    dout(10) << __func__ << ": " << pg->pg_id << " " << pg << dendl;
     auto p = pg_slots.find(pg->pg_id);
     ceph_assert(p != pg_slots.end());
     auto *slot = p->second.get();
-    dout(20) << pg->pg_id << " waiting_for_split " << slot->waiting_for_split
-            << dendl;
+    dout(20) << __func__ << ": " << pg->pg_id << " waiting_for_split "
+            << slot->waiting_for_split << dendl;
     ceph_assert(!slot->pg);
     ceph_assert(!slot->waiting_for_split.empty());
     _attach_pg(slot, pg);
@@ -10361,6 +10720,19 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
   }
 }
 
+void OSDShard::update_scheduler_config()
+{
+  std::lock_guard l(shard_lock);
+  scheduler->update_configuration();
+}
+
+std::string OSDShard::get_scheduler_type()
+{
+  std::ostringstream scheduler_type;
+  scheduler_type << *scheduler;
+  return scheduler_type.str();
+}
+
 OSDShard::OSDShard(
   int id,
   CephContext *cct,
@@ -10374,7 +10746,9 @@ OSDShard::OSDShard(
     osdmap_lock{make_mutex(shard_name + "::osdmap_lock")},
     shard_lock_name(shard_name + "::shard_lock"),
     shard_lock{make_mutex(shard_lock_name)},
-    scheduler(ceph::osd::scheduler::make_scheduler(cct)),
+    scheduler(ceph::osd::scheduler::make_scheduler(
+      cct, osd->num_shards, osd->store->is_rotational(),
+      osd->store->get_type())),
     context_queue(sdata_wait_lock, sdata_cond)
 {
   dout(0) << "using op scheduler " << *scheduler << dendl;
@@ -10459,21 +10833,59 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     sdata->context_queue.move_to(oncommits);
   }
 
-  if (sdata->scheduler->empty()) {
+  WorkItem work_item;
+  while (!std::get_if<OpSchedulerItem>(&work_item)) {
+    if (sdata->scheduler->empty()) {
+      if (osd->is_stopping()) {
+        sdata->shard_lock.unlock();
+        for (auto c : oncommits) {
+          dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+          delete c;
+        }
+        return;    // OSD shutdown, discard.
+      }
+      sdata->shard_lock.unlock();
+      handle_oncommits(oncommits);
+      return;
+    }
+
+    work_item = sdata->scheduler->dequeue();
     if (osd->is_stopping()) {
       sdata->shard_lock.unlock();
       for (auto c : oncommits) {
-       dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
-       delete c;
+        dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+        delete c;
       }
       return;    // OSD shutdown, discard.
     }
-    sdata->shard_lock.unlock();
-    handle_oncommits(oncommits);
-    return;
-  }
 
-  OpSchedulerItem item = sdata->scheduler->dequeue();
+    // If the work item is scheduled in the future, wait until
+    // the time returned in the dequeue response before retrying.
+    if (auto when_ready = std::get_if<double>(&work_item)) {
+      if (is_smallest_thread_index) {
+        sdata->shard_lock.unlock();
+        handle_oncommits(oncommits);
+        return;
+      }
+      std::unique_lock wait_lock{sdata->sdata_wait_lock};
+      auto future_time = ceph::real_clock::from_double(*when_ready);
+      dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+      // Disable heartbeat timeout until we find a non-future work item to process.
+      osd->cct->get_heartbeat_map()->clear_timeout(hb);
+      sdata->shard_lock.unlock();
+      ++sdata->waiting_threads;
+      sdata->sdata_cond.wait_until(wait_lock, future_time);
+      --sdata->waiting_threads;
+      wait_lock.unlock();
+      sdata->shard_lock.lock();
+      // Reapply default wq timeouts
+      osd->cct->get_heartbeat_map()->reset_timeout(hb,
+        timeout_interval, suicide_interval);
+    }
+  } // while
+
+  // Access the stored item
+  auto item = std::move(std::get<OpSchedulerItem>(work_item));
   if (osd->is_stopping()) {
     sdata->shard_lock.unlock();
     for (auto c : oncommits) {
@@ -10707,13 +11119,21 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 }
 
 void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
+  if (unlikely(m_fast_shutdown) ) {
+    // stop enqueing when we are in the middle of a fast shutdown
+    return;
+  }
+
   uint32_t shard_index =
     item.get_ordering_token().hash_to_shard(osd->shards.size());
 
-  dout(20) << __func__ << " " << item << dendl;
-
   OSDShard* sdata = osd->shards[shard_index];
   assert (NULL != sdata);
+  if (sdata->get_scheduler_type() == "mClockScheduler") {
+    item.maybe_set_is_qos_item();
+  }
+
+  dout(20) << __func__ << " " << item << dendl;
 
   bool empty = true;
   {
@@ -10722,14 +11142,23 @@ void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
     sdata->scheduler->enqueue(std::move(item));
   }
 
-  if (empty) {
+  {
     std::lock_guard l{sdata->sdata_wait_lock};
-    sdata->sdata_cond.notify_all();
+    if (empty) {
+      sdata->sdata_cond.notify_all();
+    } else if (sdata->waiting_threads) {
+      sdata->sdata_cond.notify_one();
+    }
   }
 }
 
 void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
 {
+  if (unlikely(m_fast_shutdown) ) {
+    // stop enqueing when we are in the middle of a fast shutdown
+    return;
+  }
+
   auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
   auto& sdata = osd->shards[shard_index];
   ceph_assert(sdata);
@@ -10756,8 +11185,25 @@ void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
   sdata->sdata_cond.notify_one();
 }
 
-namespace ceph { 
-namespace osd_cmds { 
+void OSD::ShardedOpWQ::stop_for_fast_shutdown()
+{
+  uint32_t shard_index = 0;
+  m_fast_shutdown = true;
+
+  for (; shard_index < osd->num_shards; shard_index++) {
+    auto& sdata = osd->shards[shard_index];
+    ceph_assert(sdata);
+    sdata->shard_lock.lock();
+    int work_count = 0;
+    while(! sdata->scheduler->empty() ) {
+      auto work_item = sdata->scheduler->dequeue();
+      work_count++;
+    }
+    sdata->shard_lock.unlock();
+  }
+}
+
+namespace ceph::osd_cmds {
 
 int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
         std::ostream& os)
@@ -10766,13 +11212,13 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
         os << "could not issue heap profiler command -- not using tcmalloc!";
         return -EOPNOTSUPP;
   }
-  
+
   string cmd;
   if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
         os << "unable to get value for command \"" << cmd << "\"";
        return -EINVAL;
   }
-  
+
   std::vector<std::string> cmd_vec;
   get_str_vec(cmd, cmd_vec);
 
@@ -10780,10 +11226,10 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
   if (cmd_getval(cmdmap, "value", val)) {
     cmd_vec.push_back(val);
   }
-  
+
   ceph_heap_profiler_handle_command(cmd_vec, os);
-  
+
   return 0;
 }
-}} // namespace ceph::osd_cmds
+
+} // namespace ceph::osd_cmds