]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PeeringState.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / osd / PeeringState.cc
index 47ba7d3614cd71f3b61755b17c2993a3bab9db76..68b8d22259955dfb82455429b25b3ed8abd8fbc3 100644 (file)
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
 #include "messages/MOSDScrubReserve.h"
-#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGInfo2.h"
 #include "messages/MOSDPGTrim.h"
 #include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGNotify2.h"
-#include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGQuery2.h"
 #include "messages/MOSDPGLease.h"
 #include "messages/MOSDPGLeaseAck.h"
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
 
-BufferedRecoveryMessages::BufferedRecoveryMessages(
-  ceph_release_t r,
-  PeeringCtx &ctx)
-  : require_osd_release(r) {
+using std::dec;
+using std::hex;
+using std::make_pair;
+using std::map;
+using std::ostream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ceph::Formatter;
+using ceph::make_message;
+
+BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
   // steal messages from ctx
-  message_map.swap(ctx.message_map);
-}
+  : message_map{std::move(ctx.message_map)}
+{}
 
 void BufferedRecoveryMessages::send_notify(int to, const pg_notify_t &n)
 {
-  if (require_osd_release >= ceph_release_t::octopus) {
-    spg_t pgid(n.info.pgid.pgid, n.to);
-    send_osd_message(to, make_message<MOSDPGNotify2>(pgid, n));
-  } else {
-    send_osd_message(to, make_message<MOSDPGNotify>(n.epoch_sent, vector{n}));
-  }
+  spg_t pgid(n.info.pgid.pgid, n.to);
+  send_osd_message(to, TOPNSPC::make_message<MOSDPGNotify2>(pgid, n));
 }
 
 void BufferedRecoveryMessages::send_query(
@@ -47,15 +51,7 @@ void BufferedRecoveryMessages::send_query(
   spg_t to_spgid,
   const pg_query_t &q)
 {
-  if (require_osd_release >= ceph_release_t::octopus) {
-    send_osd_message(to,
-                    make_message<MOSDPGQuery2>(to_spgid, q));
-  } else {
-    auto m = make_message<MOSDPGQuery>(
-      q.epoch_sent,
-      MOSDPGQuery::pg_list_t{{to_spgid, q}});
-    send_osd_message(to, m);
-  }
+  send_osd_message(to, TOPNSPC::make_message<MOSDPGQuery2>(to_spgid, q));
 }
 
 void BufferedRecoveryMessages::send_info(
@@ -67,31 +63,19 @@ void BufferedRecoveryMessages::send_info(
   std::optional<pg_lease_t> lease,
   std::optional<pg_lease_ack_t> lease_ack)
 {
-  if (require_osd_release >= ceph_release_t::octopus) {
-    send_osd_message(
-      to,
-      make_message<MOSDPGInfo2>(
-       to_spgid,
-       info,
-       cur_epoch,
-       min_epoch,
-       lease,
-       lease_ack)
-      );
-  } else {
-    send_osd_message(
-      to,
-      make_message<MOSDPGInfo>(
-        cur_epoch,
-        vector{pg_notify_t{to_spgid.shard,
-                          info.pgid.shard,
-                          min_epoch, cur_epoch,
-                          info, PastIntervals{}}})
-      );
-  }
+  send_osd_message(
+    to,
+    TOPNSPC::make_message<MOSDPGInfo2>(
+      to_spgid,
+      info,
+      cur_epoch,
+      min_epoch,
+      lease,
+      lease_ack)
+  );
 }
 
-void PGPool::update(CephContext *cct, OSDMapRef map)
+void PGPool::update(OSDMapRef map)
 {
   const pg_pool_t *pi = map->get_pg_pool(id);
   if (!pi) {
@@ -137,6 +121,7 @@ PeeringState::PeeringState(
     pg_whoami(pg_whoami),
     info(spgid),
     pg_log(cct),
+    last_require_osd_release(curmap->require_osd_release),
     missing_loc(spgid, this, dpp, cct),
     machine(this, cct, spgid, dpp, pl, &state_history)
 {
@@ -161,8 +146,7 @@ void PeeringState::begin_block_outgoing() {
   ceph_assert(!messages_pending_flush);
   ceph_assert(orig_ctx);
   ceph_assert(rctx);
-  messages_pending_flush = BufferedRecoveryMessages(
-    orig_ctx->require_osd_release);
+  messages_pending_flush.emplace();
   rctx.emplace(*messages_pending_flush, *orig_ctx);
 }
 
@@ -202,9 +186,7 @@ void PeeringState::check_recovery_sources(const OSDMapRef& osdmap)
   missing_loc.check_recovery_sources(osdmap);
   pl->check_recovery_sources(osdmap);
 
-  for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
-       i != peer_log_requested.end();
-       ) {
+  for (auto i = peer_log_requested.begin(); i != peer_log_requested.end();) {
     if (!osdmap->is_up(i->osd)) {
       psdout(10) << "peer_log_requested removing " << *i << dendl;
       peer_log_requested.erase(i++);
@@ -213,9 +195,8 @@ void PeeringState::check_recovery_sources(const OSDMapRef& osdmap)
     }
   }
 
-  for (set<pg_shard_t>::iterator i = peer_missing_requested.begin();
-       i != peer_missing_requested.end();
-       ) {
+  for (auto i = peer_missing_requested.begin();
+       i != peer_missing_requested.end();) {
     if (!osdmap->is_up(i->osd)) {
       psdout(10) << "peer_missing_requested removing " << *i << dendl;
       peer_missing_requested.erase(i++);
@@ -248,6 +229,16 @@ void PeeringState::update_history(const pg_history_t& new_history)
   pl->on_info_history_change();
 }
 
+hobject_t PeeringState::earliest_backfill() const
+{
+  hobject_t e = hobject_t::get_max();
+  for (const pg_shard_t& bt : get_backfill_targets()) {
+    const pg_info_t &pi = get_peer_info(bt);
+    e = std::min(pi.last_backfill, e);
+  }
+  return e;
+}
+
 void PeeringState::purge_strays()
 {
   if (is_premerge()) {
@@ -261,18 +252,16 @@ void PeeringState::purge_strays()
   psdout(10) << "purge_strays " << stray_set << dendl;
 
   bool removed = false;
-  for (set<pg_shard_t>::iterator p = stray_set.begin();
-       p != stray_set.end();
-       ++p) {
+  for (auto p = stray_set.begin(); p != stray_set.end(); ++p) {
     ceph_assert(!is_acting_recovery_backfill(*p));
     if (get_osdmap()->is_up(p->osd)) {
       psdout(10) << "sending PGRemove to osd." << *p << dendl;
       vector<spg_t> to_remove;
       to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
-      MOSDPGRemove *m = new MOSDPGRemove(
+      auto m = TOPNSPC::make_message<MOSDPGRemove>(
        get_osdmap_epoch(),
        to_remove);
-      pl->send_cluster_message(p->osd, m, get_osdmap_epoch());
+      pl->send_cluster_message(p->osd, std::move(m), get_osdmap_epoch());
     } else {
       psdout(10) << "not sending PGRemove to down osd." << *p << dendl;
     }
@@ -295,11 +284,41 @@ void PeeringState::purge_strays()
   peer_missing_requested.clear();
 }
 
+void PeeringState::query_unfound(Formatter *f, string state)
+{
+  psdout(20) << "Enter PeeringState common QueryUnfound" << dendl;
+  {
+    f->dump_string("state", state);
+    f->dump_bool("available_might_have_unfound", true);
+    f->open_array_section("might_have_unfound");
+    for (auto p = might_have_unfound.begin();
+        p != might_have_unfound.end();
+        ++p) {
+      if (peer_missing.count(*p)) {
+       ; // Ignore already probed OSDs
+      } else {
+        f->open_object_section("osd");
+        f->dump_stream("osd") << *p;
+       if (peer_missing_requested.count(*p)) {
+         f->dump_string("status", "querying");
+        } else if (!get_osdmap()->is_up(p->osd)) {
+         f->dump_string("status", "osd is down");
+        } else {
+         f->dump_string("status", "not queried");
+        }
+        f->close_section();
+      }
+    }
+    f->close_section();
+  }
+  psdout(20) << "Exit PeeringState common QueryUnfound" << dendl;
+  return;
+}
 
 bool PeeringState::proc_replica_info(
   pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
 {
-  map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
+  auto p = peer_info.find(from);
   if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
     psdout(10) << " got dup osd." << from << " info "
               << oinfo << ", identical to ours" << dendl;
@@ -340,20 +359,30 @@ void PeeringState::remove_down_peer_info(const OSDMapRef &osdmap)
 {
   // Remove any downed osds from peer_info
   bool removed = false;
-  map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
+  auto p = peer_info.begin();
   while (p != peer_info.end()) {
     if (!osdmap->is_up(p->first.osd)) {
       psdout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
       peer_missing.erase(p->first);
       peer_log_requested.erase(p->first);
       peer_missing_requested.erase(p->first);
-      peer_purged.erase(p->first);
       peer_info.erase(p++);
       removed = true;
     } else
       ++p;
   }
 
+  // Remove any downed osds from peer_purged so we can re-purge if necessary
+  auto it = peer_purged.begin();
+  while (it != peer_purged.end()) {
+    if (!osdmap->is_up(it->osd)) {
+      psdout(10) << " dropping down osd." << *it << " from peer_purged" << dendl;
+      peer_purged.erase(it++);
+    } else {
+      ++it;
+    }
+  }
+
   // if we removed anyone, update peers (which include peer_info)
   if (removed)
     update_heartbeat_peers();
@@ -375,9 +404,7 @@ void PeeringState::update_heartbeat_peers()
     if (up[i] != CRUSH_ITEM_NONE)
       new_peers.insert(up[i]);
   }
-  for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
-       p != peer_info.end();
-       ++p) {
+  for (auto p = peer_info.begin(); p != peer_info.end(); ++p) {
     new_peers.insert(p->first.osd);
   }
   pl->update_heartbeat_peers(std::move(new_peers));
@@ -415,7 +442,7 @@ void PeeringState::advance_map(
            << dendl;
 
   update_osdmap_ref(osdmap);
-  pool.update(cct, osdmap);
+  pool.update(osdmap);
 
   AdvMap evt(
     osdmap, lastmap, newup, up_primary,
@@ -424,7 +451,7 @@ void PeeringState::advance_map(
   if (pool.info.last_change == osdmap_ref->get_epoch()) {
     pl->on_pool_change();
   }
-  readable_interval = pool.get_readable_interval();
+  readable_interval = pool.get_readable_interval(cct->_conf);
   last_require_osd_release = osdmap->require_osd_release;
 }
 
@@ -446,8 +473,8 @@ void PeeringState::activate_map(PeeringCtx &rctx)
   }
   write_if_dirty(rctx.transaction);
 
-  if (get_osdmap()->check_new_blacklist_entries()) {
-    pl->check_blacklisted_watchers();
+  if (get_osdmap()->check_new_blocklist_entries()) {
+    pl->check_blocklisted_watchers();
   }
 }
 
@@ -684,6 +711,8 @@ void PeeringState::start_peering_interval(
     // did primary change?
     if (was_old_primary != is_primary()) {
       state_clear(PG_STATE_CLEAN);
+      // queue/dequeue the scrubber
+      pl->on_primary_status_change(was_old_primary, is_primary());
     }
 
     pl->on_role_change();
@@ -707,6 +736,10 @@ void PeeringState::start_peering_interval(
     }
   }
 
+  if (is_primary() && was_old_primary) {
+    pl->reschedule_scrub();
+  }
+
   if (acting.empty() && !up.empty() && up_primary == pg_whoami) {
     psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl;
     pl->queue_want_pg_temp(acting);
@@ -721,14 +754,14 @@ void PeeringState::on_new_interval()
   // initialize features
   acting_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
   upacting_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
-  for (vector<int>::iterator p = acting.begin(); p != acting.end(); ++p) {
+  for (auto p = acting.begin(); p != acting.end(); ++p) {
     if (*p == CRUSH_ITEM_NONE)
       continue;
     uint64_t f = osdmap->get_xinfo(*p).features;
     acting_features &= f;
     upacting_features &= f;
   }
-  for (vector<int>::iterator p = up.begin(); p != up.end(); ++p) {
+  for (auto p = up.begin(); p != up.end(); ++p) {
     if (*p == CRUSH_ITEM_NONE)
       continue;
     upacting_features &= osdmap->get_xinfo(*p).features;
@@ -1008,10 +1041,10 @@ unsigned PeeringState::get_backfill_priority()
   if (state & PG_STATE_FORCED_BACKFILL) {
     ret = OSD_BACKFILL_PRIORITY_FORCED;
   } else {
-    if (acting.size() < pool.info.min_size) {
+    if (actingset.size() < pool.info.min_size) {
       base = OSD_BACKFILL_INACTIVE_PRIORITY_BASE;
       // inactive: no. of replicas < min_size, highest priority since it blocks IO
-      ret = base + (pool.info.min_size - acting.size());
+      ret = base + (pool.info.min_size - actingset.size());
 
     } else if (is_undersized()) {
       // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
@@ -1118,7 +1151,7 @@ void PeeringState::send_lease()
     }
     pl->send_cluster_message(
       peer.osd,
-      new MOSDPGLease(epoch,
+      TOPNSPC::make_message<MOSDPGLease>(epoch,
                      spg_t(spgid.pgid, peer.shard),
                      get_lease()),
       epoch);
@@ -1127,12 +1160,7 @@ void PeeringState::send_lease()
 
 void PeeringState::proc_lease(const pg_lease_t& l)
 {
-  if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
-    psdout(20) << __func__ << " no-op, upacting_features 0x" << std::hex
-              << upacting_features << std::dec
-              << " does not include SERVER_OCTOPUS" << dendl;
-    return;
-  }
+  assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
   if (!is_nonprimary()) {
     psdout(20) << __func__ << " no-op, !nonprimary" << dendl;
     return;
@@ -1174,9 +1202,7 @@ void PeeringState::proc_lease(const pg_lease_t& l)
 
 void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a)
 {
-  if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
-    return;
-  }
+  assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
   auto now = pl->get_mnow();
   bool was_min = false;
   for (unsigned i = 0; i < acting.size(); ++i) {
@@ -1202,9 +1228,7 @@ void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a)
 
 void PeeringState::proc_renew_lease()
 {
-  if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
-    return;
-  }
+  assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
   renew_lease(pl->get_mnow());
   send_lease();
   schedule_renew_lease();
@@ -1232,9 +1256,7 @@ void PeeringState::recalc_readable_until()
 
 bool PeeringState::check_prior_readable_down_osds(const OSDMapRef& map)
 {
-  if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
-    return false;
-  }
+  assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
   bool changed = false;
   auto p = prior_readable_down_osds.begin();
   while (p != prior_readable_down_osds.end()) {
@@ -1273,9 +1295,7 @@ PastIntervals::PriorSet PeeringState::build_prior()
 {
   if (1) {
     // sanity check
-    for (map<pg_shard_t,pg_info_t>::iterator it = peer_info.begin();
-        it != peer_info.end();
-        ++it) {
+    for (auto it = peer_info.begin(); it != peer_info.end(); ++it) {
       ceph_assert(info.history.last_epoch_started >=
                  it->second.history.last_epoch_started);
     }
@@ -1341,12 +1361,11 @@ bool PeeringState::needs_recovery() const
   }
 
   ceph_assert(!acting_recovery_backfill.empty());
-  set<pg_shard_t>::const_iterator end = acting_recovery_backfill.end();
-  set<pg_shard_t>::const_iterator a = acting_recovery_backfill.begin();
-  for (; a != end; ++a) {
-    if (*a == get_primary()) continue;
-    pg_shard_t peer = *a;
-    map<pg_shard_t, pg_missing_t>::const_iterator pm = peer_missing.find(peer);
+  for (const pg_shard_t& peer : acting_recovery_backfill) {
+    if (peer == get_primary()) {
+      continue;
+    }
+    auto pm = peer_missing.find(peer);
     if (pm == peer_missing.end()) {
       psdout(10) << __func__ << " osd." << peer << " doesn't have missing set"
                 << dendl;
@@ -1369,11 +1388,9 @@ bool PeeringState::needs_backfill() const
 
   // We can assume that only possible osds that need backfill
   // are on the backfill_targets vector nodes.
-  set<pg_shard_t>::const_iterator end = backfill_targets.end();
-  set<pg_shard_t>::const_iterator a = backfill_targets.begin();
-  for (; a != end; ++a) {
-    pg_shard_t peer = *a;
-    map<pg_shard_t, pg_info_t>::const_iterator pi = peer_info.find(peer);
+  for (const pg_shard_t& peer : backfill_targets) {
+    auto pi = peer_info.find(peer);
+    ceph_assert(pi != peer_info.end());
     if (!pi->second.last_backfill.is_max()) {
       psdout(10) << __func__ << " osd." << peer
                 << " has last_backfill " << pi->second.last_backfill << dendl;
@@ -1393,12 +1410,12 @@ bool PeeringState::all_unfound_are_queried_or_lost(
 {
   ceph_assert(is_primary());
 
-  set<pg_shard_t>::const_iterator peer = might_have_unfound.begin();
-  set<pg_shard_t>::const_iterator mend = might_have_unfound.end();
+  auto peer = might_have_unfound.begin();
+  auto mend = might_have_unfound.end();
   for (; peer != mend; ++peer) {
     if (peer_missing.count(*peer))
       continue;
-    map<pg_shard_t, pg_info_t>::const_iterator iter = peer_info.find(*peer);
+    auto iter = peer_info.find(*peer);
     if (iter != peer_info.end() &&
         (iter->second.is_empty() || iter->second.dne()))
       continue;
@@ -1423,7 +1440,7 @@ void PeeringState::reject_reservation()
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     primary.osd,
-    new MBackfillReserve(
+    TOPNSPC::make_message<MBackfillReserve>(
       MBackfillReserve::REJECT_TOOFULL,
       spg_t(info.pgid.pgid, primary.shard),
       get_osdmap_epoch()),
@@ -1447,11 +1464,8 @@ map<pg_shard_t, pg_info_t>::const_iterator PeeringState::find_best_info(
   /* See doc/dev/osd_internals/last_epoch_started.rst before attempting
    * to make changes to this process.  Also, make sure to update it
    * when you find bugs! */
-  eversion_t min_last_update_acceptable = eversion_t::max();
   epoch_t max_last_epoch_started_found = 0;
-  for (map<pg_shard_t, pg_info_t>::const_iterator i = infos.begin();
-       i != infos.end();
-       ++i) {
+  for (auto i = infos.begin(); i != infos.end(); ++i) {
     if (!cct->_conf->osd_find_best_info_ignore_history_les &&
        max_last_epoch_started_found < i->second.history.last_epoch_started) {
       *history_les_bound = true;
@@ -1463,9 +1477,8 @@ map<pg_shard_t, pg_info_t>::const_iterator PeeringState::find_best_info(
       max_last_epoch_started_found = i->second.last_epoch_started;
     }
   }
-  for (map<pg_shard_t, pg_info_t>::const_iterator i = infos.begin();
-       i != infos.end();
-       ++i) {
+  eversion_t min_last_update_acceptable = eversion_t::max();
+  for (auto i = infos.begin(); i != infos.end(); ++i) {
     if (max_last_epoch_started_found <= i->second.last_epoch_started) {
       if (min_last_update_acceptable > i->second.last_update)
        min_last_update_acceptable = i->second.last_update;
@@ -1474,14 +1487,12 @@ map<pg_shard_t, pg_info_t>::const_iterator PeeringState::find_best_info(
   if (min_last_update_acceptable == eversion_t::max())
     return infos.end();
 
-  map<pg_shard_t, pg_info_t>::const_iterator best = infos.end();
+  auto best = infos.end();
   // find osd with newest last_update (oldest for ec_pool).
   // if there are multiples, prefer
   //  - a longer tail, if it brings another peer into log contiguity
   //  - the current primary
-  for (map<pg_shard_t, pg_info_t>::const_iterator p = infos.begin();
-       p != infos.end();
-       ++p) {
+  for (auto p = infos.begin(); p != infos.end(); ++p) {
     if (restrict_to_up_acting && !is_up(p->first) &&
        !is_acting(p->first))
       continue;
@@ -1564,7 +1575,7 @@ void PeeringState::calc_ec_acting(
 {
   vector<int> want(size, CRUSH_ITEM_NONE);
   map<shard_id_t, set<pg_shard_t> > all_info_by_shard;
-  for (map<pg_shard_t, pg_info_t>::const_iterator i = all_info.begin();
+  for (auto i = all_info.begin();
        i != all_info.end();
        ++i) {
     all_info_by_shard[i->first.shard].insert(i->first);
@@ -1592,7 +1603,7 @@ void PeeringState::calc_ec_acting(
       ss << " selecting acting[i]: " << pg_shard_t(acting[i], shard_id_t(i)) << std::endl;
       want[i] = acting[i];
     } else if (!restrict_to_up_acting) {
-      for (set<pg_shard_t>::iterator j = all_info_by_shard[shard_id_t(i)].begin();
+      for (auto j = all_info_by_shard[shard_id_t(i)].begin();
           j != all_info_by_shard[shard_id_t(i)].end();
           ++j) {
        ceph_assert(j->shard == i);
@@ -1618,33 +1629,20 @@ void PeeringState::calc_ec_acting(
   _want->swap(want);
 }
 
-/**
- * calculate the desired acting set.
- *
- * Choose an appropriate acting set.  Prefer up[0], unless it is
- * incomplete, or another osd has a longer tail that allows us to
- * bring other up nodes up to date.
- */
-void PeeringState::calc_replicated_acting(
+std::pair<map<pg_shard_t, pg_info_t>::const_iterator, eversion_t>
+PeeringState::select_replicated_primary(
   map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard,
   uint64_t force_auth_primary_missing_objects,
-  unsigned size,
-  const vector<int> &acting,
-  const vector<int> &up,
+  const std::vector<int> &up,
   pg_shard_t up_primary,
   const map<pg_shard_t, pg_info_t> &all_info,
-  bool restrict_to_up_acting,
-  vector<int> *want,
-  set<pg_shard_t> *backfill,
-  set<pg_shard_t> *acting_backfill,
   const OSDMapRef osdmap,
   ostream &ss)
 {
   pg_shard_t auth_log_shard_id = auth_log_shard->first;
 
   ss << __func__ << " newest update on osd." << auth_log_shard_id
-     << " with " << auth_log_shard->second
-     << (restrict_to_up_acting ? " restrict_to_up_acting" : "") << std::endl;
+     << " with " << auth_log_shard->second << std::endl;
 
   // select primary
   auto primary = all_info.find(up_primary);
@@ -1652,31 +1650,28 @@ void PeeringState::calc_replicated_acting(
       !primary->second.is_incomplete() &&
       primary->second.last_update >=
         auth_log_shard->second.log_tail) {
-    if (HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)) {
-      auto approx_missing_objects =
-        primary->second.stats.stats.sum.num_objects_missing;
-      auto auth_version = auth_log_shard->second.last_update.version;
-      auto primary_version = primary->second.last_update.version;
-      if (auth_version > primary_version) {
-        approx_missing_objects += auth_version - primary_version;
-      } else {
-        approx_missing_objects += primary_version - auth_version;
-      }
-      if ((uint64_t)approx_missing_objects >
-          force_auth_primary_missing_objects) {
-        primary = auth_log_shard;
-        ss << "up_primary: " << up_primary << ") has approximate "
-           << approx_missing_objects
-           << "(>" << force_auth_primary_missing_objects <<") "
-           << "missing objects, osd." << auth_log_shard_id
-           << " selected as primary instead"
-           << std::endl;
-      } else {
-        ss << "up_primary: " << up_primary << ") selected as primary"
-           << std::endl;
-      }
+    assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS));
+    auto approx_missing_objects =
+      primary->second.stats.stats.sum.num_objects_missing;
+    auto auth_version = auth_log_shard->second.last_update.version;
+    auto primary_version = primary->second.last_update.version;
+    if (auth_version > primary_version) {
+      approx_missing_objects += auth_version - primary_version;
+    } else {
+      approx_missing_objects += primary_version - auth_version;
+    }
+    if ((uint64_t)approx_missing_objects >
+        force_auth_primary_missing_objects) {
+      primary = auth_log_shard;
+      ss << "up_primary: " << up_primary << ") has approximate "
+         << approx_missing_objects
+         << "(>" << force_auth_primary_missing_objects <<") "
+         << "missing objects, osd." << auth_log_shard_id
+         << " selected as primary instead"
+         << std::endl;
     } else {
-      ss << "up_primary: " << up_primary << ") selected as primary" << std::endl;
+      ss << "up_primary: " << up_primary << ") selected as primary"
+         << std::endl;
     }
   } else {
     ceph_assert(!auth_log_shard->second.is_incomplete());
@@ -1687,8 +1682,6 @@ void PeeringState::calc_replicated_acting(
 
   ss << __func__ << " primary is osd." << primary->first
      << " with " << primary->second << std::endl;
-  want->push_back(primary->first.osd);
-  acting_backfill->insert(primary->first);
 
   /* We include auth_log_shard->second.log_tail because in GetLog,
    * we will request logs back to the min last_update over our
@@ -1698,6 +1691,39 @@ void PeeringState::calc_replicated_acting(
   eversion_t oldest_auth_log_entry =
     std::min(primary->second.log_tail, auth_log_shard->second.log_tail);
 
+  return std::make_pair(primary, oldest_auth_log_entry);
+}
+
+
+/**
+ * calculate the desired acting set.
+ *
+ * Choose an appropriate acting set.  Prefer up[0], unless it is
+ * incomplete, or another osd has a longer tail that allows us to
+ * bring other up nodes up to date.
+ */
+void PeeringState::calc_replicated_acting(
+  map<pg_shard_t, pg_info_t>::const_iterator primary,
+  eversion_t oldest_auth_log_entry,
+  unsigned size,
+  const vector<int> &acting,
+  const vector<int> &up,
+  pg_shard_t up_primary,
+  const map<pg_shard_t, pg_info_t> &all_info,
+  bool restrict_to_up_acting,
+  vector<int> *want,
+  set<pg_shard_t> *backfill,
+  set<pg_shard_t> *acting_backfill,
+  const OSDMapRef osdmap,
+  const PGPool& pool,
+  ostream &ss)
+{
+  ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "")
+     << std::endl;
+
+  want->push_back(primary->first.osd);
+  acting_backfill->insert(primary->first);
+
   // select replicas that have log contiguity with primary.
   // prefer up, then acting, then any peer_info osds
   for (auto i : up) {
@@ -1729,7 +1755,7 @@ void PeeringState::calc_replicated_acting(
     // skip up osds we already considered above
     if (acting_cand == primary->first)
       continue;
-    vector<int>::const_iterator up_it = find(up.begin(), up.end(), i);
+    auto up_it = find(up.begin(), up.end(), i);
     if (up_it != up.end())
       continue;
 
@@ -1772,10 +1798,10 @@ void PeeringState::calc_replicated_acting(
     // skip up osds we already considered above
     if (i.first == primary->first)
       continue;
-    vector<int>::const_iterator up_it = find(up.begin(), up.end(), i.first.osd);
+    auto up_it = find(up.begin(), up.end(), i.first.osd);
     if (up_it != up.end())
       continue;
-    vector<int>::const_iterator acting_it = find(
+    auto acting_it = find(
       acting.begin(), acting.end(), i.first.osd);
     if (acting_it != acting.end())
       continue;
@@ -1812,6 +1838,287 @@ void PeeringState::calc_replicated_acting(
   }
 }
 
+// Defines osd preference order: acting set, then larger last_update
+using osd_ord_t = std::tuple<bool, eversion_t>; // <acting, last_update>
+using osd_id_t = int;
+
+class bucket_candidates_t {
+  std::deque<std::pair<osd_ord_t, osd_id_t>> osds;
+  int selected = 0;
+
+public:
+  void add_osd(osd_ord_t ord, osd_id_t osd) {
+    // osds will be added in smallest to largest order
+    assert(osds.empty() || osds.back().first <= ord);
+    osds.push_back(std::make_pair(ord, osd));
+  }
+  osd_id_t pop_osd() {
+    ceph_assert(!is_empty());
+    auto ret = osds.front();
+    osds.pop_front();
+    return ret.second;
+  }
+
+  void inc_selected() { selected++; }
+  unsigned get_num_selected() const { return selected; }
+
+  osd_ord_t get_ord() const {
+    return osds.empty() ? std::make_tuple(false, eversion_t())
+      : osds.front().first;
+  }
+
+  bool is_empty() const { return osds.empty(); }
+
+  bool operator<(const bucket_candidates_t &rhs) const {
+    return std::make_tuple(-selected, get_ord()) <
+      std::make_tuple(-rhs.selected, rhs.get_ord());
+  }
+
+  friend std::ostream &operator<<(std::ostream &, const bucket_candidates_t &);
+};
+
+std::ostream &operator<<(std::ostream &lhs, const bucket_candidates_t &cand)
+{
+  return lhs << "candidates[" << cand.osds << "]";
+}
+
+class bucket_heap_t {
+  using elem_t = std::reference_wrapper<bucket_candidates_t>;
+  std::vector<elem_t> heap;
+
+  // Max heap -- should emit buckets in order of preference
+  struct comp {
+    bool operator()(const elem_t &lhs, const elem_t &rhs) {
+      return lhs.get() < rhs.get();
+    }
+  };
+public:
+  void push_if_nonempty(elem_t e) {
+    if (!e.get().is_empty()) {
+      heap.push_back(e);
+      std::push_heap(heap.begin(), heap.end(), comp());
+    }
+  }
+  elem_t pop() {
+    std::pop_heap(heap.begin(), heap.end(), comp());
+    auto ret = heap.back();
+    heap.pop_back();
+    return ret;
+  }
+
+  bool is_empty() const { return heap.empty(); }
+};
+
+/**
+ * calc_replicated_acting_stretch
+ *
+ * Choose an acting set using as much of the up set as possible; filling 
+ * in the remaining slots so as to maximize the number of crush buckets at
+ * level pool.info.peering_crush_bucket_barrier represented.
+ *
+ * Stretch clusters are a bit special: while they have a "size" the
+ * same way as normal pools, if we happen to lose a data center
+ * (we call it a "stretch bucket", but really it'll be a data center or
+ * a cloud availability zone), we don't actually want to shove
+ * 2 DC's worth of replication into a single site -- it won't fit!
+ * So we locally calculate a bucket_max, based
+ * on the targeted number of stretch buckets for the pool and
+ * its size. Then we won't pull more than bucket_max from any
+ * given ancestor even if it leaves us undersized.
+
+ * There are two distinct phases: (commented below)
+ */
+void PeeringState::calc_replicated_acting_stretch(
+  map<pg_shard_t, pg_info_t>::const_iterator primary,
+  eversion_t oldest_auth_log_entry,
+  unsigned size,
+  const vector<int> &acting,
+  const vector<int> &up,
+  pg_shard_t up_primary,
+  const map<pg_shard_t, pg_info_t> &all_info,
+  bool restrict_to_up_acting,
+  vector<int> *want,
+  set<pg_shard_t> *backfill,
+  set<pg_shard_t> *acting_backfill,
+  const OSDMapRef osdmap,
+  const PGPool& pool,
+  ostream &ss)
+{
+  ceph_assert(want);
+  ceph_assert(acting_backfill);
+  ceph_assert(backfill);
+  ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "")
+     << std::endl;
+
+  auto used = [want](int osd) {
+    return std::find(want->begin(), want->end(), osd) != want->end();
+  };
+
+  auto usable_info = [&](const auto &cur_info) mutable {
+    return !(cur_info.is_incomplete() ||
+            cur_info.last_update < oldest_auth_log_entry);
+  };
+
+  auto osd_info = [&](int osd) mutable -> const pg_info_t & {
+    pg_shard_t cand = pg_shard_t(osd, shard_id_t::NO_SHARD);
+    const pg_info_t &cur_info = all_info.find(cand)->second;
+    return cur_info;
+  };
+
+  auto usable_osd = [&](int osd) mutable {
+    return usable_info(osd_info(osd));
+  };
+
+  std::map<int, bucket_candidates_t> ancestors;
+  auto get_ancestor = [&](int osd) mutable {
+    int ancestor = osdmap->crush->get_parent_of_type(
+      osd,
+      pool.info.peering_crush_bucket_barrier,
+      pool.info.crush_rule);
+    return &ancestors[ancestor];
+  };
+
+  unsigned bucket_max = pool.info.size / pool.info.peering_crush_bucket_target;
+  if (bucket_max * pool.info.peering_crush_bucket_target < pool.info.size) {
+    ++bucket_max; 
+  }
+
+  /* 1) Select all usable osds from the up set as well as the primary
+   *
+   * We also stash any unusable osds from up into backfill.
+   */
+  auto add_required = [&](int osd) {
+    if (!used(osd)) {
+      want->push_back(osd);
+      acting_backfill->insert(
+       pg_shard_t(osd, shard_id_t::NO_SHARD));
+      get_ancestor(osd)->inc_selected();
+    }
+  };
+  add_required(primary->first.osd);
+  ss << " osd " << primary->first.osd << " primary accepted "
+     << osd_info(primary->first.osd) << std::endl;
+  for (auto upcand: up) {
+    auto upshard = pg_shard_t(upcand, shard_id_t::NO_SHARD);
+    auto &curinfo = osd_info(upcand);
+    if (usable_osd(upcand)) {
+      ss << " osd " << upcand << " (up) accepted " << curinfo << std::endl;
+      add_required(upcand);
+    } else {
+      ss << " osd " << upcand << " (up) backfill " << curinfo << std::endl;
+      backfill->insert(upshard);
+      acting_backfill->insert(upshard);
+    }
+  }
+
+  if (want->size() >= pool.info.size) { // non-failed CRUSH mappings are valid
+    ss << " up set sufficient" << std::endl;
+    return;
+  }
+  ss << " up set insufficient, considering remaining osds" << std::endl;
+
+  /* 2) Fill out remaining slots from usable osds in all_info
+   *    while maximizing the number of ancestor nodes at the
+   *    barrier_id crush level.
+   */
+  {
+    std::vector<std::pair<osd_ord_t, osd_id_t>> candidates;
+    /* To do this, we first filter the set of usable osd into an ordered
+     * list of usable osds
+     */
+    auto get_osd_ord = [&](bool is_acting, const pg_info_t &info) -> osd_ord_t {
+      return std::make_tuple(
+       !is_acting /* acting should sort first */,
+       info.last_update);
+    };
+    for (auto &cand : acting) {
+      auto &cand_info = osd_info(cand);
+      if (!used(cand) && usable_info(cand_info)) {
+       ss << " acting candidate " << cand << " " << cand_info << std::endl;
+       candidates.push_back(std::make_pair(get_osd_ord(true, cand_info), cand));
+      }
+    }
+    if (!restrict_to_up_acting) {
+      for (auto &[cand, info] : all_info) {
+       if (!used(cand.osd) && usable_info(info) &&
+           (std::find(acting.begin(), acting.end(), cand.osd)
+            == acting.end())) {
+         ss << " other candidate " << cand << " " << info << std::endl;
+         candidates.push_back(
+           std::make_pair(get_osd_ord(false, info), cand.osd));
+       }
+      }
+    }
+    std::sort(candidates.begin(), candidates.end());
+
+    // We then filter these candidates by ancestor
+    std::for_each(candidates.begin(), candidates.end(), [&](auto cand) {
+      get_ancestor(cand.second)->add_osd(cand.first, cand.second);
+    });
+  }
+
+  auto pop_ancestor = [&](auto &ancestor) {
+    ceph_assert(!ancestor.is_empty());
+    auto osd = ancestor.pop_osd();
+
+    ss << " accepting candidate " << osd << std::endl;
+
+    ceph_assert(!used(osd));
+    ceph_assert(usable_osd(osd));
+
+    want->push_back(osd);
+    acting_backfill->insert(
+      pg_shard_t(osd, shard_id_t::NO_SHARD));
+    ancestor.inc_selected();
+  };
+
+  /* Next, we use the ancestors map to grab a descendant of the
+   * peering_crush_mandatory_member if not already represented.
+   *
+   * TODO: using 0 here to match other users.  Prior to merge, I
+   * expect that this and other users should instead check against
+   * CRUSH_ITEM_NONE.
+   */
+  if (pool.info.peering_crush_mandatory_member != CRUSH_ITEM_NONE) {
+    auto aiter = ancestors.find(pool.info.peering_crush_mandatory_member);
+    if (aiter != ancestors.end() &&
+       !aiter->second.get_num_selected()) {
+      ss << " adding required ancestor " << aiter->first << std::endl;
+      ceph_assert(!aiter->second.is_empty()); // wouldn't exist otherwise
+      pop_ancestor(aiter->second);
+    }
+  }
+
+  /* We then place the ancestors in a heap ordered by fewest selected
+   * and then by the ordering token of the next osd */
+  bucket_heap_t aheap;
+  std::for_each(ancestors.begin(), ancestors.end(), [&](auto &anc) {
+    aheap.push_if_nonempty(anc.second);
+  });
+
+  /* and pull from this heap until it's empty or we have enough.
+   * "We have enough" is a sufficient check here for
+   * stretch_set_can_peer() because our heap sorting always
+   * pulls from ancestors with the least number of included OSDs,
+   * so if it is possible to satisfy the bucket_count constraints we
+   * will do so.
+   */
+  while (!aheap.is_empty() && want->size() < pool.info.size) {
+    auto next = aheap.pop();
+    pop_ancestor(next.get());
+    if (next.get().get_num_selected() < bucket_max) {
+      aheap.push_if_nonempty(next);
+    }
+  }
+
+  /* The end result is that we should have as many buckets covered as
+   * possible while respecting up, the primary selection,
+   * the pool size (given bucket count constraints),
+   * and the mandatory member.
+   */
+}
+
+
 bool PeeringState::recoverable(const vector<int> &want) const
 {
   unsigned num_want_acting = 0;
@@ -1827,13 +2134,7 @@ bool PeeringState::recoverable(const vector<int> &want) const
   }
 
   if (num_want_acting < pool.info.min_size) {
-    const bool recovery_ec_pool_below_min_size=
-      HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_OCTOPUS);
-
-    if (pool.info.is_erasure() && !recovery_ec_pool_below_min_size) {
-      psdout(10) << __func__ << " failed, ec recovery below min size not supported by pre-octopus" << dendl;
-      return false;
-    } else if (!cct->_conf.get_val<bool>("osd_allow_recovery_below_min_size")) {
+    if (!cct->_conf.get_val<bool>("osd_allow_recovery_below_min_size")) {
       psdout(10) << __func__ << " failed, recovery below min size not enabled" << dendl;
       return false;
     }
@@ -1878,21 +2179,15 @@ void PeeringState::choose_async_recovery_ec(
     // past the authoritative last_update the same as those equal to it.
     version_t auth_version = auth_info.last_update.version;
     version_t candidate_version = shard_info.last_update.version;
-    if (HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)) {
-      auto approx_missing_objects =
-        shard_info.stats.stats.sum.num_objects_missing;
-      if (auth_version > candidate_version) {
-        approx_missing_objects += auth_version - candidate_version;
-      }
-      if (static_cast<uint64_t>(approx_missing_objects) >
-         cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
-        candidates_by_cost.emplace(approx_missing_objects, shard_i);
-      }
-    } else {
-      if (auth_version > candidate_version &&
-          (auth_version - candidate_version) > cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
-        candidates_by_cost.insert(make_pair(auth_version - candidate_version, shard_i));
-      }
+    assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS));
+    auto approx_missing_objects =
+      shard_info.stats.stats.sum.num_objects_missing;
+    if (auth_version > candidate_version) {
+      approx_missing_objects += auth_version - candidate_version;
+    }
+    if (static_cast<uint64_t>(approx_missing_objects) >
+       cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
+      candidates_by_cost.emplace(approx_missing_objects, shard_i);
     }
   }
 
@@ -1938,28 +2233,17 @@ void PeeringState::choose_async_recovery_replicated(
     // logs plus historical missing objects as the cost of recovery
     version_t auth_version = auth_info.last_update.version;
     version_t candidate_version = shard_info.last_update.version;
-    if (HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)) {
-      auto approx_missing_objects =
-        shard_info.stats.stats.sum.num_objects_missing;
-      if (auth_version > candidate_version) {
-        approx_missing_objects += auth_version - candidate_version;
-      } else {
-        approx_missing_objects += candidate_version - auth_version;
-      }
-      if (static_cast<uint64_t>(approx_missing_objects)  >
-         cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
-        candidates_by_cost.emplace(approx_missing_objects, shard_i);
-      }
+    assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS));
+    auto approx_missing_objects =
+      shard_info.stats.stats.sum.num_objects_missing;
+    if (auth_version > candidate_version) {
+      approx_missing_objects += auth_version - candidate_version;
     } else {
-      size_t approx_entries;
-      if (auth_version > candidate_version) {
-        approx_entries = auth_version - candidate_version;
-      } else {
-        approx_entries = candidate_version - auth_version;
-      }
-      if (approx_entries > cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
-        candidates_by_cost.insert(make_pair(approx_entries, shard_i));
-      }
+      approx_missing_objects += candidate_version - auth_version;
+    }
+    if (static_cast<uint64_t>(approx_missing_objects)  >
+       cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
+      candidates_by_cost.emplace(approx_missing_objects, shard_i);
     }
   }
 
@@ -1975,19 +2259,22 @@ void PeeringState::choose_async_recovery_replicated(
     vector<int> candidate_want(*want);
     for (auto it = candidate_want.begin(); it != candidate_want.end(); ++it) {
       if (*it == cur_shard.osd) {
-        candidate_want.erase(it);
-       want->swap(candidate_want);
-       async_recovery->insert(cur_shard);
-        break;
+       candidate_want.erase(it);
+       if (pool.info.stretch_set_can_peer(candidate_want, *osdmap, NULL)) {
+         // if we're in stretch mode, we can only remove the osd if it doesn't
+         // break peering limits.
+         want->swap(candidate_want);
+         async_recovery->insert(cur_shard);
+       }
+       break;
       }
     }
   }
+
   psdout(20) << __func__ << " result want=" << *want
             << " async_recovery=" << *async_recovery << dendl;
 }
 
-
-
 /**
  * choose acting
  *
@@ -2012,16 +2299,14 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id,
   all_info[pg_whoami] = info;
 
   if (cct->_conf->subsys.should_gather<dout_subsys, 10>()) {
-    for (map<pg_shard_t, pg_info_t>::iterator p = all_info.begin();
-         p != all_info.end();
-         ++p) {
+    for (auto p = all_info.begin(); p != all_info.end(); ++p) {
       psdout(10) << __func__ << " all_info osd." << p->first << " "
                 << p->second << dendl;
     }
   }
 
-  map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard =
-    find_best_info(all_info, restrict_to_up_acting, history_les_bound);
+  auto auth_log_shard = find_best_info(all_info, restrict_to_up_acting,
+                                      history_les_bound);
 
   if (auth_log_shard == all_info.end()) {
     if (up != acting) {
@@ -2043,23 +2328,50 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id,
   set<pg_shard_t> want_backfill, want_acting_backfill;
   vector<int> want;
   stringstream ss;
-  if (pool.info.is_replicated())
-    calc_replicated_acting(
+  if (pool.info.is_replicated()) {
+    auto [primary_shard, oldest_log] = select_replicated_primary(
       auth_log_shard,
       cct->_conf.get_val<uint64_t>(
-        "osd_force_auth_primary_missing_objects"),
-      get_osdmap()->get_pg_size(info.pgid.pgid),
-      acting,
+       "osd_force_auth_primary_missing_objects"),
       up,
       up_primary,
       all_info,
-      restrict_to_up_acting,
-      &want,
-      &want_backfill,
-      &want_acting_backfill,
       get_osdmap(),
       ss);
-  else
+    if (pool.info.is_stretch_pool()) {
+      calc_replicated_acting_stretch(
+       primary_shard,
+       oldest_log,
+       get_osdmap()->get_pg_size(info.pgid.pgid),
+       acting,
+       up,
+       up_primary,
+       all_info,
+       restrict_to_up_acting,
+       &want,
+       &want_backfill,
+       &want_acting_backfill,
+       get_osdmap(),
+       pool,
+       ss);
+    } else {
+      calc_replicated_acting(
+       primary_shard,
+       oldest_log,
+       get_osdmap()->get_pg_size(info.pgid.pgid),
+       acting,
+       up,
+       up_primary,
+       all_info,
+       restrict_to_up_acting,
+       &want,
+       &want_backfill,
+       &want_acting_backfill,
+       get_osdmap(),
+       pool,
+       ss);
+    }
+  } else {
     calc_ec_acting(
       auth_log_shard,
       get_osdmap()->get_pg_size(info.pgid.pgid),
@@ -2071,6 +2383,7 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id,
       &want_backfill,
       &want_acting_backfill,
       ss);
+  }
   psdout(10) << ss.str() << dendl;
 
   if (!recoverable(want)) {
@@ -2115,6 +2428,7 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id,
     }
     return false;
   }
+
   if (request_pg_temp_change_only)
     return true;
   want_acting.clear();
@@ -2138,9 +2452,7 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id,
   }
   // Will not change if already set because up would have had to change
   // Verify that nothing in backfill is in stray_set
-  for (set<pg_shard_t>::iterator i = want_backfill.begin();
-      i != want_backfill.end();
-      ++i) {
+  for (auto i = want_backfill.begin(); i != want_backfill.end(); ++i) {
     ceph_assert(stray_set.find(*i) == stray_set.end());
   }
   psdout(10) << "choose_acting want=" << want << " backfill_targets="
@@ -2227,8 +2539,8 @@ bool PeeringState::discover_all_missing(
             << unfound << " unfound"
             << dendl;
 
-  std::set<pg_shard_t>::const_iterator m = might_have_unfound.begin();
-  std::set<pg_shard_t>::const_iterator mend = might_have_unfound.end();
+  auto m = might_have_unfound.begin();
+  auto mend = might_have_unfound.end();
   for (; m != mend; ++m) {
     pg_shard_t peer(*m);
 
@@ -2242,7 +2554,7 @@ bool PeeringState::discover_all_missing(
       continue;
     }
 
-    map<pg_shard_t, pg_info_t>::const_iterator iter = peer_info.find(peer);
+    auto iter = peer_info.find(peer);
     if (iter != peer_info.end() &&
         (iter->second.is_empty() || iter->second.dne())) {
       // ignore empty peers
@@ -2306,9 +2618,7 @@ void PeeringState::build_might_have_unfound()
     pool.info.is_erasure());
 
   // include any (stray) peers
-  for (map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
-       p != peer_info.end();
-       ++p)
+  for (auto p = peer_info.begin(); p != peer_info.end(); ++p)
     might_have_unfound.insert(p->first);
 
   psdout(15) << __func__ << ": built " << might_have_unfound << dendl;
@@ -2328,7 +2638,7 @@ void PeeringState::activate(
 
   if (is_primary()) {
     // only update primary last_epoch_started if we will go active
-    if (acting.size() >= pool.info.min_size) {
+    if (acting_set_writeable()) {
       ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
             info.last_epoch_started <= activation_epoch);
       info.last_epoch_started = activation_epoch;
@@ -2400,10 +2710,9 @@ void PeeringState::activate(
     purged.intersection_of(to_trim, info.purged_snaps);
     to_trim.subtract(purged);
 
-    if (HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
-      renew_lease(pl->get_mnow());
-      // do not schedule until we are actually activated
-    }
+    assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
+    renew_lease(pl->get_mnow());
+    // do not schedule until we are actually activated
 
     // adjust purged_snaps: PG may have been inactive while snaps were pruned
     // from the removed_snaps_queue in the osdmap.  update local purged_snaps
@@ -2412,11 +2721,16 @@ void PeeringState::activate(
     info.purged_snaps.swap(purged);
 
     // start up replicas
+    if (prior_readable_down_osds.empty()) {
+      dout(10) << __func__ << " no prior_readable_down_osds to wait on, clearing ub"
+              << dendl;
+      clear_prior_readable_until_ub();
+    }
     info.history.refresh_prior_readable_until_ub(pl->get_mnow(),
                                                 prior_readable_until_ub);
 
     ceph_assert(!acting_recovery_backfill.empty());
-    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+    for (auto i = acting_recovery_backfill.begin();
         i != acting_recovery_backfill.end();
         ++i) {
       if (*i == pg_whoami) continue;
@@ -2426,12 +2740,19 @@ void PeeringState::activate(
 
       psdout(10) << "activate peer osd." << peer << " " << pi << dendl;
 
-      MOSDPGLog *m = 0;
+      #if defined(WITH_SEASTAR)
+      MURef<MOSDPGLog> m;
+      #else
+      MRef<MOSDPGLog> m;
+      #endif
       ceph_assert(peer_missing.count(peer));
       pg_missing_t& pm = peer_missing[peer];
 
       bool needs_past_intervals = pi.dne();
 
+      // Save num_bytes for backfill reservation request, can't be negative
+      peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
+
       if (pi.last_update == info.last_update) {
         // empty log
        if (!pi.last_backfill.is_max())
@@ -2453,7 +2774,7 @@ void PeeringState::activate(
        } else {
          psdout(10) << "activate peer osd." << peer
                     << " is up to date, but sending pg_log anyway" << dendl;
-         m = new MOSDPGLog(
+         m = TOPNSPC::make_message<MOSDPGLog>(
            i->shard, pg_whoami.shard,
            get_osdmap_epoch(), info,
            last_peering_reset);
@@ -2482,15 +2803,13 @@ void PeeringState::activate(
        pi.last_interval_started = info.last_interval_started;
        pi.history = info.history;
        pi.hit_set = info.hit_set;
-        // Save num_bytes for reservation request, can't be negative
-        peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
         pi.stats.stats.clear();
         pi.stats.stats.sum.num_bytes = peer_bytes[peer];
 
        // initialize peer with our purged_snaps.
        pi.purged_snaps = info.purged_snaps;
 
-       m = new MOSDPGLog(
+       m = TOPNSPC::make_message<MOSDPGLog>(
          i->shard, pg_whoami.shard,
          get_osdmap_epoch(), pi,
          last_peering_reset /* epoch to create pg at */);
@@ -2505,7 +2824,7 @@ void PeeringState::activate(
       } else {
        // catch up
        ceph_assert(pg_log.get_tail() <= pi.last_update);
-       m = new MOSDPGLog(
+       m = TOPNSPC::make_message<MOSDPGLog>(
          i->shard, pg_whoami.shard,
          get_osdmap_epoch(), info,
          last_peering_reset /* epoch to create pg at */);
@@ -2521,9 +2840,7 @@ void PeeringState::activate(
 
       // update local version of peer's missing list!
       if (m && pi.last_backfill != hobject_t()) {
-        for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
-             p != m->log.log.end();
-             ++p) {
+        for (auto p = m->log.log.begin(); p != m->log.log.end(); ++p) {
          if (p->soid <= pi.last_backfill &&
              !p->is_error()) {
            if (perform_deletes_during_peering() && p->is_delete()) {
@@ -2539,7 +2856,7 @@ void PeeringState::activate(
        dout(10) << "activate peer osd." << peer << " sending " << m->log
                 << dendl;
        m->lease = get_lease();
-       pl->send_cluster_message(peer.osd, m, get_osdmap_epoch());
+       pl->send_cluster_message(peer.osd, std::move(m), get_osdmap_epoch());
       }
 
       // peer now has
@@ -2558,7 +2875,7 @@ void PeeringState::activate(
 
     // Set up missing_loc
     set<pg_shard_t> complete_shards;
-    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+    for (auto i = acting_recovery_backfill.begin();
         i != acting_recovery_backfill.end();
         ++i) {
       psdout(20) << __func__ << " setting up missing_loc from shard " << *i
@@ -2591,7 +2908,7 @@ void PeeringState::activate(
       } else {
         missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
                                    ctx.handle);
-        for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+        for (auto i = acting_recovery_backfill.begin();
             i != acting_recovery_backfill.end();
             ++i) {
          if (*i == pg_whoami) continue;
@@ -2605,9 +2922,7 @@ void PeeringState::activate(
             ctx.handle);
         }
       }
-      for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
-          i != peer_missing.end();
-          ++i) {
+      for (auto i = peer_missing.begin(); i != peer_missing.end(); ++i) {
        if (is_acting_recovery_backfill(i->first))
          continue;
        ceph_assert(peer_info.count(i->first));
@@ -2634,7 +2949,7 @@ void PeeringState::activate(
     state_set(PG_STATE_ACTIVATING);
     pl->on_activate(std::move(to_trim));
   }
-  if (acting.size() >= pool.info.min_size) {
+  if (acting_set_writeable()) {
     PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
     pg_log.roll_forward(rollbacker.get());
   }
@@ -2656,33 +2971,24 @@ void PeeringState::share_pg_info()
       peer->second.last_interval_started = info.last_interval_started;
       peer->second.history.merge(info.history);
     }
-    Message* m = nullptr;
-    if (last_require_osd_release >= ceph_release_t::octopus) {
-      m = new MOSDPGInfo2{spg_t{info.pgid.pgid, pg_shard.shard},
+    auto m = TOPNSPC::make_message<MOSDPGInfo2>(spg_t{info.pgid.pgid, pg_shard.shard},
                          info,
                          get_osdmap_epoch(),
                          get_osdmap_epoch(),
-                         get_lease(), {}};
-    } else {
-      m = new MOSDPGInfo{get_osdmap_epoch(),
-                        {pg_notify_t{pg_shard.shard,
-                                     pg_whoami.shard,
-                                     get_osdmap_epoch(),
-                                     get_osdmap_epoch(),
-                                     info,
-                                     past_intervals}}};
-    }
-    pl->send_cluster_message(pg_shard.osd, m, get_osdmap_epoch());
+                         std::optional<pg_lease_t>{get_lease()},
+                         std::nullopt);
+    pl->send_cluster_message(pg_shard.osd, std::move(m), get_osdmap_epoch());
   }
 }
 
 void PeeringState::merge_log(
-  ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
+  ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t&& olog,
   pg_shard_t from)
 {
   PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
   pg_log.merge_log(
-    oinfo, olog, from, info, rollbacker.get(), dirty_info, dirty_big_info);
+    oinfo, std::move(olog), from, info, rollbacker.get(),
+    dirty_info, dirty_big_info);
 }
 
 void PeeringState::rewind_divergent_log(
@@ -2719,7 +3025,7 @@ void PeeringState::proc_primary_info(
 
 void PeeringState::proc_master_log(
   ObjectStore::Transaction& t, pg_info_t &oinfo,
-  pg_log_t &olog, pg_missing_t& omissing, pg_shard_t from)
+  pg_log_t&& olog, pg_missing_t&& omissing, pg_shard_t from)
 {
   psdout(10) << "proc_master_log for osd." << from << ": "
             << olog << " " << omissing << dendl;
@@ -2729,7 +3035,7 @@ void PeeringState::proc_master_log(
   // make any adjustments to their missing map; we are taking their
   // log to be authoritative (i.e., their entries are by definitely
   // non-divergent).
-  merge_log(t, oinfo, olog, from);
+  merge_log(t, oinfo, std::move(olog), from);
   peer_info[from] = oinfo;
   psdout(10) << " peer osd." << from << " now " << oinfo
             << " " << omissing << dendl;
@@ -2748,13 +3054,13 @@ void PeeringState::proc_master_log(
   ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
         info.last_epoch_started >= info.history.last_epoch_started);
 
-  peer_missing[from].claim(omissing);
+  peer_missing[from].claim(std::move(omissing));
 }
 
 void PeeringState::proc_replica_log(
   pg_info_t &oinfo,
   const pg_log_t &olog,
-  pg_missing_t& omissing,
+  pg_missing_t&& omissing,
   pg_shard_t from)
 {
   psdout(10) << "proc_replica_log for osd." << from << ": "
@@ -2767,15 +3073,14 @@ void PeeringState::proc_replica_log(
             << oinfo << " " << omissing << dendl;
   might_have_unfound.insert(from);
 
-  for (map<hobject_t, pg_missing_item>::const_iterator i =
-        omissing.get_items().begin();
+  for (auto i = omissing.get_items().begin();
        i != omissing.get_items().end();
        ++i) {
     psdout(20) << " after missing " << i->first
               << " need " << i->second.need
               << " have " << i->second.have << dendl;
   }
-  peer_missing[from].claim(omissing);
+  peer_missing[from].claim(std::move(omissing));
 }
 
 void PeeringState::fulfill_info(
@@ -2797,7 +3102,7 @@ void PeeringState::fulfill_log(
   ceph_assert(from == primary);
   ceph_assert(query.type != pg_query_t::INFO);
 
-  MOSDPGLog *mlog = new MOSDPGLog(
+  auto mlog = TOPNSPC::make_message<MOSDPGLog>(
     from.shard, pg_whoami.shard,
     get_osdmap_epoch(),
     info, query_epoch);
@@ -2823,7 +3128,7 @@ void PeeringState::fulfill_log(
 
   psdout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
 
-  pl->send_cluster_message(from.osd, mlog, get_osdmap_epoch(), true);
+  pl->send_cluster_message(from.osd, std::move(mlog), get_osdmap_epoch(), true);
 }
 
 void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
@@ -2933,6 +3238,8 @@ void PeeringState::split_into(
   child->info.stats.parent_split_bits = split_bits;
   info.stats.stats_invalid = true;
   child->info.stats.stats_invalid = true;
+  child->info.stats.objects_trimmed = 0;
+  child->info.stats.snaptrim_duration = 0.0;
   child->info.last_epoch_started = info.last_epoch_started;
   child->info.last_interval_started = info.last_interval_started;
 
@@ -2952,9 +3259,9 @@ void PeeringState::split_into(
   if (get_primary() != child->get_primary())
     child->info.history.same_primary_since = get_osdmap_epoch();
 
-  child->info.stats.up = up;
+  child->info.stats.up = newup;
   child->info.stats.up_primary = up_primary;
-  child->info.stats.acting = acting;
+  child->info.stats.acting = newacting;
   child->info.stats.acting_primary = primary;
   child->info.stats.mapping_epoch = get_osdmap_epoch();
 
@@ -3166,9 +3473,7 @@ void PeeringState::update_blocked_by()
   info.stats.blocked_by.clear();
   info.stats.blocked_by.resize(keep);
   unsigned pos = 0;
-  for (set<int>::iterator p = blocked_by.begin();
-       p != blocked_by.end() && keep > 0;
-       ++p) {
+  for (auto p = blocked_by.begin(); p != blocked_by.end() && keep > 0; ++p) {
     if (skip > 0 && (rand() % (skip + keep) < skip)) {
       --skip;
     } else {
@@ -3496,11 +3801,12 @@ out:
 }
 
 std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish(
-  bool pg_stats_publish_valid,
-  const pg_stat_t &pg_stats_publish,
+  const std::optional<pg_stat_t> &pg_stats_publish,
   const object_stat_collection_t &unstable_stats)
 {
   if (info.stats.stats.sum.num_scrub_errors) {
+    psdout(10) << __func__ << " inconsistent due to " <<
+      info.stats.stats.sum.num_scrub_errors << " scrub errors" << dendl;
     state_set(PG_STATE_INCONSISTENT);
   } else {
     state_clear(PG_STATE_INCONSISTENT);
@@ -3511,7 +3817,7 @@ std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish(
   if (info.stats.state != state) {
     info.stats.last_change = now;
     // Optimistic estimation, if we just find out an inactive PG,
-    // assumt it is active till now.
+    // assume it is active till now.
     if (!(state & PG_STATE_ACTIVE) &&
        (info.stats.state & PG_STATE_ACTIVE))
       info.stats.last_active = now;
@@ -3551,9 +3857,9 @@ std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish(
   psdout(20) << __func__ << " reporting purged_snaps "
             << pre_publish.purged_snaps << dendl;
 
-  if (pg_stats_publish_valid && pre_publish == pg_stats_publish &&
+  if (pg_stats_publish && pre_publish == *pg_stats_publish &&
       info.stats.last_fresh > cutoff) {
-    psdout(15) << "publish_stats_to_osd " << pg_stats_publish.reported_epoch
+    psdout(15) << "publish_stats_to_osd " << pg_stats_publish->reported_epoch
               << ": no change since " << info.stats.last_fresh << dendl;
     return std::nullopt;
   } else {
@@ -3575,8 +3881,8 @@ std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish(
     if ((info.stats.state & PG_STATE_UNDERSIZED) == 0)
       info.stats.last_fullsized = now;
 
-    psdout(15) << "publish_stats_to_osd " << pg_stats_publish.reported_epoch
-              << ":" << pg_stats_publish.reported_seq << dendl;
+    psdout(15) << "publish_stats_to_osd " << pre_publish.reported_epoch
+              << ":" << pre_publish.reported_seq << dendl;
     return std::make_optional(std::move(pre_publish));
   }
 }
@@ -3587,7 +3893,6 @@ void PeeringState::init(
   const vector<int>& newacting, int new_acting_primary,
   const pg_history_t& history,
   const PastIntervals& pi,
-  bool backfill,
   ObjectStore::Transaction &t)
 {
   psdout(10) << "init role " << role << " up "
@@ -3616,13 +3921,6 @@ void PeeringState::init(
     pg_log.set_missing_may_contain_deletes();
   }
 
-  if (backfill) {
-    psdout(10) << __func__ << ": Setting backfill" << dendl;
-    info.set_last_backfill(hobject_t());
-    info.last_complete = info.last_update;
-    pg_log.mark_log_for_rewrite();
-  }
-
   on_new_interval();
 
   dirty_info = true;
@@ -3635,24 +3933,22 @@ void PeeringState::dump_peering_state(Formatter *f)
   f->dump_string("state", get_pg_state_string());
   f->dump_unsigned("epoch", get_osdmap_epoch());
   f->open_array_section("up");
-  for (vector<int>::const_iterator p = up.begin(); p != up.end(); ++p)
+  for (auto p = up.begin(); p != up.end(); ++p)
     f->dump_unsigned("osd", *p);
   f->close_section();
   f->open_array_section("acting");
-  for (vector<int>::const_iterator p = acting.begin(); p != acting.end(); ++p)
+  for (auto p = acting.begin(); p != acting.end(); ++p)
     f->dump_unsigned("osd", *p);
   f->close_section();
   if (!backfill_targets.empty()) {
     f->open_array_section("backfill_targets");
-    for (set<pg_shard_t>::iterator p = backfill_targets.begin();
-        p != backfill_targets.end();
-        ++p)
+    for (auto p = backfill_targets.begin(); p != backfill_targets.end(); ++p)
       f->dump_stream("shard") << *p;
     f->close_section();
   }
   if (!async_recovery_targets.empty()) {
     f->open_array_section("async_recovery_targets");
-    for (set<pg_shard_t>::iterator p = async_recovery_targets.begin();
+    for (auto p = async_recovery_targets.begin();
         p != async_recovery_targets.end();
         ++p)
       f->dump_stream("shard") << *p;
@@ -3660,7 +3956,7 @@ void PeeringState::dump_peering_state(Formatter *f)
   }
   if (!acting_recovery_backfill.empty()) {
     f->open_array_section("acting_recovery_backfill");
-    for (set<pg_shard_t>::iterator p = acting_recovery_backfill.begin();
+    for (auto p = acting_recovery_backfill.begin();
         p != acting_recovery_backfill.end();
         ++p)
       f->dump_stream("shard") << *p;
@@ -3672,14 +3968,13 @@ void PeeringState::dump_peering_state(Formatter *f)
   f->close_section();
 
   f->open_array_section("peer_info");
-  for (map<pg_shard_t, pg_info_t>::const_iterator p = peer_info.begin();
-       p != peer_info.end();
-       ++p) {
+  for (auto p = peer_info.begin(); p != peer_info.end(); ++p) {
     f->open_object_section("info");
     f->dump_stream("peer") << p->first;
     p->second.dump(f);
     f->close_section();
   }
+  f->close_section();
 }
 
 void PeeringState::update_stats(
@@ -3688,7 +3983,7 @@ void PeeringState::update_stats(
   if (f(info.history, info.stats)) {
     pl->publish_stats_to_osd();
   }
-  pl->on_info_history_change();
+  pl->reschedule_scrub();
 
   if (t) {
     dirty_info = true;
@@ -3696,6 +3991,12 @@ void PeeringState::update_stats(
   }
 }
 
+void PeeringState::update_stats_wo_resched(
+  std::function<void(pg_history_t &, pg_stat_t &)> f)
+{
+  f(info.history, info.stats);
+}
+
 bool PeeringState::append_log_entries_update_missing(
   const mempool::osd_pglog::list<pg_log_entry_t> &entries,
   ObjectStore::Transaction &t, std::optional<eversion_t> trim_to,
@@ -3746,7 +4047,7 @@ void PeeringState::merge_new_log_entries(
   ceph_assert(is_primary());
 
   bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
-  for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
+  for (auto i = acting_recovery_backfill.begin();
        i != acting_recovery_backfill.end();
        ++i) {
     pg_shard_t peer(*i);
@@ -3808,7 +4109,7 @@ void PeeringState::add_log_entry(const pg_log_entry_t& e, bool applied)
 
 
 void PeeringState::append_log(
-  const vector<pg_log_entry_t>& logv,
+  vector<pg_log_entry_t>&& logv,
   eversion_t trim_to,
   eversion_t roll_forward_to,
   eversion_t mlcod,
@@ -3843,9 +4144,7 @@ void PeeringState::append_log(
     pg_log.skip_rollforward();
   }
 
-  for (vector<pg_log_entry_t>::const_iterator p = logv.begin();
-       p != logv.end();
-       ++p) {
+  for (auto p = logv.begin(); p != logv.end(); ++p) {
     add_log_entry(*p, transaction_applied);
 
     /* We don't want to leave the rollforward artifacts around
@@ -3865,6 +4164,8 @@ void PeeringState::append_log(
 
   psdout(10) << __func__ << " approx pg log length =  "
             << pg_log.get_log().approx_size() << dendl;
+  psdout(10) << __func__ << " dups pg log length =  "
+            << pg_log.get_log().dups.size() << dendl;
   psdout(10) << __func__ << " transaction_applied = "
             << transaction_applied << dendl;
   if (!transaction_applied || async)
@@ -4051,7 +4352,7 @@ void PeeringState::recovery_committed_to(eversion_t version)
       // we are fully up to date.  tell the primary!
       pl->send_cluster_message(
        get_primary().osd,
-       new MOSDPGTrim(
+       TOPNSPC::make_message<MOSDPGTrim>(
          get_osdmap_epoch(),
          spg_t(info.pgid.pgid, primary.shard),
          last_complete_ondisk),
@@ -4085,7 +4386,7 @@ void PeeringState::calc_trim_to()
         cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
       return;
     }
-    list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
+    auto it = pg_log.get_log().log.begin();
     eversion_t new_trim_to;
     for (size_t i = 0; i < num_to_trim; ++i) {
       new_trim_to = it->version;
@@ -4161,7 +4462,7 @@ void PeeringState::apply_op_stats(
   info.stats.stats.add(delta_stats);
   info.stats.stats.floor(0);
 
-  for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+  for (auto i = get_backfill_targets().begin();
        i != get_backfill_targets().end();
        ++i) {
     pg_shard_t bt = *i;
@@ -4331,6 +4632,13 @@ boost::statechart::result PeeringState::Started::react(const QueryState& q)
   return discard_event();
 }
 
+boost::statechart::result PeeringState::Started::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "Started");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::Started::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -4419,6 +4727,13 @@ boost::statechart::result PeeringState::Reset::react(const QueryState& q)
   return discard_event();
 }
 
+boost::statechart::result PeeringState::Reset::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "Reset");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::Reset::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -4596,9 +4911,7 @@ boost::statechart::result PeeringState::Peering::react(const QueryState& q)
   q.f->close_section();
 
   q.f->open_array_section("probing_osds");
-  for (set<pg_shard_t>::iterator p = prior_set.probe.begin();
-       p != prior_set.probe.end();
-       ++p)
+  for (auto p = prior_set.probe.begin(); p != prior_set.probe.end(); ++p)
     q.f->dump_stream("osd") << *p;
   q.f->close_section();
 
@@ -4606,14 +4919,12 @@ boost::statechart::result PeeringState::Peering::react(const QueryState& q)
     q.f->dump_string("blocked", "peering is blocked due to down osds");
 
   q.f->open_array_section("down_osds_we_would_probe");
-  for (set<int>::iterator p = prior_set.down.begin();
-       p != prior_set.down.end();
-       ++p)
+  for (auto p = prior_set.down.begin(); p != prior_set.down.end(); ++p)
     q.f->dump_int("osd", *p);
   q.f->close_section();
 
   q.f->open_array_section("peering_blocked_by");
-  for (map<int,epoch_t>::iterator p = prior_set.blocked_by.begin();
+  for (auto p = prior_set.blocked_by.begin();
        p != prior_set.blocked_by.end();
        ++p) {
     q.f->open_object_section("osd");
@@ -4636,6 +4947,13 @@ boost::statechart::result PeeringState::Peering::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::Peering::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "Peering");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::Peering::exit()
 {
 
@@ -4671,13 +4989,13 @@ void PeeringState::Backfilling::backfill_release_reservations()
 {
   DECLARE_LOCALS;
   pl->cancel_local_background_io_reservation();
-  for (set<pg_shard_t>::iterator it = ps->backfill_targets.begin();
+  for (auto it = ps->backfill_targets.begin();
        it != ps->backfill_targets.end();
        ++it) {
     ceph_assert(*it != ps->pg_whoami);
     pl->send_cluster_message(
       it->osd,
-      new MBackfillReserve(
+      TOPNSPC::make_message<MBackfillReserve>(
        MBackfillReserve::RELEASE,
        spg_t(ps->info.pgid.pgid, it->shard),
        ps->get_osdmap_epoch()),
@@ -4801,7 +5119,7 @@ PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved &ev
     ceph_assert(*backfill_osd_it != ps->pg_whoami);
     pl->send_cluster_message(
       backfill_osd_it->osd,
-      new MBackfillReserve(
+      TOPNSPC::make_message<MBackfillReserve>(
        MBackfillReserve::REQUEST,
        spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard),
        ps->get_osdmap_epoch(),
@@ -4841,7 +5159,7 @@ void PeeringState::WaitRemoteBackfillReserved::retry()
     ceph_assert(*it != ps->pg_whoami);
     pl->send_cluster_message(
       it->osd,
-      new MBackfillReserve(
+      TOPNSPC::make_message<MBackfillReserve>(
        MBackfillReserve::RELEASE,
        spg_t(context< PeeringMachine >().spgid.pgid, it->shard),
        ps->get_osdmap_epoch()),
@@ -4886,11 +5204,11 @@ PeeringState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ct
   ps->state_set(PG_STATE_BACKFILL_WAIT);
   pl->request_local_background_io_reservation(
     ps->get_backfill_priority(),
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       LocalBackfillReserved()),
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       DeferBackfill(0.0)));
@@ -4916,6 +5234,14 @@ PeeringState::NotBackfilling::NotBackfilling(my_context ctx)
   pl->publish_stats_to_osd();
 }
 
+boost::statechart::result PeeringState::NotBackfilling::react(const QueryUnfound& q)
+{
+  DECLARE_LOCALS;
+
+  ps->query_unfound(q.f, "NotBackfilling");
+  return discard_event();
+}
+
 boost::statechart::result
 PeeringState::NotBackfilling::react(const RemoteBackfillReserved &evt)
 {
@@ -4949,6 +5275,14 @@ PeeringState::NotRecovering::NotRecovering(my_context ctx)
   pl->publish_stats_to_osd();
 }
 
+boost::statechart::result PeeringState::NotRecovering::react(const QueryUnfound& q)
+{
+  DECLARE_LOCALS;
+
+  ps->query_unfound(q.f, "NotRecovering");
+  return discard_event();
+}
+
 void PeeringState::NotRecovering::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -4998,7 +5332,7 @@ PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
   DECLARE_LOCALS;
   pl->send_cluster_message(
     ps->primary.osd,
-    new MRecoveryReserve(
+    TOPNSPC::make_message<MRecoveryReserve>(
       MRecoveryReserve::GRANT,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5043,21 +5377,21 @@ PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt)
        evt.primary_num_bytes, evt.local_num_bytes)) {
     post_event(RejectTooFullRemoteReservation());
   } else {
-    PGPeeringEventRef preempt;
+    PGPeeringEventURef preempt;
     if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) {
       // older peers will interpret preemption as TOOFULL
-      preempt = std::make_shared<PGPeeringEvent>(
+      preempt = std::make_unique<PGPeeringEvent>(
        pl->get_osdmap_epoch(),
        pl->get_osdmap_epoch(),
        RemoteBackfillPreempted());
     }
     pl->request_remote_recovery_reservation(
       evt.priority,
-      std::make_shared<PGPeeringEvent>(
+      std::make_unique<PGPeeringEvent>(
        pl->get_osdmap_epoch(),
        pl->get_osdmap_epoch(),
         RemoteBackfillReserved()),
-      preempt);
+      std::move(preempt));
   }
   return transit<RepWaitBackfillReserved>();
 }
@@ -5071,10 +5405,10 @@ PeeringState::RepNotRecovering::react(const RequestRecoveryPrio &evt)
   // (pre-mimic compat)
   int prio = evt.priority ? evt.priority : ps->get_recovery_priority();
 
-  PGPeeringEventRef preempt;
+  PGPeeringEventURef preempt;
   if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) {
     // older peers can't handle this
-    preempt = std::make_shared<PGPeeringEvent>(
+    preempt = std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       RemoteRecoveryPreempted());
@@ -5082,11 +5416,11 @@ PeeringState::RepNotRecovering::react(const RequestRecoveryPrio &evt)
 
   pl->request_remote_recovery_reservation(
     prio,
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       RemoteRecoveryReserved()),
-    preempt);
+    std::move(preempt));
   return transit<RepWaitRecoveryReserved>();
 }
 
@@ -5106,7 +5440,7 @@ PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
 
   pl->send_cluster_message(
       ps->primary.osd,
-      new MBackfillReserve(
+      TOPNSPC::make_message<MBackfillReserve>(
        MBackfillReserve::GRANT,
        spg_t(ps->info.pgid.pgid, ps->primary.shard),
        ps->get_osdmap_epoch()),
@@ -5163,7 +5497,7 @@ PeeringState::RepRecovering::react(const RemoteRecoveryPreempted &)
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     ps->primary.osd,
-    new MRecoveryReserve(
+    TOPNSPC::make_message<MRecoveryReserve>(
       MRecoveryReserve::REVOKE,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5180,7 +5514,7 @@ PeeringState::RepRecovering::react(const BackfillTooFull &)
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     ps->primary.osd,
-    new MBackfillReserve(
+    TOPNSPC::make_message<MBackfillReserve>(
       MBackfillReserve::REVOKE_TOOFULL,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5197,7 +5531,7 @@ PeeringState::RepRecovering::react(const RemoteBackfillPreempted &)
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     ps->primary.osd,
-    new MBackfillReserve(
+    TOPNSPC::make_message<MBackfillReserve>(
       MBackfillReserve::REVOKE,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5250,11 +5584,11 @@ PeeringState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ct
   ps->state_set(PG_STATE_RECOVERY_WAIT);
   pl->request_local_background_io_reservation(
     ps->get_recovery_priority(),
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       LocalRecoveryReserved()),
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       DeferRecovery(0.0)));
@@ -5301,7 +5635,7 @@ PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved &ev
     ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami);
     pl->send_cluster_message(
       remote_recovery_reservation_it->osd,
-      new MRecoveryReserve(
+      TOPNSPC::make_message<MRecoveryReserve>(
        MRecoveryReserve::REQUEST,
        spg_t(context< PeeringMachine >().spgid.pgid,
              remote_recovery_reservation_it->shard),
@@ -5344,15 +5678,14 @@ void PeeringState::Recovering::release_reservations(bool cancel)
   ceph_assert(cancel || !ps->pg_log.get_missing().have_missing());
 
   // release remote reservations
-  for (set<pg_shard_t>::const_iterator i =
-        context< Active >().remote_shards_to_reserve_recovery.begin();
-        i != context< Active >().remote_shards_to_reserve_recovery.end();
-        ++i) {
+  for (auto i = context< Active >().remote_shards_to_reserve_recovery.begin();
+       i != context< Active >().remote_shards_to_reserve_recovery.end();
+       ++i) {
     if (*i == ps->pg_whoami) // skip myself
       continue;
     pl->send_cluster_message(
       i->osd,
-      new MRecoveryReserve(
+      TOPNSPC::make_message<MRecoveryReserve>(
        MRecoveryReserve::RELEASE,
        spg_t(ps->info.pgid.pgid, i->shard),
        ps->get_osdmap_epoch()),
@@ -5386,6 +5719,7 @@ PeeringState::Recovering::react(const RequestBackfill &evt)
   if (!ps->async_recovery_targets.empty()) {
     pg_shard_t auth_log_shard;
     bool history_les_bound = false;
+    // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
     ps->choose_acting(auth_log_shard, true, &history_les_bound);
   }
   return transit<WaitLocalBackfillReserved>();
@@ -5462,6 +5796,7 @@ PeeringState::Recovered::Recovered(my_context ctx)
                                                 true, &history_les_bound)) {
     ceph_assert(ps->want_acting.size());
   } else if (!ps->async_recovery_targets.empty()) {
+    // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
     ps->choose_acting(auth_log_shard, true, &history_les_bound);
   }
 
@@ -5513,9 +5848,7 @@ set<pg_shard_t> unique_osd_shard_set(const pg_shard_t & skip, const T &in)
 {
   set<int> osds_found;
   set<pg_shard_t> out;
-  for (typename T::const_iterator i = in.begin();
-       i != in.end();
-       ++i) {
+  for (auto i = in.begin(); i != in.end(); ++i) {
     if (*i != skip && !osds_found.count(i->osd)) {
       osds_found.insert(i->osd);
       out.insert(*i);
@@ -5553,7 +5886,7 @@ PeeringState::Active::Active(my_context ctx)
 
   // everyone has to commit/ack before we are truly active
   ps->blocked_by.clear();
-  for (set<pg_shard_t>::iterator p = ps->acting_recovery_backfill.begin();
+  for (auto p = ps->acting_recovery_backfill.begin();
        p != ps->acting_recovery_backfill.end();
        ++p) {
     if (p->shard != ps->pg_whoami.shard) {
@@ -5704,6 +6037,7 @@ boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt)
     // if so, request pg_temp change to trigger a new interval transition
     pg_shard_t auth_log_shard;
     bool history_les_bound = false;
+    // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
     ps->choose_acting(auth_log_shard, false, &history_les_bound, true);
     if (!ps->want_acting.empty() && ps->want_acting != ps->acting) {
       psdout(10) << "Active: got notify from previous acting member "
@@ -5760,7 +6094,7 @@ boost::statechart::result PeeringState::Active::react(const MLogRec& logevt)
   psdout(10) << "searching osd." << logevt.from
                     << " log for unfound items" << dendl;
   ps->proc_replica_log(
-    logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
+    logevt.msg->info, logevt.msg->log, std::move(logevt.msg->missing), logevt.from);
   bool got_missing = ps->search_for_missing(
     ps->peer_info[logevt.from],
     ps->peer_missing[logevt.from],
@@ -5783,7 +6117,7 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q)
 
   {
     q.f->open_array_section("might_have_unfound");
-    for (set<pg_shard_t>::iterator p = ps->might_have_unfound.begin();
+    for (auto p = ps->might_have_unfound.begin();
         p != ps->might_have_unfound.end();
         ++p) {
       q.f->open_object_section("osd");
@@ -5804,7 +6138,7 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q)
   {
     q.f->open_object_section("recovery_progress");
     q.f->open_array_section("backfill_targets");
-    for (set<pg_shard_t>::const_iterator p = ps->backfill_targets.begin();
+    for (auto p = ps->backfill_targets.begin();
         p != ps->backfill_targets.end(); ++p)
       q.f->dump_stream("replica") << *p;
     q.f->close_section();
@@ -5816,6 +6150,14 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::Active::react(const QueryUnfound& q)
+{
+  DECLARE_LOCALS;
+
+  ps->query_unfound(q.f, "Active");
+  return discard_event();
+}
+
 boost::statechart::result PeeringState::Active::react(
   const ActivateCommitted &evt)
 {
@@ -5864,7 +6206,7 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated
        pl->set_not_ready_to_merge_source(pgid);
       }
     }
-  } else if (ps->acting.size() < ps->pool.info.min_size) {
+  } else if (!ps->acting_set_writeable()) {
     ps->state_set(PG_STATE_PEERED);
   } else {
     ps->state_set(PG_STATE_ACTIVE);
@@ -5935,15 +6277,14 @@ void PeeringState::Active::all_activated_and_committed()
   ceph_assert(!ps->acting_recovery_backfill.empty());
   ceph_assert(ps->blocked_by.empty());
 
-  if (HAVE_FEATURE(ps->upacting_features, SERVER_OCTOPUS)) {
-    // this is overkill when the activation is quick, but when it is slow it
-    // is important, because the lease was renewed by the activate itself but we
-    // don't know how long ago that was, and simply scheduling now may leave
-    // a gap in lease coverage.  keep it simple and aggressively renew.
-    ps->renew_lease(pl->get_mnow());
-    ps->send_lease();
-    ps->schedule_renew_lease();
-  }
+  assert(HAVE_FEATURE(ps->upacting_features, SERVER_OCTOPUS));
+  // this is overkill when the activation is quick, but when it is slow it
+  // is important, because the lease was renewed by the activate itself but we
+  // don't know how long ago that was, and simply scheduling now may leave
+  // a gap in lease coverage.  keep it simple and aggressively renew.
+  ps->renew_lease(pl->get_mnow());
+  ps->send_lease();
+  ps->schedule_renew_lease();
 
   // Degraded?
   ps->update_calc_stats();
@@ -6023,7 +6364,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(
     {}, /* lease */
     ps->get_lease_ack());
 
-  if (ps->acting.size() >= ps->pool.info.min_size) {
+  if (ps->acting_set_writeable()) {
     ps->state_set(PG_STATE_ACTIVE);
   } else {
     ps->state_set(PG_STATE_PEERED);
@@ -6042,7 +6383,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l)
   ps->proc_lease(l.lease);
   pl->send_cluster_message(
     ps->get_primary().osd,
-    new MOSDPGLeaseAck(epoch,
+    TOPNSPC::make_message<MOSDPGLeaseAck>(epoch,
                       spg_t(spgid.pgid, ps->get_primary().shard),
                       ps->get_lease_ack()),
     epoch);
@@ -6062,7 +6403,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLogRec& loge
   DECLARE_LOCALS;
   psdout(10) << "received log from " << logevt.from << dendl;
   ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
-  ps->merge_log(t, logevt.msg->info, logevt.msg->log, logevt.from);
+  ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from);
   ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
   if (logevt.msg->lease) {
     ps->proc_lease(*logevt.msg->lease);
@@ -6115,6 +6456,13 @@ boost::statechart::result PeeringState::ReplicaActive::react(const QueryState& q
   return forward_event();
 }
 
+boost::statechart::result PeeringState::ReplicaActive::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "ReplicaActive");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::ReplicaActive::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -6168,7 +6516,7 @@ boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt)
 
     ps->pg_log.reset_backfill();
   } else {
-    ps->merge_log(t, msg->info, msg->log, logevt.from);
+    ps->merge_log(t, msg->info, std::move(msg->log), logevt.from);
   }
   if (logevt.msg->lease) {
     ps->proc_lease(*logevt.msg->lease);
@@ -6253,7 +6601,7 @@ void PeeringState::ToDelete::exit()
   context< PeeringMachine >().log_exit(state_name, enter_time);
   DECLARE_LOCALS;
   // note: on a successful removal, this path doesn't execute. see
-  // _delete_some().
+  // do_delete_work().
   pl->get_perf_logger().dec(l_osd_pg_removing);
 
   pl->cancel_local_background_io_reservation();
@@ -6272,11 +6620,11 @@ PeeringState::WaitDeleteReserved::WaitDeleteReserved(my_context ctx)
   pl->cancel_local_background_io_reservation();
   pl->request_local_background_io_reservation(
     context<ToDelete>().priority,
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       DeleteReserved()),
-    std::make_shared<PGPeeringEvent>(
+    std::make_unique<PGPeeringEvent>(
       ps->get_osdmap_epoch(),
       ps->get_osdmap_epoch(),
       DeleteInterrupted()));
@@ -6305,6 +6653,7 @@ PeeringState::Deleting::Deleting(my_context ctx)
     NamedState(context< PeeringMachine >().state_history, "Started/ToDelete/Deleting")
 {
   context< PeeringMachine >().log_enter(state_name);
+
   DECLARE_LOCALS;
   ps->deleting = true;
   ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
@@ -6325,8 +6674,11 @@ boost::statechart::result PeeringState::Deleting::react(
   const DeleteSome& evt)
 {
   DECLARE_LOCALS;
-  pl->do_delete_work(context<PeeringMachine>().get_cur_transaction());
-  return discard_event();
+  std::pair<ghobject_t, bool> p;
+  p = pl->do_delete_work(context<PeeringMachine>().get_cur_transaction(),
+    next);
+  next = p.first;
+  return p.second ? discard_event() : terminate();
 }
 
 void PeeringState::Deleting::exit()
@@ -6354,10 +6706,10 @@ PeeringState::GetInfo::GetInfo(my_context ctx)
 
   prior_set = ps->build_prior();
   ps->prior_readable_down_osds = prior_set.down;
+
   if (ps->prior_readable_down_osds.empty()) {
-    psdout(10) << " no prior_set down osds, clearing prior_readable_until_ub"
+    psdout(10) << " no prior_set down osds, will clear prior_readable_until_ub before activating"
               << dendl;
-    ps->clear_prior_readable_until_ub();
   }
 
   ps->reset_min_peer_features();
@@ -6375,9 +6727,7 @@ void PeeringState::GetInfo::get_infos()
   PastIntervals::PriorSet &prior_set = context< Peering >().prior_set;
 
   ps->blocked_by.clear();
-  for (set<pg_shard_t>::const_iterator it = prior_set.probe.begin();
-       it != prior_set.probe.end();
-       ++it) {
+  for (auto it = prior_set.probe.begin(); it != prior_set.probe.end(); ++it) {
     pg_shard_t peer = *it;
     if (peer == ps->pg_whoami) {
       continue;
@@ -6414,7 +6764,7 @@ boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt
 
   DECLARE_LOCALS;
 
-  set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);
+  auto p = peer_info_requested.find(infoevt.from);
   if (p != peer_info_requested.end()) {
     peer_info_requested.erase(p);
     ps->blocked_by.erase(infoevt.from.osd);
@@ -6433,7 +6783,7 @@ boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt
       // filter out any osds that got dropped from the probe set from
       // peer_info_requested.  this is less expensive than restarting
       // peering (which would re-probe everyone).
-      set<pg_shard_t>::iterator p = peer_info_requested.begin();
+      auto p = peer_info_requested.begin();
       while (p != peer_info_requested.end()) {
        if (prior_set.probe.count(*p) == 0) {
          psdout(20) << " dropping osd." << *p << " from info_requested, no longer in probe set" << dendl;
@@ -6467,7 +6817,7 @@ boost::statechart::result PeeringState::GetInfo::react(const QueryState& q)
   q.f->dump_stream("enter_time") << enter_time;
 
   q.f->open_array_section("requested_info_from");
-  for (set<pg_shard_t>::iterator p = peer_info_requested.begin();
+  for (auto p = peer_info_requested.begin();
        p != peer_info_requested.end();
        ++p) {
     q.f->open_object_section("osd");
@@ -6485,6 +6835,13 @@ boost::statechart::result PeeringState::GetInfo::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::GetInfo::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "GetInfo");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::GetInfo::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -6538,7 +6895,7 @@ PeeringState::GetLog::GetLog(my_context ctx)
   // how much log to request?
   eversion_t request_log_from = ps->info.last_update;
   ceph_assert(!ps->acting_recovery_backfill.empty());
-  for (set<pg_shard_t>::iterator p = ps->acting_recovery_backfill.begin();
+  for (auto p = ps->acting_recovery_backfill.begin();
        p != ps->acting_recovery_backfill.end();
        ++p) {
     if (*p == ps->pg_whoami) continue;
@@ -6602,7 +6959,7 @@ boost::statechart::result PeeringState::GetLog::react(const GotLog&)
   if (msg) {
     psdout(10) << "processing master log" << dendl;
     ps->proc_master_log(context<PeeringMachine>().get_cur_transaction(),
-                       msg->info, msg->log, msg->missing,
+                       msg->info, std::move(msg->log), std::move(msg->missing),
                        auth_log_shard);
   }
   ps->start_flush(context< PeeringMachine >().get_cur_transaction());
@@ -6619,6 +6976,13 @@ boost::statechart::result PeeringState::GetLog::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::GetLog::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "GetLog");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::GetLog::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -6643,7 +7007,7 @@ boost::statechart::result PeeringState::WaitActingChange::react(const AdvMap& ad
   OSDMapRef osdmap = advmap.osdmap;
 
   psdout(10) << "verifying no want_acting " << ps->want_acting << " targets didn't go down" << dendl;
-  for (vector<int>::iterator p = ps->want_acting.begin(); p != ps->want_acting.end(); ++p) {
+  for (auto p = ps->want_acting.begin(); p != ps->want_acting.end(); ++p) {
     if (!osdmap->is_up(*p)) {
       psdout(10) << " want_acting target osd." << *p << " went down, resetting" << dendl;
       post_event(advmap);
@@ -6681,6 +7045,13 @@ boost::statechart::result PeeringState::WaitActingChange::react(const QueryState
   return forward_event();
 }
 
+boost::statechart::result PeeringState::WaitActingChange::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "WaitActingChange");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::WaitActingChange::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -6730,6 +7101,13 @@ boost::statechart::result PeeringState::Down::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::Down::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "Down");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 boost::statechart::result PeeringState::Down::react(const MNotifyRec& infoevt)
 {
   DECLARE_LOCALS;
@@ -6807,6 +7185,13 @@ boost::statechart::result PeeringState::Incomplete::react(
   return forward_event();
 }
 
+boost::statechart::result PeeringState::Incomplete::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "Incomplete");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::Incomplete::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -6831,7 +7216,7 @@ PeeringState::GetMissing::GetMissing(my_context ctx)
   ps->log_weirdness();
   ceph_assert(!ps->acting_recovery_backfill.empty());
   eversion_t since;
-  for (set<pg_shard_t>::iterator i = ps->acting_recovery_backfill.begin();
+  for (auto i = ps->acting_recovery_backfill.begin();
        i != ps->acting_recovery_backfill.end();
        ++i) {
     if (*i == ps->get_primary()) continue;
@@ -6914,7 +7299,10 @@ boost::statechart::result PeeringState::GetMissing::react(const MLogRec& logevt)
   DECLARE_LOCALS;
 
   peer_missing_requested.erase(logevt.from);
-  ps->proc_replica_log(logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
+  ps->proc_replica_log(logevt.msg->info,
+                      logevt.msg->log,
+                      std::move(logevt.msg->missing),
+                      logevt.from);
 
   if (peer_missing_requested.empty()) {
     if (ps->need_up_thru) {
@@ -6938,7 +7326,7 @@ boost::statechart::result PeeringState::GetMissing::react(const QueryState& q)
   q.f->dump_stream("enter_time") << enter_time;
 
   q.f->open_array_section("peer_missing_requested");
-  for (set<pg_shard_t>::iterator p = peer_missing_requested.begin();
+  for (auto p = peer_missing_requested.begin();
        p != peer_missing_requested.end();
        ++p) {
     q.f->open_object_section("osd");
@@ -6956,6 +7344,13 @@ boost::statechart::result PeeringState::GetMissing::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::GetMissing::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "GetMising");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::GetMissing::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -6987,7 +7382,7 @@ boost::statechart::result PeeringState::WaitUpThru::react(const MLogRec& logevt)
 {
   DECLARE_LOCALS;
   psdout(10) << "Noting missing from osd." << logevt.from << dendl;
-  ps->peer_missing[logevt.from].claim(logevt.msg->missing);
+  ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing));
   ps->peer_info[logevt.from] = logevt.msg->info;
   return discard_event();
 }
@@ -7002,6 +7397,13 @@ boost::statechart::result PeeringState::WaitUpThru::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::WaitUpThru::react(const QueryUnfound& q)
+{
+  q.f->dump_string("state", "WaitUpThru");
+  q.f->dump_bool("available_might_have_unfound", false);
+  return discard_event();
+}
+
 void PeeringState::WaitUpThru::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -7090,3 +7492,45 @@ ostream &operator<<(ostream &out, const PeeringState &ps) {
   }
   return out;
 }
+
+std::vector<pg_shard_t> PeeringState::get_replica_recovery_order() const
+{
+  std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
+    async_by_num_missing;
+  replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1);
+  for (auto &p : get_acting_recovery_backfill()) {
+    if (p == get_primary()) {
+      continue;
+    }
+    auto pm = get_peer_missing().find(p);
+    assert(pm != get_peer_missing().end());
+    auto nm = pm->second.num_missing();
+    if (nm != 0) {
+      if (is_async_recovery_target(p)) {
+       async_by_num_missing.push_back(make_pair(nm, p));
+      } else {
+       replicas_by_num_missing.push_back(make_pair(nm, p));
+      }
+    }
+  }
+  // sort by number of missing objects, in ascending order.
+  auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
+                const std::pair<unsigned int, pg_shard_t> &rhs) {
+    return lhs.first < rhs.first;
+  };
+  // acting goes first
+  std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
+  // then async_recovery_targets
+  std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
+  replicas_by_num_missing.insert(replicas_by_num_missing.end(),
+    async_by_num_missing.begin(), async_by_num_missing.end());
+
+  std::vector<pg_shard_t> ret;
+  ret.reserve(replicas_by_num_missing.size());
+  for (auto p : replicas_by_num_missing) {
+    ret.push_back(p.second);
+  }
+  return ret;
+}
+
+