]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osd / OSD.cc
index 426da2440089115065c5c2e191580b1137636ee0..54fedcddcabf0624d4820294494ee9ab58691a8b 100644 (file)
@@ -247,9 +247,9 @@ OSDService::OSDService(OSD *osd) :
   recovery_sleep_lock("OSDService::recovery_sleep_lock"),
   recovery_sleep_timer(cct, recovery_sleep_lock, false),
   reserver_finisher(cct),
-  local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+  local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                 cct->_conf->osd_min_recovery_priority),
-  remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+  remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                  cct->_conf->osd_min_recovery_priority),
   pg_temp_lock("OSDService::pg_temp_lock"),
   snap_sleep_lock("OSDService::snap_sleep_lock"),
@@ -258,7 +258,7 @@ OSDService::OSDService(OSD *osd) :
   scrub_sleep_lock("OSDService::scrub_sleep_lock"),
   scrub_sleep_timer(
     osd->client_messenger->cct, scrub_sleep_lock, false /* relax locking */),
-  snap_reserver(&reserver_finisher,
+  snap_reserver(cct, &reserver_finisher,
                cct->_conf->osd_max_trimming_pgs),
   recovery_lock("OSDService::recovery_lock"),
   recovery_ops_active(0),
@@ -1034,13 +1034,16 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f
 }
 
 
-void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid,
+                                   const vector<int>& want,
+                                   bool forced)
 {
   Mutex::Locker l(pg_temp_lock);
-  map<pg_t,vector<int> >::iterator p = pg_temp_pending.find(pgid);
+  auto p = pg_temp_pending.find(pgid);
   if (p == pg_temp_pending.end() ||
-      p->second != want) {
-    pg_temp_wanted[pgid] = want;
+      p->second.acting != want ||
+      forced) {
+    pg_temp_wanted[pgid] = pg_temp_t{want, forced};
   }
 }
 
@@ -1053,10 +1056,8 @@ void OSDService::remove_want_pg_temp(pg_t pgid)
 
 void OSDService::_sent_pg_temp()
 {
-  for (map<pg_t,vector<int> >::iterator p = pg_temp_wanted.begin();
-       p != pg_temp_wanted.end();
-       ++p)
-    pg_temp_pending[p->first] = p->second;
+  pg_temp_pending.insert(make_move_iterator(begin(pg_temp_wanted)),
+                        make_move_iterator(end(pg_temp_wanted)));
   pg_temp_wanted.clear();
 }
 
@@ -1073,15 +1074,37 @@ void OSDService::requeue_pg_temp()
           << pg_temp_wanted.size() << dendl;
 }
 
+std::ostream& operator<<(std::ostream& out,
+                        const OSDService::pg_temp_t& pg_temp)
+{
+  out << pg_temp.acting;
+  if (pg_temp.forced) {
+    out << " (forced)";
+  }
+  return out;
+}
+
 void OSDService::send_pg_temp()
 {
   Mutex::Locker l(pg_temp_lock);
   if (pg_temp_wanted.empty())
     return;
   dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
-  MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
-  m->pg_temp = pg_temp_wanted;
-  monc->send_mon_message(m);
+  MOSDPGTemp *ms[2] = {nullptr, nullptr};
+  for (auto& pg_temp : pg_temp_wanted) {
+    auto& m = ms[pg_temp.second.forced];
+    if (!m) {
+      m = new MOSDPGTemp(osdmap->get_epoch());
+      m->forced = pg_temp.second.forced;
+    }
+    m->pg_temp.emplace(pg_temp.first,
+                      pg_temp.second.acting);
+  }
+  for (auto m : ms) {
+    if (m) {
+      monc->send_mon_message(m);
+    }
+  }
   _sent_pg_temp();
 }
 
@@ -1250,10 +1273,12 @@ bool OSDService::can_inc_scrubs_pending()
 
   if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
     dout(20) << __func__ << " " << scrubs_pending << " -> " << (scrubs_pending+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active
+            << ")" << dendl;
     can_inc = true;
   } else {
-    dout(20) << __func__ << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+    dout(20) << __func__ << " " << scrubs_pending << " + " << scrubs_active
+            << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
   }
 
   return can_inc;
@@ -1388,7 +1413,8 @@ void OSDService::got_stop_ack()
 MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
                                                OSDSuperblock& sblock)
 {
-  MOSDMap *m = new MOSDMap(monc->get_fsid());
+  MOSDMap *m = new MOSDMap(monc->get_fsid(),
+                          osdmap->get_encoding_features());
   m->oldest_map = max_oldest_map;
   m->newest_map = sblock.newest_map;
 
@@ -1428,7 +1454,8 @@ void OSDService::send_incremental_map(epoch_t since, Connection *con,
     OSDSuperblock sblock(get_superblock());
     if (since < sblock.oldest_map) {
       // just send latest full map
-      MOSDMap *m = new MOSDMap(monc->get_fsid());
+      MOSDMap *m = new MOSDMap(monc->get_fsid(),
+                              osdmap->get_encoding_features());
       m->oldest_map = max_oldest_map;
       m->newest_map = sblock.newest_map;
       get_map_bl(to, m->maps[to]);
@@ -1789,7 +1816,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
     waiter.wait();
   }
 
-  ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
+  ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami);
   if (ret) {
     derr << "OSD::mkfs: failed to write fsid file: error "
          << cpp_strerror(ret) << dendl;
@@ -1803,7 +1830,7 @@ free_store:
   return ret;
 }
 
-int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
+int OSD::write_meta(CephContext *cct, ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
 {
   char val[80];
   int r;
@@ -1823,6 +1850,36 @@ int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid,
   if (r < 0)
     return r;
 
+  string key = cct->_conf->get_val<string>("key");
+  if (key.size()) {
+    r = store->write_meta("osd_key", key);
+    if (r < 0)
+      return r;
+  } else {
+    string keyfile = cct->_conf->get_val<string>("keyfile");
+    if (!keyfile.empty()) {
+      bufferlist keybl;
+      string err;
+      if (keyfile == "-") {
+       static_assert(1024 * 1024 >
+                     (sizeof(CryptoKey) - sizeof(bufferptr) +
+                      sizeof(__u16) + 16 /* AES_KEY_LEN */ + 3 - 1) / 3. * 4.,
+                     "1MB should be enough for a base64 encoded CryptoKey");
+       r = keybl.read_fd(STDIN_FILENO, 1024 * 1024);
+      } else {
+       r = keybl.read_file(keyfile.c_str(), &err);
+      }
+      if (r < 0) {
+       derr << __func__ << " failed to read keyfile " << keyfile << ": "
+            << err << ": " << cpp_strerror(r) << dendl;
+       return r;
+      }
+      r = store->write_meta("osd_key", keybl.to_str());
+      if (r < 0)
+       return r;
+    }
+  }
+
   r = store->write_meta("ready", "ready");
   if (r < 0)
     return r;
@@ -2607,6 +2664,12 @@ int OSD::init()
   update_log_config();
 
   peering_tp.start();
+  
+  service.init();
+  service.publish_map(osdmap);
+  service.publish_superblock(superblock);
+  service.max_oldest_map = superblock.oldest_map;
+
   osd_op_tp.start();
   disk_tp.start();
   command_tp.start();
@@ -2623,11 +2686,6 @@ int OSD::init()
     tick_timer_without_osd_lock.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick_WithoutOSDLock(this));
   }
 
-  service.init();
-  service.publish_map(osdmap);
-  service.publish_superblock(superblock);
-  service.max_oldest_map = superblock.oldest_map;
-
   osd_lock.Unlock();
 
   r = monc->authenticate();
@@ -2956,6 +3014,9 @@ void OSD::create_logger()
   };
 
 
+  // All the basic OSD operation stats are to be considered useful
+  osd_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
   osd_plb.add_u64(
     l_osd_op_wip, "op_wip",
     "Replication operations currently being processed (primary)");
@@ -3043,6 +3104,10 @@ void OSD::create_logger()
     l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
     "Latency of read-modify-write operations (excluding queue time and wait for finished)");
 
+  // Now we move on to some more obscure stats, revert to assuming things
+  // are low priority unless otherwise specified.
+  osd_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
+
   osd_plb.add_time_avg(l_osd_op_before_queue_op_lat, "op_before_queue_op_lat",
     "Latency of IO before calling queue(before really queue into ShardedOpWq)"); // client io before queue op_wq latency
   osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat",
@@ -3102,6 +3167,10 @@ void OSD::create_logger()
   osd_plb.add_u64(
     l_osd_pg_stray, "numpg_stray",
     "Placement groups ready to be deleted from this osd");
+  osd_plb.add_u64(
+    l_osd_pg_removing, "numpg_removing",
+    "Placement groups queued for local deletion", "pgsr",
+    PerfCountersBuilder::PRIO_USEFUL);
   osd_plb.add_u64(
     l_osd_hb_to, "heartbeat_to_peers", "Heartbeat (ping) peers we send to");
   osd_plb.add_u64_counter(l_osd_map, "map_messages", "OSD map messages");
@@ -3129,8 +3198,12 @@ void OSD::create_logger()
     l_osd_map_bl_cache_miss, "osd_map_bl_cache_miss",
     "OSDMap buffer cache misses");
 
-  osd_plb.add_u64(l_osd_stat_bytes, "stat_bytes", "OSD size");
-  osd_plb.add_u64(l_osd_stat_bytes_used, "stat_bytes_used", "Used space");
+  osd_plb.add_u64(
+    l_osd_stat_bytes, "stat_bytes", "OSD size", "size",
+    PerfCountersBuilder::PRIO_USEFUL);
+  osd_plb.add_u64(
+    l_osd_stat_bytes_used, "stat_bytes_used", "Used space", "used",
+    PerfCountersBuilder::PRIO_USEFUL);
   osd_plb.add_u64(l_osd_stat_bytes_avail, "stat_bytes_avail", "Available space");
 
   osd_plb.add_u64_counter(
@@ -3250,11 +3323,14 @@ int OSD::shutdown()
   set_state(STATE_STOPPING);
 
   // Debugging
-  cct->_conf->set_val("debug_osd", "100");
-  cct->_conf->set_val("debug_journal", "100");
-  cct->_conf->set_val("debug_filestore", "100");
-  cct->_conf->set_val("debug_ms", "100");
-  cct->_conf->apply_changes(NULL);
+  if (cct->_conf->get_val<bool>("osd_debug_shutdown")) {
+    cct->_conf->set_val("debug_osd", "100");
+    cct->_conf->set_val("debug_journal", "100");
+    cct->_conf->set_val("debug_filestore", "100");
+    cct->_conf->set_val("debug_bluestore", "100");
+    cct->_conf->set_val("debug_ms", "100");
+    cct->_conf->apply_changes(NULL);
+  }
 
   // stop MgrClient earlier as it's more like an internal consumer of OSD
   mgrc.shutdown();
@@ -3980,6 +4056,24 @@ void OSD::load_pgs()
       pg->upgrade(store);
     }
 
+    if (pg->dne())  {
+      dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
+      pg->ch = nullptr;
+      service.pg_remove_epoch(pg->pg_id);
+      pg->unlock();
+      {
+       // Delete pg
+       RWLock::WLocker l(pg_map_lock);
+       auto p = pg_map.find(pg->get_pgid());
+       assert(p != pg_map.end() && p->second == pg);
+       dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+       pg_map.erase(p);
+       pg->put("PGMap");
+      }
+      recursive_remove_collection(cct, store, pgid, *it);
+      continue;
+    }
+
     service.init_splits_between(pg->info.pgid, pg->get_osdmap(), osdmap);
 
     // generate state for PG's current mapping
@@ -4063,6 +4157,11 @@ void OSD::build_past_intervals_parallel()
         ++i) {
       PG *pg = i->second;
 
+      // Ignore PGs only partially created (DNE)
+      if (pg->info.dne()) {
+       continue;
+      }
+
       auto rpib = pg->get_required_past_interval_bounds(
        pg->info,
        superblock.oldest_map);
@@ -4249,6 +4348,11 @@ int OSD::handle_pg_peering_evt(
       ceph_abort();
     }
 
+    const bool is_mon_create =
+      evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+    if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+      return -EAGAIN;
+    }
     // do we need to resurrect a deleting pg?
     spg_t resurrected;
     PGRef old_pg_state;
@@ -4389,6 +4493,111 @@ int OSD::handle_pg_peering_evt(
   }
 }
 
+bool OSD::maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create)
+{
+  const auto max_pgs_per_osd =
+    (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+     cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+
+  RWLock::RLocker pg_map_locker{pg_map_lock};
+  if (pg_map.size() < max_pgs_per_osd) {
+    return false;
+  }
+  lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+  if (is_mon_create) {
+    pending_creates_from_mon++;
+  } else {
+    bool is_primary = osdmap->get_pg_acting_rank(pgid.pgid, whoami) == 0;
+    pending_creates_from_osd.emplace(pgid.pgid, is_primary);
+  }
+  dout(5) << __func__ << " withhold creation of pg " << pgid
+         << ": " << pg_map.size() << " >= "<< max_pgs_per_osd << dendl;
+  return true;
+}
+
+// to re-trigger a peering, we have to twiddle the pg mapping a little bit,
+// see PG::should_restart_peering(). OSDMap::pg_to_up_acting_osds() will turn
+// to up set if pg_temp is empty. so an empty pg_temp won't work.
+static vector<int32_t> twiddle(const vector<int>& acting) {
+  if (acting.size() > 1) {
+    return {acting[0]};
+  } else {
+    vector<int32_t> twiddled(acting.begin(), acting.end());
+    twiddled.push_back(-1);
+    return twiddled;
+  }
+}
+
+void OSD::resume_creating_pg()
+{
+  bool do_sub_pg_creates = false;
+  bool have_pending_creates = false;
+  {
+    const auto max_pgs_per_osd =
+      (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+       cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+    RWLock::RLocker l(pg_map_lock);
+    if (max_pgs_per_osd <= pg_map.size()) {
+      // this could happen if admin decreases this setting before a PG is removed
+      return;
+    }
+    unsigned spare_pgs = max_pgs_per_osd - pg_map.size();
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    if (pending_creates_from_mon > 0) {
+      do_sub_pg_creates = true;
+      if (pending_creates_from_mon >= spare_pgs) {
+       spare_pgs = pending_creates_from_mon = 0;
+      } else {
+       spare_pgs -= pending_creates_from_mon;
+       pending_creates_from_mon = 0;
+      }
+    }
+    auto pg = pending_creates_from_osd.cbegin();
+    while (spare_pgs > 0 && pg != pending_creates_from_osd.cend()) {
+      dout(20) << __func__ << " pg " << pg->first << dendl;
+      vector<int> acting;
+      osdmap->pg_to_up_acting_osds(pg->first, nullptr, nullptr, &acting, nullptr);
+      service.queue_want_pg_temp(pg->first, twiddle(acting), true);
+      pg = pending_creates_from_osd.erase(pg);
+      do_sub_pg_creates = true;
+      spare_pgs--;
+    }
+    have_pending_creates = (pending_creates_from_mon > 0 ||
+                           !pending_creates_from_osd.empty());
+  }
+
+  bool do_renew_subs = false;
+  if (do_sub_pg_creates) {
+    if (monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0)) {
+      dout(4) << __func__ << ": resolicit pg creates from mon since "
+             << last_pg_create_epoch << dendl;
+      do_renew_subs = true;
+    }
+  }
+  version_t start = osdmap->get_epoch() + 1;
+  if (have_pending_creates) {
+    // don't miss any new osdmap deleting PGs
+    if (monc->sub_want("osdmap", start, 0)) {
+      dout(4) << __func__ << ": resolicit osdmap from mon since "
+             << start << dendl;
+      do_renew_subs = true;
+    }
+  } else if (do_sub_pg_creates) {
+    // no need to subscribe the osdmap continuously anymore
+    // once the pgtemp and/or mon_subscribe(pg_creates) is sent
+    if (monc->sub_want_increment("osdmap", start, CEPH_SUBSCRIBE_ONETIME)) {
+      dout(4) << __func__ << ": re-subscribe osdmap(onetime) since"
+             << start << dendl;
+      do_renew_subs = true;
+    }
+  }
+
+  if (do_renew_subs) {
+    monc->renew_subs();
+  }
+
+  service.send_pg_temp();
+}
 
 void OSD::build_initial_pg_history(
   spg_t pgid,
@@ -5119,6 +5328,7 @@ void OSD::tick_without_osd_lock()
   logger->set(l_osd_cached_crc, buffer::get_cached_crc());
   logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
   logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+  logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
 
   // osd_lock is not being held, which means the OSD state
   // might change when doing the monitor report
@@ -5194,6 +5404,7 @@ void OSD::tick_without_osd_lock()
       sched_scrub();
     }
     service.promote_throttle_recalibrate();
+    resume_creating_pg();
     bool need_send_beacon = false;
     const auto now = ceph::coarse_mono_clock::now();
     {
@@ -5210,7 +5421,7 @@ void OSD::tick_without_osd_lock()
     }
   }
 
-  check_ops_in_flight();
+  mgrc.update_osd_health(get_health_metrics());
   service.kick_recovery_queue();
   tick_timer_without_osd_lock.add_event_after(OSD_TICK_INTERVAL, new C_Tick_WithoutOSDLock(this));
 }
@@ -5741,8 +5952,12 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest)
   if (osdmap->get_epoch() == 0) {
     derr << "waiting for initial osdmap" << dendl;
   } else if (osdmap->is_destroyed(whoami)) {
-    derr << "osdmap says I am destroyed, exiting" << dendl;
-    exit(0);
+    derr << "osdmap says I am destroyed" << dendl;
+    // provide a small margin so we don't livelock seeing if we
+    // un-destroyed ourselves.
+    if (osdmap->get_epoch() > newest - 1) {
+      exit(0);
+    }
   } else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) {
     derr << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
   } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
@@ -7023,9 +7238,11 @@ bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for
 }
 
 
-bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
-                              int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
-                              bool& isvalid, CryptoKey& session_key)
+bool OSD::ms_verify_authorizer(
+  Connection *con, int peer_type,
+  int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
+  bool& isvalid, CryptoKey& session_key,
+  std::unique_ptr<AuthAuthorizerChallenge> *challenge)
 {
   AuthAuthorizeHandler *authorize_handler = 0;
   switch (peer_type) {
@@ -7057,7 +7274,7 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
     isvalid = authorize_handler->verify_authorizer(
       cct, keys,
       authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
-      &auid);
+      &auid, challenge);
   } else {
     dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
     isvalid = false;
@@ -7312,6 +7529,25 @@ bool OSD::scrub_time_permit(utime_t now)
   struct tm bdt;
   time_t tt = now.sec();
   localtime_r(&tt, &bdt);
+
+  bool day_permit = false;
+  if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
+    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+      day_permit = true;
+    }
+  } else {
+    if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+      day_permit = true;
+    }
+  }
+
+  if (!day_permit) {
+    dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
+            << " - " << cct->_conf->osd_scrub_end_week_day
+            << " now " << bdt.tm_wday << " = no" << dendl;
+    return false;
+  }
+
   bool time_permit = false;
   if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
     if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
@@ -7423,6 +7659,20 @@ void OSD::sched_scrub()
 
 
 
+vector<OSDHealthMetric> OSD::get_health_metrics()
+{
+  vector<OSDHealthMetric> metrics;
+  lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+  auto n_primaries = pending_creates_from_mon;
+  for (const auto& create : pending_creates_from_osd) {
+    if (create.second) {
+      n_primaries++;
+    }
+  }
+  metrics.emplace_back(osd_metric::PENDING_CREATING_PGS, n_primaries);
+  return metrics;
+}
+
 // =====================================================
 // MAP
 
@@ -8171,6 +8421,15 @@ void OSD::consume_map()
   assert(osd_lock.is_locked());
   dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
 
+  /** make sure the cluster is speaking in SORTBITWISE, because we don't
+   *  speak the older sorting version any more. Be careful not to force
+   *  a shutdown if we are merely processing old maps, though.
+   */
+  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE) && is_active()) {
+    derr << __func__ << " SORTBITWISE flag is not set" << dendl;
+    ceph_abort();
+  }
+
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
   list<PGRef> to_remove;
 
@@ -8198,6 +8457,16 @@ void OSD::consume_map()
 
       pg->unlock();
     }
+
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    for (auto pg = pending_creates_from_osd.cbegin();
+        pg != pending_creates_from_osd.cend();) {
+      if (osdmap->get_pg_acting_rank(pg->first, whoami) < 0) {
+       pg = pending_creates_from_osd.erase(pg);
+      } else {
+       ++pg;
+      }
+    }
   }
 
   for (list<PGRef>::iterator i = to_remove.begin();
@@ -8244,6 +8513,7 @@ void OSD::consume_map()
   logger->set(l_osd_pg_primary, num_pg_primary);
   logger->set(l_osd_pg_replica, num_pg_replica);
   logger->set(l_osd_pg_stray, num_pg_stray);
+  logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
 }
 
 void OSD::activate_map()
@@ -8252,11 +8522,6 @@ void OSD::activate_map()
 
   dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
 
-  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
-    derr << __func__ << " SORTBITWISE flag is not set" << dendl;
-    ceph_abort();
-  }
-
   if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
     dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
     osdmap_subscribe(osdmap->get_epoch() + 1, false);
@@ -8531,7 +8796,6 @@ void OSD::handle_pg_create(OpRequestRef op)
               << dendl;
       continue;
     }
-
     if (handle_pg_peering_evt(
           pgid,
           history,
@@ -8546,8 +8810,13 @@ void OSD::handle_pg_create(OpRequestRef op)
       service.send_pg_created(pgid.pgid);
     }
   }
-  last_pg_create_epoch = m->epoch;
 
+  {
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    if (pending_creates_from_mon == 0) {
+      last_pg_create_epoch = m->epoch;
+    }
+  }
   maybe_update_heartbeat_peers();
 }
 
@@ -8667,7 +8936,7 @@ void OSD::do_notifies(
       continue;
     }
     service.share_map_peer(it->first, con.get(), curmap);
-    dout(7) << __func__ << " osd " << it->first
+    dout(7) << __func__ << " osd." << it->first
            << " on " << it->second.size() << " PGs" << dendl;
     MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
                                       it->second);
@@ -8923,6 +9192,8 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op)
        m->query_epoch,
        PG::RemoteBackfillReserved()));
   } else if (m->type == MBackfillReserve::REJECT) {
+    // NOTE: this is replica -> primary "i reject your request"
+    //      and also primary -> replica "cancel my previously-granted request"
     evt = PG::CephPeeringEvtRef(
       new PG::CephPeeringEvt(
        m->query_epoch,
@@ -9229,7 +9500,6 @@ void OSD::_remove_pg(PG *pg)
   pg->put("PGMap"); // since we've taken it out of map
 }
 
-
 // =========================================================
 // RECOVERY
 
@@ -9315,7 +9585,7 @@ void OSDService::adjust_pg_priorities(const vector<PGRef>& pgs, int newflags)
       i->lock();
       int pgstate = i->get_state();
       if ( ((newstate == PG_STATE_FORCED_RECOVERY) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) ||
-           ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL))) )
+           ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) )
         i->_change_recovery_force_mode(newstate, false);
       i->unlock();
     }
@@ -9336,33 +9606,40 @@ void OSD::do_recovery(
    * queue_recovery_after_sleep.
    */
   float recovery_sleep = get_osd_recovery_sleep();
-  if (recovery_sleep > 0 && service.recovery_needs_sleep) {
-    PGRef pgref(pg);
-    auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
-      dout(20) << "do_recovery wake up at "
-               << ceph_clock_now()
-              << ", re-queuing recovery" << dendl;
-      service.recovery_needs_sleep = false;
-      service.queue_recovery_after_sleep(pgref.get(), queued, reserved_pushes);
-    });
+  {
     Mutex::Locker l(service.recovery_sleep_lock);
-
-    // This is true for the first recovery op and when the previous recovery op
-    // has been scheduled in the past. The next recovery op is scheduled after
-    // completing the sleep from now.
-    if (service.recovery_schedule_time < ceph_clock_now()) {
-      service.recovery_schedule_time = ceph_clock_now();
-    }
-    service.recovery_schedule_time += recovery_sleep;
-    service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
-                                             recovery_requeue_callback);
-    dout(20) << "Recovery event scheduled at "
-             << service.recovery_schedule_time << dendl;
-    return;
+    if (recovery_sleep > 0 && service.recovery_needs_sleep) {
+      PGRef pgref(pg);
+      auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
+        dout(20) << "do_recovery wake up at "
+                 << ceph_clock_now()
+                << ", re-queuing recovery" << dendl;
+       Mutex::Locker l(service.recovery_sleep_lock);
+        service.recovery_needs_sleep = false;
+        service.queue_recovery_after_sleep(pgref.get(), queued, reserved_pushes);
+      });
+
+      // This is true for the first recovery op and when the previous recovery op
+      // has been scheduled in the past. The next recovery op is scheduled after
+      // completing the sleep from now.
+      if (service.recovery_schedule_time < ceph_clock_now()) {
+        service.recovery_schedule_time = ceph_clock_now();
+      }
+      service.recovery_schedule_time += recovery_sleep;
+      service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
+                                               recovery_requeue_callback);
+      dout(20) << "Recovery event scheduled at "
+               << service.recovery_schedule_time << dendl;
+      return;
+    }
   }
 
   {
-    service.recovery_needs_sleep = true;
+    {
+      Mutex::Locker l(service.recovery_sleep_lock);
+      service.recovery_needs_sleep = true;
+    }
+
     if (pg->pg_has_reset_since(queued)) {
       goto out;
     }
@@ -9400,18 +9677,18 @@ void OSD::do_recovery(
       pg->discover_all_missing(*rctx.query_map);
       if (rctx.query_map->empty()) {
        string action;
-        if (pg->state_test(PG_STATE_BACKFILL)) {
+        if (pg->state_test(PG_STATE_BACKFILLING)) {
          auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
            queued,
            queued,
-           PG::CancelBackfill()));
+           PG::DeferBackfill(cct->_conf->osd_recovery_retry_interval)));
          pg->queue_peering_event(evt);
          action = "in backfill";
         } else if (pg->state_test(PG_STATE_RECOVERING)) {
          auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
            queued,
            queued,
-           PG::CancelRecovery()));
+           PG::DeferRecovery(cct->_conf->osd_recovery_retry_interval)));
          pg->queue_peering_event(evt);
          action = "in recovery";
        } else {
@@ -10040,7 +10317,6 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
   uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
   auto sdata = shard_list[shard_index];
   bool queued = false;
-  unsigned pushes_to_free = 0;
   {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     auto p = sdata->pg_slots.find(pgid);
@@ -10053,18 +10329,12 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
           ++i) {
        sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
       }
-      for (auto& q : p->second.to_process) {
-       pushes_to_free += q.get_reserved_pushes();
-      }
       p->second.to_process.clear();
       p->second.waiting_for_pg = false;
       ++p->second.requeue_seq;
       queued = true;
     }
   }
-  if (pushes_to_free > 0) {
-    osd->service.release_reserved_pushes(pushes_to_free);
-  }
   if (queued) {
     sdata->sdata_lock.Lock();
     sdata->sdata_cond.SignalOne();