]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.cc
import quincy 17.2.0
[ceph.git] / ceph / src / osd / OSD.cc
index f580fb6f5645280c7357ade9585e2e64677f1340..c35ac0b526746fd85d400f6d74a43e9eae9d7afe 100644 (file)
@@ -24,7 +24,6 @@
 #include <sys/stat.h>
 #include <signal.h>
 #include <time.h>
-#include <boost/scoped_ptr.hpp>
 #include <boost/range/adaptor/reversed.hpp>
 
 #ifdef HAVE_SYS_PARAM_H
 #endif
 
 #include "osd/PG.h"
-#include "osd/scrub_machine.h"
-#include "osd/pg_scrubber.h"
+#include "osd/scrubber/scrub_machine.h"
+#include "osd/scrubber/pg_scrubber.h"
 
 #include "include/types.h"
 #include "include/compat.h"
 #include "include/random.h"
+#include "include/scope_guard.h"
 
 #include "OSD.h"
 #include "OSDMap.h"
@@ -92,7 +92,6 @@
 #include "messages/MMonGetOSDMap.h"
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGNotify2.h"
-#include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGQuery2.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
 
 #include "messages/MOSDScrub.h"
 #include "messages/MOSDScrub2.h"
-#include "messages/MOSDRepScrub.h"
 
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
 
 #include "messages/MPGStats.h"
 
-#include "messages/MWatchNotify.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDPGPull.h"
-
 #include "messages/MMonGetPurgedSnaps.h"
 #include "messages/MMonGetPurgedSnapsReply.h"
 
 #else
 #define tracepoint(...)
 #endif
-#ifdef HAVE_JAEGER
-#include "common/tracer.h"
-#endif
+
+#include "osd_tracer.h"
+
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
@@ -212,11 +205,13 @@ using ceph::make_mutex;
 
 using namespace ceph::osd::scheduler;
 using TOPNSPC::common::cmd_getval;
+using TOPNSPC::common::cmd_getval_or;
 
 static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
   return *_dout << "osd." << whoami << " " << epoch << " ";
 }
 
+
 //Initial features in new superblock.
 //Features here are also automatically upgraded
 CompatSet OSD::get_osd_initial_compat_set() {
@@ -254,7 +249,7 @@ CompatSet OSD::get_osd_compat_set() {
 OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
   osd(osd),
   cct(osd->cct),
-  whoami(osd->whoami), store(osd->store),
+  whoami(osd->whoami), store(osd->store.get()),
   log_client(osd->log_client), clog(osd->clog),
   pg_recovery_stats(osd->pg_recovery_stats),
   cluster_messenger(osd->cluster_messenger),
@@ -267,8 +262,7 @@ OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
   publish_lock{ceph::make_mutex("OSDService::publish_lock")},
   pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
   max_oldest_map(0),
-  scrubs_local(0),
-  scrubs_remote(0),
+  m_scrub_queue{cct, *this},
   agent_valid_iterator(false),
   agent_ops(0),
   flush_mode_high_count(0),
@@ -1267,79 +1261,6 @@ void OSDService::prune_pg_created()
 // --------------------------------------
 // dispatch
 
-bool OSDService::can_inc_scrubs()
-{
-  bool can_inc = false;
-  std::lock_guard l(sched_scrub_lock);
-
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " == true " << scrubs_local << " local + " << scrubs_remote
-            << " remote < max " << cct->_conf->osd_max_scrubs << dendl;
-    can_inc = true;
-  } else {
-    dout(20) << __func__ << " == false " << scrubs_local << " local + " << scrubs_remote
-            << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-
-  return can_inc;
-}
-
-bool OSDService::inc_scrubs_local()
-{
-  bool result = false;
-  std::lock_guard l{sched_scrub_lock};
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
-    result = true;
-    ++scrubs_local;
-  } else {
-    dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  return result;
-}
-
-void OSDService::dec_scrubs_local()
-{
-  std::lock_guard l{sched_scrub_lock};
-  dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
-  --scrubs_local;
-  ceph_assert(scrubs_local >= 0);
-}
-
-bool OSDService::inc_scrubs_remote()
-{
-  bool result = false;
-  std::lock_guard l{sched_scrub_lock};
-  if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
-    dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
-    result = true;
-    ++scrubs_remote;
-  } else {
-    dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  return result;
-}
-
-void OSDService::dec_scrubs_remote()
-{
-  std::lock_guard l{sched_scrub_lock};
-  dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
-  --scrubs_remote;
-  ceph_assert(scrubs_remote >= 0);
-}
-
-void OSDService::dump_scrub_reservations(Formatter *f)
-{
-  std::lock_guard l{sched_scrub_lock};
-  f->dump_int("scrubs_local", scrubs_local);
-  f->dump_int("scrubs_remote", scrubs_remote);
-  f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
-}
-
 void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
                                  epoch_t *_bind_epoch) const
 {
@@ -1378,7 +1299,7 @@ bool OSDService::prepare_to_stop()
 
   OSDMapRef osdmap = get_osdmap();
   if (osdmap && osdmap->is_up(whoami)) {
-    dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+    dout(0) << __func__ << " telling mon we are shutting down and dead " << dendl;
     set_state(PREPARING_TO_STOP);
     monc->send_mon_message(
       new MOSDMarkMeDown(
@@ -1386,12 +1307,14 @@ bool OSDService::prepare_to_stop()
        whoami,
        osdmap->get_addrs(whoami),
        osdmap->get_epoch(),
-       true  // request ack
+       true,  // request ack
+       true   // mark as down and dead
        ));
     const auto timeout = ceph::make_timespan(cct->_conf->osd_mon_shutdown_timeout);
     is_stopping_cond.wait_for(l, timeout,
       [this] { return get_state() == STOPPING; });
   }
+
   dout(0) << __func__ << " starting shutdown" << dendl;
   set_state(STOPPING);
   return true;
@@ -1747,11 +1670,13 @@ void OSDService::queue_for_snap_trim(PG *pg)
 template <class MSG_TYPE>
 void OSDService::queue_scrub_event_msg(PG* pg,
                                       Scrub::scrub_prio_t with_priority,
-                                      unsigned int qu_priority)
+                                      unsigned int qu_priority,
+                                      Scrub::act_token_t act_token)
 {
   const auto epoch = pg->get_osdmap_epoch();
-  auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
-  dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+  auto msg = new MSG_TYPE(pg->get_pgid(), epoch, act_token);
+  dout(15) << "queue a scrub event (" << *msg << ") for " << *pg
+           << ". Epoch: " << epoch << " token: " << act_token << dendl;
 
   enqueue_back(OpSchedulerItem(
     unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
@@ -1759,7 +1684,8 @@ void OSDService::queue_scrub_event_msg(PG* pg,
 }
 
 template <class MSG_TYPE>
-void OSDService::queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority)
+void OSDService::queue_scrub_event_msg(PG* pg,
+                                       Scrub::scrub_prio_t with_priority)
 {
   const auto epoch = pg->get_osdmap_epoch();
   auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
@@ -1782,17 +1708,20 @@ void OSDService::queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_prior
 
 void OSDService::queue_for_rep_scrub(PG* pg,
                                     Scrub::scrub_prio_t with_priority,
-                                    unsigned int qu_priority)
+                                    unsigned int qu_priority,
+                                    Scrub::act_token_t act_token)
 {
-  queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority);
+  queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority, act_token);
 }
 
 void OSDService::queue_for_rep_scrub_resched(PG* pg,
                                             Scrub::scrub_prio_t with_priority,
-                                            unsigned int qu_priority)
+                                            unsigned int qu_priority,
+                                            Scrub::act_token_t act_token)
 {
   // Resulting scrub event: 'SchedReplica'
-  queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority);
+  queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority,
+                                          act_token);
 }
 
 void OSDService::queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority)
@@ -1819,6 +1748,18 @@ void OSDService::queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_prio
   queue_scrub_event_msg<PGScrubPushesUpdate>(pg, with_priority);
 }
 
+void OSDService::queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'SelectedChunkFree'
+  queue_scrub_event_msg<PGScrubChunkIsFree>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ChunkIsBusy'
+  queue_scrub_event_msg<PGScrubChunkIsBusy>(pg, with_priority);
+}
+
 void OSDService::queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority)
 {
   queue_scrub_event_msg<PGScrubAppliedUpdate>(pg, with_priority);
@@ -1836,18 +1777,42 @@ void OSDService::queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_prio
   queue_scrub_event_msg<PGScrubDigestUpdate>(pg, with_priority);
 }
 
+void OSDService::queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'IntLocalMapDone'
+  queue_scrub_event_msg<PGScrubGotLocalMap>(pg, with_priority);
+}
+
 void OSDService::queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority)
 {
   // Resulting scrub event: 'GotReplicas'
   queue_scrub_event_msg<PGScrubGotReplMaps>(pg, with_priority);
 }
 
+void OSDService::queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'MapsCompared'
+  queue_scrub_event_msg<PGScrubMapsCompared>(pg, with_priority);
+}
+
 void OSDService::queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority)
 {
   // Resulting scrub event: 'ReplicaPushesUpd'
   queue_scrub_event_msg<PGScrubReplicaPushes>(pg, with_priority);
 }
 
+void OSDService::queue_scrub_is_finished(PG *pg)
+{
+  // Resulting scrub event: 'ScrubFinished'
+  queue_scrub_event_msg<PGScrubScrubFinished>(pg, Scrub::scrub_prio_t::high_priority);
+}
+
+void OSDService::queue_scrub_next_chunk(PG *pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'NextChunk'
+  queue_scrub_event_msg<PGScrubGetNextChunk>(pg, with_priority);
+}
+
 void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
 {
   dout(10) << __func__ << " on " << pgid << " e " << e  << dendl;
@@ -2030,14 +1995,16 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f, std::ostream& o
 
 } // namespace ceph::osd_cmds
 
-int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, string osdspec_affinity)
+int OSD::mkfs(CephContext *cct,
+             std::unique_ptr<ObjectStore> store,
+             uuid_d fsid,
+             int whoami,
+             string osdspec_affinity)
 {
   int ret;
 
   OSDSuperblock sb;
   bufferlist sbbl;
-  ObjectStore::CollectionHandle ch;
-
   // if we are fed a uuid for this osd, use it.
   store->set_fsid(cct->_conf->osd_uuid);
 
@@ -2045,7 +2012,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
   if (ret) {
     derr << "OSD::mkfs: ObjectStore::mkfs failed with error "
          << cpp_strerror(ret) << dendl;
-    goto free_store;
+    return ret;
   }
 
   store->set_cache_shards(1);  // doesn't matter for mkfs!
@@ -2054,15 +2021,20 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
   if (ret) {
     derr << "OSD::mkfs: couldn't mount ObjectStore: error "
          << cpp_strerror(ret) << dendl;
-    goto free_store;
+    return ret;
   }
 
-  ch = store->open_collection(coll_t::meta());
+  auto umount_store = make_scope_guard([&] {
+    store->umount();
+  });
+
+  ObjectStore::CollectionHandle ch =
+    store->open_collection(coll_t::meta());
   if (ch) {
     ret = store->read(ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, sbbl);
     if (ret < 0) {
       derr << "OSD::mkfs: have meta collection but no superblock" << dendl;
-      goto free_store;
+      return ret;
     }
     /* if we already have superblock, check content of superblock */
     dout(0) << " have superblock" << dendl;
@@ -2071,14 +2043,12 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
     if (whoami != sb.whoami) {
       derr << "provided osd id " << whoami << " != superblock's " << sb.whoami
           << dendl;
-      ret = -EINVAL;
-      goto umount_store;
+      return -EINVAL;
     }
     if (fsid != sb.cluster_fsid) {
       derr << "provided cluster fsid " << fsid
           << " != superblock's " << sb.cluster_fsid << dendl;
-      ret = -EINVAL;
-      goto umount_store;
+      return -EINVAL;
     }
   } else {
     // create superblock
@@ -2099,24 +2069,16 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, str
     if (ret) {
       derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
           << "queue_transaction returned " << cpp_strerror(ret) << dendl;
-      goto umount_store;
+      return ret;
     }
+    ch->flush();
   }
 
-  ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami, osdspec_affinity);
+  ret = write_meta(cct, store.get(), sb.cluster_fsid, sb.osd_fsid, whoami, osdspec_affinity);
   if (ret) {
     derr << "OSD::mkfs: failed to write fsid file: error "
          << cpp_strerror(ret) << dendl;
-    goto umount_store;
-  }
-
-umount_store:
-  if (ch) {
-    ch.reset();
   }
-  store->umount();
-free_store:
-  delete store;
   return ret;
 }
 
@@ -2223,7 +2185,8 @@ int OSD::peek_meta(ObjectStore *store,
 
 // cons/des
 
-OSD::OSD(CephContext *cct_, ObjectStore *store_,
+OSD::OSD(CephContext *cct_,
+        std::unique_ptr<ObjectStore> store_,
         int id,
         Messenger *internal_messenger,
         Messenger *external_messenger,
@@ -2246,7 +2209,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   mgrc(cct_, client_messenger, &mc->monmap),
   logger(create_logger()),
   recoverystate_perf(create_recoverystate_perf()),
-  store(store_),
+  store(std::move(store_)),
   log_client(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
   clog(log_client.create_channel()),
   whoami(id),
@@ -2323,9 +2286,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
       this);
     shards.push_back(one_shard);
   }
-
-  // override some config options if mclock is enabled on all the shards
-  maybe_override_options_for_qos();
 }
 
 OSD::~OSD()
@@ -2338,7 +2298,6 @@ OSD::~OSD()
   cct->get_perfcounters_collection()->remove(logger);
   delete recoverystate_perf;
   delete logger;
-  delete store;
 }
 
 double OSD::get_tick_interval() const
@@ -2491,6 +2450,70 @@ std::set<int64_t> OSD::get_mapped_pools()
   return pools;
 }
 
+OSD::PGRefOrError OSD::locate_asok_target(const cmdmap_t& cmdmap,
+                                    stringstream& ss,
+                                    bool only_primary)
+{
+  string pgidstr;
+  if (!cmd_getval(cmdmap, "pgid", pgidstr)) {
+    ss << "no pgid specified";
+    return OSD::PGRefOrError{std::nullopt, -EINVAL};
+  }
+
+  pg_t pgid;
+  if (!pgid.parse(pgidstr.c_str())) {
+    ss << "couldn't parse pgid '" << pgidstr << "'";
+    return OSD::PGRefOrError{std::nullopt, -EINVAL};
+  }
+
+  spg_t pcand;
+  PGRef pg;
+  if (get_osdmap()->get_primary_shard(pgid, &pcand) && (pg = _lookup_lock_pg(pcand))) {
+    if (pg->is_primary() || !only_primary) {
+      return OSD::PGRefOrError{pg, 0};
+    }
+
+    ss << "not primary for pgid " << pgid;
+    pg->unlock();
+    return OSD::PGRefOrError{std::nullopt, -EAGAIN};
+  } else {
+    ss << "i don't have pgid " << pgid;
+    return OSD::PGRefOrError{std::nullopt, -ENOENT};
+  }
+}
+
+// note that the cmdmap is explicitly copied into asok_route_to_pg()
+int OSD::asok_route_to_pg(
+  bool only_primary,
+  std::string_view prefix,
+  cmdmap_t cmdmap,
+  Formatter* f,
+  stringstream& ss,
+  const bufferlist& inbl,
+  bufferlist& outbl,
+  std::function<void(int, const std::string&, bufferlist&)> on_finish)
+{
+  auto [target_pg, ret] = locate_asok_target(cmdmap, ss, only_primary);
+
+  if (!target_pg.has_value()) {
+    // 'ss' and 'ret' already contain the error information
+    on_finish(ret, ss.str(), outbl);
+    return ret;
+  }
+
+  //  the PG was locked by locate_asok_target()
+  try {
+    (*target_pg)->do_command(prefix, cmdmap, inbl, on_finish);
+    (*target_pg)->unlock();
+    return 0;  // the pg handler calls on_finish directly
+  } catch (const TOPNSPC::common::bad_cmd_get& e) {
+    (*target_pg)->unlock();
+    ss << e.what();
+    on_finish(ret, ss.str(), outbl);
+    return -EINVAL;
+  }
+}
+
 void OSD::asok_command(
   std::string_view prefix, const cmdmap_t& cmdmap,
   Formatter *f,
@@ -2551,6 +2574,13 @@ void OSD::asok_command(
     }
   }
 
+  // --- PG commands that will be answered even if !primary ---
+
+  else if (prefix == "scrubdebug") {
+    asok_route_to_pg(false, prefix, cmdmap, f, ss, inbl, outbl, on_finish);
+    return;
+  }
+
   // --- OSD commands follow ---
 
   else if (prefix == "status") {
@@ -2686,7 +2716,7 @@ will start to track new ops received afterwards.";
     f->close_section();
   } else if (prefix == "dump_scrub_reservations") {
     f->open_object_section("scrub_reservations");
-    service.dump_scrub_reservations(f);
+    service.get_scrub_services().dump_scrub_reservations(f);
     f->close_section();
   } else if (prefix == "get_latest_osdmap") {
     get_latest_osdmap();
@@ -2736,7 +2766,7 @@ will start to track new ops received afterwards.";
   } else if (prefix == "dump_objectstore_kv_stats") {
     store->get_db_statistics(f);
   } else if (prefix == "dump_scrubs") {
-    service.dumps_scrub(f);
+    service.get_scrub_services().dump_scrubs(f);
   } else if (prefix == "calc_objectstore_db_histogram") {
     store->generate_db_histogram(f);
   } else if (prefix == "flush_store_cache") {
@@ -2825,145 +2855,18 @@ will start to track new ops received afterwards.";
   }
 
   else if (prefix == "bench") {
-    int64_t count;
-    int64_t bsize;
-    int64_t osize, onum;
     // default count 1G, size 4MB
-    cmd_getval(cmdmap, "count", count, (int64_t)1 << 30);
-    cmd_getval(cmdmap, "size", bsize, (int64_t)4 << 20);
-    cmd_getval(cmdmap, "object_size", osize, (int64_t)0);
-    cmd_getval(cmdmap, "object_num", onum, (int64_t)0);
-
-    uint32_t duration = cct->_conf->osd_bench_duration;
-
-    if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
-      // let us limit the block size because the next checks rely on it
-      // having a sane value.  If we allow any block size to be set things
-      // can still go sideways.
-      ss << "block 'size' values are capped at "
-         << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
-         << " a higher value, please adjust 'osd_bench_max_block_size'";
-      ret = -EINVAL;
-      goto out;
-    } else if (bsize < (int64_t) (1 << 20)) {
-      // entering the realm of small block sizes.
-      // limit the count to a sane value, assuming a configurable amount of
-      // IOPS and duration, so that the OSD doesn't get hung up on this,
-      // preventing timeouts from going off
-      int64_t max_count =
-        bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
-      if (count > max_count) {
-        ss << "'count' values greater than " << max_count
-           << " for a block size of " << byte_u_t(bsize) << ", assuming "
-           << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
-           << " for " << duration << " seconds,"
-           << " can cause ill effects on osd. "
-           << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
-           << " value if you wish to use a higher 'count'.";
-        ret = -EINVAL;
-        goto out;
-      }
-    } else {
-      // 1MB block sizes are big enough so that we get more stuff done.
-      // However, to avoid the osd from getting hung on this and having
-      // timers being triggered, we are going to limit the count assuming
-      // a configurable throughput and duration.
-      // NOTE: max_count is the total amount of bytes that we believe we
-      //       will be able to write during 'duration' for the given
-      //       throughput.  The block size hardly impacts this unless it's
-      //       way too big.  Given we already check how big the block size
-      //       is, it's safe to assume everything will check out.
-      int64_t max_count =
-        cct->_conf->osd_bench_large_size_max_throughput * duration;
-      if (count > max_count) {
-        ss << "'count' values greater than " << max_count
-           << " for a block size of " << byte_u_t(bsize) << ", assuming "
-           << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
-           << " for " << duration << " seconds,"
-           << " can cause ill effects on osd. "
-           << " Please adjust 'osd_bench_large_size_max_throughput'"
-           << " with a higher value if you wish to use a higher 'count'.";
-        ret = -EINVAL;
-        goto out;
-      }
-    }
-
-    if (osize && bsize > osize)
-      bsize = osize;
-
-    dout(1) << " bench count " << count
-            << " bsize " << byte_u_t(bsize) << dendl;
-
-    ObjectStore::Transaction cleanupt;
-
-    if (osize && onum) {
-      bufferlist bl;
-      bufferptr bp(osize);
-      bp.zero();
-      bl.push_back(std::move(bp));
-      bl.rebuild_page_aligned();
-      for (int i=0; i<onum; ++i) {
-       char nm[30];
-       snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
-       object_t oid(nm);
-       hobject_t soid(sobject_t(oid, 0));
-       ObjectStore::Transaction t;
-       t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
-       store->queue_transaction(service.meta_ch, std::move(t), NULL);
-       cleanupt.remove(coll_t(), ghobject_t(soid));
-      }
-    }
-
-    bufferlist bl;
-    bufferptr bp(bsize);
-    bp.zero();
-    bl.push_back(std::move(bp));
-    bl.rebuild_page_aligned();
-
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-
-    utime_t start = ceph_clock_now();
-    for (int64_t pos = 0; pos < count; pos += bsize) {
-      char nm[30];
-      unsigned offset = 0;
-      if (onum && osize) {
-       snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
-       offset = rand() % (osize / bsize) * bsize;
-      } else {
-       snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
-      }
-      object_t oid(nm);
-      hobject_t soid(sobject_t(oid, 0));
-      ObjectStore::Transaction t;
-      t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
-      store->queue_transaction(service.meta_ch, std::move(t), NULL);
-      if (!onum || !osize)
-       cleanupt.remove(coll_t::meta(), ghobject_t(soid));
-    }
+    int64_t count = cmd_getval_or<int64_t>(cmdmap, "count", 1LL << 30);
+    int64_t bsize = cmd_getval_or<int64_t>(cmdmap, "size", 4LL << 20);
+    int64_t osize = cmd_getval_or<int64_t>(cmdmap, "object_size", 0);
+    int64_t onum = cmd_getval_or<int64_t>(cmdmap, "object_num", 0);
+    double elapsed = 0.0;
 
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-    utime_t end = ceph_clock_now();
-
-    // clean up
-    store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
+    ret = run_osd_bench_test(count, bsize, osize, onum, &elapsed, ss);
+    if (ret != 0) {
+      goto out;
     }
 
-    double elapsed = end - start;
     double rate = count / elapsed;
     double iops = rate / bsize;
     f->open_object_section("osd_bench_results");
@@ -3210,6 +3113,30 @@ will start to track new ops received afterwards.";
     }
     f->close_section(); // entries
     f->close_section(); // network_ping_times
+  } else if (prefix == "dump_pool_statfs") {
+    lock_guard l(osd_lock);
+
+    int64_t p = 0;
+    if (!(cmd_getval(cmdmap, "poolid", p))) {
+      ss << "Error dumping pool statfs: no poolid provided";
+      ret = -EINVAL;
+      goto out;
+    }
+
+    store_statfs_t st;
+    bool per_pool_omap_stats = false;
+
+    ret = store->pool_statfs(p, &st, &per_pool_omap_stats);
+    if (ret < 0) {
+      ss << "Error dumping pool statfs: " << cpp_strerror(ret);
+      goto out;
+    } else {
+      ss << "dumping pool statfs...";
+      f->open_object_section("pool_statfs");
+      f->dump_int("poolid", p);
+      st.dump(f);
+      f->close_section();
+    }
   } else {
     ceph_abort_msg("broken asok registration");
   }
@@ -3218,6 +3145,150 @@ will start to track new ops received afterwards.";
   on_finish(ret, ss.str(), outbl);
 }
 
+int OSD::run_osd_bench_test(
+  int64_t count,
+  int64_t bsize,
+  int64_t osize,
+  int64_t onum,
+  double *elapsed,
+  ostream &ss)
+{
+  int ret = 0;
+  uint32_t duration = cct->_conf->osd_bench_duration;
+
+  if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
+    // let us limit the block size because the next checks rely on it
+    // having a sane value.  If we allow any block size to be set things
+    // can still go sideways.
+    ss << "block 'size' values are capped at "
+       << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
+       << " a higher value, please adjust 'osd_bench_max_block_size'";
+    ret = -EINVAL;
+    return ret;
+  } else if (bsize < (int64_t) (1 << 20)) {
+    // entering the realm of small block sizes.
+    // limit the count to a sane value, assuming a configurable amount of
+    // IOPS and duration, so that the OSD doesn't get hung up on this,
+    // preventing timeouts from going off
+    int64_t max_count =
+      bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
+    if (count > max_count) {
+      ss << "'count' values greater than " << max_count
+         << " for a block size of " << byte_u_t(bsize) << ", assuming "
+         << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
+         << " for " << duration << " seconds,"
+         << " can cause ill effects on osd. "
+         << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
+         << " value if you wish to use a higher 'count'.";
+      ret = -EINVAL;
+      return ret;
+    }
+  } else {
+    // 1MB block sizes are big enough so that we get more stuff done.
+    // However, to avoid the osd from getting hung on this and having
+    // timers being triggered, we are going to limit the count assuming
+    // a configurable throughput and duration.
+    // NOTE: max_count is the total amount of bytes that we believe we
+    //       will be able to write during 'duration' for the given
+    //       throughput.  The block size hardly impacts this unless it's
+    //       way too big.  Given we already check how big the block size
+    //       is, it's safe to assume everything will check out.
+    int64_t max_count =
+      cct->_conf->osd_bench_large_size_max_throughput * duration;
+    if (count > max_count) {
+      ss << "'count' values greater than " << max_count
+         << " for a block size of " << byte_u_t(bsize) << ", assuming "
+         << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
+         << " for " << duration << " seconds,"
+         << " can cause ill effects on osd. "
+         << " Please adjust 'osd_bench_large_size_max_throughput'"
+         << " with a higher value if you wish to use a higher 'count'.";
+      ret = -EINVAL;
+      return ret;
+    }
+  }
+
+  if (osize && bsize > osize) {
+    bsize = osize;
+  }
+
+  dout(1) << " bench count " << count
+          << " bsize " << byte_u_t(bsize) << dendl;
+
+  ObjectStore::Transaction cleanupt;
+
+  if (osize && onum) {
+    bufferlist bl;
+    bufferptr bp(osize);
+    memset(bp.c_str(), 'a', bp.length());
+    bl.push_back(std::move(bp));
+    bl.rebuild_page_aligned();
+    for (int i=0; i<onum; ++i) {
+      char nm[30];
+      snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
+      object_t oid(nm);
+      hobject_t soid(sobject_t(oid, 0));
+      ObjectStore::Transaction t;
+      t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
+      store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+      cleanupt.remove(coll_t(), ghobject_t(soid));
+    }
+  }
+
+  bufferlist bl;
+  bufferptr bp(bsize);
+  memset(bp.c_str(), 'a', bp.length());
+  bl.push_back(std::move(bp));
+  bl.rebuild_page_aligned();
+
+  {
+    C_SaferCond waiter;
+    if (!service.meta_ch->flush_commit(&waiter)) {
+      waiter.wait();
+    }
+  }
+
+  utime_t start = ceph_clock_now();
+  for (int64_t pos = 0; pos < count; pos += bsize) {
+    char nm[30];
+    unsigned offset = 0;
+    if (onum && osize) {
+      snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
+      offset = rand() % (osize / bsize) * bsize;
+    } else {
+      snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
+    }
+    object_t oid(nm);
+    hobject_t soid(sobject_t(oid, 0));
+    ObjectStore::Transaction t;
+    t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
+    store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+    if (!onum || !osize) {
+      cleanupt.remove(coll_t::meta(), ghobject_t(soid));
+    }
+  }
+
+  {
+    C_SaferCond waiter;
+    if (!service.meta_ch->flush_commit(&waiter)) {
+      waiter.wait();
+    }
+  }
+  utime_t end = ceph_clock_now();
+  *elapsed = end - start;
+
+  // clean up
+  store->queue_transaction(service.meta_ch, std::move(cleanupt), nullptr);
+  {
+    C_SaferCond waiter;
+    if (!service.meta_ch->flush_commit(&waiter)) {
+      waiter.wait();
+    }
+  }
+
+ return ret;
+}
+
 class TestOpsSocketHook : public AdminSocketHook {
   OSDService *service;
   ObjectStore *store;
@@ -3290,7 +3361,7 @@ int OSD::enable_disable_fuse(bool stop)
           << cpp_strerror(r) << dendl;
       return r;
     }
-    fuse_store = new FuseStore(store, mntpath);
+    fuse_store = new FuseStore(store.get(), mntpath);
     r = fuse_store->start();
     if (r < 0) {
       derr << __func__ << " unable to start fuse: " << cpp_strerror(r) << dendl;
@@ -3381,7 +3452,7 @@ int OSD::init()
   std::lock_guard lock(osd_lock);
   if (is_stopping())
     return 0;
-
+  tracing::osd::tracer.init("osd");
   tick_timer.init();
   tick_timer_without_osd_lock.init();
   service.recovery_request_timer.init();
@@ -3404,6 +3475,10 @@ int OSD::init()
 
   store->set_cache_shards(get_num_cache_shards());
 
+ int rotating_auth_attempts = 0;
+ auto rotating_auth_timeout =
+   g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
+
   int r = store->mount();
   if (r < 0) {
     derr << "OSD:init: unable to mount object store" << dendl;
@@ -3418,7 +3493,12 @@ int OSD::init()
   dout(2) << "boot" << dendl;
 
   service.meta_ch = store->open_collection(coll_t::meta());
-
+  if (!service.meta_ch) {
+    derr << "OSD:init: unable to open meta collection"
+         << dendl;
+    r = -ENOENT;
+    goto out;
+  }
   // initialize the daily loadavg with current 15min loadavg
   double loadavgs[3];
   if (getloadavg(loadavgs, 3) == 3) {
@@ -3428,10 +3508,6 @@ int OSD::init()
     daily_loadavg = 1.0;
   }
 
-  int rotating_auth_attempts = 0;
-  auto rotating_auth_timeout =
-    g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
-
   // sanity check long object name handling
   {
     hobject_t l;
@@ -3535,7 +3611,7 @@ int OSD::init()
       auto ch = service.meta_ch;
       auto hoid = make_snapmapper_oid();
       unsigned max = cct->_conf->osd_target_transaction_size;
-      r = SnapMapper::convert_legacy(cct, store, ch, hoid, max);
+      r = SnapMapper::convert_legacy(cct, store.get(), ch, hoid, max);
       if (r < 0)
        goto out;
     }
@@ -3767,13 +3843,16 @@ int OSD::init()
 
   start_boot();
 
+  // Override a few options if mclock scheduler is enabled.
+  maybe_override_max_osd_capacity_for_qos();
+  maybe_override_options_for_qos();
+
   return 0;
 
 out:
   enable_disable_fuse(true);
   store->umount();
-  delete store;
-  store = NULL;
+  store.reset();
   return r;
 }
 
@@ -3820,7 +3899,7 @@ void OSD::final_init()
   ceph_assert(r == 0);
   r = admin_socket->register_command("dump_op_pq_state",
                                     asok_hook,
-                                    "dump op priority queue state");
+                                    "dump op queue state");
   ceph_assert(r == 0);
   r = admin_socket->register_command("dump_blocklist",
                                     asok_hook,
@@ -3912,7 +3991,12 @@ void OSD::final_init()
     "Dump osd heartbeat network ping times");
   ceph_assert(r == 0);
 
-  test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
+  r = admin_socket->register_command(
+    "dump_pool_statfs name=poolid,type=CephInt,req=true", asok_hook,
+    "Dump store's statistics for the given pool");
+  ceph_assert(r == 0);
+
+  test_ops_hook = new TestOpsSocketHook(&(this->service), this->store.get());
   // Note: pools are CephString instead of CephPoolname because
   // these commands traditionally support both pool names and numbers
   r = admin_socket->register_command(
@@ -4061,6 +4145,14 @@ void OSD::final_init()
     asok_hook,
     "Scrub purged_snaps vs snapmapper index");
   ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "scrubdebug "                                              \
+    "name=pgid,type=CephPgid "                                 \
+    "name=cmd,type=CephChoices,strings=block|unblock|set|unset " \
+    "name=value,type=CephString,req=false",
+    asok_hook,
+    "debug the scrubber");
+  ceph_assert(r == 0);
 
   // -- pg commands --
   // old form: ceph pg <pgid> command ...
@@ -4155,27 +4247,44 @@ PerfCounters* OSD::create_recoverystate_perf()
 
 int OSD::shutdown()
 {
+  // vstart overwrites osd_fast_shutdown value in the conf file -> force the value here!
+  //cct->_conf->osd_fast_shutdown = true;
+
+  dout(0) << "Fast Shutdown: - cct->_conf->osd_fast_shutdown = "
+         << cct->_conf->osd_fast_shutdown
+         << ", null-fm = " << store->has_null_manager() << dendl;
+
+  utime_t  start_time_func = ceph_clock_now();
+
   if (cct->_conf->osd_fast_shutdown) {
     derr << "*** Immediate shutdown (osd_fast_shutdown=true) ***" << dendl;
     if (cct->_conf->osd_fast_shutdown_notify_mon)
       service.prepare_to_stop();
-    cct->_log->flush();
-    _exit(0);
-  }
 
-  if (!service.prepare_to_stop())
+    // There is no state we need to keep wehn running in NULL-FM moode
+    if (!store->has_null_manager()) {
+      cct->_log->flush();
+      _exit(0);
+    }
+  } else if (!service.prepare_to_stop()) {
     return 0; // already shutting down
+  }
+
   osd_lock.lock();
   if (is_stopping()) {
     osd_lock.unlock();
     return 0;
   }
-  dout(0) << "shutdown" << dendl;
 
+  if (!cct->_conf->osd_fast_shutdown) {
+    dout(0) << "shutdown" << dendl;
+  }
+
+  // don't accept new task for this OSD
   set_state(STATE_STOPPING);
 
-  // Debugging
-  if (cct->_conf.get_val<bool>("osd_debug_shutdown")) {
+  // Disabled debugging during fast-shutdown
+  if (!cct->_conf->osd_fast_shutdown && cct->_conf.get_val<bool>("osd_debug_shutdown")) {
     cct->_conf.set_val("debug_osd", "100");
     cct->_conf.set_val("debug_journal", "100");
     cct->_conf.set_val("debug_filestore", "100");
@@ -4184,6 +4293,45 @@ int OSD::shutdown()
     cct->_conf.apply_changes(nullptr);
   }
 
+  if (cct->_conf->osd_fast_shutdown) {
+    // first, stop new task from being taken from op_shardedwq
+    // and clear all pending tasks
+    op_shardedwq.stop_for_fast_shutdown();
+
+    utime_t  start_time_timer = ceph_clock_now();
+    tick_timer.shutdown();
+    {
+      std::lock_guard l(tick_timer_lock);
+      tick_timer_without_osd_lock.shutdown();
+    }
+
+    osd_lock.unlock();
+    utime_t  start_time_osd_drain = ceph_clock_now();
+
+    // then, wait on osd_op_tp to drain (TBD: should probably add a timeout)
+    osd_op_tp.drain();
+    osd_op_tp.stop();
+
+    utime_t  start_time_umount = ceph_clock_now();
+    store->prepare_for_fast_shutdown();
+    std::lock_guard lock(osd_lock);
+    // TBD: assert in allocator that nothing is being add
+    store->umount();
+
+    utime_t end_time = ceph_clock_now();
+    if (cct->_conf->osd_fast_shutdown_timeout) {
+      ceph_assert(end_time - start_time_func < cct->_conf->osd_fast_shutdown_timeout);
+    }
+    dout(0) <<"Fast Shutdown duration total     :" << end_time              - start_time_func       << " seconds" << dendl;
+    dout(0) <<"Fast Shutdown duration osd_drain :" << start_time_umount     - start_time_osd_drain  << " seconds" << dendl;
+    dout(0) <<"Fast Shutdown duration umount    :" << end_time              - start_time_umount     << " seconds" << dendl;
+    dout(0) <<"Fast Shutdown duration timer     :" << start_time_osd_drain  - start_time_timer      << " seconds" << dendl;
+    cct->_log->flush();
+
+    // now it is safe to exit
+    _exit(0);
+  }
+
   // stop MgrClient earlier as it's more like an internal consumer of OSD
   mgrc.shutdown();
 
@@ -4331,8 +4479,7 @@ int OSD::shutdown()
 
   std::lock_guard lock(osd_lock);
   store->umount();
-  delete store;
-  store = nullptr;
+  store.reset();
   dout(10) << "Store synced" << dendl;
 
   op_tracker.on_shutdown();
@@ -4346,6 +4493,11 @@ int OSD::shutdown()
   hb_front_server_messenger->shutdown();
   hb_back_server_messenger->shutdown();
 
+  utime_t duration = ceph_clock_now() - start_time_func;
+  dout(0) <<"Slow Shutdown duration:" << duration << " seconds" << dendl;
+
+  tracing::osd::tracer.shutdown();
+
   return r;
 }
 
@@ -4772,10 +4924,10 @@ void OSD::load_pgs()
        ++it) {
     spg_t pgid;
     if (it->is_temp(&pgid) ||
-       (it->is_pg(&pgid) && PG::_has_removal_flag(store, pgid))) {
+       (it->is_pg(&pgid) && PG::_has_removal_flag(store.get(), pgid))) {
       dout(10) << "load_pgs " << *it
               << " removing, legacy or flagged for removal pg" << dendl;
-      recursive_remove_collection(cct, store, pgid, *it);
+      recursive_remove_collection(cct, store.get(), pgid, *it);
       continue;
     }
 
@@ -4786,7 +4938,7 @@ void OSD::load_pgs()
 
     dout(10) << "pgid " << pgid << " coll " << coll_t(pgid) << dendl;
     epoch_t map_epoch = 0;
-    int r = PG::peek_map_epoch(store, pgid, &map_epoch);
+    int r = PG::peek_map_epoch(store.get(), pgid, &map_epoch);
     if (r < 0) {
       derr << __func__ << " unable to peek at " << pgid << " metadata, skipping"
           << dendl;
@@ -4816,7 +4968,7 @@ void OSD::load_pgs()
       pg = _make_pg(get_osdmap(), pgid);
     }
     if (!pg) {
-      recursive_remove_collection(cct, store, pgid, *it);
+      recursive_remove_collection(cct, store.get(), pgid, *it);
       continue;
     }
 
@@ -4826,13 +4978,13 @@ void OSD::load_pgs()
     pg->ch = store->open_collection(pg->coll);
 
     // read pg state, log
-    pg->read_state(store);
+    pg->read_state(store.get());
 
     if (pg->dne())  {
       dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
       pg->ch = nullptr;
       pg->unlock();
-      recursive_remove_collection(cct, store, pgid, *it);
+      recursive_remove_collection(cct, store.get(), pgid, *it);
       continue;
     }
     {
@@ -4841,8 +4993,6 @@ void OSD::load_pgs()
       store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
     }
 
-    pg->reg_next_scrub();
-
     dout(10) << __func__ << " loaded " << *pg << dendl;
     pg->unlock();
 
@@ -4863,8 +5013,6 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
     return nullptr;
   }
 
-  PeeringCtx rctx = create_context();
-
   OSDMapRef startmap = get_map(info->epoch);
 
   if (info->by_mon) {
@@ -4898,6 +5046,7 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
                 << "the pool allows ec overwrites but is not stored in "
                 << "bluestore, so deep scrubbing will not detect bitrot";
   }
+  PeeringCtx rctx;
   create_pg_collection(
     rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
   init_pg_ondisk(rctx.transaction, pgid, pp);
@@ -4926,7 +5075,6 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
     acting_primary,
     info->history,
     info->past_intervals,
-    false,
     rctx.transaction);
 
   pg->init_collection_pool_opts();
@@ -5754,22 +5902,10 @@ void OSD::heartbeat()
   ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
   dout(30) << "heartbeat" << dendl;
 
-  // get CPU load avg
-  double loadavgs[1];
-  int hb_interval = cct->_conf->osd_heartbeat_interval;
-  int n_samples = 86400;
-  if (hb_interval > 1) {
-    n_samples /= hb_interval;
-    if (n_samples < 1)
-      n_samples = 1;
-  }
-
-  if (getloadavg(loadavgs, 1) == 1) {
-    logger->set(l_osd_loadavg, 100 * loadavgs[0]);
-    daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
-    dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
+  auto load_for_logger = service.get_scrub_services().update_load_average();
+  if (load_for_logger) {
+    logger->set(l_osd_loadavg, load_for_logger.value());
   }
-
   dout(30) << "heartbeat checking stats" << dendl;
 
   // refresh peer list and osd stats
@@ -5938,7 +6074,7 @@ void OSD::tick()
     // use a seed that is stable for each scrub interval, but varies
     // by OSD to avoid any herds.
     rng.seed(whoami + superblock.last_purged_snaps_scrub.sec());
-    double r = (rng() % 1024) / 1024;
+    double r = (rng() % 1024) / 1024.0;
     next +=
       cct->_conf->osd_scrub_min_interval *
       cct->_conf->osd_scrub_interval_randomize_ratio * r;
@@ -6085,8 +6221,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
       return;
     }
 
-    int64_t shardid;
-    cmd_getval(cmdmap, "shardid", shardid, int64_t(shard_id_t::NO_SHARD));
+    int64_t shardid = cmd_getval_or<int64_t>(cmdmap, "shardid", shard_id_t::NO_SHARD);
     hobject_t obj(object_t(objname), string(""), CEPH_NOSNAP, rawpg.ps(), pool, nspace);
     ghobject_t gobj(obj, ghobject_t::NO_GEN, shard_id_t(uint8_t(shardid)));
     spg_t pgid(curmap->raw_pg_to_pg(rawpg), shard_id_t(shardid));
@@ -6175,8 +6310,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     return;
   }
   if (command == "set_recovery_delay") {
-    int64_t delay;
-    cmd_getval(cmdmap, "utime", delay, (int64_t)0);
+    int64_t delay = cmd_getval_or<int64_t>(cmdmap, "utime", 0);
     ostringstream oss;
     oss << delay;
     int r = service->cct->_conf.set_val("osd_recovery_delay_start",
@@ -6193,11 +6327,10 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     return;
   }
   if (command == "injectfull") {
-    int64_t count;
-    string type;
+    int64_t count = cmd_getval_or<int64_t>(cmdmap, "count", -1);
+    string type = cmd_getval_or<string>(cmdmap, "type", "full");
     OSDService::s_names state;
-    cmd_getval(cmdmap, "type", type, string("full"));
-    cmd_getval(cmdmap, "count", count, (int64_t)-1);
+
     if (type == "none" || count == 0) {
       type = "none";
       count = 0;
@@ -6467,7 +6600,7 @@ void OSD::handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *m)
       m->last < superblock.purged_snaps_last) {
     goto out;
   }
-  SnapMapper::record_purged_snaps(cct, store, service.meta_ch,
+  SnapMapper::record_purged_snaps(cct, store.get(), service.meta_ch,
                                  make_purged_snaps_oid(), &t,
                                  m->purged_snaps);
   superblock.purged_snaps_last = m->last;
@@ -6899,7 +7032,7 @@ void OSD::scrub_purged_snaps()
 {
   dout(10) << __func__ << dendl;
   ceph_assert(ceph_mutex_is_locked(osd_lock));
-  SnapMapper::Scrubber s(cct, store, service.meta_ch,
+  SnapMapper::Scrubber s(cct, store.get(), service.meta_ch,
                         make_snapmapper_oid(),
                         make_purged_snaps_oid());
   clog->debug() << "purged_snaps scrub starts";
@@ -7118,18 +7251,12 @@ void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRe
 
 void OSD::ms_fast_dispatch(Message *m)
 {
-
-#ifdef HAVE_JAEGER
-  jaeger_tracing::init_tracer("osd-services-reinit");
-  dout(10) << "jaeger tracer after " << opentracing::Tracer::Global() << dendl;
-  auto dispatch_span = jaeger_tracing::new_span(__func__);
-#endif
+  auto dispatch_span = tracing::osd::tracer.start_trace(__func__);
   FUNCTRACE(cct);
   if (service.is_stopping()) {
     m->put();
     return;
   }
-
   // peering event?
   switch (m->get_type()) {
   case CEPH_MSG_PING:
@@ -7142,18 +7269,14 @@ void OSD::ms_fast_dispatch(Message *m)
   case MSG_OSD_SCRUB2:
     handle_fast_scrub(static_cast<MOSDScrub2*>(m));
     return;
-
   case MSG_OSD_PG_CREATE2:
     return handle_fast_pg_create(static_cast<MOSDPGCreate2*>(m));
-  case MSG_OSD_PG_QUERY:
-    return handle_fast_pg_query(static_cast<MOSDPGQuery*>(m));
   case MSG_OSD_PG_NOTIFY:
     return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));
   case MSG_OSD_PG_INFO:
     return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));
   case MSG_OSD_PG_REMOVE:
     return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));
-
     // these are single-pg messages that handle themselves
   case MSG_OSD_PG_LOG:
   case MSG_OSD_PG_TRIM:
@@ -7184,13 +7307,8 @@ void OSD::ms_fast_dispatch(Message *m)
     tracepoint(osd, ms_fast_dispatch, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
   }
-#ifdef HAVE_JAEGER
-  op->set_osd_parent_span(dispatch_span);
-  if (op->osd_parent_span) {
-    auto op_req_span = jaeger_tracing::child_span("op-request-created", op->osd_parent_span);
-    op->set_osd_parent_span(op_req_span);
-  }
-#endif
+  op->osd_parent_span = tracing::osd::tracer.add_span("op-request-created", dispatch_span);
+
   if (m->trace)
     op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
 
@@ -7422,270 +7540,111 @@ bool OSD::scrub_random_backoff()
   return false;
 }
 
-OSDService::ScrubJob::ScrubJob(CephContext* cct,
-                              const spg_t& pg, const utime_t& timestamp,
-                              double pool_scrub_min_interval,
-                              double pool_scrub_max_interval, bool must)
-  : cct(cct),
-    pgid(pg),
-    sched_time(timestamp),
-    deadline(timestamp)
-{
-  // if not explicitly requested, postpone the scrub with a random delay
-  if (!must) {
-    double scrub_min_interval = pool_scrub_min_interval > 0 ?
-      pool_scrub_min_interval : cct->_conf->osd_scrub_min_interval;
-    double scrub_max_interval = pool_scrub_max_interval > 0 ?
-      pool_scrub_max_interval : cct->_conf->osd_scrub_max_interval;
-
-    sched_time += scrub_min_interval;
-    double r = rand() / (double)RAND_MAX;
-    sched_time +=
-      scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
-    if (scrub_max_interval == 0) {
-      deadline = utime_t();
-    } else {
-      deadline += scrub_max_interval;
-    }
-
-  }
-}
-
-bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs) const {
-  if (sched_time < rhs.sched_time)
-    return true;
-  if (sched_time > rhs.sched_time)
-    return false;
-  return pgid < rhs.pgid;
-}
 
-void OSDService::dumps_scrub(ceph::Formatter *f)
+void OSD::sched_scrub()
 {
-  ceph_assert(f != nullptr);
-  std::lock_guard l(sched_scrub_lock);
+  auto& scrub_scheduler = service.get_scrub_services();
 
-  f->open_array_section("scrubs");
-  for (const auto &i: sched_scrub_pg) {
-    f->open_object_section("scrub");
-    f->dump_stream("pgid") << i.pgid;
-    f->dump_stream("sched_time") << i.sched_time;
-    f->dump_stream("deadline") << i.deadline;
-    f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp());
-    f->close_section();
+  // fail fast if no resources are available
+  if (!scrub_scheduler.can_inc_scrubs()) {
+    dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
+    return;
   }
-  f->close_section();
-}
 
-double OSD::scrub_sleep_time(bool must_scrub)
-{
-  if (must_scrub) {
-    return cct->_conf->osd_scrub_sleep;
-  }
-  utime_t now = ceph_clock_now();
-  if (scrub_time_permit(now)) {
-    return cct->_conf->osd_scrub_sleep;
+  // if there is a PG that is just now trying to reserve scrub replica resources -
+  // we should wait and not initiate a new scrub
+  if (scrub_scheduler.is_reserving_now()) {
+    dout(20) << __func__ << ": scrub resources reservation in progress" << dendl;
+    return;
   }
-  double normal_sleep = cct->_conf->osd_scrub_sleep;
-  double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
-  return std::max(extended_sleep, normal_sleep);
-}
 
-bool OSD::scrub_time_permit(utime_t now)
-{
-  struct tm bdt;
-  time_t tt = now.sec();
-  localtime_r(&tt, &bdt);
+  Scrub::ScrubPreconds env_conditions;
 
-  bool day_permit = false;
-  if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
-    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
-      day_permit = true;
-    }
-  } else {
-    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
-      day_permit = true;
+  if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
+    if (!cct->_conf->osd_repair_during_recovery) {
+      dout(15) << __func__ << ": not scheduling scrubs due to active recovery"
+              << dendl;
+      return;
     }
+    dout(10) << __func__
+      << " will only schedule explicitly requested repair due to active recovery"
+      << dendl;
+    env_conditions.allow_requested_repair_only = true;
   }
 
-  if (!day_permit) {
-    dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
-            << " - " << cct->_conf->osd_scrub_end_week_day
-            << " now " << bdt.tm_wday << " = no" << dendl;
-    return false;
-  }
-
-  bool time_permit = false;
-  if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
-    if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
-      time_permit = true;
-    }
-  } else {
-    if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
-      time_permit = true;
+  if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+    dout(20) << __func__ << " sched_scrub starts" << dendl;
+    auto all_jobs = scrub_scheduler.list_registered_jobs();
+    for (const auto& sj : all_jobs) {
+      dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl;
     }
   }
-  if (time_permit) {
-    dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
-            << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = yes" << dendl;
-  } else {
-    dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
-            << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = no" << dendl;
-  }
-  return time_permit;
+
+  auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions);
+  dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started)
+          << ")" << dendl;
 }
 
-bool OSD::scrub_load_below_threshold()
+Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid,
+                                                     bool allow_requested_repair_only)
 {
-  double loadavgs[3];
-  if (getloadavg(loadavgs, 3) != 3) {
-    dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
-    return false;
-  }
+  dout(20) << __func__ << " trying " << pgid << dendl;
 
-  // allow scrub if below configured threshold
-  long cpus = sysconf(_SC_NPROCESSORS_ONLN);
-  double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
-  if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
-    dout(20) << __func__ << " loadavg per cpu " << loadavg_per_cpu
-            << " < max " << cct->_conf->osd_scrub_load_threshold
-            << " = yes" << dendl;
-    return true;
-  }
+  // we have a candidate to scrub. We need some PG information to know if scrubbing is
+  // allowed
 
-  // allow scrub if below daily avg and currently decreasing
-  if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
-    dout(20) << __func__ << " loadavg " << loadavgs[0]
-            << " < daily_loadavg " << daily_loadavg
-            << " and < 15m avg " << loadavgs[2]
-            << " = yes" << dendl;
-    return true;
+  PGRef pg = osd->lookup_lock_pg(pgid);
+  if (!pg) {
+    // the PG was dequeued in the short timespan between creating the candidates list
+    // (collect_ripe_jobs()) and here
+    dout(5) << __func__ << " pg  " << pgid << " not found" << dendl;
+    return Scrub::schedule_result_t::no_such_pg;
   }
 
-  dout(20) << __func__ << " loadavg " << loadavgs[0]
-          << " >= max " << cct->_conf->osd_scrub_load_threshold
-          << " and ( >= daily_loadavg " << daily_loadavg
-          << " or >= 15m avg " << loadavgs[2]
-          << ") = no" << dendl;
-  return false;
-}
-
-void OSD::sched_scrub()
-{
-  dout(20) << __func__ << " sched_scrub starts" << dendl;
-
-  // if not permitted, fail fast
-  if (!service.can_inc_scrubs()) {
-    dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
-    return;
+  // This has already started, so go on to the next scrub job
+  if (pg->is_scrub_queued_or_active()) {
+    pg->unlock();
+    dout(20) << __func__ << ": already in progress pgid " << pgid << dendl;
+    return Scrub::schedule_result_t::already_started;
   }
-  bool allow_requested_repair_only = false;
-  if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
-    if (!cct->_conf->osd_repair_during_recovery) {
-      dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl;
-      return;
-    }
-    dout(10) << __func__
-             << " will only schedule explicitly requested repair due to active recovery"
-             << dendl;
-    allow_requested_repair_only = true;
+  // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+  if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
+    pg->unlock();
+    dout(10) << __func__ << " skip " << pgid
+            << " because repairing is not explicitly requested on it" << dendl;
+    return Scrub::schedule_result_t::preconditions;
   }
 
-  utime_t now = ceph_clock_now();
-  bool time_permit = scrub_time_permit(now);
-  bool load_is_low = scrub_load_below_threshold();
-  dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
-
-  OSDService::ScrubJob scrub_job;
-  if (service.first_scrub_stamp(&scrub_job)) {
-    do {
-      dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl;
-
-      if (scrub_job.sched_time > now) {
-       // save ourselves some effort
-       dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time
-                << " > " << now << dendl;
-       break;
-      }
-
-      if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) {
-        dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to "
-                 << (!time_permit ? "time not permit" : "high load") << dendl;
-        continue;
-      }
-
-      PGRef pg = _lookup_lock_pg(scrub_job.pgid);
-      if (!pg) {
-       dout(20) << __func__ << " pg  " << scrub_job.pgid << " not found" << dendl;
-       continue;
-      }
-
-      // This has already started, so go on to the next scrub job
-      if (pg->is_scrub_active()) {
-       pg->unlock();
-       dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl;
-       continue;
-      }
-      // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
-      if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
-        pg->unlock();
-        dout(10) << __func__ << " skip " << scrub_job.pgid
-                 << " because repairing is not explicitly requested on it"
-                 << dendl;
-        continue;
-      }
-
-      // If it is reserving, let it resolve before going to the next scrub job
-      if (pg->m_scrubber->is_reserving()) {
-       pg->unlock();
-       dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl;
-       break;
-      }
-      dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time
-              << (pg->get_must_scrub() ? ", explicitly requested" :
-                  (load_is_low ? ", load_is_low" : " deadline < now"))
-              << dendl;
-      if (pg->sched_scrub()) {
-       pg->unlock();
-        dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl;
-       break;
-      }
-      pg->unlock();
-    } while (service.next_scrub_stamp(scrub_job, &scrub_job));
-  }
-  dout(20) << "sched_scrub done" << dendl;
+  auto scrub_attempt = pg->sched_scrub();
+  pg->unlock();
+  return scrub_attempt;
 }
 
 void OSD::resched_all_scrubs()
 {
   dout(10) << __func__ << ": start" << dendl;
-  const vector<spg_t> pgs = [this] {
-    vector<spg_t> pgs;
-    OSDService::ScrubJob job;
-    if (service.first_scrub_stamp(&job)) {
-      do {
-        pgs.push_back(job.pgid);
-      } while (service.next_scrub_stamp(job, &job));
-    }
-    return pgs;
-  }();
-  for (auto& pgid : pgs) {
-      dout(20) << __func__ << ": examine " << pgid << dendl;
-      PGRef pg = _lookup_lock_pg(pgid);
-      if (!pg)
-       continue;
-      if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
-        dout(15) << __func__ << ": reschedule " << pgid << dendl;
-        pg->on_info_history_change();
-      }
-      pg->unlock();
+  auto all_jobs = service.get_scrub_services().list_registered_jobs();
+  for (auto& e : all_jobs) {
+
+    auto& job = *e;
+    dout(20) << __func__ << ": examine " << job.pgid << dendl;
+
+    PGRef pg = _lookup_lock_pg(job.pgid);
+    if (!pg)
+      continue;
+
+    if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+      dout(15) << __func__ << ": reschedule " << job.pgid << dendl;
+      pg->reschedule_scrub();
+    }
+    pg->unlock();
   }
   dout(10) << __func__ << ": done" << dendl;
 }
 
 MPGStats* OSD::collect_pg_stats()
 {
+  dout(15) << __func__ << dendl;
   // This implementation unconditionally sends every is_primary PG's
   // stats every time we're called.  This has equivalent cost to the
   // previous implementation's worst case where all PGs are busy and
@@ -7711,7 +7670,7 @@ MPGStats* OSD::collect_pg_stats()
     if (!pg->is_primary()) {
       continue;
     }
-    pg->get_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
+    pg->with_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
        m->pg_stat[pg->pg_id.pgid] = s;
        min_last_epoch_clean = std::min(min_last_epoch_clean, lec);
        min_last_epoch_clean_pgs.push_back(pg->pg_id.pgid);
@@ -7749,6 +7708,15 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
     too_old -= cct->_conf.get_val<double>("osd_op_complaint_time");
     int slow = 0;
     TrackedOpRef oldest_op;
+    OSDMapRef osdmap = get_osdmap();
+    // map of slow op counts by slow op event type for an aggregated logging to
+    // the cluster log.
+    map<uint8_t, int> slow_op_types;
+    // map of slow op counts by pool for reporting a pool name with highest
+    // slow ops.
+    map<uint64_t, int> slow_op_pools;
+    bool log_aggregated_slow_op =
+           cct->_conf.get_val<bool>("osd_aggregated_slow_ops_logging");
     auto count_slow_ops = [&](TrackedOp& op) {
       if (op.get_initiated() < too_old) {
         stringstream ss;
@@ -7758,7 +7726,19 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
            << " currently "
            << op.state_string();
         lgeneric_subdout(cct,osd,20) << ss.str() << dendl;
-        clog->warn() << ss.str();
+        if (log_aggregated_slow_op) {
+          if (const OpRequest *req = dynamic_cast<const OpRequest *>(&op)) {
+            uint8_t op_type = req->state_flag();
+            auto m = req->get_req<MOSDFastDispatchOp>();
+            uint64_t poolid = m->get_spg().pgid.m_pool;
+            slow_op_types[op_type]++;
+            if (poolid > 0 && poolid <= (uint64_t) osdmap->get_pool_max()) {
+              slow_op_pools[poolid]++;
+            }
+          }
+        } else {
+          clog->warn() << ss.str();
+        }
        slow++;
        if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
          oldest_op = &op;
@@ -7772,6 +7752,32 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
       if (slow) {
        derr << __func__ << " reporting " << slow << " slow ops, oldest is "
             << oldest_op->get_desc() << dendl;
+        if (log_aggregated_slow_op &&
+             slow_op_types.size() > 0) {
+          stringstream ss;
+          ss << slow << " slow requests (by type [ ";
+          for (const auto& [op_type, count] : slow_op_types) {
+            ss << "'" << OpRequest::get_state_string(op_type)
+               << "' : " << count
+               << " ";
+          }
+          auto slow_pool_it = std::max_element(slow_op_pools.begin(), slow_op_pools.end(),
+                                 [](std::pair<uint64_t, int> p1, std::pair<uint64_t, int> p2) {
+                                   return p1.second < p2.second;
+                                 });
+          if (osdmap->get_pools().find(slow_pool_it->first) != osdmap->get_pools().end()) {
+            string pool_name = osdmap->get_pool_name(slow_pool_it->first);
+            ss << "] most affected pool [ '"
+               << pool_name
+               << "' : "
+               << slow_pool_it->second
+               << " ])";
+          } else {
+            ss << "])";
+          }
+          lgeneric_subdout(cct,osd,20) << ss.str() << dendl;
+          clog->warn() << ss.str();
+        }
       }
       metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
     } else {
@@ -8200,7 +8206,7 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   // record new purged_snaps
   if (superblock.purged_snaps_last == start - 1) {
-    SnapMapper::record_purged_snaps(cct, store, service.meta_ch,
+    SnapMapper::record_purged_snaps(cct, store.get(), service.meta_ch,
                                    make_purged_snaps_oid(), &t,
                                    purged_snaps);
     superblock.purged_snaps_last = last;
@@ -8435,6 +8441,9 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        reset_heartbeat_peers(true);
       }
     }
+  } else if (osdmap->get_epoch() > 0 && osdmap->is_stop(whoami)) {
+    derr << "map says i am stopped by admin. shutting down." << dendl;
+    do_shutdown = true;
   }
 
   map_lock.unlock();
@@ -8569,7 +8578,7 @@ void OSD::_finish_splits(set<PGRef>& pgs)
        ++i) {
     PG *pg = i->get();
 
-    PeeringCtx rctx = create_context();
+    PeeringCtx rctx;
     pg->lock();
     dout(10) << __func__ << " " << *pg << dendl;
     epoch_t e = pg->get_osdmap_epoch();
@@ -9224,11 +9233,6 @@ void OSD::handle_pg_create(OpRequestRef op)
 // ----------------------------------------
 // peering and recovery
 
-PeeringCtx OSD::create_context()
-{
-  return PeeringCtx(get_osdmap()->require_osd_release);
-}
-
 void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
                            ThreadPool::TPHandle *handle)
 {
@@ -9337,31 +9341,6 @@ void OSD::handle_fast_pg_create(MOSDPGCreate2 *m)
   m->put();
 }
 
-void OSD::handle_fast_pg_query(MOSDPGQuery *m)
-{
-  dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
-  if (!require_osd_peer(m)) {
-    m->put();
-    return;
-  }
-  int from = m->get_source().num();
-  for (auto& p : m->pg_list) {
-    enqueue_peering_evt(
-      p.first,
-      PGPeeringEventRef(
-       std::make_shared<PGPeeringEvent>(
-         p.second.epoch_sent, p.second.epoch_sent,
-         MQuery(
-           p.first,
-           pg_shard_t(from, p.second.from),
-           p.second,
-           p.second.epoch_sent),
-         false))
-      );
-  }
-  m->put();
-}
-
 void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
@@ -9406,12 +9385,12 @@ void OSD::handle_fast_pg_info(MOSDPGInfo* m)
     enqueue_peering_evt(
       spg_t(p.info.pgid.pgid, p.to),
       PGPeeringEventRef(
-       std::make_shared<PGPeeringEvent>(
-         p.epoch_sent, p.query_epoch,
-         MInfoRec(
-           pg_shard_t(from, p.from),
-           p.info,
-           p.epoch_sent)))
+       std::make_shared<PGPeeringEvent>(
+         p.epoch_sent, p.query_epoch,
+         MInfoRec(
+           pg_shard_t(from, p.from),
+           p.info,
+           p.epoch_sent)))
       );
   }
   m->put();
@@ -9502,15 +9481,13 @@ void OSD::handle_pg_query_nopg(const MQuery& q)
        osdmap->get_epoch(), empty,
        q.query.epoch_sent);
     } else {
-      vector<pg_notify_t> ls;
-      ls.push_back(
-       pg_notify_t(
-         q.query.from, q.query.to,
-         q.query.epoch_sent,
-         osdmap->get_epoch(),
-         empty,
-         PastIntervals()));
-      m = new MOSDPGNotify(osdmap->get_epoch(), std::move(ls));
+      pg_notify_t notify{q.query.from, q.query.to,
+                        q.query.epoch_sent,
+                        osdmap->get_epoch(),
+                        empty,
+                        PastIntervals()};
+      m = new MOSDPGNotify2(spg_t{pgid.pgid, q.query.from},
+                           std::move(notify));
     }
     service.maybe_share_map(con.get(), osdmap);
     con->send_message(m);
@@ -9671,7 +9648,7 @@ void OSD::do_recovery(
             << " on " << *pg << dendl;
 
     if (do_unfound) {
-      PeeringCtx rctx = create_context();
+      PeeringCtx rctx;
       rctx.handle = &handle;
       pg->find_unfound(queued, rctx);
       dispatch_context(rctx, pg, pg->get_osdmap());
@@ -9771,18 +9748,16 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
   op->osd_trace.event("enqueue op");
   op->osd_trace.keyval("priority", priority);
   op->osd_trace.keyval("cost", cost);
-#ifdef HAVE_JAEGER
-  if (op->osd_parent_span) {
-    auto enqueue_span = jaeger_tracing::child_span(__func__, op->osd_parent_span);
-    enqueue_span->Log({
-       {"priority", priority},
-       {"cost", cost},
-       {"epoch", epoch},
-       {"owner", owner},
-       {"type", type}
-       });
-  }
-#endif
+
+  auto enqueue_span = tracing::osd::tracer.add_span(__func__, op->osd_parent_span);
+  enqueue_span->AddEvent(__func__, {
+    {"priority", priority},
+    {"cost", cost},
+    {"epoch", epoch},
+    {"owner", owner},
+    {"type", type}
+    });
+
   op->mark_queued_for_pg();
   logger->tinc(l_osd_op_before_queue_op_lat, latency);
   if (type == MSG_OSD_PG_PUSH ||
@@ -9860,7 +9835,6 @@ void OSD::dequeue_peering_evt(
   PGPeeringEventRef evt,
   ThreadPool::TPHandle& handle)
 {
-  PeeringCtx rctx = create_context();
   auto curmap = sdata->get_osdmap();
   bool need_up_thru = false;
   epoch_t same_interval_since = 0;
@@ -9871,7 +9845,8 @@ void OSD::dequeue_peering_evt(
       derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
       ceph_abort();
     }
-  } else if (advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
+  } else if (PeeringCtx rctx;
+            advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
     pg->do_peering_event(evt, rctx);
     if (pg->is_deleted()) {
       pg->unlock();
@@ -9937,7 +9912,7 @@ const char** OSD::get_tracked_conf_keys() const
     "osd_snap_trim_sleep",
     "osd_snap_trim_sleep_hdd",
     "osd_snap_trim_sleep_ssd",
-    "osd_snap_trim_sleep_hybrid"
+    "osd_snap_trim_sleep_hybrid",
     "osd_scrub_sleep",
     "osd_recovery_max_active",
     "osd_recovery_max_active_hdd",
@@ -10057,14 +10032,14 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
   if (changed.count("osd_client_message_cap")) {
     uint64_t newval = cct->_conf->osd_client_message_cap;
     Messenger::Policy pol = client_messenger->get_policy(entity_name_t::TYPE_CLIENT);
-    if (pol.throttler_messages && newval > 0) {
+    if (pol.throttler_messages) {
       pol.throttler_messages->reset_max(newval);
     }
   }
   if (changed.count("osd_client_message_size_cap")) {
     uint64_t newval = cct->_conf->osd_client_message_size_cap;
     Messenger::Policy pol = client_messenger->get_policy(entity_name_t::TYPE_CLIENT);
-    if (pol.throttler_bytes && newval > 0) {
+    if (pol.throttler_bytes) {
       pol.throttler_bytes->reset_max(newval);
     }
   }
@@ -10084,11 +10059,99 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
   }
 }
 
+void OSD::maybe_override_max_osd_capacity_for_qos()
+{
+  // If the scheduler enabled is mclock, override the default
+  // osd capacity with the value obtained from running the
+  // osd bench test. This is later used to setup mclock.
+  if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") &&
+      (cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) &&
+      (!unsupported_objstore_for_qos())) {
+    std::string max_capacity_iops_config;
+    bool force_run_benchmark =
+      cct->_conf.get_val<bool>("osd_mclock_force_run_benchmark_on_init");
+
+    if (store_is_rotational) {
+      max_capacity_iops_config = "osd_mclock_max_capacity_iops_hdd";
+    } else {
+      max_capacity_iops_config = "osd_mclock_max_capacity_iops_ssd";
+    }
+
+    if (!force_run_benchmark) {
+      double default_iops = 0.0;
+
+      // Get the current osd iops capacity
+      double cur_iops = cct->_conf.get_val<double>(max_capacity_iops_config);
+
+      // Get the default max iops capacity
+      auto val = cct->_conf.get_val_default(max_capacity_iops_config);
+      if (!val.has_value()) {
+        derr << __func__ << " Unable to determine default value of "
+            << max_capacity_iops_config << dendl;
+        // Cannot determine default iops. Force a run of the OSD benchmark.
+        force_run_benchmark = true;
+      } else {
+        // Default iops
+        default_iops = std::stod(val.value());
+      }
+
+      // Determine if we really need to run the osd benchmark
+      if (!force_run_benchmark && (default_iops != cur_iops)) {
+        dout(1) << __func__ << std::fixed << std::setprecision(2)
+                << " default_iops: " << default_iops
+                << " cur_iops: " << cur_iops
+                << ". Skip OSD benchmark test." << dendl;
+        return;
+      }
+    }
+
+    // Run osd bench: write 100 4MiB objects with blocksize 4KiB
+    int64_t count = 12288000; // Count of bytes to write
+    int64_t bsize = 4096;     // Block size
+    int64_t osize = 4194304;  // Object size
+    int64_t onum = 100;       // Count of objects to write
+    double elapsed = 0.0;     // Time taken to complete the test
+    double iops = 0.0;
+    stringstream ss;
+    int ret = run_osd_bench_test(count, bsize, osize, onum, &elapsed, ss);
+    if (ret != 0) {
+      derr << __func__
+           << " osd bench err: " << ret
+           << " osd bench errstr: " << ss.str()
+           << dendl;
+      return;
+    }
+
+    double rate = count / elapsed;
+    iops = rate / bsize;
+    dout(1) << __func__
+            << " osd bench result -"
+            << std::fixed << std::setprecision(3)
+            << " bandwidth (MiB/sec): " << rate / (1024 * 1024)
+            << " iops: " << iops
+            << " elapsed_sec: " << elapsed
+            << dendl;
+
+    // Persist iops to the MON store
+    ret = mon_cmd_set_config(max_capacity_iops_config, std::to_string(iops));
+    if (ret < 0) {
+      // Fallback to setting the config within the in-memory "values" map.
+      cct->_conf.set_val(max_capacity_iops_config, std::to_string(iops));
+    }
+
+    // Override the max osd capacity for all shards
+    for (auto& shard : shards) {
+      shard->update_scheduler_config();
+    }
+  }
+}
+
 bool OSD::maybe_override_options_for_qos()
 {
   // If the scheduler enabled is mclock, override the recovery, backfill
   // and sleep options so that mclock can meet the QoS goals.
-  if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") {
+  if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+      !unsupported_objstore_for_qos()) {
     dout(1) << __func__
             << ": Changing recovery/backfill/sleep settings for QoS" << dendl;
 
@@ -10132,27 +10195,44 @@ bool OSD::maybe_override_options_for_qos()
   return false;
 }
 
+int OSD::mon_cmd_set_config(const std::string &key, const std::string &val)
+{
+  std::string cmd =
+    "{"
+      "\"prefix\": \"config set\", "
+      "\"who\": \"osd." + std::to_string(whoami) + "\", "
+      "\"name\": \"" + key + "\", "
+      "\"value\": \"" + val + "\""
+    "}";
+
+  vector<std::string> vcmd{cmd};
+  bufferlist inbl;
+  std::string outs;
+  C_SaferCond cond;
+  monc->start_mon_command(vcmd, inbl, nullptr, &outs, &cond);
+  int r = cond.wait();
+  if (r < 0) {
+    derr << __func__ << " Failed to set config key " << key
+         << " err: " << cpp_strerror(r)
+         << " errstr: " << outs << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+bool OSD::unsupported_objstore_for_qos()
+{
+  static const std::vector<std::string> unsupported_objstores = { "filestore" };
+  return std::find(unsupported_objstores.begin(),
+                   unsupported_objstores.end(),
+                   store->get_type()) != unsupported_objstores.end();
+}
+
 void OSD::update_log_config()
 {
-  map<string,string> log_to_monitors;
-  map<string,string> log_to_syslog;
-  map<string,string> log_channel;
-  map<string,string> log_prio;
-  map<string,string> log_to_graylog;
-  map<string,string> log_to_graylog_host;
-  map<string,string> log_to_graylog_port;
-  uuid_d fsid;
-  string host;
-
-  if (parse_log_client_options(cct, log_to_monitors, log_to_syslog,
-                              log_channel, log_prio, log_to_graylog,
-                              log_to_graylog_host, log_to_graylog_port,
-                              fsid, host) == 0)
-    clog->update_config(log_to_monitors, log_to_syslog,
-                       log_channel, log_prio, log_to_graylog,
-                       log_to_graylog_host, log_to_graylog_port,
-                       fsid, host);
-  derr << "log_to_monitors " << log_to_monitors << dendl;
+  auto parsed_options = clog->parse_client_options(cct);
+  derr << "log_to_monitors " << parsed_options.log_to_monitors << dendl;
 }
 
 void OSD::check_config()
@@ -10341,7 +10421,7 @@ void OSDShard::consume_map(
   dout(10) << new_osdmap->get_epoch()
            << " (was " << (old_osdmap ? old_osdmap->get_epoch() : 0) << ")"
           << dendl;
-  bool queued = false;
+  int queued = 0;
 
   // check slots
   auto p = pg_slots.begin();
@@ -10368,8 +10448,7 @@ void OSDShard::consume_map(
        dout(20) << __func__ << "  " << pgid
                 << " pending_peering first epoch " << first
                 << " <= " << new_osdmap->get_epoch() << ", requeueing" << dendl;
-       _wake_pg_slot(pgid, slot);
-       queued = true;
+       queued += _wake_pg_slot(pgid, slot);
       }
       ++p;
       continue;
@@ -10409,14 +10488,18 @@ void OSDShard::consume_map(
   }
   if (queued) {
     std::lock_guard l{sdata_wait_lock};
-    sdata_cond.notify_one();
+    if (queued == 1)
+      sdata_cond.notify_one();
+    else
+      sdata_cond.notify_all();
   }
 }
 
-void OSDShard::_wake_pg_slot(
+int OSDShard::_wake_pg_slot(
   spg_t pgid,
   OSDShardPGSlot *slot)
 {
+  int count = 0;
   dout(20) << __func__ << " " << pgid
           << " to_process " << slot->to_process
           << " waiting " << slot->waiting
@@ -10425,12 +10508,14 @@ void OSDShard::_wake_pg_slot(
        i != slot->to_process.rend();
        ++i) {
     scheduler->enqueue_front(std::move(*i));
+    count++;
   }
   slot->to_process.clear();
   for (auto i = slot->waiting.rbegin();
        i != slot->waiting.rend();
        ++i) {
     scheduler->enqueue_front(std::move(*i));
+    count++;
   }
   slot->waiting.clear();
   for (auto i = slot->waiting_peering.rbegin();
@@ -10441,10 +10526,12 @@ void OSDShard::_wake_pg_slot(
     // someday, if we decide this inefficiency matters
     for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
       scheduler->enqueue_front(std::move(*j));
+      count++;
     }
   }
   slot->waiting_peering.clear();
   ++slot->requeue_seq;
+  return count;
 }
 
 void OSDShard::identify_splits_and_merges(
@@ -10577,15 +10664,16 @@ void OSDShard::prime_merges(const OSDMapRef& as_of_osdmap,
 
 void OSDShard::register_and_wake_split_child(PG *pg)
 {
+  dout(15) <<  __func__ << ": " << pg << " #:" << pg_slots.size() << dendl;
   epoch_t epoch;
   {
     std::lock_guard l(shard_lock);
-    dout(10) << pg->pg_id << " " << pg << dendl;
+    dout(10) << __func__ << ": " << pg->pg_id << " " << pg << dendl;
     auto p = pg_slots.find(pg->pg_id);
     ceph_assert(p != pg_slots.end());
     auto *slot = p->second.get();
-    dout(20) << pg->pg_id << " waiting_for_split " << slot->waiting_for_split
-            << dendl;
+    dout(20) << __func__ << ": " << pg->pg_id << " waiting_for_split "
+            << slot->waiting_for_split << dendl;
     ceph_assert(!slot->pg);
     ceph_assert(!slot->waiting_for_split.empty());
     _attach_pg(slot, pg);
@@ -10632,6 +10720,19 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
   }
 }
 
+void OSDShard::update_scheduler_config()
+{
+  std::lock_guard l(shard_lock);
+  scheduler->update_configuration();
+}
+
+std::string OSDShard::get_scheduler_type()
+{
+  std::ostringstream scheduler_type;
+  scheduler_type << *scheduler;
+  return scheduler_type.str();
+}
+
 OSDShard::OSDShard(
   int id,
   CephContext *cct,
@@ -10646,7 +10747,8 @@ OSDShard::OSDShard(
     shard_lock_name(shard_name + "::shard_lock"),
     shard_lock{make_mutex(shard_lock_name)},
     scheduler(ceph::osd::scheduler::make_scheduler(
-      cct, osd->num_shards, osd->store->is_rotational())),
+      cct, osd->num_shards, osd->store->is_rotational(),
+      osd->store->get_type())),
     context_queue(sdata_wait_lock, sdata_cond)
 {
   dout(0) << "using op scheduler " << *scheduler << dendl;
@@ -10768,12 +10870,17 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       std::unique_lock wait_lock{sdata->sdata_wait_lock};
       auto future_time = ceph::real_clock::from_double(*when_ready);
       dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+      // Disable heartbeat timeout until we find a non-future work item to process.
+      osd->cct->get_heartbeat_map()->clear_timeout(hb);
       sdata->shard_lock.unlock();
       ++sdata->waiting_threads;
       sdata->sdata_cond.wait_until(wait_lock, future_time);
       --sdata->waiting_threads;
       wait_lock.unlock();
       sdata->shard_lock.lock();
+      // Reapply default wq timeouts
+      osd->cct->get_heartbeat_map()->reset_timeout(hb,
+        timeout_interval, suicide_interval);
     }
   } // while
 
@@ -11012,13 +11119,21 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 }
 
 void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
+  if (unlikely(m_fast_shutdown) ) {
+    // stop enqueing when we are in the middle of a fast shutdown
+    return;
+  }
+
   uint32_t shard_index =
     item.get_ordering_token().hash_to_shard(osd->shards.size());
 
-  dout(20) << __func__ << " " << item << dendl;
-
   OSDShard* sdata = osd->shards[shard_index];
   assert (NULL != sdata);
+  if (sdata->get_scheduler_type() == "mClockScheduler") {
+    item.maybe_set_is_qos_item();
+  }
+
+  dout(20) << __func__ << " " << item << dendl;
 
   bool empty = true;
   {
@@ -11039,6 +11154,11 @@ void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
 
 void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
 {
+  if (unlikely(m_fast_shutdown) ) {
+    // stop enqueing when we are in the middle of a fast shutdown
+    return;
+  }
+
   auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
   auto& sdata = osd->shards[shard_index];
   ceph_assert(sdata);
@@ -11065,6 +11185,24 @@ void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
   sdata->sdata_cond.notify_one();
 }
 
+void OSD::ShardedOpWQ::stop_for_fast_shutdown()
+{
+  uint32_t shard_index = 0;
+  m_fast_shutdown = true;
+
+  for (; shard_index < osd->num_shards; shard_index++) {
+    auto& sdata = osd->shards[shard_index];
+    ceph_assert(sdata);
+    sdata->shard_lock.lock();
+    int work_count = 0;
+    while(! sdata->scheduler->empty() ) {
+      auto work_item = sdata->scheduler->dequeue();
+      work_count++;
+    }
+    sdata->shard_lock.unlock();
+  }
+}
+
 namespace ceph::osd_cmds {
 
 int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,