]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / OSD.h
index c87deeb0eecb0b2b266763ea3e25900b9427d743..59f84ed0bf3c7cfb2abaab7b6a246f3f2b4afd2d 100644 (file)
 
 #include "msg/Dispatcher.h"
 
-#include "common/Mutex.h"
-#include "common/RWLock.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "common/AsyncReserver.h"
 #include "common/ceph_context.h"
 #include "common/config_cacher.h"
 #include "common/zipkin_trace.h"
+#include "common/ceph_timer.h"
 
 #include "mgr/MgrClient.h"
 
 #include "osd/ClassHandler.h"
 
 #include "include/CompatSet.h"
+#include "include/common_fwd.h"
 
 #include "OpRequest.h"
 #include "Session.h"
 
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpScheduler.h"
 
 #include <atomic>
 #include <map>
 #include "common/sharedptr_registry.hpp"
 #include "common/WeightedPriorityQueue.h"
 #include "common/PrioritizedQueue.h"
-#include "osd/mClockOpClassQueue.h"
-#include "osd/mClockClientQueue.h"
 #include "messages/MOSDOp.h"
 #include "common/EventTrace.h"
+#include "osd/osd_perf_counters.h"
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
 
   */
 
-enum {
-  l_osd_first = 10000,
-  l_osd_op_wip,
-  l_osd_op,
-  l_osd_op_inb,
-  l_osd_op_outb,
-  l_osd_op_lat,
-  l_osd_op_process_lat,
-  l_osd_op_prepare_lat,
-  l_osd_op_r,
-  l_osd_op_r_outb,
-  l_osd_op_r_lat,
-  l_osd_op_r_lat_outb_hist,
-  l_osd_op_r_process_lat,
-  l_osd_op_r_prepare_lat,
-  l_osd_op_w,
-  l_osd_op_w_inb,
-  l_osd_op_w_lat,
-  l_osd_op_w_lat_inb_hist,
-  l_osd_op_w_process_lat,
-  l_osd_op_w_prepare_lat,
-  l_osd_op_rw,
-  l_osd_op_rw_inb,
-  l_osd_op_rw_outb,
-  l_osd_op_rw_lat,
-  l_osd_op_rw_lat_inb_hist,
-  l_osd_op_rw_lat_outb_hist,
-  l_osd_op_rw_process_lat,
-  l_osd_op_rw_prepare_lat,
-
-  l_osd_op_before_queue_op_lat,
-  l_osd_op_before_dequeue_op_lat,
-
-  l_osd_sop,
-  l_osd_sop_inb,
-  l_osd_sop_lat,
-  l_osd_sop_w,
-  l_osd_sop_w_inb,
-  l_osd_sop_w_lat,
-  l_osd_sop_pull,
-  l_osd_sop_pull_lat,
-  l_osd_sop_push,
-  l_osd_sop_push_inb,
-  l_osd_sop_push_lat,
-
-  l_osd_pull,
-  l_osd_push,
-  l_osd_push_outb,
-
-  l_osd_rop,
-  l_osd_rbytes,
-
-  l_osd_loadavg,
-  l_osd_cached_crc,
-  l_osd_cached_crc_adjusted,
-  l_osd_missed_crc,
-
-  l_osd_pg,
-  l_osd_pg_primary,
-  l_osd_pg_replica,
-  l_osd_pg_stray,
-  l_osd_pg_removing,
-  l_osd_hb_to,
-  l_osd_map,
-  l_osd_mape,
-  l_osd_mape_dup,
-
-  l_osd_waiting_for_map,
-
-  l_osd_map_cache_hit,
-  l_osd_map_cache_miss,
-  l_osd_map_cache_miss_low,
-  l_osd_map_cache_miss_low_avg,
-  l_osd_map_bl_cache_hit,
-  l_osd_map_bl_cache_miss,
-
-  l_osd_stat_bytes,
-  l_osd_stat_bytes_used,
-  l_osd_stat_bytes_avail,
-
-  l_osd_copyfrom,
-
-  l_osd_tier_promote,
-  l_osd_tier_flush,
-  l_osd_tier_flush_fail,
-  l_osd_tier_try_flush,
-  l_osd_tier_try_flush_fail,
-  l_osd_tier_evict,
-  l_osd_tier_whiteout,
-  l_osd_tier_dirty,
-  l_osd_tier_clean,
-  l_osd_tier_delay,
-  l_osd_tier_proxy_read,
-  l_osd_tier_proxy_write,
-
-  l_osd_agent_wake,
-  l_osd_agent_skip,
-  l_osd_agent_flush,
-  l_osd_agent_evict,
-
-  l_osd_object_ctx_cache_hit,
-  l_osd_object_ctx_cache_total,
-
-  l_osd_op_cache_hit,
-  l_osd_tier_flush_lat,
-  l_osd_tier_promote_lat,
-  l_osd_tier_r_lat,
-
-  l_osd_pg_info,
-  l_osd_pg_fastinfo,
-  l_osd_pg_biginfo,
-
-  l_osd_last,
-};
-
-// RecoveryState perf counters
-enum {
-  rs_first = 20000,
-  rs_initial_latency,
-  rs_started_latency,
-  rs_reset_latency,
-  rs_start_latency,
-  rs_primary_latency,
-  rs_peering_latency,
-  rs_backfilling_latency,
-  rs_waitremotebackfillreserved_latency,
-  rs_waitlocalbackfillreserved_latency,
-  rs_notbackfilling_latency,
-  rs_repnotrecovering_latency,
-  rs_repwaitrecoveryreserved_latency,
-  rs_repwaitbackfillreserved_latency,
-  rs_reprecovering_latency,
-  rs_activating_latency,
-  rs_waitlocalrecoveryreserved_latency,
-  rs_waitremoterecoveryreserved_latency,
-  rs_recovering_latency,
-  rs_recovered_latency,
-  rs_clean_latency,
-  rs_active_latency,
-  rs_replicaactive_latency,
-  rs_stray_latency,
-  rs_getinfo_latency,
-  rs_getlog_latency,
-  rs_waitactingchange_latency,
-  rs_incomplete_latency,
-  rs_down_latency,
-  rs_getmissing_latency,
-  rs_waitupthru_latency,
-  rs_notrecovering_latency,
-  rs_last,
-};
-
 class Messenger;
 class Message;
 class MonClient;
-class PerfCounters;
 class ObjectStore;
 class FuseStore;
 class OSDMap;
@@ -243,7 +89,6 @@ class TestOpsSocketHook;
 struct C_FinishSplits;
 struct C_OpenPGs;
 class LogChannel;
-class CephContext;
 class MOSDOp;
 
 class MOSDPGCreate2;
@@ -252,10 +97,12 @@ class MOSDPGNotify;
 class MOSDPGInfo;
 class MOSDPGRemove;
 class MOSDForceRecovery;
+class MMonGetPurgedSnapsReply;
 
 class OSD;
 
 class OSDService {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
 public:
   OSD *osd;
   CephContext *cct;
@@ -272,13 +119,12 @@ public:
   PerfCounters *&logger;
   PerfCounters *&recoverystate_perf;
   MonClient   *&monc;
-  ClassHandler  *&class_handler;
 
   md_config_cacher_t<Option::size_t> osd_max_object_size;
   md_config_cacher_t<bool> osd_skip_data_digest;
 
-  void enqueue_back(OpQueueItem&& qi);
-  void enqueue_front(OpQueueItem&& qi);
+  void enqueue_back(OpSchedulerItem&& qi);
+  void enqueue_front(OpSchedulerItem&& qi);
 
   void maybe_inject_dispatch_delay() {
     if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
@@ -291,6 +137,8 @@ public:
     }
   }
 
+  ceph::signedspan get_mnow();
+
 private:
   // -- superblock --
   ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
@@ -343,6 +191,7 @@ public:
 private:
   OSDMapRef next_osdmap;
   ceph::condition_variable pre_publish_cond;
+  int pre_publish_waiter = 0;
 
 public:
   void pre_publish_map(OSDMapRef map) {
@@ -375,17 +224,21 @@ public:
     if (--(i->second) == 0) {
       map_reservations.erase(i);
     }
-    pre_publish_cond.notify_all();
+    if (pre_publish_waiter) {
+      pre_publish_cond.notify_all();
+    }
   }
   /// blocks until there are no reserved maps prior to next_osdmap
   void await_reserved_maps() {
     std::unique_lock l{pre_publish_lock};
     ceph_assert(next_osdmap);
+    pre_publish_waiter++;
     pre_publish_cond.wait(l, [this] {
       auto i = map_reservations.cbegin();
       return (i == map_reservations.cend() ||
              i->first >= next_osdmap->get_epoch());
     });
+    pre_publish_waiter--;
   }
   OSDMapRef get_next_osdmap() {
     std::lock_guard l(pre_publish_lock);
@@ -394,37 +247,26 @@ public:
     return next_osdmap;
   }
 
-private:
-  Mutex peer_map_epoch_lock;
-  map<int, epoch_t> peer_map_epoch;
-public:
-  epoch_t get_peer_epoch(int p);
-  epoch_t note_peer_epoch(int p, epoch_t e);
-  void forget_peer_epoch(int p, epoch_t e);
+  void maybe_share_map(Connection *con,
+                      const OSDMapRef& osdmap,
+                      epoch_t peer_epoch_lb=0);
 
   void send_map(class MOSDMap *m, Connection *con);
-  void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+  void send_incremental_map(epoch_t since, Connection *con,
+                           const OSDMapRef& osdmap);
   MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
                                        OSDSuperblock& superblock);
-  bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
-                        const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
-  void share_map(entity_name_t name, Connection *con, epoch_t epoch,
-                 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
-  void share_map_peer(int peer, Connection *con,
-                      OSDMapRef map = OSDMapRef());
 
   ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
   pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
   void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+  void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
   void send_message_osd_cluster(Message *m, Connection *con) {
     con->send_message(m);
   }
   void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
     con->send_message(m);
   }
-  void send_message_osd_client(Message *m, Connection *con) {
-    con->send_message(m);
-  }
   void send_message_osd_client(Message *m, const ConnectionRef& con) {
     con->send_message(m);
   }
@@ -432,7 +274,7 @@ public:
 
 private:
   // -- scrub scheduling --
-  Mutex sched_scrub_lock;
+  ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
   int scrubs_local;
   int scrubs_remote;
 
@@ -517,14 +359,15 @@ public:
   void dump_scrub_reservations(Formatter *f);
 
   void reply_op_error(OpRequestRef op, int err);
-  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
+                     vector<pg_log_op_return_item_t> op_returns);
   void handle_misdirected_op(PG *pg, OpRequestRef op);
 
 
 private:
   // -- agent shared state --
-  Mutex agent_lock;
-  Cond agent_cond;
+  ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
+  ceph::condition_variable agent_cond;
   map<uint64_t, set<PGRef> > agent_queue;
   set<PGRef>::iterator agent_queue_pos;
   bool agent_valid_iterator;
@@ -541,7 +384,7 @@ private:
     }
   } agent_thread;
   bool agent_stop_flag;
-  Mutex agent_timer_lock;
+  ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
   SafeTimer agent_timer;
 
 public:
@@ -554,7 +397,7 @@ public:
       agent_valid_iterator = false;  // inserting higher-priority queue
     set<PGRef>& nq = agent_queue[priority];
     if (nq.empty())
-      agent_cond.Signal();
+      agent_cond.notify_all();
     nq.insert(pg);
   }
 
@@ -603,7 +446,7 @@ public:
     std::lock_guard l(agent_lock);
     ceph_assert(agent_ops > 0);
     --agent_ops;
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
 
   /// note start of an async (flush) op
@@ -621,7 +464,7 @@ public:
     --agent_ops;
     ceph_assert(agent_oids.count(oid) == 1);
     agent_oids.erase(oid);
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
 
   /// check if we are operating on an object
@@ -671,14 +514,20 @@ public:
     promote_counter.finish(bytes);
   }
   void promote_throttle_recalibrate();
+  unsigned get_num_shards() const {
+    return m_objecter_finishers;
+  }
+  Finisher* get_objecter_finisher(int shard) {
+    return objecter_finishers[shard].get();
+  }
 
   // -- Objecter, for tiering reads/writes from/to other OSDs --
-  Objecter *objecter;
+  std::unique_ptr<Objecter> objecter;
   int m_objecter_finishers;
-  vector<Finisher*> objecter_finishers;
+  std::vector<std::unique_ptr<Finisher>> objecter_finishers;
 
   // -- Watch --
-  Mutex watch_lock;
+  ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
   SafeTimer watch_timer;
   uint64_t next_notif_id;
   uint64_t get_next_id(epoch_t cur_epoch) {
@@ -687,15 +536,15 @@ public:
   }
 
   // -- Recovery/Backfill Request Scheduling --
-  Mutex recovery_request_lock;
+  ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
   SafeTimer recovery_request_timer;
 
   // For async recovery sleep
   bool recovery_needs_sleep = true;
-  utime_t recovery_schedule_time = utime_t();
+  ceph::real_clock::time_point recovery_schedule_time;
 
   // For recovery & scrub & snap
-  Mutex sleep_lock;
+  ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
   SafeTimer sleep_timer;
 
   // -- tids --
@@ -711,7 +560,7 @@ public:
   AsyncReserver<spg_t> remote_reserver;
 
   // -- pg merge --
-  Mutex merge_lock = {"OSD::merge_lock"};
+  ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
   map<pg_t,eversion_t> ready_to_merge_source;   // pg -> version
   map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target;  // pg -> (version,les,lec)
   set<pg_t> not_ready_to_merge_source;
@@ -730,11 +579,11 @@ public:
   void send_ready_to_merge();
   void _send_ready_to_merge();
   void clear_sent_ready_to_merge();
-  void prune_sent_ready_to_merge(OSDMapRef& osdmap);
+  void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
 
   // -- pg_temp --
 private:
-  Mutex pg_temp_lock;
+  ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
   struct pg_temp_t {
     vector<int> acting;
     bool forced = false;
@@ -765,7 +614,7 @@ public:
 
 private:
   // -- pg recovery and associated throttling --
-  Mutex recovery_lock;
+  ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
   list<pair<epoch_t, PGRef> > awaiting_throttle;
 
   utime_t defer_recovery_until;
@@ -812,6 +661,9 @@ public:
        return awaiting.second.get() == pg;
       });
   }
+
+  unsigned get_target_pg_log_entries() const;
+  
   // delayed pg activation
   void queue_for_recovery(PG *pg) {
     std::lock_guard l(recovery_lock);
@@ -828,15 +680,16 @@ public:
     _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
   }
 
+  void queue_check_readable(spg_t spgid,
+                           epoch_t lpr,
+                           ceph::signedspan delay = ceph::signedspan::zero());
+
   // osd map cache (past osd maps)
-  Mutex map_cache_lock;
+  ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
   SharedLRU<epoch_t, const OSDMap> map_cache;
   SimpleLRU<epoch_t, bufferlist> map_bl_cache;
   SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
 
-  /// final pg_num values for recently deleted pools
-  map<int64_t,int> deleted_pool_pg_nums;
-
   OSDMapRef try_get_map(epoch_t e);
   OSDMapRef get_map(epoch_t e) {
     OSDMapRef ret(try_get_map(e));
@@ -849,10 +702,6 @@ public:
   }
   OSDMapRef _add_map(OSDMap *o);
 
-  void add_map_bl(epoch_t e, bufferlist& bl) {
-    std::lock_guard l(map_cache_lock);
-    return _add_map_bl(e, bl);
-  }
   void _add_map_bl(epoch_t e, bufferlist& bl);
   bool get_map_bl(epoch_t e, bufferlist& bl) {
     std::lock_guard l(map_cache_lock);
@@ -860,30 +709,9 @@ public:
   }
   bool _get_map_bl(epoch_t e, bufferlist& bl);
 
-  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
-    std::lock_guard l(map_cache_lock);
-    return _add_map_inc_bl(e, bl);
-  }
   void _add_map_inc_bl(epoch_t e, bufferlist& bl);
   bool get_inc_map_bl(epoch_t e, bufferlist& bl);
 
-  /// get last pg_num before a pool was deleted (if any)
-  int get_deleted_pool_pg_num(int64_t pool);
-
-  void store_deleted_pool_pg_num(int64_t pool, int pg_num) {
-    std::lock_guard l(map_cache_lock);
-    deleted_pool_pg_nums[pool] = pg_num;
-  }
-
-  /// get pgnum from newmap or, if pool was deleted, last map pool existed in
-  int get_possibly_deleted_pool_pg_num(OSDMapRef newmap,
-                                      int64_t pool) {
-    if (newmap->have_pg_pool(pool)) {
-      return newmap->get_pg_num(pool);
-    }
-    return get_deleted_pool_pg_num(pool);
-  }
-
   /// identify split child pgids over a osdmap interval
   void identify_splits_and_merges(
     OSDMapRef old_map,
@@ -901,7 +729,7 @@ public:
   void shutdown();
 
   // -- stats --
-  Mutex stat_lock;
+  ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
   osd_stat_t osd_stat;
   uint32_t seq = 0;
 
@@ -931,7 +759,7 @@ public:
   // -- OSD Full Status --
 private:
   friend TestOpsSocketHook;
-  mutable Mutex full_status_lock;
+  mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
   enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state;  // ascending
   const char *get_full_state_name(s_names s) const {
     switch (s) {
@@ -978,12 +806,12 @@ public:
   bool is_nearfull() const;
   bool need_fullness_update();  ///< osdmap state needs update
   void set_injectfull(s_names type, int64_t count);
-  bool check_osdmap_full(const set<pg_shard_t> &missing_on);
 
 
   // -- epochs --
 private:
-  mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+  // protects access to boot_epoch, up_epoch, bind_epoch
+  mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
   epoch_t boot_epoch;  // _first_ epoch we were marked up (after this process started)
   epoch_t up_epoch;    // _most_recent_ epoch we were marked up
   epoch_t bind_epoch;  // epoch we last did a bind to new ip:ports
@@ -1017,9 +845,24 @@ public:
 
   void request_osdmap_update(epoch_t e);
 
+  // -- heartbeats --
+  ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
+
+  /// osd -> heartbeat stamps
+  vector<HeartbeatStampsRef> hb_stamps;
+
+  /// get or create a ref for a peer's HeartbeatStamps
+  HeartbeatStampsRef get_hb_stamps(unsigned osd);
+
+
+  // Timer for readable leases
+  ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
+
+  void queue_renew_lease(epoch_t epoch, spg_t spgid);
+
   // -- stopping --
-  Mutex is_stopping_lock;
-  Cond is_stopping_cond;
+  ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
+  ceph::condition_variable is_stopping_cond;
   enum {
     NOT_STOPPING,
     PREPARING_TO_STOP,
@@ -1042,7 +885,7 @@ public:
 
 
 #ifdef PG_DEBUG_REFS
-  Mutex pgid_lock;
+  ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
   map<spg_t, int> pgid_tracker;
   map<spg_t, PG*> live_pgs;
   void add_pgid(spg_t pgid, PG *pg);
@@ -1051,18 +894,9 @@ public:
 #endif
 
   explicit OSDService(OSD *osd);
-  ~OSDService();
+  ~OSDService() = default;
 };
 
-
-enum class io_queue {
-  prioritized,
-  weightedpriority,
-  mclock_opclass,
-  mclock_client,
-};
-
-
 /*
 
   Each PG slot includes queues for events that are processing and/or waiting
@@ -1092,7 +926,7 @@ enum class io_queue {
     don't affect the given PG.)
 
   - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
-    OpQueueItem has an is_peering() bool to determine which we use.  Waiting
+    OpSchedulerItem has an is_peering() bool to determine which we use.  Waiting
     peering events are queued up by epoch required.
 
   - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
@@ -1114,14 +948,15 @@ enum class io_queue {
   */
 
 struct OSDShardPGSlot {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
   PGRef pg;                      ///< pg reference
-  deque<OpQueueItem> to_process; ///< order items for this slot
+  deque<OpSchedulerItem> to_process; ///< order items for this slot
   int num_running = 0;          ///< _process threads doing pg lookup/lock
 
-  deque<OpQueueItem> waiting;   ///< waiting for pg (or map + pg)
+  deque<OpSchedulerItem> waiting;   ///< waiting for pg (or map + pg)
 
   /// waiting for map (peering evt)
-  map<epoch_t,deque<OpQueueItem>> waiting_peering;
+  map<epoch_t,deque<OpSchedulerItem>> waiting_peering;
 
   /// incremented by wake_pg_waiters; indicates racing _process threads
   /// should bail out (their op has been requeued)
@@ -1148,7 +983,6 @@ struct OSDShard {
   ceph::mutex sdata_wait_lock;
   ceph::condition_variable sdata_cond;
 
-  string osdmap_lock_name;
   ceph::mutex osdmap_lock;  ///< protect shard_osdmap updates vs users w/o shard_lock
   OSDMapRef shard_osdmap;
 
@@ -1161,7 +995,7 @@ struct OSDShard {
   ceph::mutex shard_lock;   ///< protects remaining members below
 
   /// map of slots for each spg_t.  maintains ordering of items dequeued
-  /// from pqueue while _process thread drops shard lock to acquire the
+  /// from scheduler while _process thread drops shard lock to acquire the
   /// pg lock.  stale slots are removed by consume_map.
   unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
 
@@ -1183,25 +1017,12 @@ struct OSDShard {
   ceph::condition_variable min_pg_epoch_cond;
 
   /// priority queue
-  std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+  ceph::osd::scheduler::OpSchedulerRef scheduler;
 
   bool stop_waiting = false;
 
   ContextQueue context_queue;
 
-  void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
-    unsigned priority = item.get_priority();
-    unsigned cost = item.get_cost();
-    if (priority >= cutoff)
-      pqueue->enqueue_strict_front(
-       item.get_owner(),
-       priority, std::move(item));
-    else
-      pqueue->enqueue_front(
-       item.get_owner(),
-       priority, cost, std::move(item));
-  }
-
   void _attach_pg(OSDShardPGSlot *slot, PG *pg);
   void _detach_pg(OSDShardPGSlot *slot);
 
@@ -1214,7 +1035,7 @@ struct OSDShard {
 
   /// push osdmap into shard
   void consume_map(
-    OSDMapRef& osdmap,
+    const OSDMapRef& osdmap,
     unsigned *pushes_to_free);
 
   void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
@@ -1234,44 +1055,20 @@ struct OSDShard {
   OSDShard(
     int id,
     CephContext *cct,
-    OSD *osd,
-    uint64_t max_tok_per_prio, uint64_t min_cost,
-    io_queue opqueue)
-    : shard_id(id),
-      cct(cct),
-      osd(osd),
-      shard_name(string("OSDShard.") + stringify(id)),
-      sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
-      sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
-      osdmap_lock_name(shard_name + "::osdmap_lock"),
-      osdmap_lock{make_mutex(osdmap_lock_name)},
-      shard_lock_name(shard_name + "::shard_lock"),
-      shard_lock{make_mutex(shard_lock_name)},
-      context_queue(sdata_wait_lock, sdata_cond) {
-    if (opqueue == io_queue::weightedpriority) {
-      pqueue = std::make_unique<
-       WeightedPriorityQueue<OpQueueItem,uint64_t>>(
-         max_tok_per_prio, min_cost);
-    } else if (opqueue == io_queue::prioritized) {
-      pqueue = std::make_unique<
-       PrioritizedQueue<OpQueueItem,uint64_t>>(
-         max_tok_per_prio, min_cost);
-    } else if (opqueue == io_queue::mclock_opclass) {
-      pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
-    } else if (opqueue == io_queue::mclock_client) {
-      pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
-    }
-  }
+    OSD *osd);
 };
 
 class OSD : public Dispatcher,
            public md_config_obs_t {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+
   /** OSD **/
-  Mutex osd_lock;          // global lock
+  // global lock
+  ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
   SafeTimer tick_timer;    // safe timer (osd_lock)
 
   // Tick timer for those stuff that do not need osd_lock
-  Mutex tick_timer_lock;
+  ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
   SafeTimer tick_timer_without_osd_lock;
   std::string gss_ktfile_client{};
 
@@ -1305,7 +1102,7 @@ protected:
   int whoami;
   std::string dev_path, journal_path;
 
-  int last_require_osd_release = 0;
+  ceph_release_t last_require_osd_release{ceph_release_t::unknown};
 
   int numa_node = -1;
   size_t numa_cpu_set_size = 0;
@@ -1327,11 +1124,14 @@ protected:
   // asok
   friend class OSDSocketHook;
   class OSDSocketHook *asok_hook;
-  bool asok_command(std::string_view admin_command, const cmdmap_t& cmdmap,
-                   std::string_view format, std::ostream& ss);
+  void asok_command(
+    std::string_view prefix,
+    const cmdmap_t& cmdmap,
+    Formatter *f,
+    const bufferlist& inbl,
+    std::function<void(int,const std::string&,bufferlist&)> on_finish);
 
 public:
-  ClassHandler  *class_handler = nullptr;
   int get_nodeid() { return whoami; }
   
   static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
@@ -1351,6 +1151,12 @@ public:
        object_t("snapmapper"),
        0)));
   }
+  static ghobject_t make_purged_snaps_oid() {
+    return ghobject_t(hobject_t(
+      sobject_t(
+       object_t("purged_snaps"),
+       0)));
+  }
 
   static ghobject_t make_pg_log_oid(spg_t pg) {
     stringstream ss;
@@ -1482,43 +1288,40 @@ public:
 private:
 
   ShardedThreadPool osd_op_tp;
-  ThreadPool command_tp;
 
   void get_latest_osdmap();
 
   // -- sessions --
 private:
-  void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
-  void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
+  void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
 
-  Mutex session_waiting_lock;
-  set<SessionRef> session_waiting_for_map;
+  ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
+  set<ceph::ref_t<Session>> session_waiting_for_map;
 
   /// Caller assumes refs for included Sessions
-  void get_sessions_waiting_for_map(set<SessionRef> *out) {
+  void get_sessions_waiting_for_map(set<ceph::ref_t<Session>> *out) {
     std::lock_guard l(session_waiting_lock);
     out->swap(session_waiting_for_map);
   }
-  void register_session_waiting_on_map(SessionRef session) {
+  void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
     std::lock_guard l(session_waiting_lock);
     session_waiting_for_map.insert(session);
   }
-  void clear_session_waiting_on_map(SessionRef session) {
+  void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
     std::lock_guard l(session_waiting_lock);
     session_waiting_for_map.erase(session);
   }
   void dispatch_sessions_waiting_on_map() {
-    set<SessionRef> sessions_to_check;
+    set<ceph::ref_t<Session>> sessions_to_check;
     get_sessions_waiting_for_map(&sessions_to_check);
     for (auto i = sessions_to_check.begin();
         i != sessions_to_check.end();
         sessions_to_check.erase(i++)) {
       std::lock_guard l{(*i)->session_dispatch_lock};
-      SessionRef session = *i;
-      dispatch_session_waiting(session, osdmap);
+      dispatch_session_waiting(*i, get_osdmap());
     }
   }
-  void session_handle_reset(SessionRef session) {
+  void session_handle_reset(const ceph::ref_t<Session>& session) {
     std::lock_guard l(session->session_dispatch_lock);
     clear_session_waiting_on_map(session);
 
@@ -1550,7 +1353,7 @@ private:
   void osdmap_subscribe(version_t epoch, bool force_request);
   /** @} monc helpers */
 
-  Mutex osdmap_subscribe_lock;
+  ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
   epoch_t latest_subscribed_epoch{0};
 
   // -- heartbeat --
@@ -1615,15 +1418,24 @@ private:
       }
       return !is_unhealthy(now);
     }
+
+    void clear_mark_down(Connection *except = nullptr) {
+      if (con_back && con_back != except) {
+       con_back->mark_down();
+       con_back->clear_priv();
+       con_back.reset(nullptr);
+      }
+      if (con_front && con_front != except) {
+       con_front->mark_down();
+       con_front->clear_priv();
+       con_front.reset(nullptr);
+      }
+    }
   };
-  /// state attached to outgoing heartbeat connections
-  struct HeartbeatSession : public RefCountedObject {
-    int peer;
-    explicit HeartbeatSession(int p) : peer(p) {}
-  };
-  Mutex heartbeat_lock;
+
+  ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
   map<int, int> debug_heartbeat_drops_remaining;
-  Cond heartbeat_cond;
+  ceph::condition_variable heartbeat_cond;
   bool heartbeat_stop;
   std::atomic<bool> heartbeat_need_update;   
   map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
@@ -1634,6 +1446,7 @@ private:
   Messenger *hb_back_server_messenger;
   utime_t last_heartbeat_resample;   ///< last time we chose random peers in waiting-for-healthy state
   double daily_loadavg;
+  ceph::mono_time startup_time;
 
   // Track ping repsonse times using vector as a circular buffer
   // MUST BE A POWER OF 2
@@ -1660,7 +1473,7 @@ private:
 
   void heartbeat_kick() {
     std::lock_guard l(heartbeat_lock);
-    heartbeat_cond.Signal();
+    heartbeat_cond.notify_all();
   }
 
   struct T_Heartbeat : public Thread {
@@ -1705,18 +1518,6 @@ public:
     int ms_handle_authentication(Connection *con) override {
       return true;
     }
-    bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override {
-      // some pre-nautilus OSDs get confused if you include an
-      // authorizer but they are not expecting it.  do not try to authorize
-      // heartbeat connections until all OSDs are nautilus.
-      if (osd->get_osdmap()->require_osd_release >= CEPH_RELEASE_NAUTILUS) {
-       return osd->ms_get_authorizer(dest_type, authorizer);
-      }
-      return false;
-    }
-    KeyStore *ms_get_auth1_authorizer_keystore() override {
-      return osd->ms_get_auth1_authorizer_keystore();
-    }
   } heartbeat_dispatcher;
 
 private:
@@ -1724,7 +1525,7 @@ private:
   list<OpRequestRef> finished;
   
   void take_waiters(list<OpRequestRef>& ls) {
-    ceph_assert(osd_lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(osd_lock));
     finished.splice(finished.end(), ls);
   }
   void do_waiters();
@@ -1737,24 +1538,18 @@ private:
   friend struct C_FinishSplits;
   friend struct C_OpenPGs;
 
-  // -- op queue --
-  friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
-  const io_queue op_queue;
-public:
-  const unsigned int op_prio_cutoff;
 protected:
 
   /*
    * The ordered op delivery chain is:
    *
-   *   fast dispatch -> pqueue back
-   *                    pqueue front <-> to_process back
+   *   fast dispatch -> scheduler back
+   *                    scheduler front <-> to_process back
    *                                     to_process front  -> RunVis(item)
    *                                                      <- queue_front()
    *
-   * The pqueue is per-shard, and to_process is per pg_slot.  Items can be
-   * pushed back up into to_process and/or pqueue while order is preserved.
+   * The scheduler is per-shard, and to_process is per pg_slot.  Items can be
+   * pushed back up into to_process and/or scheduler while order is preserved.
    *
    * Multiple worker threads can operate on each shard.
    *
@@ -1765,13 +1560,13 @@ protected:
    * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
    * and already requeued the items.
    */
-  friend class PGOpItem;
-  friend class PGPeeringItem;
-  friend class PGRecovery;
-  friend class PGDelete;
+  friend class ceph::osd::scheduler::PGOpItem;
+  friend class ceph::osd::scheduler::PGPeeringItem;
+  friend class ceph::osd::scheduler::PGRecovery;
+  friend class ceph::osd::scheduler::PGDelete;
 
   class ShardedOpWQ
-    : public ShardedThreadPool::ShardedWQ<OpQueueItem>
+    : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
   {
     OSD *osd;
 
@@ -1780,23 +1575,23 @@ protected:
                time_t ti,
                time_t si,
                ShardedThreadPool* tp)
-      : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
+      : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
         osd(o) {
     }
 
     void _add_slot_waiter(
       spg_t token,
       OSDShardPGSlot *slot,
-      OpQueueItem&& qi);
+      OpSchedulerItem&& qi);
 
     /// try to do some work
     void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
 
     /// enqueue a new item
-    void _enqueue(OpQueueItem&& item) override;
+    void _enqueue(OpSchedulerItem&& item) override;
 
     /// requeue an old item (at the front of the line)
-    void _enqueue_front(OpQueueItem&& item) override;
+    void _enqueue_front(OpSchedulerItem&& item) override;
       
     void return_waiting_threads() override {
       for(uint32_t i = 0; i < osd->num_shards; i++) {
@@ -1827,7 +1622,7 @@ protected:
 
        std::scoped_lock l{sdata->shard_lock};
        f->open_object_section(queue_name);
-       sdata->pqueue->dump(f);
+       sdata->scheduler->dump(*f);
        f->close_section();
       }
     }
@@ -1838,9 +1633,9 @@ protected:
       ceph_assert(sdata);
       std::lock_guard l(sdata->shard_lock);
       if (thread_index < osd->num_shards) {
-       return sdata->pqueue->empty() && sdata->context_queue.empty();
+       return sdata->scheduler->empty() && sdata->context_queue.empty();
       } else {
-       return sdata->pqueue->empty();
+       return sdata->scheduler->empty();
       }
     }
 
@@ -1860,9 +1655,6 @@ protected:
   void enqueue_peering_evt(
     spg_t pgid,
     PGPeeringEventRef ref);
-  void enqueue_peering_evt_front(
-    spg_t pgid,
-    PGPeeringEventRef ref);
   void dequeue_peering_evt(
     OSDShard *sdata,
     PG *pg,
@@ -1883,18 +1675,23 @@ protected:
  protected:
 
   // -- osd map --
-  OSDMapRef       osdmap;
-  OSDMapRef get_osdmap() {
-    return osdmap;
+  // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
+  OSDMapRef       _osdmap;
+  void set_osdmap(OSDMapRef osdmap) {
+    std::atomic_store(&_osdmap, osdmap);
+  }
+  OSDMapRef get_osdmap() const {
+    return std::atomic_load(&_osdmap);
   }
   epoch_t get_osdmap_epoch() const {
+    // XXX: performance?
+    auto osdmap = get_osdmap();
     return osdmap ? osdmap->get_epoch() : 0;
   }
 
   pool_pg_num_history_t pg_num_history;
 
-  utime_t         had_map_since;
-  RWLock          map_lock;
+  ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
   list<OpRequestRef>  waiting_for_osdmap;
   deque<utime_t> osd_markdown_log;
 
@@ -1912,7 +1709,7 @@ protected:
     epoch_t advance_to,
     PG *pg,
     ThreadPool::TPHandle &handle,
-    PG::RecoveryCtx *rctx);
+    PeeringCtx &rctx);
   void consume_map();
   void activate_map();
 
@@ -1923,15 +1720,9 @@ protected:
   OSDMapRef add_map(OSDMap *o) {
     return service.add_map(o);
   }
-  void add_map_bl(epoch_t e, bufferlist& bl) {
-    return service.add_map_bl(e, bl);
-  }
   bool get_map_bl(epoch_t e, bufferlist& bl) {
     return service.get_map_bl(e, bl);
   }
-  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
-    return service.add_map_inc_bl(e, bl);
-  }
 
 public:
   // -- shards --
@@ -1949,7 +1740,7 @@ public:
   }
 
 protected:
-  Mutex merge_lock = {"OSD::merge_lock"};
+  ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
   /// merge epoch -> target pgid -> source pgid -> pg
   map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
 
@@ -1960,7 +1751,7 @@ protected:
   std::atomic<size_t> num_pgs = {0};
 
   std::mutex pending_creates_lock;
-  using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
+  using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
   std::set<create_from_osd_t> pending_creates_from_osd;
   unsigned pending_creates_from_mon = 0;
 
@@ -2005,11 +1796,11 @@ protected:
     const set<spg_t> &childpgids, set<PGRef> *out_pgs,
     OSDMapRef curmap,
     OSDMapRef nextmap,
-    PG::RecoveryCtx *rctx);
+    PeeringCtx &rctx);
   void _finish_splits(set<PGRef>& pgs);
 
   // == monitor interaction ==
-  Mutex mon_report_lock;
+  ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
   utime_t last_mon_report;
   Finisher boot_finisher;
 
@@ -2019,6 +1810,8 @@ protected:
   void _preboot(epoch_t oldest, epoch_t newest);
   void _send_boot();
   void _collect_metadata(map<string,string> *pmeta);
+  void _get_purged_snaps();
+  void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
 
   void start_waiting_for_healthy();
   bool _is_healthy();
@@ -2056,7 +1849,7 @@ protected:
   void cancel_pending_failures();
 
   ceph::coarse_mono_clock::time_point last_sent_beacon;
-  Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+  ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
   epoch_t min_last_epoch_clean = 0;
   // which pgs were scanned for min_lec
   std::vector<pg_t> min_last_epoch_clean_pgs;
@@ -2066,22 +1859,12 @@ protected:
     return service.get_tid();
   }
 
+  double scrub_sleep_time(bool must_scrub);
+
   // -- generic pg peering --
-  PG::RecoveryCtx create_context();
-  void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+  PeeringCtx create_context();
+  void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
                         ThreadPool::TPHandle *handle = NULL);
-  void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
-                                    ThreadPool::TPHandle *handle = NULL);
-  void discard_context(PG::RecoveryCtx &ctx);
-  void do_notifies(map<int,
-                      vector<pair<pg_notify_t, PastIntervals> > >&
-                      notify_list,
-                  OSDMapRef map);
-  void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
-                 OSDMapRef map);
-  void do_infos(map<int,
-                   vector<pair<pg_notify_t, PastIntervals> > >& info_map,
-               OSDMapRef map);
 
   bool require_mon_peer(const Message *m);
   bool require_mon_or_mgr_peer(const Message *m);
@@ -2096,7 +1879,7 @@ protected:
    * address as in the given map.
    * @pre op was sent by an OSD using the cluster messenger
    */
-  bool require_same_peer_instance(const Message *m, OSDMapRef& map,
+  bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
                                  bool is_fast_dispatch);
 
   bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
@@ -2118,64 +1901,7 @@ protected:
   void handle_fast_force_recovery(MOSDForceRecovery *m);
 
   // -- commands --
-  struct Command {
-    vector<string> cmd;
-    ceph_tid_t tid;
-    bufferlist indata;
-    ConnectionRef con;
-
-    Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
-      : cmd(c), tid(t), indata(bl), con(co) {}
-  };
-  list<Command*> command_queue;
-  struct CommandWQ : public ThreadPool::WorkQueue<Command> {
-    OSD *osd;
-    CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
-
-    bool _empty() override {
-      return osd->command_queue.empty();
-    }
-    bool _enqueue(Command *c) override {
-      osd->command_queue.push_back(c);
-      return true;
-    }
-    void _dequeue(Command *pg) override {
-      ceph_abort();
-    }
-    Command *_dequeue() override {
-      if (osd->command_queue.empty())
-       return NULL;
-      Command *c = osd->command_queue.front();
-      osd->command_queue.pop_front();
-      return c;
-    }
-    void _process(Command *c, ThreadPool::TPHandle &) override {
-      osd->osd_lock.lock();
-      if (osd->is_stopping()) {
-       osd->osd_lock.unlock();
-       delete c;
-       return;
-      }
-      osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
-      osd->osd_lock.unlock();
-      delete c;
-    }
-    void _clear() override {
-      while (!osd->command_queue.empty()) {
-       Command *c = osd->command_queue.front();
-       osd->command_queue.pop_front();
-       delete c;
-      }
-    }
-  } command_wq;
-
-  void handle_command(class MMonCommand *m);
   void handle_command(class MCommand *m);
-  void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
-  int _do_command(
-    Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data,
-    bufferlist& odata, stringstream& ss, stringstream& ds);
 
 
   // -- pg recovery --
@@ -2207,8 +1933,11 @@ private:
     case MSG_MON_COMMAND:
     case MSG_OSD_PG_CREATE2:
     case MSG_OSD_PG_QUERY:
+    case MSG_OSD_PG_QUERY2:
     case MSG_OSD_PG_INFO:
+    case MSG_OSD_PG_INFO2:
     case MSG_OSD_PG_NOTIFY:
+    case MSG_OSD_PG_NOTIFY2:
     case MSG_OSD_PG_LOG:
     case MSG_OSD_PG_TRIM:
     case MSG_OSD_PG_REMOVE:
@@ -2233,6 +1962,8 @@ private:
     case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
     case MSG_OSD_PG_RECOVERY_DELETE:
     case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    case MSG_OSD_PG_LEASE:
+    case MSG_OSD_PG_LEASE_ACK:
       return true;
     default:
       return false;
@@ -2240,49 +1971,14 @@ private:
   }
   void ms_fast_dispatch(Message *m) override;
   bool ms_dispatch(Message *m) override;
-  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override;
   void ms_handle_connect(Connection *con) override;
   void ms_handle_fast_connect(Connection *con) override;
   void ms_handle_fast_accept(Connection *con) override;
   int ms_handle_authentication(Connection *con) override;
-  KeyStore *ms_get_auth1_authorizer_keystore() override;
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override;
 
-  io_queue get_io_queue() const {
-    if (cct->_conf->osd_op_queue == "debug_random") {
-      static io_queue index_lookup[] = { io_queue::prioritized,
-                                        io_queue::weightedpriority,
-                                        io_queue::mclock_opclass,
-                                        io_queue::mclock_client };
-      srand(time(NULL));
-      unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
-      return index_lookup[which];
-    } else if (cct->_conf->osd_op_queue == "prioritized") {
-      return io_queue::prioritized;
-    } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
-      return io_queue::mclock_opclass;
-    } else if (cct->_conf->osd_op_queue == "mclock_client") {
-      return io_queue::mclock_client;
-    } else {
-      // default / catch-all is 'wpq'
-      return io_queue::weightedpriority;
-    }
-  }
-
-  unsigned int get_io_prio_cut() const {
-    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
-      srand(time(NULL));
-      return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
-    } else if (cct->_conf->osd_op_queue_cut_off == "high") {
-      return CEPH_MSG_PRIO_HIGH;
-    } else {
-      // default / catch-all is 'low'
-      return CEPH_MSG_PRIO_LOW;
-    }
-  }
-
  public:
   /* internal and external can point to the same messenger, they will still
    * be cleaned up properly*/
@@ -2326,8 +2022,7 @@ private:
   void handle_fast_scrub(struct MOSDScrub2 *m);
   void handle_osd_ping(class MOSDPing *m);
 
-  int init_op_flags(OpRequestRef& op);
-
+  size_t get_num_cache_shards();
   int get_num_op_shards();
   int get_num_op_threads();
 
@@ -2335,6 +2030,9 @@ private:
   float get_osd_delete_sleep();
   float get_osd_snap_trim_sleep();
 
+  int get_recovery_max_active();
+
+  void scrub_purged_snaps();
   void probe_smart(const string& devid, ostream& ss);
 
 public:
@@ -2343,7 +2041,7 @@ public:
                       uuid_d *cluster_fsid,
                       uuid_d *osd_fsid,
                       int *whoami,
-                      int *min_osd_release);
+                      ceph_release_t *min_osd_release);
   
 
   // startup/shutdown
@@ -2367,20 +2065,15 @@ public:
   friend class OSDService;
 
 private:
-  void set_perf_queries(
-      const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries);
-  void get_perf_reports(
-      std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
+  void set_perf_queries(const ConfigPayload &config_payload);
+  MetricPayload get_perf_reports();
 
-  Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"};
+  ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
   std::list<OSDPerfMetricQuery> m_perf_queries;
   std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
 };
 
 
-std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
-
 //compatibility of the executable
 extern const CompatSet::Feature ceph_osd_feature_compat[];
 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];