]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osd / OSD.cc
index e6d18ecd2df6702ee997928f8df63ad72a133df5..54fedcddcabf0624d4820294494ee9ab58691a8b 100644 (file)
 
 #include "common/errno.h"
 #include "common/ceph_argparse.h"
+#include "common/ceph_time.h"
 #include "common/version.h"
 #include "common/io_priority.h"
+#include "common/pick_address.h"
 
 #include "os/ObjectStore.h"
 #ifdef HAVE_LIBFUSE
@@ -61,7 +63,6 @@
 #include "messages/MLog.h"
 
 #include "messages/MGenericMessage.h"
-#include "messages/MPing.h"
 #include "messages/MOSDPing.h"
 #include "messages/MOSDFailure.h"
 #include "messages/MOSDMarkMeDown.h"
@@ -88,6 +89,7 @@
 #include "messages/MOSDPGBackfill.h"
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
+#include "messages/MOSDForceRecovery.h"
 #include "messages/MOSDECSubOpWrite.h"
 #include "messages/MOSDECSubOpWriteReply.h"
 #include "messages/MOSDECSubOpRead.h"
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
 
+
 const double OSD::OSD_TICK_INTERVAL = 1.0;
 
 static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
   return *_dout << "osd." << whoami << " " << epoch << " ";
 }
 
-void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
-  return osd->dequeue_op(pg, op, handle);
-}
-
-void PGQueueable::RunVis::operator()(const PGSnapTrim &op) {
-  return pg->snap_trimmer(op.epoch_queued);
-}
-
-void PGQueueable::RunVis::operator()(const PGScrub &op) {
-  return pg->scrub(op.epoch_queued, handle);
-}
-
-void PGQueueable::RunVis::operator()(const PGRecovery &op) {
-  return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
-}
-
 //Initial features in new superblock.
 //Features here are also automatically upgraded
 CompatSet OSD::get_osd_initial_compat_set() {
@@ -202,6 +189,7 @@ CompatSet OSD::get_osd_initial_compat_set() {
   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
+  ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
   return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
                   ceph_osd_feature_incompat);
 }
@@ -256,16 +244,21 @@ OSDService::OSDService(OSD *osd) :
   next_notif_id(0),
   recovery_request_lock("OSDService::recovery_request_lock"),
   recovery_request_timer(cct, recovery_request_lock, false),
+  recovery_sleep_lock("OSDService::recovery_sleep_lock"),
+  recovery_sleep_timer(cct, recovery_sleep_lock, false),
   reserver_finisher(cct),
-  local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+  local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                 cct->_conf->osd_min_recovery_priority),
-  remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+  remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                  cct->_conf->osd_min_recovery_priority),
   pg_temp_lock("OSDService::pg_temp_lock"),
   snap_sleep_lock("OSDService::snap_sleep_lock"),
   snap_sleep_timer(
     osd->client_messenger->cct, snap_sleep_lock, false /* relax locking */),
-  snap_reserver(&reserver_finisher,
+  scrub_sleep_lock("OSDService::scrub_sleep_lock"),
+  scrub_sleep_timer(
+    osd->client_messenger->cct, scrub_sleep_lock, false /* relax locking */),
+  snap_reserver(cct, &reserver_finisher,
                cct->_conf->osd_max_trimming_pgs),
   recovery_lock("OSDService::recovery_lock"),
   recovery_ops_active(0),
@@ -295,6 +288,41 @@ OSDService::~OSDService()
   delete objecter;
 }
 
+
+
+#ifdef PG_DEBUG_REFS
+void OSDService::add_pgid(spg_t pgid, PG *pg){
+  Mutex::Locker l(pgid_lock);
+  if (!pgid_tracker.count(pgid)) {
+    live_pgs[pgid] = pg;
+  }
+  pgid_tracker[pgid]++;
+}
+void OSDService::remove_pgid(spg_t pgid, PG *pg)
+{
+  Mutex::Locker l(pgid_lock);
+  assert(pgid_tracker.count(pgid));
+  assert(pgid_tracker[pgid] > 0);
+  pgid_tracker[pgid]--;
+  if (pgid_tracker[pgid] == 0) {
+    pgid_tracker.erase(pgid);
+    live_pgs.erase(pgid);
+  }
+}
+void OSDService::dump_live_pgids()
+{
+  Mutex::Locker l(pgid_lock);
+  derr << "live pgids:" << dendl;
+  for (map<spg_t, int>::const_iterator i = pgid_tracker.cbegin();
+       i != pgid_tracker.cend();
+       ++i) {
+    derr << "\t" << *i << dendl;
+    live_pgs[i->first]->dump_live_ids();
+  }
+}
+#endif
+
+
 void OSDService::_start_split(spg_t parent, const set<spg_t> &children)
 {
   for (set<spg_t>::const_iterator i = children.begin();
@@ -478,12 +506,21 @@ void OSDService::start_shutdown()
     Mutex::Locker l(agent_timer_lock);
     agent_timer.shutdown();
   }
+
+  {
+    Mutex::Locker l(recovery_sleep_lock);
+    recovery_sleep_timer.shutdown();
+  }
 }
 
-void OSDService::shutdown()
+void OSDService::shutdown_reserver()
 {
   reserver_finisher.wait_for_empty();
   reserver_finisher.stop();
+}
+
+void OSDService::shutdown()
+{
   {
     Mutex::Locker l(watch_lock);
     watch_timer.shutdown();
@@ -503,6 +540,11 @@ void OSDService::shutdown()
     snap_sleep_timer.shutdown();
   }
 
+  {
+    Mutex::Locker l(scrub_sleep_lock);
+    scrub_sleep_timer.shutdown();
+  }
+
   osdmap = OSDMapRef();
   next_osdmap = OSDMapRef();
 }
@@ -519,6 +561,7 @@ void OSDService::init()
   watch_timer.init();
   agent_timer.init();
   snap_sleep_timer.init();
+  scrub_sleep_timer.init();
 
   agent_thread.create("osd_srv_agent");
 
@@ -542,6 +585,11 @@ void OSDService::activate_map()
   agent_lock.Unlock();
 }
 
+void OSDService::request_osdmap_update(epoch_t e)
+{
+  osd->osdmap_subscribe(e, false);
+}
+
 class AgentTimeoutCB : public Context {
   PGRef pg;
 public:
@@ -711,11 +759,10 @@ float OSDService::get_failsafe_full_ratio()
   return full_ratio;
 }
 
-void OSDService::check_full_status(const osd_stat_t &osd_stat)
+void OSDService::check_full_status(float ratio)
 {
   Mutex::Locker l(full_status_lock);
 
-  float ratio = ((float)osd_stat.kb_used) / ((float)osd_stat.kb);
   cur_ratio = ratio;
 
   // The OSDMap ratios take precendence.  So if the failsafe is .95 and
@@ -732,7 +779,7 @@ void OSDService::check_full_status(const osd_stat_t &osd_stat)
   float full_ratio = std::max(osdmap->get_full_ratio(), backfillfull_ratio);
   float failsafe_ratio = std::max(get_failsafe_full_ratio(), full_ratio);
 
-  if (!osdmap->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
     // use the failsafe for nearfull and full; the mon isn't using the
     // flags anyway because we're mid-upgrade.
     full_ratio = failsafe_ratio;
@@ -779,11 +826,11 @@ void OSDService::check_full_status(const osd_stat_t &osd_stat)
     dout(10) << __func__ << " " << get_full_state_name(cur_state)
             << " -> " << get_full_state_name(new_state) << dendl;
     if (new_state == FAILSAFE) {
-      clog->error() << "failsafe engaged, dropping updates, now "
+      clog->error() << "full status failsafe engaged, dropping updates, now "
                    << (int)roundf(ratio * 100) << "% full";
     } else if (cur_state == FAILSAFE) {
-      clog->error() << "failsafe disengaged, no longer dropping updates, now "
-                   << (int)roundf(ratio * 100) << "% full";
+      clog->error() << "full status failsafe disengaged, no longer dropping "
+                    << "updates, now " << (int)roundf(ratio * 100) << "% full";
     }
     cur_state = new_state;
   }
@@ -881,15 +928,33 @@ void OSDService::set_injectfull(s_names type, int64_t count)
   injectfull = count;
 }
 
-void OSDService::update_osd_stat(vector<int>& hb_peers)
+osd_stat_t OSDService::set_osd_stat(const struct store_statfs_t &stbuf,
+                                    vector<int>& hb_peers,
+                                   int num_pgs)
 {
-  Mutex::Locker lock(stat_lock);
+  uint64_t bytes = stbuf.total;
+  uint64_t used = bytes - stbuf.available;
+  uint64_t avail = stbuf.available;
 
-  osd_stat.hb_peers.swap(hb_peers);
+  osd->logger->set(l_osd_stat_bytes, bytes);
+  osd->logger->set(l_osd_stat_bytes_used, used);
+  osd->logger->set(l_osd_stat_bytes_avail, avail);
 
-  osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+  {
+    Mutex::Locker l(stat_lock);
+    osd_stat.hb_peers.swap(hb_peers);
+    osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+    osd_stat.kb = bytes >> 10;
+    osd_stat.kb_used = used >> 10;
+    osd_stat.kb_avail = avail >> 10;
+    osd_stat.num_pgs = num_pgs;
+    return osd_stat;
+  }
+}
 
-  // fill in osd stats too
+void OSDService::update_osd_stat(vector<int>& hb_peers)
+{
+  // load osd stats first
   struct store_statfs_t stbuf;
   int r = osd->store->statfs(&stbuf);
   if (r < 0) {
@@ -897,21 +962,11 @@ void OSDService::update_osd_stat(vector<int>& hb_peers)
     return;
   }
 
-  uint64_t bytes = stbuf.total;
-  uint64_t used = bytes - stbuf.available;
-  uint64_t avail = stbuf.available;
-
-  osd_stat.kb = bytes >> 10;
-  osd_stat.kb_used = used >> 10;
-  osd_stat.kb_avail = avail >> 10;
-
-  osd->logger->set(l_osd_stat_bytes, bytes);
-  osd->logger->set(l_osd_stat_bytes_used, used);
-  osd->logger->set(l_osd_stat_bytes_avail, avail);
-
-  dout(20) << "update_osd_stat " << osd_stat << dendl;
-
-  check_full_status(osd_stat);
+  auto new_stat = set_osd_stat(stbuf, hb_peers, osd->get_num_pgs());
+  dout(20) << "update_osd_stat " << new_stat << dendl;
+  assert(new_stat.kb);
+  float ratio = ((float)new_stat.kb_used) / ((float)new_stat.kb);
+  check_full_status(ratio);
 }
 
 bool OSDService::check_osdmap_full(const set<pg_shard_t> &missing_on)
@@ -979,13 +1034,16 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f
 }
 
 
-void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid,
+                                   const vector<int>& want,
+                                   bool forced)
 {
   Mutex::Locker l(pg_temp_lock);
-  map<pg_t,vector<int> >::iterator p = pg_temp_pending.find(pgid);
+  auto p = pg_temp_pending.find(pgid);
   if (p == pg_temp_pending.end() ||
-      p->second != want) {
-    pg_temp_wanted[pgid] = want;
+      p->second.acting != want ||
+      forced) {
+    pg_temp_wanted[pgid] = pg_temp_t{want, forced};
   }
 }
 
@@ -998,10 +1056,8 @@ void OSDService::remove_want_pg_temp(pg_t pgid)
 
 void OSDService::_sent_pg_temp()
 {
-  for (map<pg_t,vector<int> >::iterator p = pg_temp_wanted.begin();
-       p != pg_temp_wanted.end();
-       ++p)
-    pg_temp_pending[p->first] = p->second;
+  pg_temp_pending.insert(make_move_iterator(begin(pg_temp_wanted)),
+                        make_move_iterator(end(pg_temp_wanted)));
   pg_temp_wanted.clear();
 }
 
@@ -1018,22 +1074,46 @@ void OSDService::requeue_pg_temp()
           << pg_temp_wanted.size() << dendl;
 }
 
+std::ostream& operator<<(std::ostream& out,
+                        const OSDService::pg_temp_t& pg_temp)
+{
+  out << pg_temp.acting;
+  if (pg_temp.forced) {
+    out << " (forced)";
+  }
+  return out;
+}
+
 void OSDService::send_pg_temp()
 {
   Mutex::Locker l(pg_temp_lock);
   if (pg_temp_wanted.empty())
     return;
   dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
-  MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
-  m->pg_temp = pg_temp_wanted;
-  monc->send_mon_message(m);
+  MOSDPGTemp *ms[2] = {nullptr, nullptr};
+  for (auto& pg_temp : pg_temp_wanted) {
+    auto& m = ms[pg_temp.second.forced];
+    if (!m) {
+      m = new MOSDPGTemp(osdmap->get_epoch());
+      m->forced = pg_temp.second.forced;
+    }
+    m->pg_temp.emplace(pg_temp.first,
+                      pg_temp.second.acting);
+  }
+  for (auto m : ms) {
+    if (m) {
+      monc->send_mon_message(m);
+    }
+  }
   _sent_pg_temp();
 }
 
 void OSDService::send_pg_created(pg_t pgid)
 {
   dout(20) << __func__ << dendl;
-  monc->send_mon_message(new MOSDPGCreated(pgid));
+  if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    monc->send_mon_message(new MOSDPGCreated(pgid));
+  }
 }
 
 // --------------------------------------
@@ -1193,10 +1273,12 @@ bool OSDService::can_inc_scrubs_pending()
 
   if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
     dout(20) << __func__ << " " << scrubs_pending << " -> " << (scrubs_pending+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active
+            << ")" << dendl;
     can_inc = true;
   } else {
-    dout(20) << __func__ << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+    dout(20) << __func__ << " " << scrubs_pending << " + " << scrubs_active
+            << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
   }
 
   return can_inc;
@@ -1331,7 +1413,8 @@ void OSDService::got_stop_ack()
 MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
                                                OSDSuperblock& sblock)
 {
-  MOSDMap *m = new MOSDMap(monc->get_fsid());
+  MOSDMap *m = new MOSDMap(monc->get_fsid(),
+                          osdmap->get_encoding_features());
   m->oldest_map = max_oldest_map;
   m->newest_map = sblock.newest_map;
 
@@ -1371,7 +1454,8 @@ void OSDService::send_incremental_map(epoch_t since, Connection *con,
     OSDSuperblock sblock(get_superblock());
     if (since < sblock.oldest_map) {
       // just send latest full map
-      MOSDMap *m = new MOSDMap(monc->get_fsid());
+      MOSDMap *m = new MOSDMap(monc->get_fsid(),
+                              osdmap->get_encoding_features());
       m->oldest_map = max_oldest_map;
       m->newest_map = sblock.newest_map;
       get_map_bl(to, m->maps[to]);
@@ -1395,12 +1479,19 @@ void OSDService::send_incremental_map(epoch_t since, Connection *con,
 bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
 {
   bool found = map_bl_cache.lookup(e, &bl);
-  if (found)
+  if (found) {
+    if (logger)
+      logger->inc(l_osd_map_bl_cache_hit);
     return true;
+  }
+  if (logger)
+    logger->inc(l_osd_map_bl_cache_miss);
   found = store->read(coll_t::meta(),
-                     OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
-  if (found)
+                     OSD::get_osdmap_pobject_name(e), 0, 0, bl,
+                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
+  if (found) {
     _add_map_bl(e, bl);
+  }
   return found;
 }
 
@@ -1408,36 +1499,61 @@ bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
 {
   Mutex::Locker l(map_cache_lock);
   bool found = map_bl_inc_cache.lookup(e, &bl);
-  if (found)
+  if (found) {
+    if (logger)
+      logger->inc(l_osd_map_bl_cache_hit);
     return true;
+  }
+  if (logger)
+    logger->inc(l_osd_map_bl_cache_miss);
   found = store->read(coll_t::meta(),
-                     OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
-  if (found)
+                     OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl,
+                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
+  if (found) {
     _add_map_inc_bl(e, bl);
+  }
   return found;
 }
 
 void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
 {
   dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
+  // cache a contiguous buffer
+  if (bl.get_num_buffers() > 1) {
+    bl.rebuild();
+  }
+  bl.try_assign_to_mempool(mempool::mempool_osd_mapbl);
   map_bl_cache.add(e, bl);
 }
 
 void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
 {
   dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
+  // cache a contiguous buffer
+  if (bl.get_num_buffers() > 1) {
+    bl.rebuild();
+  }
+  bl.try_assign_to_mempool(mempool::mempool_osd_mapbl);
   map_bl_inc_cache.add(e, bl);
 }
 
 void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
 {
   Mutex::Locker l(map_cache_lock);
+  // cache a contiguous buffer
+  if (bl.get_num_buffers() > 1) {
+    bl.rebuild();
+  }
   map_bl_inc_cache.pin(e, bl);
 }
 
 void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
 {
   Mutex::Locker l(map_cache_lock);
+  // cache a contiguous buffer
+  if (bl.get_num_buffers() > 1) {
+    bl.rebuild();
+  }
   map_bl_cache.pin(e, bl);
 }
 
@@ -1528,6 +1644,10 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
 
 void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
 {
+  if (!cct->_conf->osd_debug_misdirected_ops) {
+    return;
+  }
+
   const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
   assert(m->get_type() == CEPH_MSG_OSD_OP);
 
@@ -1575,9 +1695,6 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
               << " to osd." << whoami
               << " not " << pg->acting
               << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
-  if (g_conf->osd_enxio_on_misdirected_op) {
-    reply_op_error(op, -ENXIO);
-  }
 }
 
 void OSDService::enqueue_back(spg_t pgid, PGQueueable qi)
@@ -1641,15 +1758,17 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
 
   ret = store->mkfs();
   if (ret) {
-    derr << "OSD::mkfs: ObjectStore::mkfs failed with error " << ret << dendl;
+    derr << "OSD::mkfs: ObjectStore::mkfs failed with error "
+         << cpp_strerror(ret) << dendl;
     goto free_store;
   }
 
-  store->set_cache_shards(cct->_conf->osd_op_num_shards);
+  store->set_cache_shards(1);  // doesn't matter for mkfs!
 
   ret = store->mount();
   if (ret) {
-    derr << "OSD::mkfs: couldn't mount ObjectStore: error " << ret << dendl;
+    derr << "OSD::mkfs: couldn't mount ObjectStore: error "
+         << cpp_strerror(ret) << dendl;
     goto free_store;
   }
 
@@ -1688,7 +1807,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
     ret = store->apply_transaction(osr.get(), std::move(t));
     if (ret) {
       derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
-          << "apply_transaction returned " << ret << dendl;
+          << "apply_transaction returned " << cpp_strerror(ret) << dendl;
       goto umount_store;
     }
   }
@@ -1697,9 +1816,10 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
     waiter.wait();
   }
 
-  ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
+  ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami);
   if (ret) {
-    derr << "OSD::mkfs: failed to write fsid file: error " << ret << dendl;
+    derr << "OSD::mkfs: failed to write fsid file: error "
+         << cpp_strerror(ret) << dendl;
     goto umount_store;
   }
 
@@ -1710,7 +1830,7 @@ free_store:
   return ret;
 }
 
-int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
+int OSD::write_meta(CephContext *cct, ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
 {
   char val[80];
   int r;
@@ -1730,6 +1850,36 @@ int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid,
   if (r < 0)
     return r;
 
+  string key = cct->_conf->get_val<string>("key");
+  if (key.size()) {
+    r = store->write_meta("osd_key", key);
+    if (r < 0)
+      return r;
+  } else {
+    string keyfile = cct->_conf->get_val<string>("keyfile");
+    if (!keyfile.empty()) {
+      bufferlist keybl;
+      string err;
+      if (keyfile == "-") {
+       static_assert(1024 * 1024 >
+                     (sizeof(CryptoKey) - sizeof(bufferptr) +
+                      sizeof(__u16) + 16 /* AES_KEY_LEN */ + 3 - 1) / 3. * 4.,
+                     "1MB should be enough for a base64 encoded CryptoKey");
+       r = keybl.read_fd(STDIN_FILENO, 1024 * 1024);
+      } else {
+       r = keybl.read_file(keyfile.c_str(), &err);
+      }
+      if (r < 0) {
+       derr << __func__ << " failed to read keyfile " << keyfile << ": "
+            << err << ": " << cpp_strerror(r) << dendl;
+       return r;
+      }
+      r = store->write_meta("osd_key", keybl.to_str());
+      if (r < 0)
+       return r;
+    }
+  }
+
   r = store->write_meta("ready", "ready");
   if (r < 0)
     return r;
@@ -1813,15 +1963,19 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   clog(log_client.create_channel()),
   whoami(id),
   dev_path(dev), journal_path(jdev),
+  store_is_rotational(store->is_rotational()),
   trace_endpoint("0.0.0.0", 0, "osd"),
   asok_hook(NULL),
   osd_compat(get_osd_compat_set()),
-  osd_tp(cct, "OSD::osd_tp", "tp_osd", cct->_conf->osd_op_threads, "osd_op_threads"),
+  peering_tp(cct, "OSD::peering_tp", "tp_peering",
+            cct->_conf->osd_peering_wq_threads,
+            "osd_peering_tp_threads"),
   osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
-    cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
+           get_num_op_threads()),
   disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
   command_tp(cct, "OSD::command_tp", "tp_osd_cmd",  1),
   session_waiting_lock("OSD::session_waiting_lock"),
+  osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false),
   heartbeat_need_update(true),
@@ -1838,7 +1992,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   op_queue(get_io_queue()),
   op_prio_cutoff(get_io_prio_cut()),
   op_shardedwq(
-    cct->_conf->osd_op_num_shards,
+    get_num_op_shards(),
     this,
     cct->_conf->osd_op_thread_timeout,
     cct->_conf->osd_op_thread_suicide_timeout,
@@ -1847,7 +2001,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     this,
     cct->_conf->osd_op_thread_timeout,
     cct->_conf->osd_op_thread_suicide_timeout,
-    &osd_tp),
+    &peering_tp),
   map_lock("OSD::map_lock"),
   pg_map_lock("OSD::pg_map_lock"),
   last_pg_create_epoch(0),
@@ -1958,30 +2112,48 @@ bool OSD::asok_command(string admin_command, cmdmap_t& cmdmap, string format,
   } else if (admin_command == "flush_journal") {
     store->flush_journal();
   } else if (admin_command == "dump_ops_in_flight" ||
-            admin_command == "ops") {
-    if (!op_tracker.dump_ops_in_flight(f)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_blocked_ops") {
-    if (!op_tracker.dump_ops_in_flight(f, true)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_historic_ops") {
-    if (!op_tracker.dump_historic_ops(f, false)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_historic_ops_by_duration") {
-    if (!op_tracker.dump_historic_ops(f, true)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_historic_slow_ops") {
-    if (!op_tracker.dump_historic_slow_ops(f)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+             admin_command == "ops" ||
+             admin_command == "dump_blocked_ops" ||
+             admin_command == "dump_historic_ops" ||
+             admin_command == "dump_historic_ops_by_duration" ||
+             admin_command == "dump_historic_slow_ops") {
+
+    const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \
+even those get stuck. Please enable \"osd_enable_op_tracker\", and the tracker \
+will start to track new ops received afterwards.";
+
+    set<string> filters;
+    vector<string> filter_str;
+    if (cmd_getval(cct, cmdmap, "filterstr", filter_str)) {
+        copy(filter_str.begin(), filter_str.end(),
+           inserter(filters, filters.end()));
+    }
+
+    if (admin_command == "dump_ops_in_flight" ||
+        admin_command == "ops") {
+      if (!op_tracker.dump_ops_in_flight(f, false, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_blocked_ops") {
+      if (!op_tracker.dump_ops_in_flight(f, true, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_historic_ops") {
+      if (!op_tracker.dump_historic_ops(f, false, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_historic_ops_by_duration") {
+      if (!op_tracker.dump_historic_ops(f, true, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_historic_slow_ops") {
+      if (!op_tracker.dump_historic_slow_ops(f, filters)) {
+        ss << error_str;
+      }
     }
   } else if (admin_command == "dump_op_pq_state") {
     f->open_object_section("pq");
@@ -1995,7 +2167,7 @@ bool OSD::asok_command(string admin_command, cmdmap_t& cmdmap, string format,
     curmap->get_blacklist(&bl);
     for (list<pair<entity_addr_t,utime_t> >::iterator it = bl.begin();
        it != bl.end(); ++it) {
-      f->open_array_section("entry");
+      f->open_object_section("entry");
       f->open_object_section("entity_addr_t");
       it->first.dump(f);
       f->close_section(); //entity_addr_t
@@ -2026,7 +2198,7 @@ bool OSD::asok_command(string admin_command, cmdmap_t& cmdmap, string format,
     for (list<obj_watch_item_t>::iterator it = watchers.begin();
        it != watchers.end(); ++it) {
 
-      f->open_array_section("watch");
+      f->open_object_section("watch");
 
       f->dump_string("namespace", it->obj.nspace);
       f->dump_string("object", it->obj.oid.name);
@@ -2035,8 +2207,8 @@ bool OSD::asok_command(string admin_command, cmdmap_t& cmdmap, string format,
       it->wi.name.dump(f);
       f->close_section(); //entity_name_t
 
-      f->dump_int("cookie", it->wi.cookie);
-      f->dump_int("timeout", it->wi.timeout_seconds);
+      f->dump_unsigned("cookie", it->wi.cookie);
+      f->dump_unsigned("timeout", it->wi.timeout_seconds);
 
       f->open_object_section("entity_addr_t");
       it->wi.addr.dump(f);
@@ -2130,6 +2302,18 @@ bool OSD::asok_command(string admin_command, cmdmap_t& cmdmap, string format,
       pg->unlock();
     }
     f->close_section();
+  } else if (admin_command == "compact") {
+    dout(1) << "triggering manual compaction" << dendl;
+    auto start = ceph::coarse_mono_clock::now();
+    store->compact();
+    auto end = ceph::coarse_mono_clock::now();
+    auto time_span = chrono::duration_cast<chrono::duration<double>>(end - start);
+    dout(1) << "finished manual compaction in " 
+            << time_span.count()
+            << " seconds" << dendl;
+    f->open_object_section("compact_result");
+    f->dump_float("elapsed_time", time_span.count());
+    f->close_section();
   } else {
     assert(0 == "broken asok registration");
   }
@@ -2184,10 +2368,10 @@ int OSD::enable_disable_fuse(bool stop)
     delete fuse_store;
     fuse_store = NULL;
     r = ::rmdir(mntpath.c_str());
-    if (r < 0)
-      r = -errno;
     if (r < 0) {
-      derr << __func__ << " failed to rmdir " << mntpath << dendl;
+      r = -errno;
+      derr << __func__ << " failed to rmdir " << mntpath << ": "
+           << cpp_strerror(r) << dendl;
       return r;
     }
     return 0;
@@ -2215,6 +2399,38 @@ int OSD::enable_disable_fuse(bool stop)
   return 0;
 }
 
+int OSD::get_num_op_shards()
+{
+  if (cct->_conf->osd_op_num_shards)
+    return cct->_conf->osd_op_num_shards;
+  if (store_is_rotational)
+    return cct->_conf->osd_op_num_shards_hdd;
+  else
+    return cct->_conf->osd_op_num_shards_ssd;
+}
+
+int OSD::get_num_op_threads()
+{
+  if (cct->_conf->osd_op_num_threads_per_shard)
+    return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard;
+  if (store_is_rotational)
+    return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard_hdd;
+  else
+    return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard_ssd;
+}
+
+float OSD::get_osd_recovery_sleep()
+{
+  if (cct->_conf->osd_recovery_sleep)
+    return cct->_conf->osd_recovery_sleep;
+  if (!store_is_rotational && !journal_is_rotational)
+    return cct->_conf->osd_recovery_sleep_ssd;
+  else if (store_is_rotational && !journal_is_rotational)
+    return cct->_conf->get_val<double>("osd_recovery_sleep_hybrid");
+  else
+    return cct->_conf->osd_recovery_sleep_hdd;
+}
+
 int OSD::init()
 {
   CompatSet initial, diff;
@@ -2225,19 +2441,25 @@ int OSD::init()
   tick_timer.init();
   tick_timer_without_osd_lock.init();
   service.recovery_request_timer.init();
+  service.recovery_sleep_timer.init();
 
   // mount.
-  dout(2) << "mounting " << dev_path << " "
-         << (journal_path.empty() ? "(no journal)" : journal_path) << dendl;
+  dout(2) << "init " << dev_path
+         << " (looks like " << (store_is_rotational ? "hdd" : "ssd") << ")"
+         << dendl;
+  dout(2) << "journal " << journal_path << dendl;
   assert(store);  // call pre_init() first!
 
-  store->set_cache_shards(cct->_conf->osd_op_num_shards);
+  store->set_cache_shards(get_num_op_shards());
 
   int r = store->mount();
   if (r < 0) {
     derr << "OSD:init: unable to mount object store" << dendl;
     return r;
   }
+  journal_is_rotational = store->is_journal_rotational();
+  dout(2) << "journal looks like " << (journal_is_rotational ? "hdd" : "ssd")
+          << dendl;
 
   enable_disable_fuse(false);
 
@@ -2365,6 +2587,9 @@ int OSD::init()
 
   clear_temp_objects();
 
+  // initialize osdmap references in sharded wq
+  op_shardedwq.prune_pg_waiters(osdmap, whoami);
+
   // load up pgs (as they previously existed)
   load_pgs();
 
@@ -2438,7 +2663,13 @@ int OSD::init()
   monc->set_log_client(&log_client);
   update_log_config();
 
-  osd_tp.start();
+  peering_tp.start();
+  
+  service.init();
+  service.publish_map(osdmap);
+  service.publish_superblock(superblock);
+  service.max_oldest_map = superblock.oldest_map;
+
   osd_op_tp.start();
   disk_tp.start();
   command_tp.start();
@@ -2455,18 +2686,15 @@ int OSD::init()
     tick_timer_without_osd_lock.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick_WithoutOSDLock(this));
   }
 
-  service.init();
-  service.publish_map(osdmap);
-  service.publish_superblock(superblock);
-  service.max_oldest_map = superblock.oldest_map;
-
   osd_lock.Unlock();
 
   r = monc->authenticate();
   if (r < 0) {
+    derr << __func__ << " authentication failed: " << cpp_strerror(r)
+         << dendl;
     osd_lock.Lock(); // locker is going to unlock this on function exit
     if (is_stopping())
-      r =  0;
+      r = 0;
     goto monout;
   }
 
@@ -2474,9 +2702,10 @@ int OSD::init()
     derr << "unable to obtain rotating service keys; retrying" << dendl;
     ++rotating_auth_attempts;
     if (rotating_auth_attempts > g_conf->max_rotating_auth_attempts) {
+        derr << __func__ << " wait_auth_rotating timed out" << dendl;
         osd_lock.Lock(); // make locker happy
         if (!is_stopping()) {
-            r = - ETIMEDOUT;
+            r = -ETIMEDOUT;
         }
         goto monout;
     }
@@ -2484,12 +2713,16 @@ int OSD::init()
 
   r = update_crush_device_class();
   if (r < 0) {
+    derr << __func__ << " unable to update_crush_device_class: "
+        << cpp_strerror(r) << dendl;
     osd_lock.Lock();
     goto monout;
   }
 
   r = update_crush_location();
   if (r < 0) {
+    derr << __func__ << " unable to update_crush_location: "
+         << cpp_strerror(r) << dendl;
     osd_lock.Lock();
     goto monout;
   }
@@ -2525,8 +2758,7 @@ int OSD::init()
 
   return 0;
 monout:
-  mgrc.shutdown();
-  monc->shutdown();
+  exit(1);
 
 out:
   enable_disable_fuse(true);
@@ -2548,26 +2780,38 @@ void OSD::final_init()
                                      "flush the journal to permanent store");
   assert(r == 0);
   r = admin_socket->register_command("dump_ops_in_flight",
-                                    "dump_ops_in_flight", asok_hook,
+                                    "dump_ops_in_flight " \
+                                    "name=filterstr,type=CephString,n=N,req=false",
+                                    asok_hook,
                                     "show the ops currently in flight");
   assert(r == 0);
   r = admin_socket->register_command("ops",
-                                    "ops", asok_hook,
+                                    "ops " \
+                                    "name=filterstr,type=CephString,n=N,req=false",
+                                    asok_hook,
                                     "show the ops currently in flight");
   assert(r == 0);
   r = admin_socket->register_command("dump_blocked_ops",
-                                    "dump_blocked_ops", asok_hook,
+                                    "dump_blocked_ops " \
+                                    "name=filterstr,type=CephString,n=N,req=false",
+                                    asok_hook,
                                     "show the blocked ops currently in flight");
   assert(r == 0);
-  r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
+  r = admin_socket->register_command("dump_historic_ops",
+                                     "dump_historic_ops " \
+                                     "name=filterstr,type=CephString,n=N,req=false",
                                     asok_hook,
                                     "show recent ops");
   assert(r == 0);
-  r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops",
+  r = admin_socket->register_command("dump_historic_slow_ops",
+                                     "dump_historic_slow_ops " \
+                                     "name=filterstr,type=CephString,n=N,req=false",
                                     asok_hook,
                                     "show slowest recent ops");
   assert(r == 0);
-  r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
+  r = admin_socket->register_command("dump_historic_ops_by_duration",
+                                     "dump_historic_ops_by_duration " \
+                                     "name=filterstr,type=CephString,n=N,req=false",
                                     asok_hook,
                                     "show slowest recent ops, sorted by duration");
   assert(r == 0);
@@ -2645,6 +2889,12 @@ void OSD::final_init()
                                     "show recent state history");
   assert(r == 0);
 
+  r = admin_socket->register_command("compact", "compact",
+                                    asok_hook,
+                                    "Commpact object store's omap."
+                                     " WARNING: Compaction probably slows your requests");
+  assert(r == 0);
+
   test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
   // Note: pools are CephString instead of CephPoolname because
   // these commands traditionally support both pool names and numbers
@@ -2764,6 +3014,9 @@ void OSD::create_logger()
   };
 
 
+  // All the basic OSD operation stats are to be considered useful
+  osd_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
   osd_plb.add_u64(
     l_osd_op_wip, "op_wip",
     "Replication operations currently being processed (primary)");
@@ -2797,7 +3050,7 @@ void OSD::create_logger()
   osd_plb.add_time_avg(
     l_osd_op_r_lat, "op_r_latency",
     "Latency of read operation (including queue time)");
-  osd_plb.add_histogram(
+  osd_plb.add_u64_counter_histogram(
     l_osd_op_r_lat_outb_hist, "op_r_latency_out_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of operation latency (including queue time) + data read");
@@ -2814,7 +3067,7 @@ void OSD::create_logger()
   osd_plb.add_time_avg(
     l_osd_op_w_lat,  "op_w_latency",
     "Latency of write operation (including queue time)");
-  osd_plb.add_histogram(
+  osd_plb.add_u64_counter_histogram(
     l_osd_op_w_lat_inb_hist, "op_w_latency_in_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of operation latency (including queue time) + data written");
@@ -2836,11 +3089,11 @@ void OSD::create_logger()
   osd_plb.add_time_avg(
     l_osd_op_rw_lat, "op_rw_latency",
     "Latency of read-modify-write operation (including queue time)");
-  osd_plb.add_histogram(
+  osd_plb.add_u64_counter_histogram(
     l_osd_op_rw_lat_inb_hist, "op_rw_latency_in_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of rw operation latency (including queue time) + data written");
-  osd_plb.add_histogram(
+  osd_plb.add_u64_counter_histogram(
     l_osd_op_rw_lat_outb_hist, "op_rw_latency_out_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of rw operation latency (including queue time) + data read");
@@ -2851,6 +3104,15 @@ void OSD::create_logger()
     l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
     "Latency of read-modify-write operations (excluding queue time and wait for finished)");
 
+  // Now we move on to some more obscure stats, revert to assuming things
+  // are low priority unless otherwise specified.
+  osd_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
+
+  osd_plb.add_time_avg(l_osd_op_before_queue_op_lat, "op_before_queue_op_lat",
+    "Latency of IO before calling queue(before really queue into ShardedOpWq)"); // client io before queue op_wq latency
+  osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat",
+    "Latency of IO before calling dequeue_op(already dequeued and get PG lock)"); // client io before dequeue_op latency
+
   osd_plb.add_u64_counter(
     l_osd_sop, "subop", "Suboperations");
   osd_plb.add_u64_counter(
@@ -2905,6 +3167,10 @@ void OSD::create_logger()
   osd_plb.add_u64(
     l_osd_pg_stray, "numpg_stray",
     "Placement groups ready to be deleted from this osd");
+  osd_plb.add_u64(
+    l_osd_pg_removing, "numpg_removing",
+    "Placement groups queued for local deletion", "pgsr",
+    PerfCountersBuilder::PRIO_USEFUL);
   osd_plb.add_u64(
     l_osd_hb_to, "heartbeat_to_peers", "Heartbeat (ping) peers we send to");
   osd_plb.add_u64_counter(l_osd_map, "map_messages", "OSD map messages");
@@ -2914,6 +3180,7 @@ void OSD::create_logger()
   osd_plb.add_u64_counter(
     l_osd_waiting_for_map, "messages_delayed_for_map",
     "Operations waiting for OSD map");
+
   osd_plb.add_u64_counter(
     l_osd_map_cache_hit, "osd_map_cache_hit", "osdmap cache hit");
   osd_plb.add_u64_counter(
@@ -2924,9 +3191,19 @@ void OSD::create_logger()
   osd_plb.add_u64_avg(
     l_osd_map_cache_miss_low_avg, "osd_map_cache_miss_low_avg",
     "osdmap cache miss, avg distance below cache lower bound");
+  osd_plb.add_u64_counter(
+    l_osd_map_bl_cache_hit, "osd_map_bl_cache_hit",
+    "OSDMap buffer cache hits");
+  osd_plb.add_u64_counter(
+    l_osd_map_bl_cache_miss, "osd_map_bl_cache_miss",
+    "OSDMap buffer cache misses");
 
-  osd_plb.add_u64(l_osd_stat_bytes, "stat_bytes", "OSD size");
-  osd_plb.add_u64(l_osd_stat_bytes_used, "stat_bytes_used", "Used space");
+  osd_plb.add_u64(
+    l_osd_stat_bytes, "stat_bytes", "OSD size", "size",
+    PerfCountersBuilder::PRIO_USEFUL);
+  osd_plb.add_u64(
+    l_osd_stat_bytes_used, "stat_bytes_used", "Used space", "used",
+    PerfCountersBuilder::PRIO_USEFUL);
   osd_plb.add_u64(l_osd_stat_bytes_avail, "stat_bytes_avail", "Available space");
 
   osd_plb.add_u64_counter(
@@ -3046,11 +3323,14 @@ int OSD::shutdown()
   set_state(STATE_STOPPING);
 
   // Debugging
-  cct->_conf->set_val("debug_osd", "100");
-  cct->_conf->set_val("debug_journal", "100");
-  cct->_conf->set_val("debug_filestore", "100");
-  cct->_conf->set_val("debug_ms", "100");
-  cct->_conf->apply_changes(NULL);
+  if (cct->_conf->get_val<bool>("osd_debug_shutdown")) {
+    cct->_conf->set_val("debug_osd", "100");
+    cct->_conf->set_val("debug_journal", "100");
+    cct->_conf->set_val("debug_filestore", "100");
+    cct->_conf->set_val("debug_bluestore", "100");
+    cct->_conf->set_val("debug_ms", "100");
+    cct->_conf->apply_changes(NULL);
+  }
 
   // stop MgrClient earlier as it's more like an internal consumer of OSD
   mgrc.shutdown();
@@ -3098,12 +3378,15 @@ int OSD::shutdown()
   cct->get_admin_socket()->unregister_command("dump_watchers");
   cct->get_admin_socket()->unregister_command("dump_reservations");
   cct->get_admin_socket()->unregister_command("get_latest_osdmap");
+  cct->get_admin_socket()->unregister_command("heap");
   cct->get_admin_socket()->unregister_command("set_heap_property");
   cct->get_admin_socket()->unregister_command("get_heap_property");
   cct->get_admin_socket()->unregister_command("dump_objectstore_kv_stats");
+  cct->get_admin_socket()->unregister_command("dump_scrubs");
   cct->get_admin_socket()->unregister_command("calc_objectstore_db_histogram");
   cct->get_admin_socket()->unregister_command("flush_store_cache");
   cct->get_admin_socket()->unregister_command("dump_pgstate_history");
+  cct->get_admin_socket()->unregister_command("compact");
   delete asok_hook;
   asok_hook = NULL;
 
@@ -3115,6 +3398,8 @@ int OSD::shutdown()
   cct->get_admin_socket()->unregister_command("injectdataerr");
   cct->get_admin_socket()->unregister_command("injectmdataerr");
   cct->get_admin_socket()->unregister_command("set_recovery_delay");
+  cct->get_admin_socket()->unregister_command("trigger_scrub");
+  cct->get_admin_socket()->unregister_command("injectfull");
   delete test_ops_hook;
   test_ops_hook = NULL;
 
@@ -3126,9 +3411,9 @@ int OSD::shutdown()
   heartbeat_lock.Unlock();
   heartbeat_thread.join();
 
-  osd_tp.drain();
+  peering_tp.drain();
   peering_wq.clear();
-  osd_tp.stop();
+  peering_tp.stop();
   dout(10) << "osd tp stopped" << dendl;
 
   osd_op_tp.drain();
@@ -3175,6 +3460,8 @@ int OSD::shutdown()
     assert(pg_stat_queue.empty());
   }
 
+  service.shutdown_reserver();
+
   // Remove PGs
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
@@ -3192,7 +3479,9 @@ int OSD::shutdown()
 #ifdef PG_DEBUG_REFS
        p->second->dump_live_ids();
 #endif
-        ceph_abort();
+       if (cct->_conf->osd_shutdown_pgref_assert) {
+         ceph_abort();
+       }
       }
       p->second->unlock();
       p->second->put("PGMap");
@@ -3319,17 +3608,34 @@ int OSD::update_crush_location()
 
 int OSD::update_crush_device_class()
 {
+  if (!cct->_conf->osd_class_update_on_start) {
+    dout(10) << __func__ << " osd_class_update_on_start = false" << dendl;
+    return 0;
+  }
+
   string device_class;
   int r = store->read_meta("crush_device_class", &device_class);
-  if (r < 0)
+  if (r < 0 || device_class.empty()) {
+    device_class = store->get_default_device_class();
+  }
+
+  if (device_class.empty()) {
+    dout(20) << __func__ << " no device class stored locally" << dendl;
     return 0;
+  }
 
   string cmd =
     string("{\"prefix\": \"osd crush set-device-class\", ") +
-    string("\"id\": ") + stringify(whoami) + string(", ") +
-    string("\"class\": \"") + device_class + string("\"}");
-
-  return mon_cmd_maybe_osd_create(cmd);
+    string("\"class\": \"") + device_class + string("\", ") +
+    string("\"ids\": [\"") + stringify(whoami) + string("\"]}");
+
+  r = mon_cmd_maybe_osd_create(cmd);
+  // the above cmd can fail for various reasons, e.g.:
+  //   (1) we are connecting to a pre-luminous monitor
+  //   (2) user manually specify a class other than
+  //       'ceph-disk prepare --crush-device-class'
+  // simply skip result-checking for now
+  return 0;
 }
 
 void OSD::write_superblock(ObjectStore::Transaction& t)
@@ -3644,6 +3950,11 @@ PG *OSD::_lookup_lock_pg(spg_t pgid)
   return pg;
 }
 
+PG *OSD::lookup_lock_pg(spg_t pgid)
+{
+  return _lookup_lock_pg(pgid);
+}
+
 PG *OSD::_lookup_lock_pg_with_map_lock_held(spg_t pgid)
 {
   assert(pg_map.count(pgid));
@@ -3745,6 +4056,24 @@ void OSD::load_pgs()
       pg->upgrade(store);
     }
 
+    if (pg->dne())  {
+      dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
+      pg->ch = nullptr;
+      service.pg_remove_epoch(pg->pg_id);
+      pg->unlock();
+      {
+       // Delete pg
+       RWLock::WLocker l(pg_map_lock);
+       auto p = pg_map.find(pg->get_pgid());
+       assert(p != pg_map.end() && p->second == pg);
+       dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+       pg_map.erase(p);
+       pg->put("PGMap");
+      }
+      recursive_remove_collection(cct, store, pgid, *it);
+      continue;
+    }
+
     service.init_splits_between(pg->info.pgid, pg->get_osdmap(), osdmap);
 
     // generate state for PG's current mapping
@@ -3828,6 +4157,11 @@ void OSD::build_past_intervals_parallel()
         ++i) {
       PG *pg = i->second;
 
+      // Ignore PGs only partially created (DNE)
+      if (pg->info.dne()) {
+       continue;
+      }
+
       auto rpib = pg->get_required_past_interval_bounds(
        pg->info,
        superblock.oldest_map);
@@ -4014,6 +4348,11 @@ int OSD::handle_pg_peering_evt(
       ceph_abort();
     }
 
+    const bool is_mon_create =
+      evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+    if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+      return -EAGAIN;
+    }
     // do we need to resurrect a deleting pg?
     spg_t resurrected;
     PGRef old_pg_state;
@@ -4154,6 +4493,111 @@ int OSD::handle_pg_peering_evt(
   }
 }
 
+bool OSD::maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create)
+{
+  const auto max_pgs_per_osd =
+    (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+     cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+
+  RWLock::RLocker pg_map_locker{pg_map_lock};
+  if (pg_map.size() < max_pgs_per_osd) {
+    return false;
+  }
+  lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+  if (is_mon_create) {
+    pending_creates_from_mon++;
+  } else {
+    bool is_primary = osdmap->get_pg_acting_rank(pgid.pgid, whoami) == 0;
+    pending_creates_from_osd.emplace(pgid.pgid, is_primary);
+  }
+  dout(5) << __func__ << " withhold creation of pg " << pgid
+         << ": " << pg_map.size() << " >= "<< max_pgs_per_osd << dendl;
+  return true;
+}
+
+// to re-trigger a peering, we have to twiddle the pg mapping a little bit,
+// see PG::should_restart_peering(). OSDMap::pg_to_up_acting_osds() will turn
+// to up set if pg_temp is empty. so an empty pg_temp won't work.
+static vector<int32_t> twiddle(const vector<int>& acting) {
+  if (acting.size() > 1) {
+    return {acting[0]};
+  } else {
+    vector<int32_t> twiddled(acting.begin(), acting.end());
+    twiddled.push_back(-1);
+    return twiddled;
+  }
+}
+
+void OSD::resume_creating_pg()
+{
+  bool do_sub_pg_creates = false;
+  bool have_pending_creates = false;
+  {
+    const auto max_pgs_per_osd =
+      (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+       cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+    RWLock::RLocker l(pg_map_lock);
+    if (max_pgs_per_osd <= pg_map.size()) {
+      // this could happen if admin decreases this setting before a PG is removed
+      return;
+    }
+    unsigned spare_pgs = max_pgs_per_osd - pg_map.size();
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    if (pending_creates_from_mon > 0) {
+      do_sub_pg_creates = true;
+      if (pending_creates_from_mon >= spare_pgs) {
+       spare_pgs = pending_creates_from_mon = 0;
+      } else {
+       spare_pgs -= pending_creates_from_mon;
+       pending_creates_from_mon = 0;
+      }
+    }
+    auto pg = pending_creates_from_osd.cbegin();
+    while (spare_pgs > 0 && pg != pending_creates_from_osd.cend()) {
+      dout(20) << __func__ << " pg " << pg->first << dendl;
+      vector<int> acting;
+      osdmap->pg_to_up_acting_osds(pg->first, nullptr, nullptr, &acting, nullptr);
+      service.queue_want_pg_temp(pg->first, twiddle(acting), true);
+      pg = pending_creates_from_osd.erase(pg);
+      do_sub_pg_creates = true;
+      spare_pgs--;
+    }
+    have_pending_creates = (pending_creates_from_mon > 0 ||
+                           !pending_creates_from_osd.empty());
+  }
+
+  bool do_renew_subs = false;
+  if (do_sub_pg_creates) {
+    if (monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0)) {
+      dout(4) << __func__ << ": resolicit pg creates from mon since "
+             << last_pg_create_epoch << dendl;
+      do_renew_subs = true;
+    }
+  }
+  version_t start = osdmap->get_epoch() + 1;
+  if (have_pending_creates) {
+    // don't miss any new osdmap deleting PGs
+    if (monc->sub_want("osdmap", start, 0)) {
+      dout(4) << __func__ << ": resolicit osdmap from mon since "
+             << start << dendl;
+      do_renew_subs = true;
+    }
+  } else if (do_sub_pg_creates) {
+    // no need to subscribe the osdmap continuously anymore
+    // once the pgtemp and/or mon_subscribe(pg_creates) is sent
+    if (monc->sub_want_increment("osdmap", start, CEPH_SUBSCRIBE_ONETIME)) {
+      dout(4) << __func__ << ": re-subscribe osdmap(onetime) since"
+             << start << dendl;
+      do_renew_subs = true;
+    }
+  }
+
+  if (do_renew_subs) {
+    monc->renew_subs();
+  }
+
+  service.send_pg_temp();
+}
 
 void OSD::build_initial_pg_history(
   spg_t pgid,
@@ -4164,6 +4608,7 @@ void OSD::build_initial_pg_history(
 {
   dout(10) << __func__ << " " << pgid << " created " << created << dendl;
   h->epoch_created = created;
+  h->epoch_pool_created = created;
   h->same_interval_since = created;
   h->same_up_since = created;
   h->same_primary_since = created;
@@ -4211,12 +4656,21 @@ void OSD::build_initial_pg_history(
       &debug);
     if (new_interval) {
       h->same_interval_since = e;
-    }
-    if (up != new_up) {
-      h->same_up_since = e;
-    }
-    if (acting_primary != new_acting_primary) {
-      h->same_primary_since = e;
+      if (up != new_up) {
+        h->same_up_since = e;
+      }
+      if (acting_primary != new_acting_primary) {
+        h->same_primary_since = e;
+      }
+      if (pgid.pgid.is_split(lastmap->get_pg_num(pgid.pgid.pool()),
+                             osdmap->get_pg_num(pgid.pgid.pool()),
+                             nullptr)) {
+        h->last_epoch_split = e;
+      }
+      up = new_up;
+      acting = new_acting;
+      up_primary = new_up_primary;
+      acting_primary = new_acting_primary;
     }
     lastmap = osdmap;
   }
@@ -4306,7 +4760,7 @@ bool OSD::project_pg_history(spg_t pgid, pg_history_t& h, epoch_t from,
       break;
   }
 
-  // base case: these floors should be the creation epoch if we didn't
+  // base case: these floors should be the pg creation epoch if we didn't
   // find any changes.
   if (e == h.epoch_created) {
     if (!h.same_interval_since)
@@ -4526,7 +4980,11 @@ void OSD::handle_osd_ping(MOSDPing *m)
   }
 
   OSDMapRef curmap = service.get_osdmap();
-  assert(curmap);
+  if (!curmap) {
+    heartbeat_lock.Unlock();
+    m->put();
+    return;
+  }
 
   switch (m->op) {
 
@@ -4563,8 +5021,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
       Message *r = new MOSDPing(monc->get_fsid(),
                                curmap->get_epoch(),
-                               MOSDPing::PING_REPLY,
-                               m->stamp);
+                               MOSDPing::PING_REPLY, m->stamp,
+                               cct->_conf->osd_heartbeat_min_size);
       m->get_connection()->send_message(r);
 
       if (curmap->is_up(from)) {
@@ -4581,7 +5039,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
        Message *r = new MOSDPing(monc->get_fsid(),
                                  curmap->get_epoch(),
                                  MOSDPing::YOU_DIED,
-                                 m->stamp);
+                                 m->stamp,
+                                 cct->_conf->osd_heartbeat_min_size);
        m->get_connection()->send_message(r);
       }
     }
@@ -4759,14 +5218,14 @@ void OSD::heartbeat()
     dout(30) << "heartbeat sending ping to osd." << peer << dendl;
     i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
                                          service.get_osdmap()->get_epoch(),
-                                         MOSDPing::PING,
-                                         now));
+                                         MOSDPing::PING, now,
+                                         cct->_conf->osd_heartbeat_min_size));
 
     if (i->second.con_front)
       i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
                                             service.get_osdmap()->get_epoch(),
-                                                    MOSDPing::PING,
-                                                    now));
+                                            MOSDPing::PING, now,
+                                         cct->_conf->osd_heartbeat_min_size));
   }
 
   logger->set(l_osd_hb_to, heartbeat_peers.size());
@@ -4845,20 +5304,17 @@ void OSD::tick()
 
   if (is_waiting_for_healthy()) {
     start_boot();
+  } else if (is_preboot() &&
+            waiting_for_luminous_mons &&
+            monc->monmap.get_required_features().contains_all(
+              ceph::features::mon::FEATURE_LUMINOUS)) {
+    // mon upgrade finished!
+    start_boot();
   }
 
   do_waiters();
 
   tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
-
-  if (is_active()) {
-    const auto now = ceph::coarse_mono_clock::now();
-    const auto elapsed = now - last_sent_beacon;
-    if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
-       cct->_conf->osd_beacon_report_interval) {
-      send_beacon(now);
-    }
-  }
 }
 
 void OSD::tick_without_osd_lock()
@@ -4872,6 +5328,7 @@ void OSD::tick_without_osd_lock()
   logger->set(l_osd_cached_crc, buffer::get_cached_crc());
   logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
   logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+  logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
 
   // osd_lock is not being held, which means the OSD state
   // might change when doing the monitor report
@@ -4935,7 +5392,9 @@ void OSD::tick_without_osd_lock()
       // do any pending reports
       send_full_update();
       send_failures();
-      send_pg_stats(now);
+      if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       send_pg_stats(now);
+      }
     }
     map_lock.put_read();
   }
@@ -4945,9 +5404,24 @@ void OSD::tick_without_osd_lock()
       sched_scrub();
     }
     service.promote_throttle_recalibrate();
+    resume_creating_pg();
+    bool need_send_beacon = false;
+    const auto now = ceph::coarse_mono_clock::now();
+    {
+      // borrow lec lock to pretect last_sent_beacon from changing
+      Mutex::Locker l{min_last_epoch_clean_lock};
+      const auto elapsed = now - last_sent_beacon;
+      if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
+        cct->_conf->osd_beacon_report_interval) {
+        need_send_beacon = true;
+      }
+    }
+    if (need_send_beacon) {
+      send_beacon(now);
+    }
   }
 
-  check_ops_in_flight();
+  mgrc.update_osd_health(get_health_metrics());
   service.kick_recovery_queue();
   tick_timer_without_osd_lock.add_event_after(OSD_TICK_INTERVAL, new C_Tick_WithoutOSDLock(this));
 }
@@ -4998,7 +5472,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     if (pool < 0 && isdigit(poolstr[0]))
       pool = atoll(poolstr.c_str());
     if (pool < 0) {
-      ss << "Invalid pool" << poolstr;
+      ss << "Invalid pool '" << poolstr << "''";
       return;
     }
 
@@ -5325,7 +5799,9 @@ void OSD::ms_handle_connect(Connection *con)
       service.send_pg_temp();
       requeue_failures();
       send_failures();
-      send_pg_stats(now);
+      if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       send_pg_stats(now);
+      }
 
       map_lock.put_read();
       if (is_active()) {
@@ -5448,6 +5924,7 @@ void OSD::start_boot()
   }
   dout(1) << __func__ << dendl;
   set_state(STATE_PREBOOT);
+  waiting_for_luminous_mons = false;
   dout(10) << "start_boot - have maps " << superblock.oldest_map
           << ".." << superblock.newest_map << dendl;
   C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
@@ -5472,18 +5949,28 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest)
   heartbeat();
 
   // if our map within recent history, try to add ourselves to the osdmap.
-  if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) {
+  if (osdmap->get_epoch() == 0) {
+    derr << "waiting for initial osdmap" << dendl;
+  } else if (osdmap->is_destroyed(whoami)) {
+    derr << "osdmap says I am destroyed" << dendl;
+    // provide a small margin so we don't livelock seeing if we
+    // un-destroyed ourselves.
+    if (osdmap->get_epoch() > newest - 1) {
+      exit(0);
+    }
+  } else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) {
     derr << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
   } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     derr << "osdmap SORTBITWISE OSDMap flag is NOT set; please set it"
         << dendl;
-  } else if (!osdmap->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+  } else if (osdmap->require_osd_release < CEPH_RELEASE_JEWEL) {
     derr << "osdmap REQUIRE_JEWEL OSDMap flag is NOT set; please set it"
         << dendl;
   } else if (!monc->monmap.get_required_features().contains_all(
               ceph::features::mon::FEATURE_LUMINOUS)) {
     derr << "monmap REQUIRE_LUMINOUS is NOT set; must upgrade all monitors to "
         << "Luminous or later before Luminous OSDs will boot" << dendl;
+    waiting_for_luminous_mons = true;
   } else if (service.need_fullness_update()) {
     derr << "osdmap fullness state needs update" << dendl;
     send_full_update();
@@ -5523,6 +6010,9 @@ void OSD::start_waiting_for_healthy()
   dout(1) << "start_waiting_for_healthy" << dendl;
   set_state(STATE_WAITING_FOR_HEALTHY);
   last_heartbeat_resample = utime_t();
+
+  // subscribe to osdmap updates, in case our peers really are known to be dead
+  osdmap_subscribe(osdmap->get_epoch() + 1, false);
 }
 
 bool OSD::_is_healthy()
@@ -5622,7 +6112,10 @@ void OSD::_collect_metadata(map<string,string> *pm)
 {
   // config info
   (*pm)["osd_data"] = dev_path;
-  (*pm)["osd_journal"] = journal_path;
+  if (store->get_type() == "filestore") {
+    // not applicable for bluestore
+    (*pm)["osd_journal"] = journal_path;
+  }
   (*pm)["front_addr"] = stringify(client_messenger->get_myaddr());
   (*pm)["back_addr"] = stringify(cluster_messenger->get_myaddr());
   (*pm)["hb_front_addr"] = stringify(hb_front_server_messenger->get_myaddr());
@@ -5630,10 +6123,24 @@ void OSD::_collect_metadata(map<string,string> *pm)
 
   // backend
   (*pm)["osd_objectstore"] = store->get_type();
+  (*pm)["rotational"] = store_is_rotational ? "1" : "0";
+  (*pm)["journal_rotational"] = journal_is_rotational ? "1" : "0";
+  (*pm)["default_device_class"] = store->get_default_device_class();
   store->collect_metadata(pm);
 
   collect_sys_info(pm, cct);
 
+  std::string front_iface, back_iface;
+  /*
+  pick_iface(cct,
+      CEPH_PICK_ADDRESS_PUBLIC | CEPH_PICK_ADDRESS_CLUSTER,
+      &front_iface, &back_iface);
+      */
+  (*pm)["front_iface"] = pick_iface(cct,
+      client_messenger->get_myaddr().get_sockaddr_storage());
+  (*pm)["back_iface"] = pick_iface(cct,
+      cluster_messenger->get_myaddr().get_sockaddr_storage());
+
   dout(10) << __func__ << " " << *pm << dendl;
 }
 
@@ -5746,8 +6253,8 @@ void OSD::send_failures()
   utime_t now = ceph_clock_now();
   while (!failure_queue.empty()) {
     int osd = failure_queue.begin()->first;
-    entity_inst_t i = osdmap->get_inst(osd);
     if (!failure_pending.count(osd)) {
+      entity_inst_t i = osdmap->get_inst(osd);
       int failed_for = (int)(double)(now - failure_queue.begin()->second);
       monc->send_mon_message(new MOSDFailure(monc->get_fsid(), i, failed_for,
                                             osdmap->get_epoch()));
@@ -5766,6 +6273,7 @@ void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i)
 void OSD::send_pg_stats(const utime_t &now)
 {
   assert(map_lock.is_locked());
+  assert(osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS);
   dout(20) << "send_pg_stats" << dendl;
 
   osd_stat_t cur_stat = service.get_osd_stat();
@@ -5917,12 +6425,12 @@ void OSD::send_beacon(const ceph::coarse_mono_clock::time_point& now)
       monmap.get_required_features().contains_all(
         ceph::features::mon::FEATURE_LUMINOUS)) {
     dout(20) << __func__ << " sending" << dendl;
-    last_sent_beacon = now;
     MOSDBeacon* beacon = nullptr;
     {
       Mutex::Locker l{min_last_epoch_clean_lock};
       beacon = new MOSDBeacon(osdmap->get_epoch(), min_last_epoch_clean);
       std::swap(beacon->pgs, min_last_epoch_clean_pgs);
+      last_sent_beacon = now;
     }
     monc->send_mon_message(beacon);
   } else {
@@ -6012,6 +6520,11 @@ COMMAND("list_missing " \
        "name=offset,type=CephString,req=false",
        "list missing objects on this pg, perhaps starting at an offset given in JSON",
        "osd", "r", "cli,rest")
+COMMAND("perf histogram dump "
+        "name=logger,type=CephString,req=false "
+        "name=counter,type=CephString,req=false",
+       "Get histogram data",
+       "osd", "r", "cli,rest")
 
 // tell <osd.n> commands.  Validation of osd.n must be special-cased in client
 COMMAND("version", "report version of OSD", "osd", "r", "cli,rest")
@@ -6020,6 +6533,10 @@ COMMAND("injectargs " \
        "name=injected_args,type=CephString,n=N",
        "inject configuration arguments into running OSD",
        "osd", "rw", "cli,rest")
+COMMAND("config set " \
+       "name=key,type=CephString name=value,type=CephString",
+       "Set a configuration option at runtime (not persistent)",
+       "osd", "rw", "cli,rest")
 COMMAND("cluster_log " \
        "name=level,type=CephChoices,strings=error,warning,info,debug " \
        "name=message,type=CephString,n=N",
@@ -6051,6 +6568,10 @@ COMMAND("dump_pg_recovery_stats", "dump pg recovery statistics",
        "osd", "r", "cli,rest")
 COMMAND("reset_pg_recovery_stats", "reset pg recovery statistics",
        "osd", "rw", "cli,rest")
+COMMAND("compact",
+        "compact object store's omap. "
+        "WARNING: Compaction probably slows your requests",
+        "osd", "rw", "cli,rest")
 };
 
 void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data)
@@ -6130,6 +6651,18 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     r = cct->_conf->injectargs(args, &ss);
     osd_lock.Lock();
   }
+  else if (prefix == "config set") {
+    std::string key;
+    std::string val;
+    cmd_getval(cct, cmdmap, "key", key);
+    cmd_getval(cct, cmdmap, "value", val);
+    osd_lock.Unlock();
+    r = cct->_conf->set_val(key, val, true, &ss);
+    if (r == 0) {
+      cct->_conf->apply_changes(nullptr);
+    }
+    osd_lock.Lock();
+  }
   else if (prefix == "cluster_log") {
     vector<string> msg;
     cmd_getval(cct, cmdmap, "message", msg);
@@ -6361,7 +6894,12 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
   }
 
   else if (prefix == "flush_pg_stats") {
-    flush_pg_stats();
+    if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      mgrc.send_pgstats();
+      ds << service.get_osd_stat_seq() << "\n";
+    } else {
+      flush_pg_stats();
+    }
   }
 
   else if (prefix == "heap") {
@@ -6448,6 +6986,30 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     pg_recovery_stats.reset();
   }
 
+  else if (prefix == "perf histogram dump") {
+    std::string logger;
+    std::string counter;
+    cmd_getval(cct, cmdmap, "logger", logger);
+    cmd_getval(cct, cmdmap, "counter", counter);
+    if (f) {
+      cct->get_perfcounters_collection()->dump_formatted_histograms(
+          f.get(), false, logger, counter);
+      f->flush(ds);
+    }
+  }
+
+  else if (prefix == "compact") {
+    dout(1) << "triggering manual compaction" << dendl;
+    auto start = ceph::coarse_mono_clock::now();
+    store->compact();
+    auto end = ceph::coarse_mono_clock::now();
+    auto time_span = chrono::duration_cast<chrono::duration<double>>(end - start);
+    dout(1) << "finished manual compaction in "
+            << time_span.count()
+            << " seconds" << dendl;
+    ss << "compacted omap in " << time_span.count() << " seconds";
+  }
+
   else {
     ss << "unrecognized command! " << cmd;
     r = -EINVAL;
@@ -6654,6 +7216,11 @@ bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for
 {
   dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
 
+  if (is_stopping()) {
+    dout(10) << __func__ << " bailing, we are shutting down" << dendl;
+    return false;
+  }
+
   if (dest_type == CEPH_ENTITY_TYPE_MON)
     return true;
 
@@ -6671,9 +7238,11 @@ bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for
 }
 
 
-bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
-                              int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
-                              bool& isvalid, CryptoKey& session_key)
+bool OSD::ms_verify_authorizer(
+  Connection *con, int peer_type,
+  int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
+  bool& isvalid, CryptoKey& session_key,
+  std::unique_ptr<AuthAuthorizerChallenge> *challenge)
 {
   AuthAuthorizeHandler *authorize_handler = 0;
   switch (peer_type) {
@@ -6700,10 +7269,16 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
   uint64_t global_id;
   uint64_t auid = CEPH_AUTH_UID_DEFAULT;
 
-  isvalid = authorize_handler->verify_authorizer(
-    cct, monc->rotating_secrets.get(),
-    authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
-    &auid);
+  RotatingKeyRing *keys = monc->rotating_secrets.get();
+  if (keys) {
+    isvalid = authorize_handler->verify_authorizer(
+      cct, keys,
+      authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
+      &auid, challenge);
+  } else {
+    dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
+    isvalid = false;
+  }
 
   if (isvalid) {
     Session *s = static_cast<Session *>(con->get_priv());
@@ -6822,6 +7397,10 @@ void OSD::_dispatch(Message *m)
     handle_scrub(static_cast<MOSDScrub*>(m));
     break;
 
+  case MSG_OSD_FORCE_RECOVERY:
+    handle_force_recovery(m);
+    break;
+
     // -- need OSDMap --
 
   case MSG_OSD_PG_CREATE:
@@ -6950,6 +7529,25 @@ bool OSD::scrub_time_permit(utime_t now)
   struct tm bdt;
   time_t tt = now.sec();
   localtime_r(&tt, &bdt);
+
+  bool day_permit = false;
+  if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
+    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+      day_permit = true;
+    }
+  } else {
+    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+      day_permit = true;
+    }
+  }
+
+  if (!day_permit) {
+    dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
+            << " - " << cct->_conf->osd_scrub_end_week_day
+            << " now " << bdt.tm_wday << " = no" << dendl;
+    return false;
+  }
+
   bool time_permit = false;
   if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
     if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
@@ -7011,6 +7609,11 @@ void OSD::sched_scrub()
   if (!service.can_inc_scrubs_pending()) {
     return;
   }
+  if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
+    dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
+    return;
+  }
+
 
   utime_t now = ceph_clock_now();
   bool time_permit = scrub_time_permit(now);
@@ -7029,11 +7632,6 @@ void OSD::sched_scrub()
        break;
       }
 
-      if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
-        dout(10) << __func__ << "not scheduling scrub of " << scrub.pgid << " due to active recovery ops" << dendl;
-        break;
-      }
-
       if ((scrub.deadline >= now) && !(time_permit && load_is_low)) {
         dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
                  << (!time_permit ? "time not permit" : "high load") << dendl;
@@ -7061,6 +7659,20 @@ void OSD::sched_scrub()
 
 
 
+vector<OSDHealthMetric> OSD::get_health_metrics()
+{
+  vector<OSDHealthMetric> metrics;
+  lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+  auto n_primaries = pending_creates_from_mon;
+  for (const auto& create : pending_creates_from_osd) {
+    if (create.second) {
+      n_primaries++;
+    }
+  }
+  metrics.emplace_back(osd_metric::PENDING_CREATING_PGS, n_primaries);
+  return metrics;
+}
+
 // =====================================================
 // MAP
 
@@ -7133,10 +7745,12 @@ struct C_OnMapApply : public Context {
 
 void OSD::osdmap_subscribe(version_t epoch, bool force_request)
 {
-  OSDMapRef osdmap = service.get_osdmap();
-  if (osdmap->get_epoch() >= epoch)
+  Mutex::Locker l(osdmap_subscribe_lock);
+  if (latest_subscribed_epoch >= epoch && !force_request)
     return;
 
+  latest_subscribed_epoch = MAX(epoch, latest_subscribed_epoch);
+
   if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
       force_request) {
     monc->renew_subs();
@@ -7176,7 +7790,8 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps)
   if (num > 0) {
     service.publish_superblock(superblock);
     write_superblock(t);
-    store->queue_transaction(service.meta_osr.get(), std::move(t), nullptr);
+    int tr = store->queue_transaction(service.meta_osr.get(), std::move(t), nullptr);
+    assert(tr == 0);
   }
   // we should not remove the cached maps
   assert(min <= service.map_cache.cached_key_lower_bound());
@@ -7364,12 +7979,6 @@ void OSD::handle_osd_map(MOSDMap *m)
     rerequest_full_maps();
   }
 
-  if (last <= superblock.newest_map) {
-    dout(10) << " no new maps here, dropping" << dendl;
-    m->put();
-    return;
-  }
-
   if (superblock.oldest_map) {
     // make sure we at least keep pace with incoming maps
     trim_maps(m->oldest_map, last - first + 1, skip_maps);
@@ -7405,6 +8014,10 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     return;
   }
   Mutex::Locker l(osd_lock);
+  if (is_stopping()) {
+    dout(10) << __func__ << " bailing, we are shutting down" << dendl;
+    return;
+  }
   map_lock.get_write();
 
   bool do_shutdown = false;
@@ -7444,8 +8057,9 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
       }
     }
 
-    if (osdmap->test_flag(CEPH_OSDMAP_NOUP) !=
-       newmap->test_flag(CEPH_OSDMAP_NOUP)) {
+    if ((osdmap->test_flag(CEPH_OSDMAP_NOUP) !=
+        newmap->test_flag(CEPH_OSDMAP_NOUP)) ||
+        (osdmap->is_noup(whoami) != newmap->is_noup(whoami))) {
       dout(10) << __func__ << " NOUP flag changed in " << newmap->get_epoch()
               << dendl;
       if (is_booting()) {
@@ -7458,6 +8072,13 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        do_restart = true;
       }
     }
+    if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS &&
+       newmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      dout(10) << __func__ << " require_osd_release reached luminous in "
+              << newmap->get_epoch() << dendl;
+      clear_pg_stat_queue();
+      clear_outstanding_pg_stats();
+    }
 
     osdmap = newmap;
     epoch_t up_epoch;
@@ -7513,9 +8134,11 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        if (service.is_preparing_to_stop() || service.is_stopping()) {
          service.got_stop_ack();
        } else {
-         clog->warn() << "map e" << osdmap->get_epoch()
-                      << " wrongly marked me down at e"
-                      << osdmap->get_down_at(whoami);
+          clog->warn() << "Monitor daemon marked osd." << whoami << " down, "
+                          "but it is still running";
+          clog->debug() << "map e" << osdmap->get_epoch()
+                        << " wrongly marked me down at e"
+                        << osdmap->get_down_at(whoami);
        }
       } else if (!osdmap->get_addr(whoami).probably_equals(
                   client_messenger->get_myaddr())) {
@@ -7531,7 +8154,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
       } else if (!osdmap->get_hb_back_addr(whoami).probably_equals(
                   hb_back_server_messenger->get_myaddr())) {
        clog->error() << "map e" << osdmap->get_epoch()
-                     << " had wrong hb back addr ("
+                     << " had wrong heartbeat back addr ("
                      << osdmap->get_hb_back_addr(whoami)
                      << " != my " << hb_back_server_messenger->get_myaddr()
                      << ")";
@@ -7539,7 +8162,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
                 !osdmap->get_hb_front_addr(whoami).probably_equals(
                   hb_front_server_messenger->get_myaddr())) {
        clog->error() << "map e" << osdmap->get_epoch()
-                     << " had wrong hb front addr ("
+                     << " had wrong heartbeat front addr ("
                      << osdmap->get_hb_front_addr(whoami)
                      << " != my " << hb_front_server_messenger->get_myaddr()
                      << ")";
@@ -7632,12 +8255,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     activate_map();
   }
 
-  if (m->newest_map && m->newest_map > last) {
-    dout(10) << " msg say newest map is " << m->newest_map
-            << ", requesting more" << dendl;
-    osdmap_subscribe(osdmap->get_epoch()+1, false);
-  }
-  else if (do_shutdown) {
+  if (do_shutdown) {
     if (network_error) {
       Mutex::Locker l(heartbeat_lock);
       map<int,pair<utime_t,entity_inst_t>>::iterator it =
@@ -7653,6 +8271,11 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     dout(0) << __func__ << " shutdown OSD via async signal" << dendl;
     queue_async_signal(SIGINT);
   }
+  else if (m->newest_map && m->newest_map > last) {
+    dout(10) << " msg say newest map is " << m->newest_map
+            << ", requesting more" << dendl;
+    osdmap_subscribe(osdmap->get_epoch()+1, false);
+  }
   else if (is_preboot()) {
     if (m->get_source().is_mon())
       _preboot(m->oldest_map, m->newest_map);
@@ -7726,7 +8349,7 @@ bool OSD::advance_pg(
   epoch_t osd_epoch, PG *pg,
   ThreadPool::TPHandle &handle,
   PG::RecoveryCtx *rctx,
-  set<boost::intrusive_ptr<PG> > *new_pgs)
+  set<PGRef> *new_pgs)
 {
   assert(pg->is_locked());
   epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1;
@@ -7798,6 +8421,15 @@ void OSD::consume_map()
   assert(osd_lock.is_locked());
   dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
 
+  /** make sure the cluster is speaking in SORTBITWISE, because we don't
+   *  speak the older sorting version any more. Be careful not to force
+   *  a shutdown if we are merely processing old maps, though.
+   */
+  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE) && is_active()) {
+    derr << __func__ << " SORTBITWISE flag is not set" << dendl;
+    ceph_abort();
+  }
+
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
   list<PGRef> to_remove;
 
@@ -7825,6 +8457,16 @@ void OSD::consume_map()
 
       pg->unlock();
     }
+
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    for (auto pg = pending_creates_from_osd.cbegin();
+        pg != pending_creates_from_osd.cend();) {
+      if (osdmap->get_pg_acting_rank(pg->first, whoami) < 0) {
+       pg = pending_creates_from_osd.erase(pg);
+      } else {
+       ++pg;
+      }
+    }
   }
 
   for (list<PGRef>::iterator i = to_remove.begin();
@@ -7871,6 +8513,7 @@ void OSD::consume_map()
   logger->set(l_osd_pg_primary, num_pg_primary);
   logger->set(l_osd_pg_replica, num_pg_replica);
   logger->set(l_osd_pg_stray, num_pg_stray);
+  logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
 }
 
 void OSD::activate_map()
@@ -7879,11 +8522,6 @@ void OSD::activate_map()
 
   dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
 
-  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
-    derr << __func__ << " SORTBITWISE flag is not set" << dendl;
-    ceph_abort();
-  }
-
   if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
     dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
     osdmap_subscribe(osdmap->get_epoch() + 1, false);
@@ -8031,7 +8669,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
 
 void OSD::split_pgs(
   PG *parent,
-  const set<spg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+  const set<spg_t> &childpgids, set<PGRef> *out_pgs,
   OSDMapRef curmap,
   OSDMapRef nextmap,
   PG::RecoveryCtx *rctx)
@@ -8158,7 +8796,6 @@ void OSD::handle_pg_create(OpRequestRef op)
               << dendl;
       continue;
     }
-
     if (handle_pg_peering_evt(
           pgid,
           history,
@@ -8173,8 +8810,13 @@ void OSD::handle_pg_create(OpRequestRef op)
       service.send_pg_created(pgid.pgid);
     }
   }
-  last_pg_create_epoch = m->epoch;
 
+  {
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    if (pending_creates_from_mon == 0) {
+      last_pg_create_epoch = m->epoch;
+    }
+  }
   maybe_update_heartbeat_peers();
 }
 
@@ -8294,7 +8936,7 @@ void OSD::do_notifies(
       continue;
     }
     service.share_map_peer(it->first, con.get(), curmap);
-    dout(7) << __func__ << " osd " << it->first
+    dout(7) << __func__ << " osd." << it->first
            << " on " << it->second.size() << " PGs" << dendl;
     MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
                                       it->second);
@@ -8550,6 +9192,8 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op)
        m->query_epoch,
        PG::RemoteBackfillReserved()));
   } else if (m->type == MBackfillReserve::REJECT) {
+    // NOTE: this is replica -> primary "i reject your request"
+    //      and also primary -> replica "cancel my previously-granted request"
     evt = PG::CephPeeringEvtRef(
       new PG::CephPeeringEvt(
        m->query_epoch,
@@ -8622,6 +9266,33 @@ void OSD::handle_pg_recovery_reserve(OpRequestRef op)
   pg->unlock();
 }
 
+void OSD::handle_force_recovery(Message *m)
+{
+  MOSDForceRecovery *msg = static_cast<MOSDForceRecovery*>(m);
+  assert(msg->get_type() == MSG_OSD_FORCE_RECOVERY);
+
+  vector<PGRef> local_pgs;
+  local_pgs.reserve(msg->forced_pgs.size());
+
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (auto& i : msg->forced_pgs) {
+      spg_t locpg;
+      if (osdmap->get_primary_shard(i, &locpg)) {
+       auto pg_map_entry = pg_map.find(locpg);
+       if (pg_map_entry != pg_map.end()) {
+         local_pgs.push_back(pg_map_entry->second);
+       }
+      }
+    }
+  }
+
+  if (local_pgs.size()) {
+    service.adjust_pg_priorities(local_pgs, msg->options);
+  }
+
+  msg->put();
+}
 
 /** PGQuery
  * from primary to replica | stray
@@ -8829,7 +9500,6 @@ void OSD::_remove_pg(PG *pg)
   pg->put("PGMap"); // since we've taken it out of map
 }
 
-
 // =========================================================
 // RECOVERY
 
@@ -8876,23 +9546,100 @@ bool OSDService::_recover_now(uint64_t *available_pushes)
   return true;
 }
 
+
+void OSDService::adjust_pg_priorities(const vector<PGRef>& pgs, int newflags)
+{
+  if (!pgs.size() || !(newflags & (OFR_BACKFILL | OFR_RECOVERY)))
+    return;
+  int newstate = 0;
+
+  if (newflags & OFR_BACKFILL) {
+    newstate = PG_STATE_FORCED_BACKFILL;
+  } else if (newflags & OFR_RECOVERY) {
+    newstate = PG_STATE_FORCED_RECOVERY;
+  }
+
+  // debug output here may get large, don't generate it if debug level is below
+  // 10 and use abbreviated pg ids otherwise
+  if ((cct)->_conf->subsys.should_gather(ceph_subsys_osd, 10)) {
+    stringstream ss;
+
+    for (auto& i : pgs) {
+      ss << i->get_pgid() << " ";
+    }
+
+    dout(10) << __func__ << " working on " << ss.str() << dendl;
+  }
+
+  if (newflags & OFR_CANCEL) {
+    for (auto& i : pgs) {
+      i->lock();
+      i->_change_recovery_force_mode(newstate, true);
+      i->unlock();
+    }
+  } else {
+    for (auto& i : pgs) {
+      // make sure the PG is in correct state before forcing backfill or recovery, or
+      // else we'll make PG keeping FORCE_* flag forever, requiring osds restart
+      // or forcing somehow recovery/backfill.
+      i->lock();
+      int pgstate = i->get_state();
+      if ( ((newstate == PG_STATE_FORCED_RECOVERY) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) ||
+           ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) )
+        i->_change_recovery_force_mode(newstate, false);
+      i->unlock();
+    }
+  }
+}
+
 void OSD::do_recovery(
   PG *pg, epoch_t queued, uint64_t reserved_pushes,
   ThreadPool::TPHandle &handle)
 {
   uint64_t started = 0;
-  if (cct->_conf->osd_recovery_sleep > 0) {
-    handle.suspend_tp_timeout();
-    pg->unlock();
-    utime_t t;
-    t.set_from_double(cct->_conf->osd_recovery_sleep);
-    t.sleep();
-    dout(20) << __func__ << " slept for " << t << dendl;
-    pg->lock();
-    handle.reset_tp_timeout();
+
+  /*
+   * When the value of osd_recovery_sleep is set greater than zero, recovery
+   * ops are scheduled after osd_recovery_sleep amount of time from the previous
+   * recovery event's schedule time. This is done by adding a
+   * recovery_requeue_callback event, which re-queues the recovery op using
+   * queue_recovery_after_sleep.
+   */
+  float recovery_sleep = get_osd_recovery_sleep();
+  {
+    Mutex::Locker l(service.recovery_sleep_lock);
+    if (recovery_sleep > 0 && service.recovery_needs_sleep) {
+      PGRef pgref(pg);
+      auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
+        dout(20) << "do_recovery wake up at "
+                 << ceph_clock_now()
+                << ", re-queuing recovery" << dendl;
+       Mutex::Locker l(service.recovery_sleep_lock);
+        service.recovery_needs_sleep = false;
+        service.queue_recovery_after_sleep(pgref.get(), queued, reserved_pushes);
+      });
+
+      // This is true for the first recovery op and when the previous recovery op
+      // has been scheduled in the past. The next recovery op is scheduled after
+      // completing the sleep from now.
+      if (service.recovery_schedule_time < ceph_clock_now()) {
+        service.recovery_schedule_time = ceph_clock_now();
+      }
+      service.recovery_schedule_time += recovery_sleep;
+      service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
+                                               recovery_requeue_callback);
+      dout(20) << "Recovery event scheduled at "
+               << service.recovery_schedule_time << dendl;
+      return;
+    }
   }
 
   {
+    {
+      Mutex::Locker l(service.recovery_sleep_lock);
+      service.recovery_needs_sleep = true;
+    }
+
     if (pg->pg_has_reset_since(queued)) {
       goto out;
     }
@@ -8929,9 +9676,27 @@ void OSD::do_recovery(
     if (!more && pg->have_unfound()) {
       pg->discover_all_missing(*rctx.query_map);
       if (rctx.query_map->empty()) {
-       dout(10) << "do_recovery  no luck, giving up on this pg for now" << dendl;
+       string action;
+        if (pg->state_test(PG_STATE_BACKFILLING)) {
+         auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
+           queued,
+           queued,
+           PG::DeferBackfill(cct->_conf->osd_recovery_retry_interval)));
+         pg->queue_peering_event(evt);
+         action = "in backfill";
+        } else if (pg->state_test(PG_STATE_RECOVERING)) {
+         auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
+           queued,
+           queued,
+           PG::DeferRecovery(cct->_conf->osd_recovery_retry_interval)));
+         pg->queue_peering_event(evt);
+         action = "in recovery";
+       } else {
+         action = "already out of recovery/backfill";
+       }
+       dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
       } else {
-       dout(10) << "do_recovery  no luck, giving up on this pg for now" << dendl;
+       dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
        pg->queue_recovery();
       }
     }
@@ -8985,10 +9750,7 @@ void OSDService::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
 
 bool OSDService::is_recovery_active()
 {
-  if (recovery_ops_active > 0)
-    return true;
-
-  return false;
+  return local_reserver.has_reservation() || remote_reserver.has_reservation();
 }
 
 // =========================================================
@@ -9016,6 +9778,7 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
   op->osd_trace.keyval("priority", op->get_req()->get_priority());
   op->osd_trace.keyval("cost", op->get_req()->get_cost());
   op->mark_queued_for_pg();
+  logger->tinc(l_osd_op_before_queue_op_lat, latency);
   op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));
 }
 
@@ -9040,6 +9803,8 @@ void OSD::dequeue_op(
           << " " << *(op->get_req())
           << " pg " << *pg << dendl;
 
+  logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
+
   Session *session = static_cast<Session *>(
     op->get_req()->get_connection()->get_priv());
   if (session) {
@@ -9063,27 +9828,28 @@ void OSD::dequeue_op(
 
 struct C_CompleteSplits : public Context {
   OSD *osd;
-  set<boost::intrusive_ptr<PG> > pgs;
-  C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in)
+  set<PGRef> pgs;
+  C_CompleteSplits(OSD *osd, const set<PGRef> &in)
     : osd(osd), pgs(in) {}
   void finish(int r) override {
     Mutex::Locker l(osd->osd_lock);
     if (osd->is_stopping())
       return;
     PG::RecoveryCtx rctx = osd->create_context();
-    for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
+    for (set<PGRef>::iterator i = pgs.begin();
         i != pgs.end();
         ++i) {
       osd->pg_map_lock.get_write();
       (*i)->lock();
-      osd->add_newly_split_pg(&**i, &rctx);
+      PG *pg = i->get();
+      osd->add_newly_split_pg(pg, &rctx);
       if (!((*i)->deleting)) {
         set<spg_t> to_complete;
         to_complete.insert((*i)->info.pgid);
         osd->service.complete_split(to_complete);
       }
       osd->pg_map_lock.put_write();
-      osd->dispatch_context_transaction(rctx, &**i);
+      osd->dispatch_context_transaction(rctx, pg);
       osd->wake_pg_waiters(*i);
       (*i)->unlock();
     }
@@ -9105,7 +9871,7 @@ void OSD::process_peering_events(
   for (list<PG*>::const_iterator i = pgs.begin();
        i != pgs.end();
        ++i) {
-    set<boost::intrusive_ptr<PG> > split_pgs;
+    set<PGRef> split_pgs;
     PG *pg = *i;
     pg->lock_suspend_timeout(handle);
     curmap = service.get_osdmap();
@@ -9148,8 +9914,13 @@ const char** OSD::get_tracked_conf_keys() const
   static const char* KEYS[] = {
     "osd_max_backfills",
     "osd_min_recovery_priority",
-    "osd_op_complaint_time", "osd_op_log_threshold",
-    "osd_op_history_size", "osd_op_history_duration",
+    "osd_max_trimming_pgs",
+    "osd_op_complaint_time",
+    "osd_op_log_threshold",
+    "osd_op_history_size",
+    "osd_op_history_duration",
+    "osd_op_history_slow_op_size",
+    "osd_op_history_slow_op_threshold",
     "osd_enable_op_tracker",
     "osd_map_cache_size",
     "osd_map_max_advance",
@@ -9170,6 +9941,8 @@ const char** OSD::get_tracked_conf_keys() const
     "osd_recovery_delay_start",
     "osd_client_message_size_cap",
     "osd_client_message_cap",
+    "osd_heartbeat_min_size",
+    "osd_heartbeat_interval",
     NULL
   };
   return KEYS;
@@ -9381,6 +10154,7 @@ int OSD::init_op_flags(OpRequestRef& op)
       if (base_pool && base_pool->require_rollback()) {
         if ((iter->op.op != CEPH_OSD_OP_READ) &&
             (iter->op.op != CEPH_OSD_OP_CHECKSUM) &&
+            (iter->op.op != CEPH_OSD_OP_CMPEXT) &&
             (iter->op.op != CEPH_OSD_OP_STAT) &&
             (iter->op.op != CEPH_OSD_OP_ISDIRTY) &&
             (iter->op.op != CEPH_OSD_OP_UNDIRTY) &&
@@ -9530,6 +10304,7 @@ void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
   in_use.insert(out->begin(), out->end());
 }
 
+
 // =============================================================
 
 #undef dout_context
@@ -9542,7 +10317,6 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
   uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
   auto sdata = shard_list[shard_index];
   bool queued = false;
-  unsigned pushes_to_free = 0;
   {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     auto p = sdata->pg_slots.find(pgid);
@@ -9555,18 +10329,12 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
           ++i) {
        sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
       }
-      for (auto& q : p->second.to_process) {
-       pushes_to_free += q.get_reserved_pushes();
-      }
       p->second.to_process.clear();
       p->second.waiting_for_pg = false;
       ++p->second.requeue_seq;
       queued = true;
     }
   }
-  if (pushes_to_free > 0) {
-    osd->service.release_reserved_pushes(pushes_to_free);
-  }
   if (queued) {
     sdata->sdata_lock.Lock();
     sdata->sdata_cond.SignalOne();
@@ -9655,10 +10423,10 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   if (sdata->pqueue->empty()) {
     dout(20) << __func__ << " empty q, waiting" << dendl;
     // optimistically sleep a moment; maybe another work item will come along.
-    sdata->sdata_op_ordering_lock.Unlock();
     osd->cct->get_heartbeat_map()->reset_timeout(hb,
       osd->cct->_conf->threadpool_default_timeout, 0);
     sdata->sdata_lock.Lock();
+    sdata->sdata_op_ordering_lock.Unlock();
     sdata->sdata_cond.WaitInterval(sdata->sdata_lock,
       utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
     sdata->sdata_lock.Unlock();
@@ -9922,3 +10690,21 @@ int heap(CephContext& cct, cmdmap_t& cmdmap, Formatter& f, std::ostream& os)
  
 }} // namespace ceph::osd_cmds
 
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) {
+  switch(q) {
+  case OSD::io_queue::prioritized:
+    out << "prioritized";
+    break;
+  case OSD::io_queue::weightedpriority:
+    out << "weightedpriority";
+    break;
+  case OSD::io_queue::mclock_opclass:
+    out << "mclock_opclass";
+    break;
+  case OSD::io_queue::mclock_client:
+    out << "mclock_client";
+    break;
+  }
+  return out;
+}