]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / osd / OSD.cc
index 34dd356ebf6e8d2605702a62a4df6243b0c96c13..121190f409549a87b89ee2b67c3ca0c405cae59c 100644 (file)
@@ -36,6 +36,8 @@
 #endif
 
 #include "osd/PG.h"
+#include "osd/scrub_machine.h"
+#include "osd/pg_scrubber.h"
 
 #include "include/types.h"
 #include "include/compat.h"
@@ -51,6 +53,7 @@
 #include "common/ceph_releases.h"
 #include "common/ceph_time.h"
 #include "common/version.h"
+#include "common/async/blocked_completion.h"
 #include "common/pick_address.h"
 #include "common/blkdev.h"
 #include "common/numa.h"
 #include "messages/MOSDPGInfo2.h"
 #include "messages/MOSDPGCreate.h"
 #include "messages/MOSDPGCreate2.h"
-#include "messages/MOSDPGScan.h"
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
 #include "messages/MOSDForceRecovery.h"
 #include "messages/MCommandReply.h"
 
 #include "messages/MPGStats.h"
-#include "messages/MPGStatsAck.h"
 
 #include "messages/MWatchNotify.h"
 #include "messages/MOSDPGPush.h"
 #include "perfglue/cpu_profiler.h"
 #include "perfglue/heap_profiler.h"
 
+#include "osd/ClassHandler.h"
 #include "osd/OpRequest.h"
 
 #include "auth/AuthAuthorizeHandler.h"
 #else
 #define tracepoint(...)
 #endif
+#ifdef HAVE_JAEGER
+#include "common/tracer.h"
+#endif
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
 
+using std::deque;
+using std::list;
+using std::lock_guard;
+using std::make_pair;
+using std::make_tuple;
+using std::make_unique;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::fixed_u_to_string;
+using ceph::Formatter;
+using ceph::heartbeat_handle_d;
+using ceph::make_mutex;
+
 using namespace ceph::osd::scheduler;
 using TOPNSPC::common::cmd_getval;
 
@@ -220,7 +251,7 @@ CompatSet OSD::get_osd_compat_set() {
   return compat;
 }
 
-OSDService::OSDService(OSD *osd) :
+OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
   osd(osd),
   cct(osd->cct),
   whoami(osd->whoami), store(osd->store),
@@ -248,9 +279,10 @@ OSDService::OSDService(OSD *osd) :
   last_recalibrate(ceph_clock_now()),
   promote_max_objects(0),
   promote_max_bytes(0),
+  poolctx(poolctx),
   objecter(make_unique<Objecter>(osd->client_messenger->cct,
                                 osd->objecter_messenger,
-                                osd->monc, nullptr)),
+                                osd->monc, poolctx)),
   m_objecter_finishers(cct->_conf->osd_objecter_finishers),
   watch_timer(osd->client_messenger->cct, watch_lock),
   next_notif_id(0),
@@ -284,7 +316,7 @@ OSDService::OSDService(OSD *osd) :
 }
 
 #ifdef PG_DEBUG_REFS
-void OSDService::add_pgid(spg_t pgid, PG *pg){
+void OSDService::add_pgid(spg_t pgid, PG *pg) {
   std::lock_guard l(pgid_lock);
   if (!pgid_tracker.count(pgid)) {
     live_pgs[pgid] = pg;
@@ -594,7 +626,7 @@ void OSDService::agent_entry()
        << " no agent_work, delay for " << cct->_conf->osd_agent_delay_time
        << " seconds" << dendl;
 
-      osd->logger->inc(l_osd_tier_delay);
+      logger->inc(l_osd_tier_delay);
       // Queue a timer to call agent_choose_mode for this pg in 5 seconds
       std::lock_guard timer_locker{agent_timer_lock};
       Context *cb = new AgentTimeoutCB(pg);
@@ -943,9 +975,9 @@ void OSDService::set_statfs(const struct store_statfs_t &stbuf,
     used = bytes - avail;
   }
 
-  osd->logger->set(l_osd_stat_bytes, bytes);
-  osd->logger->set(l_osd_stat_bytes_used, used);
-  osd->logger->set(l_osd_stat_bytes_avail, avail);
+  logger->set(l_osd_stat_bytes, bytes);
+  logger->set(l_osd_stat_bytes_used, used);
+  logger->set(l_osd_stat_bytes_avail, avail);
 
   std::lock_guard l(stat_lock);
   osd_stat.statfs = stbuf;
@@ -1401,19 +1433,19 @@ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
     }
     max--;
     max_bytes -= bl.length();
-    m->maps[since].claim(bl);
+    m->maps[since] = std::move(bl);
   }
   for (epoch_t e = since + 1; e <= to; ++e) {
     bufferlist bl;
     if (get_inc_map_bl(e, bl)) {
-      m->incremental_maps[e].claim(bl);
+      m->incremental_maps[e] = std::move(bl);
     } else {
       dout(10) << __func__ << " missing incremental map " << e << dendl;
       if (!get_map_bl(e, bl)) {
        derr << __func__ << " also missing full map " << e << dendl;
        goto panic;
       }
-      m->maps[e].claim(bl);
+      m->maps[e] = std::move(bl);
     }
     max--;
     max_bytes -= bl.length();
@@ -1432,7 +1464,7 @@ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
   // send something
   bufferlist bl;
   if (get_inc_map_bl(m->newest_map, bl)) {
-    m->incremental_maps[m->newest_map].claim(bl);
+    m->incremental_maps[m->newest_map] = std::move(bl);
   } else {
     derr << __func__ << " unable to load latest map " << m->newest_map << dendl;
     if (!get_map_bl(m->newest_map, bl)) {
@@ -1440,7 +1472,7 @@ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
           << dendl;
       ceph_abort();
     }
-    m->maps[m->newest_map].claim(bl);
+    m->maps[m->newest_map] = std::move(bl);
   }
   return m;
 }
@@ -1486,12 +1518,10 @@ bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
 {
   bool found = map_bl_cache.lookup(e, &bl);
   if (found) {
-    if (logger)
-      logger->inc(l_osd_map_bl_cache_hit);
+    logger->inc(l_osd_map_bl_cache_hit);
     return true;
   }
-  if (logger)
-    logger->inc(l_osd_map_bl_cache_miss);
+  logger->inc(l_osd_map_bl_cache_miss);
   found = store->read(meta_ch,
                      OSD::get_osdmap_pobject_name(e), 0, 0, bl,
                      CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
@@ -1506,12 +1536,10 @@ bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
   std::lock_guard l(map_cache_lock);
   bool found = map_bl_inc_cache.lookup(e, &bl);
   if (found) {
-    if (logger)
-      logger->inc(l_osd_map_bl_cache_hit);
+    logger->inc(l_osd_map_bl_cache_hit);
     return true;
   }
-  if (logger)
-    logger->inc(l_osd_map_bl_cache_miss);
+  logger->inc(l_osd_map_bl_cache_miss);
   found = store->read(meta_ch,
                      OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl,
                      CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
@@ -1568,12 +1596,10 @@ OSDMapRef OSDService::try_get_map(epoch_t epoch)
   OSDMapRef retval = map_cache.lookup(epoch);
   if (retval) {
     dout(30) << "get_map " << epoch << " -cached" << dendl;
-    if (logger) {
-      logger->inc(l_osd_map_cache_hit);
-    }
+    logger->inc(l_osd_map_cache_hit);
     return retval;
   }
-  if (logger) {
+  {
     logger->inc(l_osd_map_cache_miss);
     epoch_t lb = map_cache.cached_key_lower_bound();
     if (epoch < lb) {
@@ -1718,21 +1744,108 @@ void OSDService::queue_for_snap_trim(PG *pg)
       pg->get_osdmap_epoch()));
 }
 
-void OSDService::queue_for_scrub(PG *pg, bool with_high_priority)
+template <class MSG_TYPE>
+void OSDService::queue_scrub_event_msg(PG* pg,
+                                      Scrub::scrub_prio_t with_priority,
+                                      unsigned int qu_priority)
 {
-  unsigned scrub_queue_priority = pg->scrubber.priority;
-  if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
-    scrub_queue_priority = cct->_conf->osd_client_op_priority;
-  }
   const auto epoch = pg->get_osdmap_epoch();
-  enqueue_back(
-    OpSchedulerItem(
-      unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
-      cct->_conf->osd_scrub_cost,
-      scrub_queue_priority,
-      ceph_clock_now(),
-      0,
-      epoch));
+  auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
+  dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+
+  enqueue_back(OpSchedulerItem(
+    unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
+    pg->scrub_requeue_priority(with_priority, qu_priority), ceph_clock_now(), 0, epoch));
+}
+
+template <class MSG_TYPE>
+void OSDService::queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  const auto epoch = pg->get_osdmap_epoch();
+  auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
+  dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+
+  enqueue_back(OpSchedulerItem(
+    unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
+    pg->scrub_requeue_priority(with_priority), ceph_clock_now(), 0, epoch));
+}
+
+void OSDService::queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  queue_scrub_event_msg<PGScrub>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  queue_scrub_event_msg<PGScrubAfterRepair>(pg, with_priority);
+}
+
+void OSDService::queue_for_rep_scrub(PG* pg,
+                                    Scrub::scrub_prio_t with_priority,
+                                    unsigned int qu_priority)
+{
+  queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority);
+}
+
+void OSDService::queue_for_rep_scrub_resched(PG* pg,
+                                            Scrub::scrub_prio_t with_priority,
+                                            unsigned int qu_priority)
+{
+  // Resulting scrub event: 'SchedReplica'
+  queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority);
+}
+
+void OSDService::queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'RemotesReserved'
+  queue_scrub_event_msg<PGScrubResourcesOK>(pg, with_priority);
+}
+
+void OSDService::queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ReservationFailure'
+  queue_scrub_event_msg<PGScrubDenied>(pg, with_priority);
+}
+
+void OSDService::queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'InternalSchedScrub'
+  queue_scrub_event_msg<PGScrubResched>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ActivePushesUpd'
+  queue_scrub_event_msg<PGScrubPushesUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  queue_scrub_event_msg<PGScrubAppliedUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'Unblocked'
+  queue_scrub_event_msg<PGScrubUnblocked>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'DigestUpdate'
+  queue_scrub_event_msg<PGScrubDigestUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'GotReplicas'
+  queue_scrub_event_msg<PGScrubGotReplMaps>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority)
+{
+  // Resulting scrub event: 'ReplicaPushesUpd'
+  queue_scrub_event_msg<PGScrubReplicaPushes>(pg, with_priority);
 }
 
 void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
@@ -1911,12 +2024,11 @@ void OSDService::_queue_for_recovery(
 #define dout_prefix *_dout
 
 // Commands shared between OSD's console and admin console:
-namespace ceph { 
-namespace osd_cmds { 
+namespace ceph::osd_cmds {
 
 int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f, std::ostream& os);
-}} // namespace ceph::osd_cmds
+
+} // namespace ceph::osd_cmds
 
 int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, string osdspec_affinity)
 {
@@ -2121,7 +2233,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
         Messenger *hb_back_serverm,
         Messenger *osdc_messenger,
         MonClient *mc,
-        const std::string &dev, const std::string &jdev) :
+        const std::string &dev, const std::string &jdev,
+        ceph::async::io_context_pool& poolctx) :
   Dispatcher(cct_),
   tick_timer(cct, osd_lock),
   tick_timer_without_osd_lock(cct, tick_timer_lock),
@@ -2131,8 +2244,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   objecter_messenger(osdc_messenger),
   monc(mc),
   mgrc(cct_, client_messenger, &mc->monmap),
-  logger(NULL),
-  recoverystate_perf(NULL),
+  logger(create_logger()),
+  recoverystate_perf(create_recoverystate_perf()),
   store(store_),
   log_client(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
   clog(log_client.create_channel()),
@@ -2160,20 +2273,20 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   test_ops_hook(NULL),
   op_shardedwq(
     this,
-    cct->_conf->osd_op_thread_timeout,
-    cct->_conf->osd_op_thread_suicide_timeout,
+    ceph::make_timespan(cct->_conf->osd_op_thread_timeout),
+    ceph::make_timespan(cct->_conf->osd_op_thread_suicide_timeout),
     &osd_op_tp),
   last_pg_create_epoch(0),
   boot_finisher(cct),
   up_thru_wanted(0),
   requested_full_first(0),
   requested_full_last(0),
-  service(this)
+  service(this, poolctx)
 {
 
   if (!gss_ktfile_client.empty()) {
-    // Assert we can export environment variable 
-    /* 
+    // Assert we can export environment variable
+    /*
         The default client keytab is used, if it is present and readable,
         to automatically obtain initial credentials for GSSAPI client
         applications. The principal name of the first entry in the client
@@ -2182,7 +2295,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
         2. The default_client_keytab_name profile variable in [libdefaults].
         3. The hardcoded default, DEFCKTNAME.
     */
-    const int32_t set_result(setenv("KRB5_CLIENT_KTNAME", 
+    const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
                                     gss_ktfile_client.c_str(), 1));
     ceph_assert(set_result == 0);
   }
@@ -2508,12 +2621,12 @@ will start to track new ops received afterwards.";
     f->open_object_section("pq");
     op_shardedwq.dump(f);
     f->close_section();
-  } else if (prefix == "dump_blacklist") {
+  } else if (prefix == "dump_blocklist") {
     list<pair<entity_addr_t,utime_t> > bl;
     OSDMapRef curmap = service.get_osdmap();
 
-    f->open_array_section("blacklist");
-    curmap->get_blacklist(&bl);
+    f->open_array_section("blocklist");
+    curmap->get_blocklist(&bl);
     for (list<pair<entity_addr_t,utime_t> >::iterator it = bl.begin();
        it != bl.end(); ++it) {
       f->open_object_section("entry");
@@ -2523,7 +2636,7 @@ will start to track new ops received afterwards.";
       it->second.localtime(f->dump_stream("expire_time"));
       f->close_section(); //entry
     }
-    f->close_section(); //blacklist
+    f->close_section(); //blocklist
   } else if (prefix == "dump_watchers") {
     list<obj_watch_item_t> watchers;
     // scan pg's
@@ -2645,7 +2758,7 @@ will start to track new ops received afterwards.";
     store->compact();
     auto end = ceph::coarse_mono_clock::now();
     double duration = std::chrono::duration<double>(end-start).count();
-    dout(1) << "finished manual compaction in " 
+    dout(1) << "finished manual compaction in "
             << duration
             << " seconds" << dendl;
     f->open_object_section("compact_result");
@@ -3459,8 +3572,6 @@ int OSD::init()
 
   check_osdmap_features();
 
-  create_recoverystate_perf();
-
   {
     epoch_t bind_epoch = osdmap->get_epoch();
     service.set_epochs(NULL, NULL, &bind_epoch);
@@ -3479,7 +3590,10 @@ int OSD::init()
 
   dout(2) << "superblock: I am osd." << superblock.whoami << dendl;
 
-  create_logger();
+  if (cct->_conf.get_val<bool>("osd_compact_on_start")) {
+    dout(2) << "compacting object store's omap" << dendl;
+    store->compact();
+  }
 
   // prime osd stats
   {
@@ -3490,7 +3604,7 @@ int OSD::init()
     service.set_statfs(stbuf, alerts);
   }
 
-  // client_messenger auth_client is already set up by monc.
+  // client_messenger's auth_client will be set up by monc->init() later.
   for (auto m : { cluster_messenger,
        objecter_messenger,
        hb_front_client_messenger,
@@ -3513,7 +3627,7 @@ int OSD::init()
   if (r < 0)
     goto out;
 
-  mgrc.set_pgstats_cb([this](){ return collect_pg_stats(); });
+  mgrc.set_pgstats_cb([this]() { return collect_pg_stats(); });
   mgrc.set_perf_metric_query_cb(
     [this](const ConfigPayload &config_payload) {
         set_perf_queries(config_payload);
@@ -3705,9 +3819,9 @@ void OSD::final_init()
                                     asok_hook,
                                     "dump op priority queue state");
   ceph_assert(r == 0);
-  r = admin_socket->register_command("dump_blacklist",
+  r = admin_socket->register_command("dump_blocklist",
                                     asok_hook,
-                                    "dump blacklisted clients and times");
+                                    "dump blocklisted clients and times");
   ceph_assert(r == 0);
   r = admin_socket->register_command("dump_watchers",
                                     asok_hook,
@@ -4022,26 +4136,26 @@ void OSD::final_init()
   ceph_assert(r == 0);
 }
 
-void OSD::create_logger()
+PerfCounters* OSD::create_logger()
 {
-  dout(10) << "create_logger" << dendl;
-
-  logger = build_osd_logger(cct);
+  PerfCounters* logger = build_osd_logger(cct);
   cct->get_perfcounters_collection()->add(logger);
+  return logger;
 }
 
-void OSD::create_recoverystate_perf()
+PerfCounters* OSD::create_recoverystate_perf()
 {
-  dout(10) << "create_recoverystate_perf" << dendl;
-
-  recoverystate_perf = build_recoverystate_perf(cct);
+  PerfCounters* recoverystate_perf = build_recoverystate_perf(cct);
   cct->get_perfcounters_collection()->add(recoverystate_perf);
+  return recoverystate_perf;
 }
 
 int OSD::shutdown()
 {
   if (cct->_conf->osd_fast_shutdown) {
     derr << "*** Immediate shutdown (osd_fast_shutdown=true) ***" << dendl;
+    if (cct->_conf->osd_fast_shutdown_notify_mon)
+      service.prepare_to_stop();
     cct->_log->flush();
     _exit(0);
   }
@@ -4505,7 +4619,7 @@ PG* OSD::_make_pg(
     }
     decode(ec_profile, p);
   }
-  PGPool pool(cct, createmap, pgid.pool(), pi, name);
+  PGPool pool(createmap, pgid.pool(), pi, name);
   PG *pg;
   if (pi.type == pg_pool_t::TYPE_REPLICATED ||
       pi.type == pg_pool_t::TYPE_ERASURE)
@@ -5680,6 +5794,11 @@ void OSD::heartbeat()
        i != heartbeat_peers.end();
        ++i) {
     int peer = i->first;
+    Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
+    if (!s) {
+      dout(30) << "heartbeat osd." << peer << " has no open con" << dendl;
+      continue;
+    }
     dout(30) << "heartbeat sending ping to osd." << peer << dendl;
 
     i->second.last_tx = now;
@@ -5690,7 +5809,6 @@ void OSD::heartbeat()
     if (i->second.hb_interval_start == utime_t())
       i->second.hb_interval_start = now;
 
-    Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
     std::optional<ceph::signedspan> delta_ub;
     s->stamps->sent_ping(&delta_ub);
 
@@ -5838,9 +5956,9 @@ void OSD::tick_without_osd_lock()
   ceph_assert(ceph_mutex_is_locked(tick_timer_lock));
   dout(10) << "tick_without_osd_lock" << dendl;
 
-  logger->set(l_osd_cached_crc, buffer::get_cached_crc());
-  logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
-  logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+  logger->set(l_osd_cached_crc, ceph::buffer::get_cached_crc());
+  logger->set(l_osd_cached_crc_adjusted, ceph::buffer::get_cached_crc_adjusted());
+  logger->set(l_osd_missed_crc, ceph::buffer::get_missed_crc());
 
   // refresh osd stats
   struct store_statfs_t stbuf;
@@ -5893,7 +6011,7 @@ void OSD::tick_without_osd_lock()
       // borrow lec lock to pretect last_sent_beacon from changing
       std::lock_guard l{min_last_epoch_clean_lock};
       const auto elapsed = now - last_sent_beacon;
-      if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
+      if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() >
         cct->_conf->osd_beacon_report_interval) {
         need_send_beacon = true;
       }
@@ -6218,12 +6336,12 @@ bool OSD::ms_handle_refused(Connection *con)
   return true;
 }
 
-struct C_OSD_GetVersion : public Context {
+struct CB_OSD_GetVersion {
   OSD *osd;
-  uint64_t oldest, newest;
-  explicit C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
-  void finish(int r) override {
-    if (r >= 0)
+  explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
+  void operator ()(boost::system::error_code ec, version_t newest,
+                  version_t oldest) {
+    if (!ec)
       osd->_got_mon_epochs(oldest, newest);
   }
 };
@@ -6243,8 +6361,7 @@ void OSD::start_boot()
   set_state(STATE_PREBOOT);
   dout(10) << "start_boot - have maps " << superblock.oldest_map
           << ".." << superblock.newest_map << dendl;
-  C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
-  monc->get_version("osdmap", &c->newest, &c->oldest, c);
+  monc->get_version("osdmap", CB_OSD_GetVersion(this));
 }
 
 void OSD::_got_mon_epochs(epoch_t oldest, epoch_t newest)
@@ -6284,9 +6401,6 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest)
   } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     derr << "osdmap SORTBITWISE OSDMap flag is NOT set; please set it"
         << dendl;
-  } else if (osdmap->require_osd_release < ceph_release_t::luminous) {
-    derr << "osdmap require_osd_release < luminous; please upgrade to luminous"
-        << dendl;
   } else if (service.need_fullness_update()) {
     derr << "osdmap fullness state needs update" << dendl;
     send_full_update();
@@ -6653,7 +6767,7 @@ void OSD::got_full_map(epoch_t e)
     requested_full_first = requested_full_last = 0;
     return;
   }
-  
+
   requested_full_first = e + 1;
 
   dout(10) << __func__ << " " << e << ", requested " << requested_full_first
@@ -6732,7 +6846,8 @@ void OSD::send_beacon(const ceph::coarse_mono_clock::time_point& now)
       std::lock_guard l{min_last_epoch_clean_lock};
       beacon = new MOSDBeacon(get_osdmap_epoch(),
                              min_last_epoch_clean,
-                             superblock.last_purged_snaps_scrub);
+                             superblock.last_purged_snaps_scrub,
+                             cct->_conf->osd_beacon_report_interval);
       beacon->pgs = min_last_epoch_clean_pgs;
       last_sent_beacon = now;
     }
@@ -6999,6 +7114,12 @@ void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRe
 
 void OSD::ms_fast_dispatch(Message *m)
 {
+
+#ifdef HAVE_JAEGER
+  jaeger_tracing::init_tracer("osd-services-reinit");
+  dout(10) << "jaeger tracer after " << opentracing::Tracer::Global() << dendl;
+  auto dispatch_span = jaeger_tracing::new_span(__func__);
+#endif
   FUNCTRACE(cct);
   if (service.is_stopping()) {
     m->put();
@@ -7059,7 +7180,13 @@ void OSD::ms_fast_dispatch(Message *m)
     tracepoint(osd, ms_fast_dispatch, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
   }
-
+#ifdef HAVE_JAEGER
+  op->set_osd_parent_span(dispatch_span);
+  if (op->osd_parent_span) {
+    auto op_req_span = jaeger_tracing::child_span("op-request-created", op->osd_parent_span);
+    op->set_osd_parent_span(op_req_span);
+  }
+#endif
   if (m->trace)
     op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
 
@@ -7091,7 +7218,7 @@ void OSD::ms_fast_dispatch(Message *m)
       service.release_map(nextmap);
     }
   }
-  OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); 
+  OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
 }
 
 int OSD::ms_handle_authentication(Connection *con)
@@ -7120,7 +7247,7 @@ int OSD::ms_handle_authentication(Connection *con)
     try {
       decode(str, p);
     }
-    catch (buffer::error& e) {
+    catch (ceph::buffer::error& e) {
       dout(10) << __func__ << " session " << s << " " << s->entity_name
               << " failed to decode caps string" << dendl;
       ret = -EACCES;
@@ -7328,6 +7455,23 @@ bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs)
   return pgid < rhs.pgid;
 }
 
+void OSDService::dumps_scrub(ceph::Formatter *f)
+{
+  ceph_assert(f != nullptr);
+  std::lock_guard l(sched_scrub_lock);
+
+  f->open_array_section("scrubs");
+  for (const auto &i: sched_scrub_pg) {
+    f->open_object_section("scrub");
+    f->dump_stream("pgid") << i.pgid;
+    f->dump_stream("sched_time") << i.sched_time;
+    f->dump_stream("deadline") << i.deadline;
+    f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp());
+    f->close_section();
+  }
+  f->close_section();
+}
+
 double OSD::scrub_sleep_time(bool must_scrub)
 {
   if (must_scrub) {
@@ -7376,14 +7520,14 @@ bool OSD::scrub_time_permit(utime_t now)
       time_permit = true;
     }
   }
-  if (!time_permit) {
+  if (time_permit) {
     dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
             << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = no" << dendl;
+            << " now " << bdt.tm_hour << " = yes" << dendl;
   } else {
     dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
             << " - " << cct->_conf->osd_scrub_end_hour
-            << " now " << bdt.tm_hour << " = yes" << dendl;
+            << " now " << bdt.tm_hour << " = no" << dendl;
   }
   return time_permit;
 }
@@ -7425,14 +7569,17 @@ bool OSD::scrub_load_below_threshold()
 
 void OSD::sched_scrub()
 {
+  dout(20) << __func__ << " sched_scrub starts" << dendl;
+
   // if not permitted, fail fast
   if (!service.can_inc_scrubs()) {
+    dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
     return;
   }
   bool allow_requested_repair_only = false;
   if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
     if (!cct->_conf->osd_repair_during_recovery) {
-      dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
+      dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl;
       return;
     }
     dout(10) << __func__
@@ -7446,57 +7593,62 @@ void OSD::sched_scrub()
   bool load_is_low = scrub_load_below_threshold();
   dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
 
-  OSDService::ScrubJob scrub;
-  if (service.first_scrub_stamp(&scrub)) {
+  OSDService::ScrubJob scrub_job;
+  if (service.first_scrub_stamp(&scrub_job)) {
     do {
-      dout(30) << "sched_scrub examine " << scrub.pgid << " at " << scrub.sched_time << dendl;
+      dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl;
 
-      if (scrub.sched_time > now) {
+      if (scrub_job.sched_time > now) {
        // save ourselves some effort
-       dout(10) << "sched_scrub " << scrub.pgid << " scheduled at " << scrub.sched_time
+       dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time
                 << " > " << now << dendl;
        break;
       }
 
-      if ((scrub.deadline.is_zero() || scrub.deadline >= now) && !(time_permit && load_is_low)) {
-        dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
+      if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) {
+        dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to "
                  << (!time_permit ? "time not permit" : "high load") << dendl;
         continue;
       }
 
-      PGRef pg = _lookup_lock_pg(scrub.pgid);
-      if (!pg)
+      PGRef pg = _lookup_lock_pg(scrub_job.pgid);
+      if (!pg) {
+       dout(20) << __func__ << " pg  " << scrub_job.pgid << " not found" << dendl;
        continue;
+      }
+
       // This has already started, so go on to the next scrub job
-      if (pg->scrubber.active) {
+      if (pg->is_scrub_active()) {
        pg->unlock();
-       dout(30) << __func__ << ": already in progress pgid " << scrub.pgid << dendl;
+       dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl;
        continue;
       }
-      // Skip other kinds of scrubing if only explicitly requested repairing is allowed
-      if (allow_requested_repair_only && !pg->scrubber.must_repair) {
+      // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+      if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
         pg->unlock();
-        dout(10) << __func__ << " skip " << scrub.pgid
+        dout(10) << __func__ << " skip " << scrub_job.pgid
                  << " because repairing is not explicitly requested on it"
                  << dendl;
         continue;
       }
+
       // If it is reserving, let it resolve before going to the next scrub job
-      if (pg->scrubber.local_reserved && !pg->scrubber.active) {
+      if (pg->m_scrubber->is_reserving()) {
        pg->unlock();
-       dout(30) << __func__ << ": reserve in progress pgid " << scrub.pgid << dendl;
+       dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl;
        break;
       }
-      dout(10) << "sched_scrub scrubbing " << scrub.pgid << " at " << scrub.sched_time
+      dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time
               << (pg->get_must_scrub() ? ", explicitly requested" :
                   (load_is_low ? ", load_is_low" : " deadline < now"))
               << dendl;
       if (pg->sched_scrub()) {
        pg->unlock();
+        dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl;
        break;
       }
       pg->unlock();
-    } while (service.next_scrub_stamp(scrub, &scrub));
+    } while (service.next_scrub_stamp(scrub_job, &scrub_job));
   }
   dout(20) << "sched_scrub done" << dendl;
 }
@@ -7504,20 +7656,20 @@ void OSD::sched_scrub()
 void OSD::resched_all_scrubs()
 {
   dout(10) << __func__ << ": start" << dendl;
-  OSDService::ScrubJob scrub;
-  if (service.first_scrub_stamp(&scrub)) {
+  OSDService::ScrubJob scrub_job;
+  if (service.first_scrub_stamp(&scrub_job)) {
     do {
-      dout(20) << __func__ << ": examine " << scrub.pgid << dendl;
+      dout(20) << __func__ << ": examine " << scrub_job.pgid << dendl;
 
-      PGRef pg = _lookup_lock_pg(scrub.pgid);
+      PGRef pg = _lookup_lock_pg(scrub_job.pgid);
       if (!pg)
        continue;
-      if (!pg->scrubber.must_scrub && !pg->scrubber.need_auto) {
-        dout(20) << __func__ << ": reschedule " << scrub.pgid << dendl;
+      if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+        dout(15) << __func__ << ": reschedule " << scrub_job.pgid << dendl;
         pg->on_info_history_change();
       }
       pg->unlock();
-    } while (service.next_scrub_stamp(scrub, &scrub));
+    } while (service.next_scrub_stamp(scrub_job, &scrub_job));
   }
   dout(10) << __func__ << ": done" << dendl;
 }
@@ -7551,7 +7703,7 @@ MPGStats* OSD::collect_pg_stats()
     }
     pg->get_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
        m->pg_stat[pg->pg_id.pgid] = s;
-       min_last_epoch_clean = min(min_last_epoch_clean, lec);
+       min_last_epoch_clean = std::min(min_last_epoch_clean, lec);
        min_last_epoch_clean_pgs.push_back(pg->pg_id.pgid);
       });
   }
@@ -8088,7 +8240,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     OSDMapRef newmap = get_map(cur);
     ceph_assert(newmap);  // we just cached it above!
 
-    // start blacklisting messages sent to peers that go down.
+    // start blocklisting messages sent to peers that go down.
     service.pre_publish_map(newmap);
 
     // kill connections to newly down osds
@@ -8251,7 +8403,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        set<int> avoid_ports;
 #if defined(__FreeBSD__)
         // prevent FreeBSD from grabbing the client_messenger port during
-        // rebinding. In which case a cluster_meesneger will connect also 
+        // rebinding. In which case a cluster_meesneger will connect also
        // to the same port
        client_messenger->get_myaddrs().get_ports(&avoid_ports);
 #endif
@@ -8444,7 +8596,6 @@ bool OSD::advance_pg(
   }
   ceph_assert(pg->is_locked());
   OSDMapRef lastmap = pg->get_osdmap();
-  ceph_assert(lastmap->get_epoch() < osd_epoch);
   set<PGRef> new_pgs;  // any split children
   bool ret = true;
 
@@ -9476,7 +9627,7 @@ void OSD::do_recovery(
       // This is true for the first recovery op and when the previous recovery op
       // has been scheduled in the past. The next recovery op is scheduled after
       // completing the sleep from now.
-      
+
       if (auto now = ceph::real_clock::now();
          service.recovery_schedule_time < now) {
         service.recovery_schedule_time = now;
@@ -9506,7 +9657,7 @@ void OSD::do_recovery(
 #endif
 
     bool do_unfound = pg->start_recovery_ops(reserved_pushes, handle, &started);
-    dout(10) << "do_recovery started " << started << "/" << reserved_pushes 
+    dout(10) << "do_recovery started " << started << "/" << reserved_pushes
             << " on " << *pg << dendl;
 
     if (do_unfound) {
@@ -9599,8 +9750,10 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
   const unsigned priority = op->get_req()->get_priority();
   const int cost = op->get_req()->get_cost();
   const uint64_t owner = op->get_req()->get_source().num();
+  const int type = op->get_req()->get_type();
 
   dout(15) << "enqueue_op " << op << " prio " << priority
+           << " type " << type
           << " cost " << cost
           << " latency " << latency
           << " epoch " << epoch
@@ -9608,12 +9761,32 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
   op->osd_trace.event("enqueue op");
   op->osd_trace.keyval("priority", priority);
   op->osd_trace.keyval("cost", cost);
+#ifdef HAVE_JAEGER
+  if (op->osd_parent_span) {
+    auto enqueue_span = jaeger_tracing::child_span(__func__, op->osd_parent_span);
+    enqueue_span->Log({
+       {"priority", priority},
+       {"cost", cost},
+       {"epoch", epoch},
+       {"owner", owner},
+       {"type", type}
+       });
+  }
+#endif
   op->mark_queued_for_pg();
   logger->tinc(l_osd_op_before_queue_op_lat, latency);
-  op_shardedwq.queue(
-    OpSchedulerItem(
-      unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
-      cost, priority, stamp, owner, epoch));
+  if (type == MSG_OSD_PG_PUSH ||
+      type == MSG_OSD_PG_PUSH_REPLY) {
+    op_shardedwq.queue(
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecoveryMsg(pg, std::move(op))),
+        cost, priority, stamp, owner, epoch));
+  } else {
+    op_shardedwq.queue(
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
+        cost, priority, stamp, owner, epoch));
+  }
 }
 
 void OSD::enqueue_peering_evt(spg_t pgid, PGPeeringEventRef evt)
@@ -9743,6 +9916,13 @@ const char** OSD::get_tracked_conf_keys() const
     "osd_map_cache_size",
     "osd_pg_epoch_max_lag_factor",
     "osd_pg_epoch_persisted_max_stale",
+    "osd_recovery_sleep",
+    "osd_recovery_sleep_hdd",
+    "osd_recovery_sleep_ssd",
+    "osd_recovery_sleep_hybrid",
+    "osd_recovery_max_active",
+    "osd_recovery_max_active_hdd",
+    "osd_recovery_max_active_ssd",
     // clog & admin clog
     "clog_to_monitors",
     "clog_to_syslog",
@@ -9771,9 +9951,67 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
                             const std::set <std::string> &changed)
 {
   std::lock_guard l{osd_lock};
-  if (changed.count("osd_max_backfills")) {
-    service.local_reserver.set_max(cct->_conf->osd_max_backfills);
-    service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
+
+  if (changed.count("osd_max_backfills") ||
+      changed.count("osd_delete_sleep") ||
+      changed.count("osd_delete_sleep_hdd") ||
+      changed.count("osd_delete_sleep_ssd") ||
+      changed.count("osd_delete_sleep_hybrid") ||
+      changed.count("osd_snap_trim_sleep") ||
+      changed.count("osd_snap_trim_sleep_hdd") ||
+      changed.count("osd_snap_trim_sleep_ssd") ||
+      changed.count("osd_snap_trim_sleep_hybrid") ||
+      changed.count("osd_scrub_sleep") ||
+      changed.count("osd_recovery_sleep") ||
+      changed.count("osd_recovery_sleep_hdd") ||
+      changed.count("osd_recovery_sleep_ssd") ||
+      changed.count("osd_recovery_sleep_hybrid") ||
+      changed.count("osd_recovery_max_active") ||
+      changed.count("osd_recovery_max_active_hdd") ||
+      changed.count("osd_recovery_max_active_ssd")) {
+    if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+        cct->_conf.get_val<std::string>("osd_mclock_profile") != "custom") {
+      // Set ceph config option to meet QoS goals
+      // Set high value for recovery max active
+      uint32_t recovery_max_active = 1000;
+      if (cct->_conf->osd_recovery_max_active) {
+        cct->_conf.set_val(
+            "osd_recovery_max_active", std::to_string(recovery_max_active));
+      }
+      if (store_is_rotational) {
+        cct->_conf.set_val(
+            "osd_recovery_max_active_hdd", std::to_string(recovery_max_active));
+      } else {
+        cct->_conf.set_val(
+            "osd_recovery_max_active_ssd", std::to_string(recovery_max_active));
+      }
+      // Set high value for osd_max_backfill
+      cct->_conf.set_val("osd_max_backfills", std::to_string(1000));
+
+      // Disable recovery sleep
+      cct->_conf.set_val("osd_recovery_sleep", std::to_string(0));
+      cct->_conf.set_val("osd_recovery_sleep_hdd", std::to_string(0));
+      cct->_conf.set_val("osd_recovery_sleep_ssd", std::to_string(0));
+      cct->_conf.set_val("osd_recovery_sleep_hybrid", std::to_string(0));
+
+      // Disable delete sleep
+      cct->_conf.set_val("osd_delete_sleep", std::to_string(0));
+      cct->_conf.set_val("osd_delete_sleep_hdd", std::to_string(0));
+      cct->_conf.set_val("osd_delete_sleep_ssd", std::to_string(0));
+      cct->_conf.set_val("osd_delete_sleep_hybrid", std::to_string(0));
+
+      // Disable snap trim sleep
+      cct->_conf.set_val("osd_snap_trim_sleep", std::to_string(0));
+      cct->_conf.set_val("osd_snap_trim_sleep_hdd", std::to_string(0));
+      cct->_conf.set_val("osd_snap_trim_sleep_ssd", std::to_string(0));
+      cct->_conf.set_val("osd_snap_trim_sleep_hybrid", std::to_string(0));
+
+      // Disable scrub sleep
+      cct->_conf.set_val("osd_scrub_sleep", std::to_string(0));
+    } else {
+      service.local_reserver.set_max(cct->_conf->osd_max_backfills);
+      service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
+    }
   }
   if (changed.count("osd_min_recovery_priority")) {
     service.local_reserver.set_min_priority(cct->_conf->osd_min_recovery_priority);
@@ -9858,6 +10096,10 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
     dout(0) << __func__ << ": scrub interval change" << dendl;
   }
   check_config();
+  if (changed.count("osd_asio_thread_count")) {
+    service.poolctx.stop();
+    service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+  }
 }
 
 void OSD::update_log_config()
@@ -9892,7 +10134,7 @@ void OSD::check_config()
                 << cct->_conf->osd_pg_epoch_persisted_max_stale << ")";
   }
   if (cct->_conf->osd_object_clean_region_max_num_intervals < 0) {
-    clog->warn() << "osd_object_clean_region_max_num_intervals (" 
+    clog->warn() << "osd_object_clean_region_max_num_intervals ("
                  << cct->_conf->osd_object_clean_region_max_num_intervals
                 << ") is < 0";
   }
@@ -9904,9 +10146,8 @@ void OSD::get_latest_osdmap()
 {
   dout(10) << __func__ << " -- start" << dendl;
 
-  C_SaferCond cond;
-  service.objecter->wait_for_latest_osdmap(&cond);
-  cond.wait();
+  boost::system::error_code ec;
+  service.objecter->wait_for_latest_osdmap(ceph::async::use_blocked[ec]);
 
   dout(10) << __func__ << " -- finish" << dendl;
 }
@@ -10374,7 +10615,8 @@ OSDShard::OSDShard(
     osdmap_lock{make_mutex(shard_name + "::osdmap_lock")},
     shard_lock_name(shard_name + "::shard_lock"),
     shard_lock{make_mutex(shard_lock_name)},
-    scheduler(ceph::osd::scheduler::make_scheduler(cct)),
+    scheduler(ceph::osd::scheduler::make_scheduler(
+      cct, osd->num_shards, osd->store->is_rotational())),
     context_queue(sdata_wait_lock, sdata_cond)
 {
   dout(0) << "using op scheduler " << *scheduler << dendl;
@@ -10459,21 +10701,54 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     sdata->context_queue.move_to(oncommits);
   }
 
-  if (sdata->scheduler->empty()) {
+  WorkItem work_item;
+  while (!std::get_if<OpSchedulerItem>(&work_item)) {
+    if (sdata->scheduler->empty()) {
+      if (osd->is_stopping()) {
+        sdata->shard_lock.unlock();
+        for (auto c : oncommits) {
+          dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+          delete c;
+        }
+        return;    // OSD shutdown, discard.
+      }
+      sdata->shard_lock.unlock();
+      handle_oncommits(oncommits);
+      return;
+    }
+
+    work_item = sdata->scheduler->dequeue();
     if (osd->is_stopping()) {
       sdata->shard_lock.unlock();
       for (auto c : oncommits) {
-       dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
-       delete c;
+        dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+        delete c;
       }
       return;    // OSD shutdown, discard.
     }
-    sdata->shard_lock.unlock();
-    handle_oncommits(oncommits);
-    return;
-  }
 
-  OpSchedulerItem item = sdata->scheduler->dequeue();
+    // If the work item is scheduled in the future, wait until
+    // the time returned in the dequeue response before retrying.
+    if (auto when_ready = std::get_if<double>(&work_item)) {
+      if (is_smallest_thread_index) {
+        sdata->shard_lock.unlock();
+        handle_oncommits(oncommits);
+        return;
+      }
+      std::unique_lock wait_lock{sdata->sdata_wait_lock};
+      auto future_time = ceph::real_clock::from_double(*when_ready);
+      dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+      sdata->shard_lock.unlock();
+      ++sdata->waiting_threads;
+      sdata->sdata_cond.wait_until(wait_lock, future_time);
+      --sdata->waiting_threads;
+      wait_lock.unlock();
+      sdata->shard_lock.lock();
+    }
+  } // while
+
+  // Access the stored item
+  auto item = std::move(std::get<OpSchedulerItem>(work_item));
   if (osd->is_stopping()) {
     sdata->shard_lock.unlock();
     for (auto c : oncommits) {
@@ -10722,9 +10997,13 @@ void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
     sdata->scheduler->enqueue(std::move(item));
   }
 
-  if (empty) {
+  {
     std::lock_guard l{sdata->sdata_wait_lock};
-    sdata->sdata_cond.notify_all();
+    if (empty) {
+      sdata->sdata_cond.notify_all();
+    } else if (sdata->waiting_threads) {
+      sdata->sdata_cond.notify_one();
+    }
   }
 }
 
@@ -10756,8 +11035,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
   sdata->sdata_cond.notify_one();
 }
 
-namespace ceph { 
-namespace osd_cmds { 
+namespace ceph::osd_cmds {
 
 int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
         std::ostream& os)
@@ -10766,13 +11044,13 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
         os << "could not issue heap profiler command -- not using tcmalloc!";
         return -EOPNOTSUPP;
   }
-  
+
   string cmd;
   if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
         os << "unable to get value for command \"" << cmd << "\"";
        return -EINVAL;
   }
-  
+
   std::vector<std::string> cmd_vec;
   get_str_vec(cmd, cmd_vec);
 
@@ -10780,10 +11058,10 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
   if (cmd_getval(cmdmap, "value", val)) {
     cmd_vec.push_back(val);
   }
-  
+
   ceph_heap_profiler_handle_command(cmd_vec, os);
-  
+
   return 0;
 }
-}} // namespace ceph::osd_cmds
+
+} // namespace ceph::osd_cmds