]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / osd / OSD.h
index bfa511e8b077df2d9864dae429eba84946a5b954..30d0b0b4aef07a78d284fec2a782fc074bdca1c0 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 /*
  * Ceph - scalable distributed file system
@@ -7,9 +7,9 @@
  *
  * This is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
+ * License version 2.1, as published by the Free Software
  * Foundation.  See file COPYING.
- * 
+ *
  */
 
 #ifndef CEPH_OSD_H
 
 #include "msg/Dispatcher.h"
 
-#include "common/Mutex.h"
-#include "common/RWLock.h"
+#include "common/async/context_pool.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "common/AsyncReserver.h"
 #include "common/ceph_context.h"
+#include "common/config_cacher.h"
 #include "common/zipkin_trace.h"
+#include "common/ceph_timer.h"
 
 #include "mgr/MgrClient.h"
 
 #include "os/ObjectStore.h"
-#include "OSDCap.h" 
-#include "auth/KeyRing.h"
-#include "osd/ClassHandler.h"
 
 #include "include/CompatSet.h"
+#include "include/common_fwd.h"
 
 #include "OpRequest.h"
 #include "Session.h"
 
-#include "osd/PGQueueable.h"
+#include "osd/scheduler/OpScheduler.h"
 
 #include <atomic>
 #include <map>
 #include <memory>
-#include "include/memory.h"
-using namespace std;
+#include <string>
 
 #include "include/unordered_map.h"
 
 #include "common/shared_cache.hpp"
 #include "common/simple_cache.hpp"
-#include "common/sharedptr_registry.hpp"
-#include "common/WeightedPriorityQueue.h"
-#include "common/PrioritizedQueue.h"
-#include "osd/mClockOpClassQueue.h"
-#include "osd/mClockClientQueue.h"
 #include "messages/MOSDOp.h"
-#include "include/Spinlock.h"
 #include "common/EventTrace.h"
+#include "osd/osd_perf_counters.h"
+#include "common/Finisher.h"
+#include "scrubber/osd_scrub_sched.h"
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
+/*
 
-enum {
-  l_osd_first = 10000,
-  l_osd_op_wip,
-  l_osd_op,
-  l_osd_op_inb,
-  l_osd_op_outb,
-  l_osd_op_lat,
-  l_osd_op_process_lat,
-  l_osd_op_prepare_lat,
-  l_osd_op_r,
-  l_osd_op_r_outb,
-  l_osd_op_r_lat,
-  l_osd_op_r_lat_outb_hist,
-  l_osd_op_r_process_lat,
-  l_osd_op_r_prepare_lat,
-  l_osd_op_w,
-  l_osd_op_w_inb,
-  l_osd_op_w_lat,
-  l_osd_op_w_lat_inb_hist,
-  l_osd_op_w_process_lat,
-  l_osd_op_w_prepare_lat,
-  l_osd_op_rw,
-  l_osd_op_rw_inb,
-  l_osd_op_rw_outb,
-  l_osd_op_rw_lat,
-  l_osd_op_rw_lat_inb_hist,
-  l_osd_op_rw_lat_outb_hist,
-  l_osd_op_rw_process_lat,
-  l_osd_op_rw_prepare_lat,
-
-  l_osd_op_before_queue_op_lat,
-  l_osd_op_before_dequeue_op_lat,
-
-  l_osd_sop,
-  l_osd_sop_inb,
-  l_osd_sop_lat,
-  l_osd_sop_w,
-  l_osd_sop_w_inb,
-  l_osd_sop_w_lat,
-  l_osd_sop_pull,
-  l_osd_sop_pull_lat,
-  l_osd_sop_push,
-  l_osd_sop_push_inb,
-  l_osd_sop_push_lat,
-
-  l_osd_pull,
-  l_osd_push,
-  l_osd_push_outb,
-
-  l_osd_rop,
-
-  l_osd_loadavg,
-  l_osd_buf,
-  l_osd_history_alloc_bytes,
-  l_osd_history_alloc_num,
-  l_osd_cached_crc,
-  l_osd_cached_crc_adjusted,
-  l_osd_missed_crc,
-
-  l_osd_pg,
-  l_osd_pg_primary,
-  l_osd_pg_replica,
-  l_osd_pg_stray,
-  l_osd_hb_to,
-  l_osd_map,
-  l_osd_mape,
-  l_osd_mape_dup,
-
-  l_osd_waiting_for_map,
-
-  l_osd_map_cache_hit,
-  l_osd_map_cache_miss,
-  l_osd_map_cache_miss_low,
-  l_osd_map_cache_miss_low_avg,
-  l_osd_map_bl_cache_hit,
-  l_osd_map_bl_cache_miss,
-
-  l_osd_stat_bytes,
-  l_osd_stat_bytes_used,
-  l_osd_stat_bytes_avail,
-
-  l_osd_copyfrom,
-
-  l_osd_tier_promote,
-  l_osd_tier_flush,
-  l_osd_tier_flush_fail,
-  l_osd_tier_try_flush,
-  l_osd_tier_try_flush_fail,
-  l_osd_tier_evict,
-  l_osd_tier_whiteout,
-  l_osd_tier_dirty,
-  l_osd_tier_clean,
-  l_osd_tier_delay,
-  l_osd_tier_proxy_read,
-  l_osd_tier_proxy_write,
-
-  l_osd_agent_wake,
-  l_osd_agent_skip,
-  l_osd_agent_flush,
-  l_osd_agent_evict,
-
-  l_osd_object_ctx_cache_hit,
-  l_osd_object_ctx_cache_total,
-
-  l_osd_op_cache_hit,
-  l_osd_tier_flush_lat,
-  l_osd_tier_promote_lat,
-  l_osd_tier_r_lat,
-
-  l_osd_pg_info,
-  l_osd_pg_fastinfo,
-  l_osd_pg_biginfo,
-
-  l_osd_last,
-};
+  lock ordering for pg map
 
-// RecoveryState perf counters
-enum {
-  rs_first = 20000,
-  rs_initial_latency,
-  rs_started_latency,
-  rs_reset_latency,
-  rs_start_latency,
-  rs_primary_latency,
-  rs_peering_latency,
-  rs_backfilling_latency,
-  rs_waitremotebackfillreserved_latency,
-  rs_waitlocalbackfillreserved_latency,
-  rs_notbackfilling_latency,
-  rs_repnotrecovering_latency,
-  rs_repwaitrecoveryreserved_latency,
-  rs_repwaitbackfillreserved_latency,
-  rs_reprecovering_latency,
-  rs_activating_latency,
-  rs_waitlocalrecoveryreserved_latency,
-  rs_waitremoterecoveryreserved_latency,
-  rs_recovering_latency,
-  rs_recovered_latency,
-  rs_clean_latency,
-  rs_active_latency,
-  rs_replicaactive_latency,
-  rs_stray_latency,
-  rs_getinfo_latency,
-  rs_getlog_latency,
-  rs_waitactingchange_latency,
-  rs_incomplete_latency,
-  rs_down_latency,
-  rs_getmissing_latency,
-  rs_waitupthru_latency,
-  rs_notrecovering_latency,
-  rs_last,
-};
+    PG::lock
+      ShardData::lock
+        OSD::pg_map_lock
+
+  */
 
 class Messenger;
 class Message;
 class MonClient;
-class PerfCounters;
 class ObjectStore;
 class FuseStore;
 class OSDMap;
 class MLog;
 class Objecter;
+class KeyStore;
 
 class Watch;
 class PrimaryLogPG;
 
-class AuthAuthorizeHandlerRegistry;
-
 class TestOpsSocketHook;
-struct C_CompleteSplits;
+struct C_FinishSplits;
 struct C_OpenPGs;
 class LogChannel;
-class CephContext;
-typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
-class MOSDOp;
-
-class DeletingState {
-  Mutex lock;
-  Cond cond;
-  enum {
-    QUEUED,
-    CLEARING_DIR,
-    CLEARING_WAITING,
-    DELETING_DIR,
-    DELETED_DIR,
-    CANCELED,
-  } status;
-  bool stop_deleting;
-public:
-  const spg_t pgid;
-  const PGRef old_pg_state;
-  explicit DeletingState(const pair<spg_t, PGRef> &in) :
-    lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
-    pgid(in.first), old_pg_state(in.second) {
-    }
 
-  /// transition status to CLEARING_WAITING
-  bool pause_clearing() {
-    Mutex::Locker l(lock);
-    assert(status == CLEARING_DIR);
-    if (stop_deleting) {
-      status = CANCELED;
-      cond.Signal();
-      return false;
-    }
-    status = CLEARING_WAITING;
-    return true;
-  } ///< @return false if we should cancel deletion
-
-  /// start or resume the clearing - transition the status to CLEARING_DIR
-  bool start_or_resume_clearing() {
-    Mutex::Locker l(lock);
-    assert(
-      status == QUEUED ||
-      status == DELETED_DIR ||
-      status == CLEARING_WAITING);
-    if (stop_deleting) {
-      status = CANCELED;
-      cond.Signal();
-      return false;
-    }
-    status = CLEARING_DIR;
-    return true;
-  } ///< @return false if we should cancel the deletion
-
-  /// transition status to CLEARING_DIR
-  bool resume_clearing() {
-    Mutex::Locker l(lock);
-    assert(status == CLEARING_WAITING);
-    if (stop_deleting) {
-      status = CANCELED;
-      cond.Signal();
-      return false;
-    }
-    status = CLEARING_DIR;
-    return true;
-  } ///< @return false if we should cancel deletion
-
-  /// transition status to deleting
-  bool start_deleting() {
-    Mutex::Locker l(lock);
-    assert(status == CLEARING_DIR);
-    if (stop_deleting) {
-      status = CANCELED;
-      cond.Signal();
-      return false;
-    }
-    status = DELETING_DIR;
-    return true;
-  } ///< @return false if we should cancel deletion
-
-  /// signal collection removal queued
-  void finish_deleting() {
-    Mutex::Locker l(lock);
-    assert(status == DELETING_DIR);
-    status = DELETED_DIR;
-    cond.Signal();
-  }
-
-  /// try to halt the deletion
-  bool try_stop_deletion() {
-    Mutex::Locker l(lock);
-    stop_deleting = true;
-    /**
-     * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
-     * operations we have to wait for before continuing on.  States
-     * CLEARING_WAITING and QUEUED indicate that the remover will check
-     * stop_deleting before queueing any further operations.  CANCELED
-     * indicates that the remover has already halted.  DELETED_DIR
-     * indicates that the deletion has been fully queued.
-     */
-    while (status == DELETING_DIR || status == CLEARING_DIR)
-      cond.Wait(lock);
-    return status != DELETED_DIR;
-  } ///< @return true if we don't need to recreate the collection
-};
-typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
+class MOSDPGCreate2;
+class MOSDPGNotify;
+class MOSDPGInfo;
+class MOSDPGRemove;
+class MOSDForceRecovery;
+class MMonGetPurgedSnapsReply;
 
 class OSD;
 
 class OSDService {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
 public:
   OSD *osd;
   CephContext *cct;
-  SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
-  ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
-  SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
+  ObjectStore::CollectionHandle meta_ch;
   const int whoami;
-  ObjectStore *&store;
+  ObjectStore * const store;
   LogClient &log_client;
   LogChannelRef clog;
   PGRecoveryStats &pg_recovery_stats;
@@ -363,74 +112,38 @@ public:
   PerfCounters *&logger;
   PerfCounters *&recoverystate_perf;
   MonClient   *&monc;
-  ThreadPool::BatchWorkQueue<PG> &peering_wq;
-  GenContextWQ recovery_gen_wq;
-  ClassHandler  *&class_handler;
 
-  void enqueue_back(spg_t pgid, PGQueueable qi);
-  void enqueue_front(spg_t pgid, PGQueueable qi);
+  md_config_cacher_t<Option::size_t> osd_max_object_size;
+  md_config_cacher_t<bool> osd_skip_data_digest;
+
+  void enqueue_back(OpSchedulerItem&& qi);
+  void enqueue_front(OpSchedulerItem&& qi);
 
   void maybe_inject_dispatch_delay() {
-    if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
+    if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
       if (rand() % 10000 <
-         g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
+         g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
        utime_t t;
-       t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
+       t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
        t.sleep();
       }
     }
   }
 
-private:
-  // -- map epoch lower bound --
-  Mutex pg_epoch_lock;
-  multiset<epoch_t> pg_epochs;
-  map<spg_t,epoch_t> pg_epoch;
-
-public:
-  void pg_add_epoch(spg_t pgid, epoch_t epoch) {
-    Mutex::Locker l(pg_epoch_lock);
-    map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
-    assert(t == pg_epoch.end());
-    pg_epoch[pgid] = epoch;
-    pg_epochs.insert(epoch);
-  }
-  void pg_update_epoch(spg_t pgid, epoch_t epoch) {
-    Mutex::Locker l(pg_epoch_lock);
-    map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
-    assert(t != pg_epoch.end());
-    pg_epochs.erase(pg_epochs.find(t->second));
-    t->second = epoch;
-    pg_epochs.insert(epoch);
-  }
-  void pg_remove_epoch(spg_t pgid) {
-    Mutex::Locker l(pg_epoch_lock);
-    map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
-    if (t != pg_epoch.end()) {
-      pg_epochs.erase(pg_epochs.find(t->second));
-      pg_epoch.erase(t);
-    }
-  }
-  epoch_t get_min_pg_epoch() {
-    Mutex::Locker l(pg_epoch_lock);
-    if (pg_epochs.empty())
-      return 0;
-    else
-      return *pg_epochs.begin();
-  }
+  ceph::signedspan get_mnow();
 
 private:
   // -- superblock --
-  Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
+  ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
   OSDSuperblock superblock;
 
 public:
   OSDSuperblock get_superblock() {
-    Mutex::Locker l(publish_lock);
+    std::lock_guard l(publish_lock);
     return superblock;
   }
   void publish_superblock(const OSDSuperblock &block) {
-    Mutex::Locker l(publish_lock);
+    std::lock_guard l(publish_lock);
     superblock = block;
   }
 
@@ -442,24 +155,24 @@ private:
 
 public:
   OSDMapRef get_osdmap() {
-    Mutex::Locker l(publish_lock);
+    std::lock_guard l(publish_lock);
     return osdmap;
   }
   epoch_t get_osdmap_epoch() {
-    Mutex::Locker l(publish_lock);
+    std::lock_guard l(publish_lock);
     return osdmap ? osdmap->get_epoch() : 0;
   }
   void publish_map(OSDMapRef map) {
-    Mutex::Locker l(publish_lock);
+    std::lock_guard l(publish_lock);
     osdmap = map;
   }
 
   /*
-   * osdmap - current published map
-   * next_osdmap - pre_published map that is about to be published.
+   * osdmap - current published std::map
+   * next_osdmap - pre_published std::map that is about to be published.
    *
    * We use the next_osdmap to send messages and initiate connections,
-   * but only if the target is the same instance as the one in the map
+   * but only if the target is the same instance as the one in the std::map
    * epoch the current user is working from (i.e., the result is
    * equivalent to what is in next_osdmap).
    *
@@ -470,193 +183,126 @@ public:
    */
 private:
   OSDMapRef next_osdmap;
-  Cond pre_publish_cond;
+  ceph::condition_variable pre_publish_cond;
+  int pre_publish_waiter = 0;
 
 public:
   void pre_publish_map(OSDMapRef map) {
-    Mutex::Locker l(pre_publish_lock);
+    std::lock_guard l(pre_publish_lock);
     next_osdmap = std::move(map);
   }
 
   void activate_map();
   /// map epochs reserved below
-  map<epoch_t, unsigned> map_reservations;
+  std::map<epoch_t, unsigned> map_reservations;
 
   /// gets ref to next_osdmap and registers the epoch as reserved
   OSDMapRef get_nextmap_reserved() {
-    Mutex::Locker l(pre_publish_lock);
-    if (!next_osdmap)
-      return OSDMapRef();
+    std::lock_guard l(pre_publish_lock);
     epoch_t e = next_osdmap->get_epoch();
-    map<epoch_t, unsigned>::iterator i =
-      map_reservations.insert(make_pair(e, 0)).first;
+    std::map<epoch_t, unsigned>::iterator i =
+      map_reservations.insert(std::make_pair(e, 0)).first;
     i->second++;
     return next_osdmap;
   }
   /// releases reservation on map
   void release_map(OSDMapRef osdmap) {
-    Mutex::Locker l(pre_publish_lock);
-    map<epoch_t, unsigned>::iterator i =
+    std::lock_guard l(pre_publish_lock);
+    std::map<epoch_t, unsigned>::iterator i =
       map_reservations.find(osdmap->get_epoch());
-    assert(i != map_reservations.end());
-    assert(i->second > 0);
+    ceph_assert(i != map_reservations.end());
+    ceph_assert(i->second > 0);
     if (--(i->second) == 0) {
       map_reservations.erase(i);
     }
-    pre_publish_cond.Signal();
+    if (pre_publish_waiter) {
+      pre_publish_cond.notify_all();
+    }
   }
   /// blocks until there are no reserved maps prior to next_osdmap
   void await_reserved_maps() {
-    Mutex::Locker l(pre_publish_lock);
-    assert(next_osdmap);
-    while (true) {
-      map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
-      if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
-       break;
-      } else {
-       pre_publish_cond.Wait(pre_publish_lock);
-      }
-    }
+    std::unique_lock l{pre_publish_lock};
+    ceph_assert(next_osdmap);
+    pre_publish_waiter++;
+    pre_publish_cond.wait(l, [this] {
+      auto i = map_reservations.cbegin();
+      return (i == map_reservations.cend() ||
+             i->first >= next_osdmap->get_epoch());
+    });
+    pre_publish_waiter--;
+  }
+  OSDMapRef get_next_osdmap() {
+    std::lock_guard l(pre_publish_lock);
+    return next_osdmap;
   }
 
-private:
-  Mutex peer_map_epoch_lock;
-  map<int, epoch_t> peer_map_epoch;
-public:
-  epoch_t get_peer_epoch(int p);
-  epoch_t note_peer_epoch(int p, epoch_t e);
-  void forget_peer_epoch(int p, epoch_t e);
+  void maybe_share_map(Connection *con,
+                      const OSDMapRef& osdmap,
+                      epoch_t peer_epoch_lb=0);
 
   void send_map(class MOSDMap *m, Connection *con);
-  void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+  void send_incremental_map(epoch_t since, Connection *con,
+                           const OSDMapRef& osdmap);
   MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
                                        OSDSuperblock& superblock);
-  bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
-                        const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
-  void share_map(entity_name_t name, Connection *con, epoch_t epoch,
-                 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
-  void share_map_peer(int peer, Connection *con,
-                      OSDMapRef map = OSDMapRef());
 
   ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
-  pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
+  std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
   void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
-  void send_message_osd_cluster(Message *m, Connection *con) {
-    con->send_message(m);
+  void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
+  void send_message_osd_cluster(MessageRef m, Connection *con) {
+    con->send_message2(std::move(m));
   }
   void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
     con->send_message(m);
   }
-  void send_message_osd_client(Message *m, Connection *con) {
-    con->send_message(m);
-  }
   void send_message_osd_client(Message *m, const ConnectionRef& con) {
     con->send_message(m);
   }
-  entity_name_t get_cluster_msgr_name() {
-    return cluster_messenger->get_myname();
-  }
+  entity_name_t get_cluster_msgr_name() const;
 
-private:
-  // -- scrub scheduling --
-  Mutex sched_scrub_lock;
-  int scrubs_pending;
-  int scrubs_active;
 
 public:
-  struct ScrubJob {
-    CephContext* cct;
-    /// pg to be scrubbed
-    spg_t pgid;
-    /// a time scheduled for scrub. but the scrub could be delayed if system
-    /// load is too high or it fails to fall in the scrub hours
-    utime_t sched_time;
-    /// the hard upper bound of scrub time
-    utime_t deadline;
-    ScrubJob() : cct(nullptr) {}
-    explicit ScrubJob(CephContext* cct, const spg_t& pg,
-                     const utime_t& timestamp,
-                     double pool_scrub_min_interval = 0,
-                     double pool_scrub_max_interval = 0, bool must = true);
-    /// order the jobs by sched_time
-    bool operator<(const ScrubJob& rhs) const;
-  };
-  set<ScrubJob> sched_scrub_pg;
-
-  /// @returns the scrub_reg_stamp used for unregister the scrub job
-  utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
-                      double pool_scrub_max_interval, bool must) {
-    ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
-                  must);
-    Mutex::Locker l(sched_scrub_lock);
-    sched_scrub_pg.insert(scrub);
-    return scrub.sched_time;
-  }
-  void unreg_pg_scrub(spg_t pgid, utime_t t) {
-    Mutex::Locker l(sched_scrub_lock);
-    size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
-    assert(removed);
-  }
-  bool first_scrub_stamp(ScrubJob *out) {
-    Mutex::Locker l(sched_scrub_lock);
-    if (sched_scrub_pg.empty())
-      return false;
-    set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
-    *out = *iter;
-    return true;
-  }
-  bool next_scrub_stamp(const ScrubJob& next,
-                       ScrubJob *out) {
-    Mutex::Locker l(sched_scrub_lock);
-    if (sched_scrub_pg.empty())
-      return false;
-    set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
-    if (iter == sched_scrub_pg.cend())
-      return false;
-    ++iter;
-    if (iter == sched_scrub_pg.cend())
-      return false;
-    *out = *iter;
-    return true;
-  }
-
-  void dumps_scrub(Formatter *f) {
-    assert(f != nullptr);
-    Mutex::Locker l(sched_scrub_lock);
-
-    f->open_array_section("scrubs");
-    for (const auto &i: sched_scrub_pg) {
-      f->open_object_section("scrub");
-      f->dump_stream("pgid") << i.pgid;
-      f->dump_stream("sched_time") << i.sched_time;
-      f->dump_stream("deadline") << i.deadline;
-      f->dump_bool("forced", i.sched_time == i.deadline);
-      f->close_section();
-    }
-    f->close_section();
-  }
-
-  bool can_inc_scrubs_pending();
-  bool inc_scrubs_pending();
-  void inc_scrubs_active(bool reserved);
-  void dec_scrubs_pending();
-  void dec_scrubs_active();
 
   void reply_op_error(OpRequestRef op, int err);
-  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
+                     std::vector<pg_log_op_return_item_t> op_returns);
   void handle_misdirected_op(PG *pg, OpRequestRef op);
 
+ private:
+  /**
+   * The entity that maintains the set of PGs we may scrub (i.e. - those that we
+   * are their primary), and schedules their scrubbing.
+   */
+  ScrubQueue m_scrub_queue;
 
-private:
+ public:
+  ScrubQueue& get_scrub_services() { return m_scrub_queue; }
+
+  /**
+   * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
+   *
+   * The request might fail for multiple reasons, as ScrubQueue cannot by its own
+   * check some of the PG-specific preconditions and those are checked here. See
+   * attempt_t definition.
+   *
+   * @param pgid to scrub
+   * @param allow_requested_repair_only
+   * @return a Scrub::attempt_t detailing either a success, or the failure reason.
+   */
+  Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
+
+
+ private:
   // -- agent shared state --
-  Mutex agent_lock;
-  Cond agent_cond;
-  map<uint64_t, set<PGRef> > agent_queue;
-  set<PGRef>::iterator agent_queue_pos;
+  ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
+  ceph::condition_variable agent_cond;
+  std::map<uint64_t, std::set<PGRef> > agent_queue;
+  std::set<PGRef>::iterator agent_queue_pos;
   bool agent_valid_iterator;
   int agent_ops;
   int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
-  set<hobject_t> agent_oids;
+  std::set<hobject_t> agent_oids;
   bool agent_active;
   struct AgentThread : public Thread {
     OSDService *osd;
@@ -667,7 +313,7 @@ private:
     }
   } agent_thread;
   bool agent_stop_flag;
-  Mutex agent_timer_lock;
+  ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
   SafeTimer agent_timer;
 
 public:
@@ -678,16 +324,16 @@ public:
     if (!agent_queue.empty() &&
        agent_queue.rbegin()->first < priority)
       agent_valid_iterator = false;  // inserting higher-priority queue
-    set<PGRef>& nq = agent_queue[priority];
+    std::set<PGRef>& nq = agent_queue[priority];
     if (nq.empty())
-      agent_cond.Signal();
+      agent_cond.notify_all();
     nq.insert(pg);
   }
 
   void _dequeue(PG *pg, uint64_t old_priority) {
-    set<PGRef>& oq = agent_queue[old_priority];
-    set<PGRef>::iterator p = oq.find(pg);
-    assert(p != oq.end());
+    std::set<PGRef>& oq = agent_queue[old_priority];
+    std::set<PGRef>::iterator p = oq.find(pg);
+    ceph_assert(p != oq.end());
     if (p == agent_queue_pos)
       ++agent_queue_pos;
     oq.erase(p);
@@ -700,81 +346,81 @@ public:
 
   /// enable agent for a pg
   void agent_enable_pg(PG *pg, uint64_t priority) {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     _enqueue(pg, priority);
   }
 
   /// adjust priority for an enagled pg
   void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
-    Mutex::Locker l(agent_lock);
-    assert(new_priority != old_priority);
+    std::lock_guard l(agent_lock);
+    ceph_assert(new_priority != old_priority);
     _enqueue(pg, new_priority);
     _dequeue(pg, old_priority);
   }
 
   /// disable agent for a pg
   void agent_disable_pg(PG *pg, uint64_t old_priority) {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     _dequeue(pg, old_priority);
   }
 
   /// note start of an async (evict) op
   void agent_start_evict_op() {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     ++agent_ops;
   }
 
   /// note finish or cancellation of an async (evict) op
   void agent_finish_evict_op() {
-    Mutex::Locker l(agent_lock);
-    assert(agent_ops > 0);
+    std::lock_guard l(agent_lock);
+    ceph_assert(agent_ops > 0);
     --agent_ops;
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
 
   /// note start of an async (flush) op
   void agent_start_op(const hobject_t& oid) {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     ++agent_ops;
-    assert(agent_oids.count(oid) == 0);
+    ceph_assert(agent_oids.count(oid) == 0);
     agent_oids.insert(oid);
   }
 
   /// note finish or cancellation of an async (flush) op
   void agent_finish_op(const hobject_t& oid) {
-    Mutex::Locker l(agent_lock);
-    assert(agent_ops > 0);
+    std::lock_guard l(agent_lock);
+    ceph_assert(agent_ops > 0);
     --agent_ops;
-    assert(agent_oids.count(oid) == 1);
+    ceph_assert(agent_oids.count(oid) == 1);
     agent_oids.erase(oid);
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
 
   /// check if we are operating on an object
   bool agent_is_active_oid(const hobject_t& oid) {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     return agent_oids.count(oid);
   }
 
   /// get count of active agent ops
   int agent_get_num_ops() {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     return agent_ops;
   }
 
   void agent_inc_high_count() {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     flush_mode_high_count ++;
   }
 
   void agent_dec_high_count() {
-    Mutex::Locker l(agent_lock);
+    std::lock_guard l(agent_lock);
     flush_mode_high_count --;
   }
 
 private:
   /// throttle promotion attempts
-  std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
+  std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
   PromoteCounter promote_counter;
   utime_t last_recalibrate;
   unsigned long promote_max_objects, promote_max_bytes;
@@ -797,271 +443,331 @@ public:
     promote_counter.finish(bytes);
   }
   void promote_throttle_recalibrate();
+  unsigned get_num_shards() const {
+    return m_objecter_finishers;
+  }
+  Finisher* get_objecter_finisher(int shard) {
+    return objecter_finishers[shard].get();
+  }
 
   // -- Objecter, for tiering reads/writes from/to other OSDs --
-  Objecter *objecter;
-  Finisher objecter_finisher;
+  ceph::async::io_context_pool& poolctx;
+  std::unique_ptr<Objecter> objecter;
+  int m_objecter_finishers;
+  std::vector<std::unique_ptr<Finisher>> objecter_finishers;
 
   // -- Watch --
-  Mutex watch_lock;
+  ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
   SafeTimer watch_timer;
   uint64_t next_notif_id;
   uint64_t get_next_id(epoch_t cur_epoch) {
-    Mutex::Locker l(watch_lock);
+    std::lock_guard l(watch_lock);
     return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
   }
 
   // -- Recovery/Backfill Request Scheduling --
-  Mutex recovery_request_lock;
+  ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
   SafeTimer recovery_request_timer;
 
   // For async recovery sleep
   bool recovery_needs_sleep = true;
-  utime_t recovery_schedule_time = utime_t();
+  ceph::real_clock::time_point recovery_schedule_time;
 
-  Mutex recovery_sleep_lock;
-  SafeTimer recovery_sleep_timer;
+  // For recovery & scrub & snap
+  ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
+  SafeTimer sleep_timer;
 
   // -- tids --
   // for ops i issue
-  std::atomic_uint last_tid{0};
+  std::atomic<unsigned int> last_tid{0};
   ceph_tid_t get_tid() {
     return (ceph_tid_t)last_tid++;
   }
 
   // -- backfill_reservation --
   Finisher reserver_finisher;
-  AsyncReserver<spg_t> local_reserver;
-  AsyncReserver<spg_t> remote_reserver;
+  AsyncReserver<spg_t, Finisher> local_reserver;
+  AsyncReserver<spg_t, Finisher> remote_reserver;
+
+  // -- pg merge --
+  ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
+  std::map<pg_t,eversion_t> ready_to_merge_source;   // pg -> version
+  std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target;  // pg -> (version,les,lec)
+  std::set<pg_t> not_ready_to_merge_source;
+  std::map<pg_t,pg_t> not_ready_to_merge_target;
+  std::set<pg_t> sent_ready_to_merge_source;
+
+  void set_ready_to_merge_source(PG *pg,
+                                eversion_t version);
+  void set_ready_to_merge_target(PG *pg,
+                                eversion_t version,
+                                epoch_t last_epoch_started,
+                                epoch_t last_epoch_clean);
+  void set_not_ready_to_merge_source(pg_t source);
+  void set_not_ready_to_merge_target(pg_t target, pg_t source);
+  void clear_ready_to_merge(PG *pg);
+  void send_ready_to_merge();
+  void _send_ready_to_merge();
+  void clear_sent_ready_to_merge();
+  void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
 
   // -- pg_temp --
 private:
-  Mutex pg_temp_lock;
-  map<pg_t, vector<int> > pg_temp_wanted;
-  map<pg_t, vector<int> > pg_temp_pending;
+  ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
+  struct pg_temp_t {
+    std::vector<int> acting;
+    bool forced = false;
+  };
+  std::map<pg_t, pg_temp_t> pg_temp_wanted;
+  std::map<pg_t, pg_temp_t> pg_temp_pending;
   void _sent_pg_temp();
+  friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
 public:
-  void queue_want_pg_temp(pg_t pgid, vector<int>& want);
+  void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
+                         bool forced = false);
   void remove_want_pg_temp(pg_t pgid);
   void requeue_pg_temp();
   void send_pg_temp();
 
+  ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
+  std::set<pg_t> pg_created;
   void send_pg_created(pg_t pgid);
+  void prune_pg_created();
+  void send_pg_created();
 
-  void queue_for_peering(PG *pg);
+  AsyncReserver<spg_t, Finisher> snap_reserver;
+  void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
+  void queue_for_snap_trim(PG *pg);
+  void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
 
-  Mutex snap_sleep_lock;
-  SafeTimer snap_sleep_timer;
+  void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
 
-  Mutex scrub_sleep_lock;
-  SafeTimer scrub_sleep_timer;
+  /// queue the message (-> event) that all replicas have reserved scrub resources for us
+  void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
 
-  AsyncReserver<spg_t> snap_reserver;
-  void queue_for_snap_trim(PG *pg);
+  /// queue the message (-> event) that some replicas denied our scrub resources request
+  void queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority);
 
-  void queue_for_scrub(PG *pg, bool with_high_priority) {
-    unsigned scrub_queue_priority = pg->scrubber.priority;
-    if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
-      scrub_queue_priority = cct->_conf->osd_client_op_priority;
-    }
-    enqueue_back(
-      pg->info.pgid,
-      PGQueueable(
-       PGScrub(pg->get_osdmap()->get_epoch()),
-       cct->_conf->osd_scrub_cost,
-       scrub_queue_priority,
-       ceph_clock_now(),
-       entity_inst_t(),
-       pg->get_osdmap()->get_epoch()));
-  }
+  /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
+  /// of the primary map being created by the backend.
+  void queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals a change in the number of in-flight recovery writes
+  void queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that all pending updates were applied
+  void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that the selected chunk (objects range) is available for scrubbing
+  void queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// The chunk selected is blocked by user operations, and cannot be scrubbed now
+  void queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// The block-range that was locked and prevented the scrubbing - is freed
+  void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that all write OPs are done
+  void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that the the local (Primary's) scrub map is ready
+  void queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
+  void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that all chunks were handled
+  /// Note: always with high priority, as must be acted upon before the
+  /// next scrub request arrives from the Primary (and the primary is free
+  /// to send the request once the replica's map is received).
+  void queue_scrub_is_finished(PG* pg);
+
+  /// Signals that there are more chunks to handle
+  void queue_scrub_next_chunk(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  /// Signals that we have finished comparing the maps for this chunk
+  /// Note: required, as in Crimson this operation is 'futurized'.
+  void queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority);
+
+  void queue_for_rep_scrub(PG* pg,
+                          Scrub::scrub_prio_t with_high_priority,
+                          unsigned int qu_priority,
+                          Scrub::act_token_t act_token);
+
+  /// Signals a change in the number of in-flight recovery writes
+  void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
+
+  /// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
+  /// trigger a re-check of the availability of the scrub map prepared by the
+  /// backend.
+  void queue_for_rep_scrub_resched(PG* pg,
+                                  Scrub::scrub_prio_t with_high_priority,
+                                  unsigned int qu_priority,
+                                  Scrub::act_token_t act_token);
+
+  void queue_for_pg_delete(spg_t pgid, epoch_t e);
+  bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
 
 private:
   // -- pg recovery and associated throttling --
-  Mutex recovery_lock;
-  list<pair<epoch_t, PGRef> > awaiting_throttle;
+  ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
+  std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
+
+  /// queue a scrub-related message for a PG
+  template <class MSG_TYPE>
+  void queue_scrub_event_msg(PG* pg,
+                            Scrub::scrub_prio_t with_priority,
+                            unsigned int qu_priority,
+                            Scrub::act_token_t act_token);
+
+  /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
+  /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
+  template <class MSG_TYPE>
+  void queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority);
 
   utime_t defer_recovery_until;
   uint64_t recovery_ops_active;
   uint64_t recovery_ops_reserved;
   bool recovery_paused;
 #ifdef DEBUG_RECOVERY_OIDS
-  map<spg_t, set<hobject_t> > recovery_oids;
+  std::map<spg_t, std::set<hobject_t> > recovery_oids;
 #endif
   bool _recover_now(uint64_t *available_pushes);
   void _maybe_queue_recovery();
   void _queue_for_recovery(
-    pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
-    assert(recovery_lock.is_locked_by_me());
-    enqueue_back(
-      p.second->info.pgid,
-      PGQueueable(
-       PGRecovery(p.first, reserved_pushes),
-       cct->_conf->osd_recovery_cost,
-       cct->_conf->osd_recovery_priority,
-       ceph_clock_now(),
-       entity_inst_t(),
-       p.first));
-  }
+    std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
 public:
   void start_recovery_op(PG *pg, const hobject_t& soid);
   void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
   bool is_recovery_active();
-  void release_reserved_pushes(uint64_t pushes) {
-    Mutex::Locker l(recovery_lock);
-    assert(recovery_ops_reserved >= pushes);
-    recovery_ops_reserved -= pushes;
-    _maybe_queue_recovery();
-  }
+  void release_reserved_pushes(uint64_t pushes);
   void defer_recovery(float defer_for) {
     defer_recovery_until = ceph_clock_now();
     defer_recovery_until += defer_for;
   }
   void pause_recovery() {
-    Mutex::Locker l(recovery_lock);
+    std::lock_guard l(recovery_lock);
     recovery_paused = true;
   }
   bool recovery_is_paused() {
-    Mutex::Locker l(recovery_lock);
+    std::lock_guard l(recovery_lock);
     return recovery_paused;
   }
   void unpause_recovery() {
-    Mutex::Locker l(recovery_lock);
+    std::lock_guard l(recovery_lock);
     recovery_paused = false;
     _maybe_queue_recovery();
   }
   void kick_recovery_queue() {
-    Mutex::Locker l(recovery_lock);
+    std::lock_guard l(recovery_lock);
     _maybe_queue_recovery();
   }
   void clear_queued_recovery(PG *pg) {
-    Mutex::Locker l(recovery_lock);
-    for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
-        i != awaiting_throttle.end();
-      ) {
-      if (i->second.get() == pg) {
-       awaiting_throttle.erase(i);
-       return;
-      } else {
-       ++i;
-      }
-    }
+    std::lock_guard l(recovery_lock);
+    awaiting_throttle.remove_if(
+      [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
+       return awaiting.second.get() == pg;
+      });
   }
+
+  unsigned get_target_pg_log_entries() const;
+
   // delayed pg activation
-  void queue_for_recovery(PG *pg, bool front = false) {
-    Mutex::Locker l(recovery_lock);
-    if (front) {
-      awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
+  void queue_for_recovery(PG *pg) {
+    std::lock_guard l(recovery_lock);
+
+    if (pg->is_forced_recovery_or_backfill()) {
+      awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
     } else {
-      awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
+      awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
     }
     _maybe_queue_recovery();
   }
   void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
-    Mutex::Locker l(recovery_lock);
-    _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
+    std::lock_guard l(recovery_lock);
+    _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
   }
 
+  void queue_check_readable(spg_t spgid,
+                           epoch_t lpr,
+                           ceph::signedspan delay = ceph::signedspan::zero());
 
   // osd map cache (past osd maps)
-  Mutex map_cache_lock;
+  ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
   SharedLRU<epoch_t, const OSDMap> map_cache;
-  SimpleLRU<epoch_t, bufferlist> map_bl_cache;
-  SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
+  SimpleLRU<epoch_t, ceph::buffer::list> map_bl_cache;
+  SimpleLRU<epoch_t, ceph::buffer::list> map_bl_inc_cache;
 
   OSDMapRef try_get_map(epoch_t e);
   OSDMapRef get_map(epoch_t e) {
     OSDMapRef ret(try_get_map(e));
-    assert(ret);
+    ceph_assert(ret);
     return ret;
   }
   OSDMapRef add_map(OSDMap *o) {
-    Mutex::Locker l(map_cache_lock);
+    std::lock_guard l(map_cache_lock);
     return _add_map(o);
   }
   OSDMapRef _add_map(OSDMap *o);
 
-  void add_map_bl(epoch_t e, bufferlist& bl) {
-    Mutex::Locker l(map_cache_lock);
-    return _add_map_bl(e, bl);
-  }
-  void pin_map_bl(epoch_t e, bufferlist &bl);
-  void _add_map_bl(epoch_t e, bufferlist& bl);
-  bool get_map_bl(epoch_t e, bufferlist& bl) {
-    Mutex::Locker l(map_cache_lock);
+  void _add_map_bl(epoch_t e, ceph::buffer::list& bl);
+  bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
+    std::lock_guard l(map_cache_lock);
     return _get_map_bl(e, bl);
   }
-  bool _get_map_bl(epoch_t e, bufferlist& bl);
+  bool _get_map_bl(epoch_t e, ceph::buffer::list& bl);
 
-  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
-    Mutex::Locker l(map_cache_lock);
-    return _add_map_inc_bl(e, bl);
-  }
-  void pin_map_inc_bl(epoch_t e, bufferlist &bl);
-  void _add_map_inc_bl(epoch_t e, bufferlist& bl);
-  bool get_inc_map_bl(epoch_t e, bufferlist& bl);
+  void _add_map_inc_bl(epoch_t e, ceph::buffer::list& bl);
+  bool get_inc_map_bl(epoch_t e, ceph::buffer::list& bl);
 
-  void clear_map_bl_cache_pins(epoch_t e);
+  /// identify split child pgids over a osdmap interval
+  void identify_splits_and_merges(
+    OSDMapRef old_map,
+    OSDMapRef new_map,
+    spg_t pgid,
+    std::set<std::pair<spg_t,epoch_t>> *new_children,
+    std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
 
   void need_heartbeat_peer_update();
 
-  void pg_stat_queue_enqueue(PG *pg);
-  void pg_stat_queue_dequeue(PG *pg);
-
   void init();
-  void final_init();  
+  void final_init();
   void start_shutdown();
   void shutdown_reserver();
   void shutdown();
 
-private:
-  // split
-  Mutex in_progress_split_lock;
-  map<spg_t, spg_t> pending_splits; // child -> parent
-  map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
-  set<spg_t> in_progress_splits;       // child
-
-public:
-  void _start_split(spg_t parent, const set<spg_t> &children);
-  void start_split(spg_t parent, const set<spg_t> &children) {
-    Mutex::Locker l(in_progress_split_lock);
-    return _start_split(parent, children);
-  }
-  void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
-  void complete_split(const set<spg_t> &pgs);
-  void cancel_pending_splits_for_parent(spg_t parent);
-  void _cancel_pending_splits_for_parent(spg_t parent);
-  bool splitting(spg_t pgid);
-  void expand_pg_num(OSDMapRef old_map,
-                    OSDMapRef new_map);
-  void _maybe_split_pgid(OSDMapRef old_map,
-                        OSDMapRef new_map,
-                        spg_t pgid);
-  void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
-
   // -- stats --
-  Mutex stat_lock;
+  ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
   osd_stat_t osd_stat;
   uint32_t seq = 0;
 
-  void update_osd_stat(vector<int>& hb_peers);
-  osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
-                          vector<int>& hb_peers);
+  void set_statfs(const struct store_statfs_t &stbuf,
+    osd_alert_list_t& alerts);
+  osd_stat_t set_osd_stat(std::vector<int>& hb_peers, int num_pgs);
+  void inc_osd_stat_repaired(void);
+  float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
   osd_stat_t get_osd_stat() {
-    Mutex::Locker l(stat_lock);
+    std::lock_guard l(stat_lock);
     ++seq;
     osd_stat.up_from = up_epoch;
     osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
     return osd_stat;
   }
   uint64_t get_osd_stat_seq() {
-    Mutex::Locker l(stat_lock);
+    std::lock_guard l(stat_lock);
     return osd_stat.seq;
   }
+  void get_hb_pingtime(std::map<int, osd_stat_t::Interfaces> *pp)
+  {
+    std::lock_guard l(stat_lock);
+    *pp = osd_stat.hb_pingtime;
+    return;
+  }
 
   // -- OSD Full Status --
 private:
   friend TestOpsSocketHook;
-  mutable Mutex full_status_lock;
+  mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
   enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state;  // ascending
   const char *get_full_state_name(s_names s) const {
     switch (s) {
@@ -1073,7 +779,7 @@ private:
     default: return "???";
     }
   }
-  s_names get_full_state(string type) const {
+  s_names get_full_state(std::string type) const {
     if (type == "none")
       return NONE;
     else if (type == "failsafe")
@@ -1087,41 +793,45 @@ private:
     else
       return INVALID;
   }
-  double cur_ratio;  ///< current utilization
+  double cur_ratio, physical_ratio;  ///< current utilization
   mutable int64_t injectfull = 0;
   s_names injectfull_state = NONE;
   float get_failsafe_full_ratio();
-  void check_full_status(float ratio);
-  bool _check_full(s_names type, ostream &ss) const;
+  bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
+  bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
 public:
-  bool check_failsafe_full(ostream &ss) const;
-  bool check_full(ostream &ss) const;
-  bool check_backfill_full(ostream &ss) const;
-  bool check_nearfull(ostream &ss) const;
+  void check_full_status(float ratio, float pratio);
+  s_names recalc_full_state(float ratio, float pratio, std::string &inject);
+  bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
+  bool check_failsafe_full(DoutPrefixProvider *dpp) const;
+  bool check_full(DoutPrefixProvider *dpp) const;
+  bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
+  bool check_backfill_full(DoutPrefixProvider *dpp) const;
+  bool check_nearfull(DoutPrefixProvider *dpp) const;
   bool is_failsafe_full() const;
   bool is_full() const;
   bool is_backfillfull() const;
   bool is_nearfull() const;
   bool need_fullness_update();  ///< osdmap state needs update
   void set_injectfull(s_names type, int64_t count);
-  bool check_osdmap_full(const set<pg_shard_t> &missing_on);
 
 
   // -- epochs --
 private:
-  mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+  // protects access to boot_epoch, up_epoch, bind_epoch
+  mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
   epoch_t boot_epoch;  // _first_ epoch we were marked up (after this process started)
   epoch_t up_epoch;    // _most_recent_ epoch we were marked up
   epoch_t bind_epoch;  // epoch we last did a bind to new ip:ports
 public:
   /**
-   * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
+   * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
    * can be NULL if you don't care about them.
    */
   void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
                        epoch_t *_bind_epoch) const;
   /**
-   * Set the boot, up, and bind epochs. Any NULL params will not be set.
+   * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
    */
   void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
                   const epoch_t *_bind_epoch);
@@ -1141,15 +851,32 @@ public:
     return ret;
   }
 
+  void request_osdmap_update(epoch_t e);
+
+  // -- heartbeats --
+  ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
+
+  /// osd -> heartbeat stamps
+  std::vector<HeartbeatStampsRef> hb_stamps;
+
+  /// get or create a ref for a peer's HeartbeatStamps
+  HeartbeatStampsRef get_hb_stamps(unsigned osd);
+
+
+  // Timer for readable leases
+  ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
+
+  void queue_renew_lease(epoch_t epoch, spg_t spgid);
+
   // -- stopping --
-  Mutex is_stopping_lock;
-  Cond is_stopping_cond;
+  ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
+  ceph::condition_variable is_stopping_cond;
   enum {
     NOT_STOPPING,
     PREPARING_TO_STOP,
     STOPPING };
-  std::atomic_int state{NOT_STOPPING};
-  int get_state() {
+  std::atomic<int> state{NOT_STOPPING};
+  int get_state() const {
     return state;
   }
   void set_state(int s) {
@@ -1166,41 +893,208 @@ public:
 
 
 #ifdef PG_DEBUG_REFS
-  Mutex pgid_lock;
-  map<spg_t, int> pgid_tracker;
-  map<spg_t, PG*> live_pgs;
+  ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
+  std::map<spg_t, int> pgid_tracker;
+  std::map<spg_t, PG*> live_pgs;
   void add_pgid(spg_t pgid, PG *pg);
   void remove_pgid(spg_t pgid, PG *pg);
   void dump_live_pgids();
 #endif
 
-  explicit OSDService(OSD *osd);
-  ~OSDService();
+  explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
+  ~OSDService() = default;
+};
+
+/*
+
+  Each PG slot includes queues for events that are processing and/or waiting
+  for a PG to be materialized in the slot.
+
+  These are the constraints:
+
+  - client ops must remained ordered by client, regardless of std::map epoch
+  - peering messages/events from peers must remain ordered by peer
+  - peering messages and client ops need not be ordered relative to each other
+
+  - some peering events can create a pg (e.g., notify)
+  - the query peering event can proceed when a PG doesn't exist
+
+  Implementation notes:
+
+  - everybody waits for split.  If the OSD has the parent PG it will instantiate
+    the PGSlot early and mark it waiting_for_split.  Everything will wait until
+    the parent is able to commit the split operation and the child PG's are
+    materialized in the child slots.
+
+  - every event has an epoch property and will wait for the OSDShard to catch
+    up to that epoch.  For example, if we get a peering event from a future
+    epoch, the event will wait in the slot until the local OSD has caught up.
+    (We should be judicious in specifying the required epoch [by, e.g., setting
+    it to the same_interval_since epoch] so that we don't wait for epochs that
+    don't affect the given PG.)
+
+  - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
+    OpSchedulerItem has an is_peering() bool to determine which we use.  Waiting
+    peering events are queued up by epoch required.
+
+  - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
+    materialized the PG), we wake *all* waiting items.  (This could be optimized,
+    probably, but we don't bother.)  We always requeue peering items ahead of
+    client ops.
+
+  - some peering events are marked !peering_requires_pg (PGQuery).  if we do
+    not have a PG these are processed immediately (under the shard lock).
+
+  - we do not have a PG present, we check if the slot maps to the current host.
+    if so, we either queue the item and wait for the PG to materialize, or
+    (if the event is a pg creating event like PGNotify), we materialize the PG.
+
+  - when we advance the osdmap on the OSDShard, we scan pg slots and
+    discard any slots with no pg (and not waiting_for_split) that no
+    longer std::map to the current host.
+
+  */
+
+struct OSDShardPGSlot {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+  PGRef pg;                      ///< pg reference
+  std::deque<OpSchedulerItem> to_process; ///< order items for this slot
+  int num_running = 0;          ///< _process threads doing pg lookup/lock
+
+  std::deque<OpSchedulerItem> waiting;   ///< waiting for pg (or map + pg)
+
+  /// waiting for map (peering evt)
+  std::map<epoch_t,std::deque<OpSchedulerItem>> waiting_peering;
+
+  /// incremented by wake_pg_waiters; indicates racing _process threads
+  /// should bail out (their op has been requeued)
+  uint64_t requeue_seq = 0;
+
+  /// waiting for split child to materialize in these epoch(s)
+  std::set<epoch_t> waiting_for_split;
+
+  epoch_t epoch = 0;
+  boost::intrusive::set_member_hook<> pg_epoch_item;
+
+  /// waiting for a merge (source or target) by this epoch
+  epoch_t waiting_for_merge_epoch = 0;
+};
+
+struct OSDShard {
+  const unsigned shard_id;
+  CephContext *cct;
+  OSD *osd;
+
+  std::string shard_name;
+
+  std::string sdata_wait_lock_name;
+  ceph::mutex sdata_wait_lock;
+  ceph::condition_variable sdata_cond;
+  int waiting_threads = 0;
+
+  ceph::mutex osdmap_lock;  ///< protect shard_osdmap updates vs users w/o shard_lock
+  OSDMapRef shard_osdmap;
+
+  OSDMapRef get_osdmap() {
+    std::lock_guard l(osdmap_lock);
+    return shard_osdmap;
+  }
+
+  std::string shard_lock_name;
+  ceph::mutex shard_lock;   ///< protects remaining members below
+
+  /// map of slots for each spg_t.  maintains ordering of items dequeued
+  /// from scheduler while _process thread drops shard lock to acquire the
+  /// pg lock.  stale slots are removed by consume_map.
+  std::unordered_map<spg_t,std::unique_ptr<OSDShardPGSlot>> pg_slots;
+
+  struct pg_slot_compare_by_epoch {
+    bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
+      return l.epoch < r.epoch;
+    }
+  };
+
+  /// maintain an ordering of pg slots by pg epoch
+  boost::intrusive::multiset<
+    OSDShardPGSlot,
+    boost::intrusive::member_hook<
+      OSDShardPGSlot,
+      boost::intrusive::set_member_hook<>,
+      &OSDShardPGSlot::pg_epoch_item>,
+    boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
+  int waiting_for_min_pg_epoch = 0;
+  ceph::condition_variable min_pg_epoch_cond;
+
+  /// priority queue
+  ceph::osd::scheduler::OpSchedulerRef scheduler;
+
+  bool stop_waiting = false;
+
+  ContextQueue context_queue;
+
+  void _attach_pg(OSDShardPGSlot *slot, PG *pg);
+  void _detach_pg(OSDShardPGSlot *slot);
+
+  void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
+  epoch_t get_min_pg_epoch();
+  void wait_min_pg_epoch(epoch_t need);
+
+  /// return newest epoch we are waiting for
+  epoch_t get_max_waiting_epoch();
+
+  /// push osdmap into shard
+  void consume_map(
+    const OSDMapRef& osdmap,
+    unsigned *pushes_to_free);
+
+  int _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
+
+  void identify_splits_and_merges(
+    const OSDMapRef& as_of_osdmap,
+    std::set<std::pair<spg_t,epoch_t>> *split_children,
+    std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
+  void _prime_splits(std::set<std::pair<spg_t,epoch_t>> *pgids);
+  void prime_splits(const OSDMapRef& as_of_osdmap,
+                   std::set<std::pair<spg_t,epoch_t>> *pgids);
+  void prime_merges(const OSDMapRef& as_of_osdmap,
+                   std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
+  void register_and_wake_split_child(PG *pg);
+  void unprime_split_children(spg_t parent, unsigned old_pg_num);
+  void update_scheduler_config();
+  std::string get_scheduler_type();
+
+  OSDShard(
+    int id,
+    CephContext *cct,
+    OSD *osd);
 };
 
 class OSD : public Dispatcher,
            public md_config_obs_t {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+
   /** OSD **/
-  Mutex osd_lock;                      // global lock
+  // global lock
+  ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
   SafeTimer tick_timer;    // safe timer (osd_lock)
 
   // Tick timer for those stuff that do not need osd_lock
-  Mutex tick_timer_lock;
+  ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
   SafeTimer tick_timer_without_osd_lock;
+  std::string gss_ktfile_client{};
+
 public:
   // config observer bits
   const char** get_tracked_conf_keys() const override;
-  void handle_conf_change(const struct md_config_t *conf,
+  void handle_conf_change(const ConfigProxy& conf,
                           const std::set <std::string> &changed) override;
   void update_log_config();
   void check_config();
 
 protected:
 
-  static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
-
-  AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
-  AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
+  const double OSD_TICK_INTERVAL = { 1.0 };
+  double get_tick_interval() const;
 
   Messenger   *cluster_messenger;
   Messenger   *client_messenger;
@@ -1209,7 +1103,7 @@ protected:
   MgrClient   mgrc;
   PerfCounters      *logger;
   PerfCounters      *recoverystate_perf;
-  ObjectStore *store;
+  std::unique_ptr<ObjectStore> store;
 #ifdef HAVE_LIBFUSE
   FuseStore *fuse_store = nullptr;
 #endif
@@ -1219,27 +1113,49 @@ protected:
   int whoami;
   std::string dev_path, journal_path;
 
+  ceph_release_t last_require_osd_release{ceph_release_t::unknown};
+
+  int numa_node = -1;
+  size_t numa_cpu_set_size = 0;
+  cpu_set_t numa_cpu_set;
+
   bool store_is_rotational = true;
+  bool journal_is_rotational = true;
 
   ZTracer::Endpoint trace_endpoint;
-  void create_logger();
-  void create_recoverystate_perf();
+  PerfCounters* create_logger();
+  PerfCounters* create_recoverystate_perf();
   void tick();
   void tick_without_osd_lock();
   void _dispatch(Message *m);
   void dispatch_op(OpRequestRef op);
 
-  void check_osdmap_features(ObjectStore *store);
+  void check_osdmap_features();
 
   // asok
   friend class OSDSocketHook;
   class OSDSocketHook *asok_hook;
-  bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
+  using PGRefOrError = std::tuple<std::optional<PGRef>, int>;
+    PGRefOrError locate_asok_target(const cmdmap_t& cmdmap,
+                                   std::stringstream& ss, bool only_primary);
+  int asok_route_to_pg(bool only_primary,
+    std::string_view prefix,
+    cmdmap_t cmdmap,
+    Formatter *f,
+    std::stringstream& ss,
+    const bufferlist& inbl,
+    bufferlist& outbl,
+    std::function<void(int, const std::string&, bufferlist&)> on_finish);
+  void asok_command(
+    std::string_view prefix,
+    const cmdmap_t& cmdmap,
+    ceph::Formatter *f,
+    const ceph::buffer::list& inbl,
+    std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
 
 public:
-  ClassHandler  *class_handler = nullptr;
   int get_nodeid() { return whoami; }
-  
+
   static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
     char foo[20];
     snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
@@ -1257,19 +1173,25 @@ public:
        object_t("snapmapper"),
        0)));
   }
+  static ghobject_t make_purged_snaps_oid() {
+    return ghobject_t(hobject_t(
+      sobject_t(
+       object_t("purged_snaps"),
+       0)));
+  }
 
   static ghobject_t make_pg_log_oid(spg_t pg) {
-    stringstream ss;
+    std::stringstream ss;
     ss << "pglog_" << pg;
-    string s;
+    std::string s;
     getline(ss, s);
     return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
   }
-  
+
   static ghobject_t make_pg_biginfo_oid(spg_t pg) {
-    stringstream ss;
+    std::stringstream ss;
     ss << "pginfo_" << pg;
-    string s;
+    std::string s;
     getline(ss, s);
     return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
   }
@@ -1277,6 +1199,19 @@ public:
     hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
     return ghobject_t(oid);
   }
+
+  static ghobject_t make_final_pool_info_oid(int64_t pool) {
+    return ghobject_t(
+      hobject_t(
+       sobject_t(
+         object_t(std::string("final_pool_") + stringify(pool)),
+         CEPH_NOSNAP)));
+  }
+
+  static ghobject_t make_pg_num_history_oid() {
+    return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
+  }
+
   static void recursive_remove_collection(CephContext* cct,
                                          ObjectStore *store,
                                          spg_t pgid,
@@ -1285,7 +1220,7 @@ public:
   /**
    * get_osd_initial_compat_set()
    *
-   * Get the initial feature set for this OSD.  Features
+   * Get the initial feature std::set for this OSD.  Features
    * here are automatically upgraded.
    *
    * Return value: Initial osd CompatSet
@@ -1300,12 +1235,15 @@ public:
    * Return value: CompatSet of all supported features
    */
   static CompatSet get_osd_compat_set();
-  
+
 
 private:
   class C_Tick;
   class C_Tick_WithoutOSDLock;
 
+  // -- config settings --
+  float m_osd_pg_epoch_max_lag_factor;
+
   // -- superblock --
   OSDSuperblock superblock;
 
@@ -1341,8 +1279,7 @@ public:
   }
 
 private:
-  std::atomic_int state{STATE_INITIALIZING};
-  bool waiting_for_luminous_mons = false;
+  std::atomic<int> state{STATE_INITIALIZING};
 
 public:
   int get_state() const {
@@ -1372,55 +1309,42 @@ public:
 
 private:
 
-  ThreadPool peering_tp;
   ShardedThreadPool osd_op_tp;
-  ThreadPool disk_tp;
-  ThreadPool command_tp;
 
-  void set_disk_tp_priority();
   void get_latest_osdmap();
 
   // -- sessions --
 private:
-  void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
-  void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
+  void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
 
-  Mutex session_waiting_lock;
-  set<Session*> session_waiting_for_map;
+  ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
+  std::set<ceph::ref_t<Session>> session_waiting_for_map;
 
   /// Caller assumes refs for included Sessions
-  void get_sessions_waiting_for_map(set<Session*> *out) {
-    Mutex::Locker l(session_waiting_lock);
+  void get_sessions_waiting_for_map(std::set<ceph::ref_t<Session>> *out) {
+    std::lock_guard l(session_waiting_lock);
     out->swap(session_waiting_for_map);
   }
-  void register_session_waiting_on_map(Session *session) {
-    Mutex::Locker l(session_waiting_lock);
-    if (session_waiting_for_map.insert(session).second) {
-      session->get();
-    }
+  void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
+    std::lock_guard l(session_waiting_lock);
+    session_waiting_for_map.insert(session);
   }
-  void clear_session_waiting_on_map(Session *session) {
-    Mutex::Locker l(session_waiting_lock);
-    set<Session*>::iterator i = session_waiting_for_map.find(session);
-    if (i != session_waiting_for_map.end()) {
-      (*i)->put();
-      session_waiting_for_map.erase(i);
-    }
+  void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
+    std::lock_guard l(session_waiting_lock);
+    session_waiting_for_map.erase(session);
   }
   void dispatch_sessions_waiting_on_map() {
-    set<Session*> sessions_to_check;
+    std::set<ceph::ref_t<Session>> sessions_to_check;
     get_sessions_waiting_for_map(&sessions_to_check);
-    for (set<Session*>::iterator i = sessions_to_check.begin();
+    for (auto i = sessions_to_check.begin();
         i != sessions_to_check.end();
         sessions_to_check.erase(i++)) {
-      (*i)->session_dispatch_lock.Lock();
-      dispatch_session_waiting(*i, osdmap);
-      (*i)->session_dispatch_lock.Unlock();
-      (*i)->put();
+      std::lock_guard l{(*i)->session_dispatch_lock};
+      dispatch_session_waiting(*i, get_osdmap());
     }
   }
-  void session_handle_reset(Session *session) {
-    Mutex::Locker l(session->session_dispatch_lock);
+  void session_handle_reset(const ceph::ref_t<Session>& session) {
+    std::lock_guard l(session->session_dispatch_lock);
     clear_session_waiting_on_map(session);
 
     session->clear_backoffs();
@@ -1451,6 +1375,9 @@ private:
   void osdmap_subscribe(version_t epoch, bool force_request);
   /** @} monc helpers */
 
+  ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
+  epoch_t latest_subscribed_epoch{0};
+
   // -- heartbeat --
   /// information about a heartbeat peer
   struct HeartbeatInfo {
@@ -1462,32 +1389,78 @@ private:
     utime_t last_rx_front;  ///< last time we got a ping reply on the front side
     utime_t last_rx_back;   ///< last time we got a ping reply on the back side
     epoch_t epoch;      ///< most recent epoch we wanted this peer
+    /// number of connections we send and receive heartbeat pings/replies
+    static constexpr int HEARTBEAT_MAX_CONN = 2;
+    /// history of inflight pings, arranging by timestamp we sent
+    /// send time -> deadline -> remaining replies
+    std::map<utime_t, std::pair<utime_t, int>> ping_history;
+
+    utime_t hb_interval_start;
+    uint32_t hb_average_count = 0;
+    uint32_t hb_index = 0;
+
+    uint32_t hb_total_back = 0;
+    uint32_t hb_min_back = UINT_MAX;
+    uint32_t hb_max_back = 0;
+    std::vector<uint32_t> hb_back_pingtime;
+    std::vector<uint32_t> hb_back_min;
+    std::vector<uint32_t> hb_back_max;
+
+    uint32_t hb_total_front = 0;
+    uint32_t hb_min_front = UINT_MAX;
+    uint32_t hb_max_front = 0;
+    std::vector<uint32_t> hb_front_pingtime;
+    std::vector<uint32_t> hb_front_min;
+    std::vector<uint32_t> hb_front_max;
+
+    bool is_stale(utime_t stale) const {
+      if (ping_history.empty()) {
+        return false;
+      }
+      utime_t oldest_deadline = ping_history.begin()->second.first;
+      return oldest_deadline <= stale;
+    }
+
+    bool is_unhealthy(utime_t now) const {
+      if (ping_history.empty()) {
+        /// we haven't sent a ping yet or we have got all replies,
+        /// in either way we are safe and healthy for now
+        return false;
+      }
 
-    bool is_unhealthy(utime_t cutoff) const {
-      return
-       ! ((last_rx_front > cutoff ||
-           (last_rx_front == utime_t() && (last_tx == utime_t() ||
-                                           first_tx > cutoff))) &&
-          (last_rx_back > cutoff ||
-           (last_rx_back == utime_t() && (last_tx == utime_t() ||
-                                          first_tx > cutoff))));
+      utime_t oldest_deadline = ping_history.begin()->second.first;
+      return now > oldest_deadline;
     }
-    bool is_healthy(utime_t cutoff) const {
-      return last_rx_front > cutoff && last_rx_back > cutoff;
+
+    bool is_healthy(utime_t now) const {
+      if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
+        // only declare to be healthy until we have received the first
+        // replies from both front/back connections
+        return false;
+      }
+      return !is_unhealthy(now);
     }
 
+    void clear_mark_down(Connection *except = nullptr) {
+      if (con_back && con_back != except) {
+       con_back->mark_down();
+       con_back->clear_priv();
+       con_back.reset(nullptr);
+      }
+      if (con_front && con_front != except) {
+       con_front->mark_down();
+       con_front->clear_priv();
+       con_front.reset(nullptr);
+      }
+    }
   };
-  /// state attached to outgoing heartbeat connections
-  struct HeartbeatSession : public RefCountedObject {
-    int peer;
-    explicit HeartbeatSession(int p) : peer(p) {}
-  };
-  Mutex heartbeat_lock;
-  map<int, int> debug_heartbeat_drops_remaining;
-  Cond heartbeat_cond;
+
+  ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
+  std::map<int, int> debug_heartbeat_drops_remaining;
+  ceph::condition_variable heartbeat_cond;
   bool heartbeat_stop;
-  std::atomic_bool heartbeat_need_update;   
-  map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
+  std::atomic<bool> heartbeat_need_update;
+  std::map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
   utime_t last_mon_heartbeat;
   Messenger *hb_front_client_messenger;
   Messenger *hb_back_client_messenger;
@@ -1495,12 +1468,17 @@ private:
   Messenger *hb_back_server_messenger;
   utime_t last_heartbeat_resample;   ///< last time we chose random peers in waiting-for-healthy state
   double daily_loadavg;
-  
+  ceph::mono_time startup_time;
+
+  // Track ping repsonse times using vector as a circular buffer
+  // MUST BE A POWER OF 2
+  const uint32_t hb_vector_size = 16;
+
   void _add_heartbeat_peer(int p);
   void _remove_heartbeat_peer(int p);
   bool heartbeat_reset(Connection *con);
   void maybe_update_heartbeat_peers();
-  void reset_heartbeat_peers();
+  void reset_heartbeat_peers(bool all);
   bool heartbeat_peers_need_update() {
     return heartbeat_need_update.load();
   }
@@ -1516,8 +1494,8 @@ private:
   void need_heartbeat_peer_update();
 
   void heartbeat_kick() {
-    Mutex::Locker l(heartbeat_lock);
-    heartbeat_cond.Signal();
+    std::lock_guard l(heartbeat_lock);
+    heartbeat_cond.notify_all();
   }
 
   struct T_Heartbeat : public Thread {
@@ -1539,12 +1517,12 @@ public:
     bool ms_can_fast_dispatch_any() const override { return true; }
     bool ms_can_fast_dispatch(const Message *m) const override {
       switch (m->get_type()) {
-       case CEPH_MSG_PING:
-       case MSG_OSD_PING:
-          return true;
-       default:
-          return false;
-       }
+      case CEPH_MSG_PING:
+      case MSG_OSD_PING:
+       return true;
+      default:
+       return false;
+      }
     }
     void ms_fast_dispatch(Message *m) override {
       osd->heartbeat_dispatch(m);
@@ -1559,336 +1537,187 @@ public:
     bool ms_handle_refused(Connection *con) override {
       return osd->ms_handle_refused(con);
     }
-    bool ms_verify_authorizer(Connection *con, int peer_type,
-                             int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
-                             bool& isvalid, CryptoKey& session_key) override {
-      isvalid = true;
+    int ms_handle_authentication(Connection *con) override {
       return true;
     }
   } heartbeat_dispatcher;
 
 private:
   // -- waiters --
-  list<OpRequestRef> finished;
-  
-  void take_waiters(list<OpRequestRef>& ls) {
-    assert(osd_lock.is_locked());
+  std::list<OpRequestRef> finished;
+
+  void take_waiters(std::list<OpRequestRef>& ls) {
+    ceph_assert(ceph_mutex_is_locked(osd_lock));
     finished.splice(finished.end(), ls);
   }
   void do_waiters();
-  
+
   // -- op tracking --
   OpTracker op_tracker;
-  void check_ops_in_flight();
-  void test_ops(std::string command, std::string args, ostream& ss);
+  void test_ops(std::string command, std::string args, std::ostream& ss);
   friend class TestOpsSocketHook;
   TestOpsSocketHook *test_ops_hook;
-  friend struct C_CompleteSplits;
+  friend struct C_FinishSplits;
   friend struct C_OpenPGs;
 
-  // -- op queue --
-  enum class io_queue {
-    prioritized,
-    weightedpriority,
-    mclock_opclass,
-    mclock_client,
-  };
-  friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
-
-  const io_queue op_queue;
-  const unsigned int op_prio_cutoff;
+protected:
 
   /*
    * The ordered op delivery chain is:
    *
-   *   fast dispatch -> pqueue back
-   *                    pqueue front <-> to_process back
+   *   fast dispatch -> scheduler back
+   *                    scheduler front <-> to_process back
    *                                     to_process front  -> RunVis(item)
    *                                                      <- queue_front()
    *
-   * The pqueue is per-shard, and to_process is per pg_slot.  Items can be
-   * pushed back up into to_process and/or pqueue while order is preserved.
+   * The scheduler is per-shard, and to_process is per pg_slot.  Items can be
+   * pushed back up into to_process and/or scheduler while order is preserved.
    *
    * Multiple worker threads can operate on each shard.
    *
-   * Under normal circumstances, num_running == to_proces.size().  There are
+   * Under normal circumstances, num_running == to_process.size().  There are
    * two times when that is not true: (1) when waiting_for_pg == true and
    * to_process is accumulating requests that are waiting for the pg to be
    * instantiated; in that case they will all get requeued together by
    * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
    * and already requeued the items.
    */
-  friend class PGQueueable;
+  friend class ceph::osd::scheduler::PGOpItem;
+  friend class ceph::osd::scheduler::PGPeeringItem;
+  friend class ceph::osd::scheduler::PGRecovery;
+  friend class ceph::osd::scheduler::PGRecoveryMsg;
+  friend class ceph::osd::scheduler::PGDelete;
 
   class ShardedOpWQ
-    : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
+    : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
   {
-    struct ShardData {
-      Mutex sdata_lock;
-      Cond sdata_cond;
-
-      Mutex sdata_op_ordering_lock;   ///< protects all members below
-
-      OSDMapRef waiting_for_pg_osdmap;
-      struct pg_slot {
-       PGRef pg;                     ///< cached pg reference [optional]
-       list<PGQueueable> to_process; ///< order items for this slot
-       int num_running = 0;          ///< _process threads doing pg lookup/lock
-
-       /// true if pg does/did not exist. if so all new items go directly to
-       /// to_process.  cleared by prune_pg_waiters.
-       bool waiting_for_pg = false;
-
-       /// incremented by wake_pg_waiters; indicates racing _process threads
-       /// should bail out (their op has been requeued)
-       uint64_t requeue_seq = 0;
-      };
-
-      /// map of slots for each spg_t.  maintains ordering of items dequeued
-      /// from pqueue while _process thread drops shard lock to acquire the
-      /// pg lock.  slots are removed only by prune_pg_waiters.
-      unordered_map<spg_t,pg_slot> pg_slots;
-
-      /// priority queue
-      std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
-
-      void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
-       unsigned priority = item.second.get_priority();
-       unsigned cost = item.second.get_cost();
-       if (priority >= cutoff)
-         pqueue->enqueue_strict_front(
-           item.second.get_owner(),
-           priority, item);
-       else
-         pqueue->enqueue_front(
-           item.second.get_owner(),
-           priority, cost, item);
-      }
-
-      ShardData(
-       string lock_name, string ordering_lock,
-       uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
-       io_queue opqueue)
-       : sdata_lock(lock_name.c_str(), false, true, false, cct),
-         sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
-                                false, cct) {
-       if (opqueue == io_queue::weightedpriority) {
-         pqueue = std::unique_ptr
-           <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
-             new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
-               max_tok_per_prio, min_cost));
-       } else if (opqueue == io_queue::prioritized) {
-         pqueue = std::unique_ptr
-           <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
-             new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
-               max_tok_per_prio, min_cost));
-       } else if (opqueue == io_queue::mclock_opclass) {
-         pqueue = std::unique_ptr
-           <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
-       } else if (opqueue == io_queue::mclock_client) {
-         pqueue = std::unique_ptr
-           <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
-       }
-      }
-    }; // struct ShardData
-
-    vector<ShardData*> shard_list;
     OSD *osd;
-    uint32_t num_shards;
 
   public:
-    ShardedOpWQ(uint32_t pnum_shards,
-               OSD *o,
-               time_t ti,
-               time_t si,
+    ShardedOpWQ(OSD *o,
+               ceph::timespan ti,
+               ceph::timespan si,
                ShardedThreadPool* tp)
-      : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
-        osd(o),
-        num_shards(pnum_shards) {
-      for (uint32_t i = 0; i < num_shards; i++) {
-       char lock_name[32] = {0};
-       snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
-       char order_lock[32] = {0};
-       snprintf(order_lock, sizeof(order_lock), "%s.%d",
-                "OSD:ShardedOpWQ:order:", i);
-       ShardData* one_shard = new ShardData(
-         lock_name, order_lock,
-         osd->cct->_conf->osd_op_pq_max_tokens_per_priority, 
-         osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
-       shard_list.push_back(one_shard);
-      }
+      : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
+        osd(o) {
     }
-    ~ShardedOpWQ() override {
-      while (!shard_list.empty()) {
-       delete shard_list.back();
-       shard_list.pop_back();
-      }
-    }
-
-    /// wake any pg waiters after a PG is created/instantiated
-    void wake_pg_waiters(spg_t pgid);
 
-    /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
-    void prune_pg_waiters(OSDMapRef osdmap, int whoami);
-
-    /// clear cached PGRef on pg deletion
-    void clear_pg_pointer(spg_t pgid);
-
-    /// clear pg_slots on shutdown
-    void clear_pg_slots();
+    void _add_slot_waiter(
+      spg_t token,
+      OSDShardPGSlot *slot,
+      OpSchedulerItem&& qi);
 
     /// try to do some work
-    void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
+    void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
 
     /// enqueue a new item
-    void _enqueue(pair <spg_t, PGQueueable> item) override;
+    void _enqueue(OpSchedulerItem&& item) override;
 
     /// requeue an old item (at the front of the line)
-    void _enqueue_front(pair <spg_t, PGQueueable> item) override;
-      
+    void _enqueue_front(OpSchedulerItem&& item) override;
+
     void return_waiting_threads() override {
-      for(uint32_t i = 0; i < num_shards; i++) {
-       ShardData* sdata = shard_list[i];
-       assert (NULL != sdata); 
-       sdata->sdata_lock.Lock();
-       sdata->sdata_cond.Signal();
-       sdata->sdata_lock.Unlock();
+      for(uint32_t i = 0; i < osd->num_shards; i++) {
+       OSDShard* sdata = osd->shards[i];
+       assert (NULL != sdata);
+       std::scoped_lock l{sdata->sdata_wait_lock};
+       sdata->stop_waiting = true;
+       sdata->sdata_cond.notify_all();
       }
     }
 
-    void dump(Formatter *f) {
-      for(uint32_t i = 0; i < num_shards; i++) {
-       ShardData* sdata = shard_list[i];
-       char lock_name[32] = {0};
-       snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
+    void stop_return_waiting_threads() override {
+      for(uint32_t i = 0; i < osd->num_shards; i++) {
+       OSDShard* sdata = osd->shards[i];
        assert (NULL != sdata);
-       sdata->sdata_op_ordering_lock.Lock();
-       f->open_object_section(lock_name);
-       sdata->pqueue->dump(f);
-       f->close_section();
-       sdata->sdata_op_ordering_lock.Unlock();
+       std::scoped_lock l{sdata->sdata_wait_lock};
+       sdata->stop_waiting = false;
       }
     }
 
-    /// Must be called on ops queued back to front
-    struct Pred {
-      spg_t pgid;
-      list<OpRequestRef> *out_ops;
-      uint64_t reserved_pushes_to_free;
-      Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
-       : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
-      void accumulate(const PGQueueable &op) {
-       reserved_pushes_to_free += op.get_reserved_pushes();
-       if (out_ops) {
-         boost::optional<OpRequestRef> mop = op.maybe_get_op();
-         if (mop)
-           out_ops->push_front(*mop);
-       }
-      }
-      bool operator()(const pair<spg_t, PGQueueable> &op) {
-       if (op.first == pgid) {
-         accumulate(op.second);
-         return true;
-       } else {
-         return false;
-       }
-      }
-      uint64_t get_reserved_pushes_to_free() const {
-       return reserved_pushes_to_free;
+    void dump(ceph::Formatter *f) {
+      for(uint32_t i = 0; i < osd->num_shards; i++) {
+       auto &&sdata = osd->shards[i];
+
+       char queue_name[32] = {0};
+       snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
+       ceph_assert(NULL != sdata);
+
+       std::scoped_lock l{sdata->shard_lock};
+       f->open_object_section(queue_name);
+       sdata->scheduler->dump(*f);
+       f->close_section();
       }
-    };
+    }
 
     bool is_shard_empty(uint32_t thread_index) override {
-      uint32_t shard_index = thread_index % num_shards; 
-      ShardData* sdata = shard_list[shard_index];
-      assert(NULL != sdata);
-      Mutex::Locker l(sdata->sdata_op_ordering_lock);
-      return sdata->pqueue->empty();
+      uint32_t shard_index = thread_index % osd->num_shards;
+      auto &&sdata = osd->shards[shard_index];
+      ceph_assert(sdata);
+      std::lock_guard l(sdata->shard_lock);
+      if (thread_index < osd->num_shards) {
+       return sdata->scheduler->empty() && sdata->context_queue.empty();
+      } else {
+       return sdata->scheduler->empty();
+      }
+    }
+
+    void handle_oncommits(std::list<Context*>& oncommits) {
+      for (auto p : oncommits) {
+       p->complete(0);
+      }
     }
   } op_shardedwq;
 
 
-  void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
+  void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
   void dequeue_op(
     PGRef pg, OpRequestRef op,
     ThreadPool::TPHandle &handle);
 
-  // -- peering queue --
-  struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
-    list<PG*> peering_queue;
-    OSD *osd;
-    set<PG*> in_use;
-    PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::BatchWorkQueue<PG>(
-       "OSD::PeeringWQ", ti, si, tp), osd(o) {}
-
-    void _dequeue(PG *pg) override {
-      for (list<PG*>::iterator i = peering_queue.begin();
-          i != peering_queue.end();
-          ) {
-       if (*i == pg) {
-         peering_queue.erase(i++);
-         pg->put("PeeringWQ");
-       } else {
-         ++i;
-       }
-      }
-    }
-    bool _enqueue(PG *pg) override {
-      pg->get("PeeringWQ");
-      peering_queue.push_back(pg);
-      return true;
-    }
-    bool _empty() override {
-      return peering_queue.empty();
-    }
-    void _dequeue(list<PG*> *out) override;
-    void _process(
-      const list<PG *> &pgs,
-      ThreadPool::TPHandle &handle) override {
-      assert(!pgs.empty());
-      osd->process_peering_events(pgs, handle);
-      for (list<PG *>::const_iterator i = pgs.begin();
-          i != pgs.end();
-          ++i) {
-       (*i)->put("PeeringWQ");
-      }
-    }
-    void _process_finish(const list<PG *> &pgs) override {
-      for (list<PG*>::const_iterator i = pgs.begin();
-          i != pgs.end();
-          ++i) {
-       in_use.erase(*i);
-      }
-    }
-    void _clear() override {
-      assert(peering_queue.empty());
-    }
-  } peering_wq;
-
-  void process_peering_events(
-    const list<PG*> &pg,
-    ThreadPool::TPHandle &handle);
+  void enqueue_peering_evt(
+    spg_t pgid,
+    PGPeeringEventRef ref);
+  void dequeue_peering_evt(
+    OSDShard *sdata,
+    PG *pg,
+    PGPeeringEventRef ref,
+    ThreadPool::TPHandle& handle);
+
+  void dequeue_delete(
+    OSDShard *sdata,
+    PG *pg,
+    epoch_t epoch,
+    ThreadPool::TPHandle& handle);
 
   friend class PG;
+  friend struct OSDShard;
   friend class PrimaryLogPG;
+  friend class PgScrubber;
 
 
  protected:
 
   // -- osd map --
-  OSDMapRef       osdmap;
-  OSDMapRef get_osdmap() {
-    return osdmap;
+  // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
+  OSDMapRef       _osdmap;
+  void set_osdmap(OSDMapRef osdmap) {
+    std::atomic_store(&_osdmap, osdmap);
+  }
+  OSDMapRef get_osdmap() const {
+    return std::atomic_load(&_osdmap);
   }
   epoch_t get_osdmap_epoch() const {
+    // XXX: performance?
+    auto osdmap = get_osdmap();
     return osdmap ? osdmap->get_epoch() : 0;
   }
 
-  utime_t         had_map_since;
-  RWLock          map_lock;
-  list<OpRequestRef>  waiting_for_osdmap;
-  deque<utime_t> osd_markdown_log;
+  pool_pg_num_history_t pg_num_history;
+
+  ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
+  std::list<OpRequestRef>  waiting_for_osdmap;
+  std::deque<utime_t> osd_markdown_log;
 
   friend struct send_map_on_destruct;
 
@@ -1898,14 +1727,13 @@ private:
   void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
   void note_down_osd(int osd);
   void note_up_osd(int osd);
-  friend class C_OnMapCommit;
+  friend struct C_OnMapCommit;
 
   bool advance_pg(
-    epoch_t advance_to, PG *pg,
+    epoch_t advance_to,
+    PG *pg,
     ThreadPool::TPHandle &handle,
-    PG::RecoveryCtx *rctx,
-    set<PGRef> *split_pgs
-  );
+    PeeringCtx &rctx);
   void consume_map();
   void activate_map();
 
@@ -1916,74 +1744,64 @@ private:
   OSDMapRef add_map(OSDMap *o) {
     return service.add_map(o);
   }
-  void add_map_bl(epoch_t e, bufferlist& bl) {
-    return service.add_map_bl(e, bl);
-  }
-  void pin_map_bl(epoch_t e, bufferlist &bl) {
-    return service.pin_map_bl(e, bl);
-  }
-  bool get_map_bl(epoch_t e, bufferlist& bl) {
+  bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
     return service.get_map_bl(e, bl);
   }
-  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
-    return service.add_map_inc_bl(e, bl);
+
+public:
+  // -- shards --
+  std::vector<OSDShard*> shards;
+  uint32_t num_shards = 0;
+
+  void inc_num_pgs() {
+    ++num_pgs;
   }
-  void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
-    return service.pin_map_inc_bl(e, bl);
+  void dec_num_pgs() {
+    --num_pgs;
+  }
+  int get_num_pgs() const {
+    return num_pgs;
   }
 
 protected:
+  ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
+  /// merge epoch -> target pgid -> source pgid -> pg
+  std::map<epoch_t,std::map<spg_t,std::map<spg_t,PGRef>>> merge_waiters;
+
+  bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
+                       unsigned need);
+
   // -- placement groups --
-  RWLock pg_map_lock; // this lock orders *above* individual PG _locks
-  ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
+  std::atomic<size_t> num_pgs = {0};
+
+  std::mutex pending_creates_lock;
+  using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
+  std::set<create_from_osd_t> pending_creates_from_osd;
+  unsigned pending_creates_from_mon = 0;
 
-  map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;
 
-  PGPool _get_pool(int id, OSDMapRef createmap);
+  PGRef _lookup_pg(spg_t pgid);
+  PGRef _lookup_lock_pg(spg_t pgid);
+  void register_pg(PGRef pg);
+  bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
 
-  PG   *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
-  PG   *_lookup_lock_pg(spg_t pgid);
+  void _get_pgs(std::vector<PGRef> *v, bool clear_too=false);
+  void _get_pgids(std::vector<spg_t> *v);
 
 public:
-  PG   *lookup_lock_pg(spg_t pgid);
-
-protected:
-  PG   *_open_lock_pg(OSDMapRef createmap,
-                     spg_t pg, bool no_lockdep_check=false);
-  enum res_result {
-    RES_PARENT,    // resurrected a parent
-    RES_SELF,      // resurrected self
-    RES_NONE       // nothing relevant deleting
-  };
-  res_result _try_resurrect_pg(
-    OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
+  PGRef lookup_lock_pg(spg_t pgid);
 
-  PG   *_create_lock_pg(
-    OSDMapRef createmap,
-    spg_t pgid,
-    bool hold_map_lock,
-    bool backfill,
-    int role,
-    vector<int>& up, int up_primary,
-    vector<int>& acting, int acting_primary,
-    pg_history_t history,
-    const PastIntervals& pi,
-    ObjectStore::Transaction& t);
+  std::set<int64_t> get_mapped_pools();
 
+protected:
   PG* _make_pg(OSDMapRef createmap, spg_t pgid);
-  void add_newly_split_pg(PG *pg,
-                         PG::RecoveryCtx *rctx);
 
-  int handle_pg_peering_evt(
-    spg_t pgid,
-    const pg_history_t& orig_history,
-    const PastIntervals& pi,
-    epoch_t epoch,
-    PG::CephPeeringEvtRef evt);
-  
+  bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
+                            spg_t pgid, bool is_mon_create);
+  void resume_creating_pg();
+
   void load_pgs();
-  void build_past_intervals_parallel();
 
   /// build initial pg history and intervals on create
   void build_initial_pg_history(
@@ -1993,59 +1811,38 @@ protected:
     pg_history_t *h,
     PastIntervals *pi);
 
-  /// project pg history from from to now
-  bool project_pg_history(
-    spg_t pgid, pg_history_t& h, epoch_t from,
-    const vector<int>& lastup,
-    int lastupprimary,
-    const vector<int>& lastacting,
-    int lastactingprimary
-    ); ///< @return false if there was a map gap between from and now
-
-  // this must be called with pg->lock held on any pg addition to pg_map
-  void wake_pg_waiters(PGRef pg) {
-    assert(pg->is_locked());
-    op_shardedwq.wake_pg_waiters(pg->info.pgid);
-  }
   epoch_t last_pg_create_epoch;
 
   void handle_pg_create(OpRequestRef op);
 
   void split_pgs(
     PG *parent,
-    const set<spg_t> &childpgids, set<PGRef> *out_pgs,
+    const std::set<spg_t> &childpgids, std::set<PGRef> *out_pgs,
     OSDMapRef curmap,
     OSDMapRef nextmap,
-    PG::RecoveryCtx *rctx);
+    PeeringCtx &rctx);
+  void _finish_splits(std::set<PGRef>& pgs);
 
   // == monitor interaction ==
-  Mutex mon_report_lock;
+  ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
   utime_t last_mon_report;
-  utime_t last_pg_stats_sent;
-
-  /* if our monitor dies, we want to notice it and reconnect.
-   *  So we keep track of when it last acked our stat updates,
-   *  and if too much time passes (and we've been sending
-   *  more updates) then we can call it dead and reconnect
-   *  elsewhere.
-   */
-  utime_t last_pg_stats_ack;
-  float stats_ack_timeout;
-  set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
+  Finisher boot_finisher;
 
   // -- boot --
   void start_boot();
   void _got_mon_epochs(epoch_t oldest, epoch_t newest);
   void _preboot(epoch_t oldest, epoch_t newest);
   void _send_boot();
-  void _collect_metadata(map<string,string> *pmeta);
+  void _collect_metadata(std::map<std::string,std::string> *pmeta);
+  void _get_purged_snaps();
+  void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
 
   void start_waiting_for_healthy();
   bool _is_healthy();
 
   void send_full_update();
-  
-  friend struct C_OSD_GetVersion;
+
+  friend struct CB_OSD_GetVersion;
 
   // -- alive --
   epoch_t up_thru_wanted;
@@ -2067,79 +1864,28 @@ protected:
   void got_full_map(epoch_t e);
 
   // -- failures --
-  map<int,utime_t> failure_queue;
-  map<int,pair<utime_t,entity_inst_t> > failure_pending;
+  std::map<int,utime_t> failure_queue;
+  std::map<int,std::pair<utime_t,entity_addrvec_t> > failure_pending;
 
   void requeue_failures();
   void send_failures();
-  void send_still_alive(epoch_t epoch, const entity_inst_t &i);
-
-  // -- pg stats --
-  Mutex pg_stat_queue_lock;
-  Cond pg_stat_queue_cond;
-  xlist<PG*> pg_stat_queue;
-  bool osd_stat_updated;
-  uint64_t pg_stat_tid, pg_stat_tid_flushed;
-
-  void send_pg_stats(const utime_t &now);
-  void handle_pg_stats_ack(class MPGStatsAck *ack);
-  void flush_pg_stats();
+  void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
+  void cancel_pending_failures();
 
   ceph::coarse_mono_clock::time_point last_sent_beacon;
-  Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+  ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
   epoch_t min_last_epoch_clean = 0;
   // which pgs were scanned for min_lec
   std::vector<pg_t> min_last_epoch_clean_pgs;
   void send_beacon(const ceph::coarse_mono_clock::time_point& now);
 
-  void pg_stat_queue_enqueue(PG *pg) {
-    pg_stat_queue_lock.Lock();
-    if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
-      pg->get("pg_stat_queue");
-      pg_stat_queue.push_back(&pg->stat_queue_item);
-    }
-    osd_stat_updated = true;
-    pg_stat_queue_lock.Unlock();
-  }
-  void pg_stat_queue_dequeue(PG *pg) {
-    pg_stat_queue_lock.Lock();
-    if (pg->stat_queue_item.remove_myself())
-      pg->put("pg_stat_queue");
-    pg_stat_queue_lock.Unlock();
-  }
-  void clear_pg_stat_queue() {
-    pg_stat_queue_lock.Lock();
-    while (!pg_stat_queue.empty()) {
-      PG *pg = pg_stat_queue.front();
-      pg_stat_queue.pop_front();
-      pg->put("pg_stat_queue");
-    }
-    pg_stat_queue_lock.Unlock();
-  }
-  void clear_outstanding_pg_stats(){
-    Mutex::Locker l(pg_stat_queue_lock);
-    outstanding_pg_stats.clear();
-  }
-
   ceph_tid_t get_tid() {
     return service.get_tid();
   }
 
   // -- generic pg peering --
-  PG::RecoveryCtx create_context();
-  void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+  void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
                         ThreadPool::TPHandle *handle = NULL);
-  void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
-                                    ThreadPool::TPHandle *handle = NULL);
-  void do_notifies(map<int,
-                      vector<pair<pg_notify_t, PastIntervals> > >&
-                      notify_list,
-                  OSDMapRef map);
-  void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
-                 OSDMapRef map);
-  void do_infos(map<int,
-                   vector<pair<pg_notify_t, PastIntervals> > >& info_map,
-               OSDMapRef map);
 
   bool require_mon_peer(const Message *m);
   bool require_mon_or_mgr_peer(const Message *m);
@@ -2151,83 +1897,32 @@ protected:
   bool require_self_aliveness(const Message *m, epoch_t alive_since);
   /**
    * Verifies that the OSD who sent the given op has the same
-   * address as in the given map.
+   * address as in the given std::map.
    * @pre op was sent by an OSD using the cluster messenger
    */
-  bool require_same_peer_instance(const Message *m, OSDMapRef& map,
+  bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
                                  bool is_fast_dispatch);
 
   bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
                                 bool is_fast_dispatch);
 
-  void handle_pg_query(OpRequestRef op);
-  void handle_pg_notify(OpRequestRef op);
-  void handle_pg_log(OpRequestRef op);
-  void handle_pg_info(OpRequestRef op);
-  void handle_pg_trim(OpRequestRef op);
+  void handle_fast_pg_create(MOSDPGCreate2 *m);
+  void handle_pg_query_nopg(const MQuery& q);
+  void handle_fast_pg_notify(MOSDPGNotify *m);
+  void handle_pg_notify_nopg(const MNotifyRec& q);
+  void handle_fast_pg_info(MOSDPGInfo *m);
+  void handle_fast_pg_remove(MOSDPGRemove *m);
 
-  void handle_pg_backfill_reserve(OpRequestRef op);
-  void handle_pg_recovery_reserve(OpRequestRef op);
+public:
+  // used by OSDShard
+  PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
+protected:
 
-  void handle_pg_remove(OpRequestRef op);
-  void _remove_pg(PG *pg);
+  void handle_fast_force_recovery(MOSDForceRecovery *m);
 
   // -- commands --
-  struct Command {
-    vector<string> cmd;
-    ceph_tid_t tid;
-    bufferlist indata;
-    ConnectionRef con;
-
-    Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
-      : cmd(c), tid(t), indata(bl), con(co) {}
-  };
-  list<Command*> command_queue;
-  struct CommandWQ : public ThreadPool::WorkQueue<Command> {
-    OSD *osd;
-    CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
-
-    bool _empty() override {
-      return osd->command_queue.empty();
-    }
-    bool _enqueue(Command *c) override {
-      osd->command_queue.push_back(c);
-      return true;
-    }
-    void _dequeue(Command *pg) override {
-      ceph_abort();
-    }
-    Command *_dequeue() override {
-      if (osd->command_queue.empty())
-       return NULL;
-      Command *c = osd->command_queue.front();
-      osd->command_queue.pop_front();
-      return c;
-    }
-    void _process(Command *c, ThreadPool::TPHandle &) override {
-      osd->osd_lock.Lock();
-      if (osd->is_stopping()) {
-       osd->osd_lock.Unlock();
-       delete c;
-       return;
-      }
-      osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
-      osd->osd_lock.Unlock();
-      delete c;
-    }
-    void _clear() override {
-      while (!osd->command_queue.empty()) {
-       Command *c = osd->command_queue.front();
-       osd->command_queue.pop_front();
-       delete c;
-      }
-    }
-  } command_wq;
-
-  void handle_command(class MMonCommand *m);
   void handle_command(class MCommand *m);
-  void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
+
 
   // -- pg recovery --
   void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
@@ -2236,55 +1931,37 @@ protected:
 
   // -- scrubbing --
   void sched_scrub();
+  void resched_all_scrubs();
   bool scrub_random_backoff();
-  bool scrub_load_below_threshold();
-  bool scrub_time_permit(utime_t now);
-
-  // -- removing --
-  struct RemoveWQ :
-    public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
-    CephContext* cct;
-    ObjectStore *&store;
-    list<pair<PGRef, DeletingStateRef> > remove_queue;
-    RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
-            ThreadPool *tp)
-      : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
-       "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
-
-    bool _empty() override {
-      return remove_queue.empty();
-    }
-    void _enqueue(pair<PGRef, DeletingStateRef> item) override {
-      remove_queue.push_back(item);
-    }
-    void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
-      remove_queue.push_front(item);
-    }
-    bool _dequeue(pair<PGRef, DeletingStateRef> item) {
-      ceph_abort();
-    }
-    pair<PGRef, DeletingStateRef> _dequeue() override {
-      assert(!remove_queue.empty());
-      pair<PGRef, DeletingStateRef> item = remove_queue.front();
-      remove_queue.pop_front();
-      return item;
-    }
-    void _process(pair<PGRef, DeletingStateRef>,
-                 ThreadPool::TPHandle &) override;
-    void _clear() override {
-      remove_queue.clear();
-    }
-  } remove_wq;
+
+  // -- status reporting --
+  MPGStats *collect_pg_stats();
+  std::vector<DaemonHealthMetric> get_health_metrics();
+
 
 private:
   bool ms_can_fast_dispatch_any() const override { return true; }
   bool ms_can_fast_dispatch(const Message *m) const override {
     switch (m->get_type()) {
+    case CEPH_MSG_PING:
     case CEPH_MSG_OSD_OP:
     case CEPH_MSG_OSD_BACKOFF:
-    case MSG_OSD_SUBOP:
+    case MSG_OSD_SCRUB2:
+    case MSG_OSD_FORCE_RECOVERY:
+    case MSG_MON_COMMAND:
+    case MSG_OSD_PG_CREATE2:
+    case MSG_OSD_PG_QUERY:
+    case MSG_OSD_PG_QUERY2:
+    case MSG_OSD_PG_INFO:
+    case MSG_OSD_PG_INFO2:
+    case MSG_OSD_PG_NOTIFY:
+    case MSG_OSD_PG_NOTIFY2:
+    case MSG_OSD_PG_LOG:
+    case MSG_OSD_PG_TRIM:
+    case MSG_OSD_PG_REMOVE:
+    case MSG_OSD_BACKFILL_RESERVE:
+    case MSG_OSD_RECOVERY_RESERVE:
     case MSG_OSD_REPOP:
-    case MSG_OSD_SUBOPREPLY:
     case MSG_OSD_REPOPREPLY:
     case MSG_OSD_PG_PUSH:
     case MSG_OSD_PG_PULL:
@@ -2301,61 +1978,30 @@ private:
     case MSG_OSD_REP_SCRUBMAP:
     case MSG_OSD_PG_UPDATE_LOG_MISSING:
     case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+    case MSG_OSD_PG_RECOVERY_DELETE:
+    case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    case MSG_OSD_PG_LEASE:
+    case MSG_OSD_PG_LEASE_ACK:
       return true;
     default:
       return false;
     }
   }
   void ms_fast_dispatch(Message *m) override;
-  void ms_fast_preprocess(Message *m) override;
   bool ms_dispatch(Message *m) override;
-  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
-  bool ms_verify_authorizer(Connection *con, int peer_type,
-                           int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
-                           bool& isvalid, CryptoKey& session_key) override;
   void ms_handle_connect(Connection *con) override;
   void ms_handle_fast_connect(Connection *con) override;
   void ms_handle_fast_accept(Connection *con) override;
+  int ms_handle_authentication(Connection *con) override;
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override;
 
-  io_queue get_io_queue() const {
-    if (cct->_conf->osd_op_queue == "debug_random") {
-      static io_queue index_lookup[] = { io_queue::prioritized,
-                                        io_queue::weightedpriority,
-                                        io_queue::mclock_opclass,
-                                        io_queue::mclock_client };
-      srand(time(NULL));
-      unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
-      return index_lookup[which];
-    } else if (cct->_conf->osd_op_queue == "wpq") {
-      return io_queue::weightedpriority;
-    } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
-      return io_queue::mclock_opclass;
-    } else if (cct->_conf->osd_op_queue == "mclock_client") {
-      return io_queue::mclock_client;
-    } else {
-      return io_queue::prioritized;
-    }
-  }
-
-  unsigned int get_io_prio_cut() const {
-    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
-      srand(time(NULL));
-      return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
-    } else if (cct->_conf->osd_op_queue_cut_off == "low") {
-      return CEPH_MSG_PRIO_LOW;
-    } else {
-      return CEPH_MSG_PRIO_HIGH;
-    }
-  }
-
  public:
   /* internal and external can point to the same messenger, they will still
    * be cleaned up properly*/
   OSD(CephContext *cct_,
-      ObjectStore *store_,
+      std::unique_ptr<ObjectStore> store_,
       int id,
       Messenger *internal,
       Messenger *external,
@@ -2364,16 +2010,20 @@ private:
       Messenger *hb_front_server,
       Messenger *hb_back_server,
       Messenger *osdc_messenger,
-      MonClient *mc, const std::string &dev, const std::string &jdev);
+      MonClient *mc, const std::string &dev, const std::string &jdev,
+      ceph::async::io_context_pool& poolctx);
   ~OSD() override;
 
   // static bits
-  static int mkfs(CephContext *cct, ObjectStore *store,
-                 const string& dev,
-                 uuid_d fsid, int whoami);
-  /* remove any non-user xattrs from a map of them */
-  void filter_xattrs(map<string, bufferptr>& attrs) {
-    for (map<string, bufferptr>::iterator iter = attrs.begin();
+  static int mkfs(CephContext *cct,
+                 std::unique_ptr<ObjectStore> store,
+                 uuid_d fsid,
+                 int whoami,
+                 std::string osdspec_affinity);
+
+  /* remove any non-user xattrs from a std::map of them */
+  void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
+    for (std::map<std::string, ceph::buffer::ptr>::iterator iter = attrs.begin();
         iter != attrs.end();
         ) {
       if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
@@ -2383,26 +2033,49 @@ private:
   }
 
 private:
-  int mon_cmd_maybe_osd_create(string &cmd);
+  int mon_cmd_maybe_osd_create(std::string &cmd);
   int update_crush_device_class();
   int update_crush_location();
 
-  static int write_meta(ObjectStore *store,
-                       uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
+  static int write_meta(CephContext *cct,
+                       ObjectStore *store,
+                       uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
 
-  void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
-  void handle_scrub(struct MOSDScrub *m);
+  void handle_scrub(class MOSDScrub *m);
+  void handle_fast_scrub(class MOSDScrub2 *m);
   void handle_osd_ping(class MOSDPing *m);
 
-  int init_op_flags(OpRequestRef& op);
-
+  size_t get_num_cache_shards();
   int get_num_op_shards();
   int get_num_op_threads();
 
+  float get_osd_recovery_sleep();
+  float get_osd_delete_sleep();
+  float get_osd_snap_trim_sleep();
+
+  int get_recovery_max_active();
+  void maybe_override_max_osd_capacity_for_qos();
+  bool maybe_override_options_for_qos();
+  int run_osd_bench_test(int64_t count,
+                         int64_t bsize,
+                         int64_t osize,
+                         int64_t onum,
+                         double *elapsed,
+                         std::ostream& ss);
+  int mon_cmd_set_config(const std::string &key, const std::string &val);
+  bool unsupported_objstore_for_qos();
+
+  void scrub_purged_snaps();
+  void probe_smart(const std::string& devid, std::ostream& ss);
+
 public:
-  static int peek_meta(ObjectStore *store, string& magic,
-                      uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
-  
+  static int peek_meta(ObjectStore *store,
+                      std::string *magic,
+                      uuid_d *cluster_fsid,
+                      uuid_d *osd_fsid,
+                      int *whoami,
+                      ceph_release_t *min_osd_release);
+
 
   // startup/shutdown
   int pre_init();
@@ -2410,6 +2083,7 @@ public:
   void final_init();
 
   int enable_disable_fuse(bool stop);
+  int set_numa_affinity();
 
   void suicide(int exitcode);
   int shutdown();
@@ -2422,10 +2096,15 @@ public:
 public:
   OSDService service;
   friend class OSDService;
-};
 
+private:
+  void set_perf_queries(const ConfigPayload &config_payload);
+  MetricPayload get_perf_reports();
 
-std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+  ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
+  std::list<OSDPerfMetricQuery> m_perf_queries;
+  std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
+};
 
 
 //compatibility of the executable