]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PG.cc
update sources to v12.2.5
[ceph.git] / ceph / src / osd / PG.cc
index e753310534bd86bac4155092a9de1d1f30451655..1b5107f7be71ed0fac08af3c460f0a16307021d5 100644 (file)
@@ -55,6 +55,8 @@
 #include "messages/MOSDSubOpReply.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDRepScrubMap.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
 
 #include "common/BackTrace.h"
 #include "common/EventTrace.h"
@@ -223,6 +225,7 @@ void PG::dump_live_ids()
 }
 #endif
 
+
 void PGPool::update(OSDMapRef map)
 {
   const pg_pool_t *pi = map->get_pg_pool(id);
@@ -294,7 +297,8 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   dirty_info(false), dirty_big_info(false),
   info(p),
   info_struct_v(0),
-  coll(p), pg_log(cct),
+  coll(p),
+  pg_log(cct),
   pgmeta_oid(p.make_pgmeta_oid()),
   missing_loc(this),
   past_intervals(
@@ -572,9 +576,13 @@ bool PG::search_for_missing(
 bool PG::MissingLoc::readable_with_acting(
   const hobject_t &hoid,
   const set<pg_shard_t> &acting) const {
-  if (!needs_recovery(hoid)) return true;
+  if (!needs_recovery(hoid))
+    return true;
+  if (is_deleted(hoid))
+    return false;
   auto missing_loc_entry = missing_loc.find(hoid);
-  if (missing_loc_entry == missing_loc.end()) return false;
+  if (missing_loc_entry == missing_loc.end())
+    return false;
   const set<pg_shard_t> &locs = missing_loc_entry->second;
   ldout(pg->cct, 10) << __func__ << ": locs:" << locs << dendl;
   set<pg_shard_t> have_acting;
@@ -600,6 +608,8 @@ void PG::MissingLoc::add_batch_sources_info(
       handle->reset_tp_timeout();
       loop = 0;
     }
+    if (i->second.is_delete())
+      continue;
     missing_loc[i->first].insert(sources.begin(), sources.end());
     missing_loc_sources.insert(sources.begin(), sources.end());
   }
@@ -623,6 +633,12 @@ bool PG::MissingLoc::add_source_info(
       handle->reset_tp_timeout();
       loop = 0;
     }
+    if (p->second.is_delete()) {
+      ldout(pg->cct, 10) << __func__ << " " << soid
+                        << " delete, ignoring source" << dendl;
+      found_missing = true;
+      continue;
+    }
     if (oinfo.last_update < need) {
       ldout(pg->cct, 10) << "search_for_missing " << soid << " " << need
                         << " also missing on osd." << fromosd
@@ -994,7 +1010,7 @@ void PG::clear_primary_state()
 PG::Scrubber::Scrubber()
  : reserved(false), reserve_failed(false),
    epoch_start(0),
-   active(false), queue_snap_trim(false),
+   active(false),
    waiting_on(0), shallow_errors(0), deep_errors(0), fixed(0),
    must_scrub(false), must_deep_scrub(false), must_repair(false),
    auto_repair(false),
@@ -1628,6 +1644,7 @@ void PG::activate(ObjectStore::Transaction& t,
       dout(10) << "activate peer osd." << peer << " " << pi << dendl;
 
       MOSDPGLog *m = 0;
+      assert(peer_missing.count(peer));
       pg_missing_t& pm = peer_missing[peer];
 
       bool needs_past_intervals = pi.dne();
@@ -1677,7 +1694,7 @@ void PG::activate(ObjectStore::Transaction& t,
         * behind.
         */
        // backfill
-       osd->clog->info() << info.pgid << " starting backfill to osd." << peer
+       osd->clog->debug() << info.pgid << " starting backfill to osd." << peer
                         << " from (" << pi.log_tail << "," << pi.last_update
                          << "] " << pi.last_backfill
                         << " to " << info.last_update;
@@ -1724,12 +1741,18 @@ void PG::activate(ObjectStore::Transaction& t,
       if (m && pi.last_backfill != hobject_t()) {
         for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
              p != m->log.log.end();
-             ++p)
+             ++p) {
          if (p->soid <= pi.last_backfill &&
-             !p->is_error())
-           pm.add_next_event(*p);
+             !p->is_error()) {
+           if (perform_deletes_during_peering() && p->is_delete()) {
+             pm.rm(p->soid, p->version);
+           } else {
+             pm.add_next_event(*p);
+           }
+         }
+       }
       }
-      
+
       if (m) {
        dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
        //m->log.print(cout);
@@ -1753,6 +1776,7 @@ void PG::activate(ObjectStore::Transaction& t,
     for (set<pg_shard_t>::iterator i = actingbackfill.begin();
         i != actingbackfill.end();
         ++i) {
+      dout(20) << __func__ << " setting up missing_loc from shard " << *i << " " << dendl;
       if (*i == get_primary()) {
        missing_loc.add_active_missing(missing);
         if (!missing.have_missing())
@@ -1809,14 +1833,13 @@ void PG::activate(ObjectStore::Transaction& t,
 
       build_might_have_unfound();
 
-      state_set(PG_STATE_DEGRADED);
       if (have_unfound())
        discover_all_missing(query_map);
     }
 
-    // degraded?
+    // num_objects_degraded if calculated should reflect this too, unless no
+    // missing and we are about to go clean.
     if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) {
-      state_set(PG_STATE_DEGRADED);
       state_set(PG_STATE_UNDERSIZED);
     }
 
@@ -1856,8 +1879,10 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
                             op->need_write_cap(),
                             op->classes());
 
-  dout(20) << "op_has_sufficient_caps pool=" << pool.id << " (" << pool.name
-                  << " " << req->get_hobj().nspace
+  dout(20) << "op_has_sufficient_caps "
+           << "session=" << session
+           << " pool=" << pool.id << " (" << pool.name
+           << " " << req->get_hobj().nspace
           << ") owner=" << pool.auid
           << " need_read_cap=" << op->need_read_cap()
           << " need_write_cap=" << op->need_write_cap()
@@ -1906,6 +1931,12 @@ void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
     // waiters
     if (flushes_in_progress == 0) {
       requeue_ops(waiting_for_peered);
+    } else if (!waiting_for_peered.empty()) {
+      dout(10) << __func__ << " flushes in progress, moving "
+              << waiting_for_peered.size() << " items to waiting_for_flush"
+              << dendl;
+      assert(waiting_for_flush.empty());
+      waiting_for_flush.swap(waiting_for_peered);
     }
   }
 
@@ -1927,6 +1958,14 @@ void PG::all_activated_and_committed()
   assert(!actingbackfill.empty());
   assert(blocked_by.empty());
 
+  // Degraded?
+  _update_calc_stats();
+  if (info.stats.stats.sum.num_objects_degraded) {
+    state_set(PG_STATE_DEGRADED);
+  } else {
+    state_clear(PG_STATE_DEGRADED);
+  }
+
   queue_peering_event(
     CephPeeringEvtRef(
       std::make_shared<CephPeeringEvt>(
@@ -1949,7 +1988,7 @@ bool PG::requeue_scrub(bool high_priority)
   }
 }
 
-void PG::queue_recovery(bool front)
+void PG::queue_recovery()
 {
   if (!is_primary() || !is_peered()) {
     dout(10) << "queue_recovery -- not primary or not peered " << dendl;
@@ -1959,7 +1998,7 @@ void PG::queue_recovery(bool front)
   } else {
     dout(10) << "queue_recovery -- queuing" << dendl;
     recovery_queued = true;
-    osd->queue_for_recovery(this, front);
+    osd->queue_for_recovery(this);
   }
 }
 
@@ -2003,80 +2042,88 @@ struct C_PG_FinishRecovery : public Context {
 
 void PG::mark_clean()
 {
-  // only mark CLEAN if we have the desired number of replicas AND we
-  // are not remapped.
-  if (actingset.size() == get_osdmap()->get_pg_size(info.pgid.pgid) &&
-      up == acting)
+  if (actingset.size() == get_osdmap()->get_pg_size(info.pgid.pgid)) {
+    state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY);
     state_set(PG_STATE_CLEAN);
-
-  // NOTE: this is actually a bit premature: we haven't purged the
-  // strays yet.
-  info.history.last_epoch_clean = get_osdmap()->get_epoch();
-  info.history.last_interval_clean = info.history.same_interval_since;
-
-  past_intervals.clear();
-  dirty_big_info = true;
-
-  if (is_active()) {
-    /* The check is needed because if we are below min_size we're not
-     * actually active */
-    kick_snap_trim();
+    info.history.last_epoch_clean = get_osdmap()->get_epoch();
+    info.history.last_interval_clean = info.history.same_interval_since;
+    past_intervals.clear();
+    dirty_big_info = true;
+    dirty_info = true;
   }
 
-  dirty_info = true;
+  kick_snap_trim();
 }
 
-unsigned PG::get_recovery_priority()
+void PG::_change_recovery_force_mode(int new_mode, bool clear)
 {
-  // a higher value -> a higher priority
-
-  int pool_recovery_priority = 0;
-  pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
+  if (!deleting) {
+    // we can't and shouldn't do anything if the PG is being deleted locally
+    if (clear) {
+      state_clear(new_mode);
+    } else {
+      state_set(new_mode);
+    }
+    publish_stats_to_osd();
+  }
+}
 
-  int ret = OSD_RECOVERY_PRIORITY_BASE + pool_recovery_priority;
+inline int PG::clamp_recovery_priority(int priority)
+{
+  static_assert(OSD_RECOVERY_PRIORITY_MIN < OSD_RECOVERY_PRIORITY_MAX, "Invalid priority range");
+  static_assert(OSD_RECOVERY_PRIORITY_MIN >= 0, "Priority range must match unsigned type");
 
   // Clamp to valid range
-  if (ret > OSD_RECOVERY_PRIORITY_MAX) {
-    ret = OSD_RECOVERY_PRIORITY_MAX;
-  } else if (ret < OSD_RECOVERY_PRIORITY_MIN) {
-    ret = OSD_RECOVERY_PRIORITY_MIN;
+  if (priority > OSD_RECOVERY_PRIORITY_MAX) {
+    return OSD_RECOVERY_PRIORITY_MAX;
+  } else if (priority < OSD_RECOVERY_PRIORITY_MIN) {
+    return OSD_RECOVERY_PRIORITY_MIN;
+  } else {
+    return priority;
   }
+}
 
-  static_assert(OSD_RECOVERY_PRIORITY_MIN < OSD_RECOVERY_PRIORITY_MAX, "Invalid priority range");
-  static_assert(OSD_RECOVERY_PRIORITY_MIN >= 0, "Priority range must match unsigned type");
+unsigned PG::get_recovery_priority()
+{
+  // a higher value -> a higher priority
+  int ret = 0;
 
+  if (state & PG_STATE_FORCED_RECOVERY) {
+    ret = OSD_RECOVERY_PRIORITY_FORCED;
+  } else {
+    pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &ret);
+    ret = clamp_recovery_priority(OSD_RECOVERY_PRIORITY_BASE + ret);
+  }
+  dout(20) << __func__ << " recovery priority for " << *this << " is " << ret << ", state is " << state << dendl;
   return static_cast<unsigned>(ret);
 }
 
 unsigned PG::get_backfill_priority()
 {
   // a higher value -> a higher priority
-
   int ret = OSD_BACKFILL_PRIORITY_BASE;
-  if (acting.size() < pool.info.min_size) {
-    // inactive: no. of replicas < min_size, highest priority since it blocks IO
-    ret = OSD_BACKFILL_INACTIVE_PRIORITY_BASE + (pool.info.min_size - acting.size());
+  if (state & PG_STATE_FORCED_BACKFILL) {
+    ret = OSD_RECOVERY_PRIORITY_FORCED;
+  } else {
+    if (acting.size() < pool.info.min_size) {
+      // inactive: no. of replicas < min_size, highest priority since it blocks IO
+      ret = OSD_BACKFILL_INACTIVE_PRIORITY_BASE + (pool.info.min_size - acting.size());
 
-  } else if (is_undersized()) {
-    // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
-    assert(pool.info.size > actingset.size());
-    ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE + (pool.info.size - actingset.size());
+    } else if (is_undersized()) {
+      // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
+      assert(pool.info.size > actingset.size());
+      ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE + (pool.info.size - actingset.size());
 
-  } else if (is_degraded()) {
-    // degraded: baseline degraded
-    ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE;
-  }
+    } else if (is_degraded()) {
+      // degraded: baseline degraded
+      ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE;
+    }
 
-  // Adjust with pool's recovery priority
-  int pool_recovery_priority = 0;
-  pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
-  ret += pool_recovery_priority;
+    // Adjust with pool's recovery priority
+    int pool_recovery_priority = 0;
+    pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
 
-  // Clamp to valid range
-  if (ret > OSD_RECOVERY_PRIORITY_MAX) {
-    ret = OSD_RECOVERY_PRIORITY_MAX;
-  } else if (ret < OSD_RECOVERY_PRIORITY_MIN) {
-    ret = OSD_RECOVERY_PRIORITY_MIN;
+    ret = clamp_recovery_priority(pool_recovery_priority + ret);
   }
 
   return static_cast<unsigned>(ret);
@@ -2196,6 +2243,10 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
     // in the future).
     info.set_last_backfill(hobject_t());
     child->info.set_last_backfill(hobject_t());
+    // restarting backfill implies that the missing set is empty,
+    // since it is only used for objects prior to last_backfill
+    pg_log.reset_backfill();
+    child->pg_log.reset_backfill();
   }
 
   child->info.stats = info.stats;
@@ -2528,99 +2579,150 @@ void PG::_update_calc_stats()
   info.stats.ondisk_log_size = info.stats.log_size;
   info.stats.log_start = pg_log.get_tail();
   info.stats.ondisk_log_start = pg_log.get_tail();
+  info.stats.snaptrimq_len = snap_trimq.size();
 
-  // If actingset is larger then upset we will have misplaced,
-  // so we will report based on actingset size.
-
-  // If upset is larger then we will have degraded,
-  // so we will report based on upset size.
-
-  // If target is the largest of them all, it will contribute to
-  // the degraded count because num_object_copies is
-  // computed using target and eventual used to get degraded total.
+  unsigned num_shards = get_osdmap()->get_pg_size(info.pgid.pgid);
 
-  unsigned target = get_osdmap()->get_pg_size(info.pgid.pgid);
-  unsigned nrep = MAX(actingset.size(), upset.size());
+  // In rare case that upset is too large (usually transient), use as target
+  // for calculations below.
+  unsigned target = std::max(num_shards, (unsigned)upset.size());
+  // Not sure this could ever happen, that actingset > upset
+  // which only matters if actingset > num_shards.
+  unsigned nrep = std::max(actingset.size(), upset.size());
   // calc num_object_copies
   info.stats.stats.calc_copies(MAX(target, nrep));
   info.stats.stats.sum.num_objects_degraded = 0;
   info.stats.stats.sum.num_objects_unfound = 0;
   info.stats.stats.sum.num_objects_misplaced = 0;
-  if ((is_degraded() || is_undersized() || !is_clean()) && is_peered()) {
-    // NOTE: we only generate copies, degraded, misplaced and unfound
+  if ((is_remapped() || is_undersized() || !is_clean()) && (is_peered() || is_activating())) {
+    dout(20) << __func__ << " actingset " << actingset << " upset "
+             << upset << " actingbackfill " << actingbackfill << dendl;
+    dout(20) << __func__ << " acting " << acting << " up " << up << dendl;
+
+    assert(!actingbackfill.empty());
+
+    // NOTE: we only generate degraded, misplaced and unfound
     // values for the summation, not individual stat categories.
     int64_t num_objects = info.stats.stats.sum.num_objects;
 
-    // Total sum of all missing
-    int64_t missing = 0;
-    // Objects that have arrived backfilled to up OSDs (not in acting)
-    int64_t backfilled = 0;
-    // A misplaced object is not stored on the correct OSD
-    int64_t misplaced = 0;
-    // Total of object copies/shards found
-    int64_t object_copies = 0;
-
-    // num_objects_missing on each peer
-    for (map<pg_shard_t, pg_info_t>::iterator pi =
-        peer_info.begin();
-        pi != peer_info.end();
-        ++pi) {
-      map<pg_shard_t, pg_missing_t>::const_iterator pm =
-        peer_missing.find(pi->first);
-      if (pm != peer_missing.end()) {
-        pi->second.stats.stats.sum.num_objects_missing =
-          pm->second.num_missing();
-      }
+    // Objects missing from up nodes, sorted by # objects.
+    boost::container::flat_set<pair<int64_t,pg_shard_t>> missing_target_objects;
+    // Objects missing from nodes not in up, sort by # objects
+    boost::container::flat_set<pair<int64_t,pg_shard_t>> acting_source_objects;
+
+    int64_t missing;
+
+    // Primary first
+    missing = pg_log.get_missing().num_missing();
+    assert(actingbackfill.count(pg_whoami));
+    if (upset.count(pg_whoami)) {
+      missing_target_objects.insert(make_pair(missing, pg_whoami));
+    } else {
+      acting_source_objects.insert(make_pair(missing, pg_whoami));
     }
+    info.stats.stats.sum.num_objects_missing_on_primary = missing;
 
-    assert(!actingbackfill.empty());
-    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
-        i != actingbackfill.end();
-        ++i) {
-      const pg_shard_t &p = *i;
-
-      bool in_up = (upset.find(p) != upset.end());
-      bool in_acting = (actingset.find(p) != actingset.end());
-      assert(in_up || in_acting);
-
-      // in acting                  Compute total objects excluding num_missing
-      // in acting and not in up    Compute misplaced objects excluding num_missing
-      // in up and not in acting    Compute total objects already backfilled
-      if (in_acting) {
-        unsigned osd_missing;
-        // primary handling
-        if (p == pg_whoami) {
-          osd_missing = pg_log.get_missing().num_missing();
-          info.stats.stats.sum.num_objects_missing_on_primary =
-              osd_missing;
-          object_copies += num_objects; // My local (primary) count
-        } else {
-          assert(peer_missing.count(p));
-          osd_missing = peer_missing[p].num_missing();
-          object_copies += peer_info[p].stats.stats.sum.num_objects;
+    // All other peers
+    for (auto& peer : peer_info) {
+      // Ignore other peers until we add code to look at detailed missing
+      // information. (recovery)
+      if (!actingbackfill.count(peer.first)) {
+       continue;
+      }
+      missing = 0;
+      // Backfill targets always track num_objects accurately
+      // all other peers track missing accurately.
+      if (is_backfill_targets(peer.first)) {
+       missing = std::max((int64_t)0, num_objects - peer.second.stats.stats.sum.num_objects);
+      } else {
+       if (peer_missing.count(peer.first)) {
+         missing = peer_missing[peer.first].num_missing();
+       } else {
+         dout(20) << __func__ << " no peer_missing found for " << peer.first << dendl;
         }
-        missing += osd_missing;
-        // Count non-missing objects not in up as misplaced
-        if (!in_up && num_objects > osd_missing)
-         misplaced += num_objects - osd_missing;
+      }
+      if (upset.count(peer.first)) {
+       missing_target_objects.insert(make_pair(missing, peer.first));
       } else {
-        assert(in_up && !in_acting);
+       acting_source_objects.insert(make_pair(missing, peer.first));
+      }
+      peer.second.stats.stats.sum.num_objects_missing = missing;
+    }
 
-        // If this peer has more objects then it should, ignore them
-        backfilled += MIN(num_objects, peer_info[p].stats.stats.sum.num_objects);
+    if (pool.info.is_replicated()) {
+      // Add to missing_target_objects up to target elements (num_objects missing)
+      assert(target >= missing_target_objects.size());
+      unsigned needed = target - missing_target_objects.size();
+      for (; needed; --needed)
+       missing_target_objects.insert(make_pair(num_objects, pg_shard_t(pg_shard_t::NO_OSD)));
+    } else {
+      for (unsigned i = 0 ; i < num_shards; ++i) {
+        shard_id_t shard(i);
+       bool found = false;
+       for (const auto& t : missing_target_objects) {
+         if (std::get<1>(t).shard == shard) {
+           found = true;
+           break;
+         }
+       }
+       if (!found)
+         missing_target_objects.insert(make_pair(num_objects, pg_shard_t(pg_shard_t::NO_OSD,shard)));
       }
     }
 
-    // Any objects that have been backfilled to up OSDs can deducted from misplaced
-    misplaced = MAX(0, misplaced - backfilled);
+    for (const auto& item : missing_target_objects)
+      dout(20) << __func__ << " missing shard " << std::get<1>(item) << " missing= " << std::get<0>(item) << dendl;
+    for (const auto& item : acting_source_objects)
+      dout(20) << __func__ << " acting shard " << std::get<1>(item) << " missing= " << std::get<0>(item) << dendl;
+
+    // A misplaced object is not stored on the correct OSD
+    int64_t misplaced = 0;
+    // a degraded objects has fewer replicas or EC shards than the pool specifies.
+    int64_t degraded = 0;
+
+    for (auto m = missing_target_objects.rbegin();
+        m != missing_target_objects.rend(); ++m) {
+
+      int64_t extra_missing = -1;
 
-    // Deduct computed total missing on acting nodes
-    object_copies -= missing;
-    // Include computed backfilled objects on up nodes
-    object_copies += backfilled;
-    // a degraded objects has fewer replicas or EC shards than the
-    // pool specifies.  num_object_copies will never be smaller than target * num_copies.
-    int64_t degraded = MAX(0, info.stats.stats.sum.num_object_copies - object_copies);
+      if (pool.info.is_replicated()) {
+       if (!acting_source_objects.empty()) {
+         auto extra_copy = acting_source_objects.begin();
+         extra_missing = std::get<0>(*extra_copy);
+          acting_source_objects.erase(extra_copy);
+       }
+      } else { // Erasure coded
+       // Use corresponding shard
+       for (const auto& a : acting_source_objects) {
+         if (std::get<1>(a).shard == std::get<1>(*m).shard) {
+           extra_missing = std::get<0>(a);
+           acting_source_objects.erase(a);
+           break;
+         }
+       }
+      }
+
+      if (extra_missing >= 0 && std::get<0>(*m) >= extra_missing) {
+       // We don't know which of the objects on the target
+       // are part of extra_missing so assume are all degraded.
+       misplaced += std::get<0>(*m) - extra_missing;
+       degraded += extra_missing;
+      } else {
+       // 1. extra_missing == -1, more targets than sources so degraded
+       // 2. extra_missing > std::get<0>(m), so that we know that some extra_missing
+       //    previously degraded are now present on the target.
+       degraded += std::get<0>(*m);
+      }
+    }
+    // If there are still acting that haven't been accounted for
+    // then they are misplaced
+    for (const auto& a : acting_source_objects) {
+      int64_t extra_misplaced = std::max((int64_t)0, num_objects - std::get<0>(a));
+      dout(20) << __func__ << " extra acting misplaced " << extra_misplaced << dendl;
+      misplaced += extra_misplaced;
+    }
+    dout(20) << __func__ << " degraded " << degraded << dendl;
+    dout(20) << __func__ << " misplaced " << misplaced << dendl;
 
     info.stats.stats.sum.num_objects_degraded = degraded;
     info.stats.stats.sum.num_objects_unfound = get_num_unfound();
@@ -2684,6 +2786,11 @@ void PG::publish_stats_to_osd()
   }
 
   _update_calc_stats();
+  if (info.stats.stats.sum.num_objects_degraded) {
+    state_set(PG_STATE_DEGRADED);
+  } else {
+    state_clear(PG_STATE_DEGRADED);
+  }
   _update_blocked_by();
 
   bool publish = false;
@@ -3155,7 +3262,7 @@ void PG::append_log(
   auto last = logv.rbegin();
   if (is_primary() && last != logv.rend()) {
     projected_log.skip_can_rollback_to_to_head();
-    projected_log.trim(cct, last->version, nullptr);
+    projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
   }
 
   if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) {
@@ -3277,6 +3384,17 @@ void PG::read_state(ObjectStore *store, bufferlist &bl)
 
   last_written_info = info;
 
+  // if we are upgrading from jewel, we need to force rebuild of
+  // missing set.  v9 was fastinfo, added v11.0.2-331-g1d5dc29a13
+  // (before kraken).  persisted missing set was circa
+  // v11.0.0-866-gb0e239da95 (a bit earlier, also before kraken).
+  // v8 was pre-jewel (per-pg meta object).
+  bool force_rebuild_missing = info_struct_v < 9;
+  if (force_rebuild_missing) {
+    dout(10) << __func__ << " detected upgrade from jewel, force_rebuild_missing"
+            << dendl;
+  }
+
   ostringstream oss;
   pg_log.read_log_and_missing(
     store,
@@ -3284,11 +3402,18 @@ void PG::read_state(ObjectStore *store, bufferlist &bl)
     info_struct_v < 8 ? coll_t::meta() : coll,
     ghobject_t(info_struct_v < 8 ? OSD::make_pg_log_oid(pg_id) : pgmeta_oid),
     info,
+    force_rebuild_missing,
     oss,
     cct->_conf->osd_ignore_stale_divergent_priors,
     cct->_conf->osd_debug_verify_missing_on_start);
   if (oss.tellp())
-    osd->clog->error() << oss.rdbuf();
+    osd->clog->error() << oss.str();
+
+  if (force_rebuild_missing) {
+    dout(10) << __func__ << " forced rebuild of missing got "
+            << pg_log.get_missing()
+            << dendl;
+  }
 
   // log any weirdness
   log_weirdness();
@@ -3309,8 +3434,8 @@ void PG::log_weirdness()
     // sloppy check
     if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()))
       osd->clog->error() << info.pgid
-                       << " log bound mismatch, info (" << pg_log.get_tail() << ","
-                       << pg_log.get_head() << "]"
+                       << " log bound mismatch, info (tail,head] ("
+                       << pg_log.get_tail() << "," << pg_log.get_head() << "]"
                        << " actual ["
                        << pg_log.get_log().log.begin()->version << ","
                         << pg_log.get_log().log.rbegin()->version << "]";
@@ -3345,6 +3470,7 @@ void PG::update_snap_map(
        try {
          ::decode(snaps, p);
        } catch (...) {
+         derr << __func__ << " decode snaps failure on " << *i << dendl;
          snaps.clear();
        }
        set<snapid_t> _snaps(snaps.begin(), snaps.end());
@@ -3555,13 +3681,14 @@ bool PG::sched_scrub()
   bool ret = true;
   if (!scrubber.reserved) {
     assert(scrubber.reserved_peers.empty());
-    if (osd->inc_scrubs_pending()) {
-      dout(20) << "sched_scrub: reserved locally, reserving replicas" << dendl;
+    if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
+         osd->inc_scrubs_pending()) {
+      dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
       scrubber.reserved = true;
       scrubber.reserved_peers.insert(pg_whoami);
       scrub_reserve_replicas();
     } else {
-      dout(20) << "sched_scrub: failed to reserve locally" << dendl;
+      dout(20) << __func__ << ": failed to reserve locally" << dendl;
       ret = false;
     }
   }
@@ -3592,7 +3719,7 @@ bool PG::sched_scrub()
        } else {
          osd->clog->error() << "osd." << osd->whoami
                             << " pg " << info.pgid
-                            << " Regular scrub request, losing deep-scrub details";
+                            << " Regular scrub request, deep-scrub details will be lost";
        }
       }
       queue_scrub();
@@ -3741,7 +3868,13 @@ void PG::handle_scrub_reserve_request(OpRequestRef op)
             << dendl;
     return;
   }
-  scrubber.reserved = osd->inc_scrubs_pending();
+  if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
+      osd->inc_scrubs_pending()) {
+    scrubber.reserved = true;
+  } else {
+    dout(20) << __func__ << ": failed to reserve remotely" << dendl;
+    scrubber.reserved = false;
+  }
   if (op->get_req()->get_type() == MSG_OSD_SCRUB_RESERVE) {
     const MOSDScrubReserve *m =
       static_cast<const MOSDScrubReserve*>(op->get_req());
@@ -3815,21 +3948,21 @@ void PG::reject_reservation()
     get_osdmap()->get_epoch());
 }
 
-void PG::schedule_backfill_full_retry()
+void PG::schedule_backfill_retry(float delay)
 {
   Mutex::Locker lock(osd->recovery_request_lock);
   osd->recovery_request_timer.add_event_after(
-    cct->_conf->osd_backfill_retry_interval,
+    delay,
     new QueuePeeringEvt<RequestBackfill>(
       this, get_osdmap()->get_epoch(),
       RequestBackfill()));
 }
 
-void PG::schedule_recovery_full_retry()
+void PG::schedule_recovery_retry(float delay)
 {
   Mutex::Locker lock(osd->recovery_request_lock);
   osd->recovery_request_timer.add_event_after(
-    cct->_conf->osd_recovery_retry_interval,
+    delay,
     new QueuePeeringEvt<DoRecovery>(
       this, get_osdmap()->get_epoch(),
       DoRecovery()));
@@ -3959,6 +4092,9 @@ void PG::_scan_snaps(ScrubMap &smap)
        continue;
       }
       head = hoid.get_head();
+      // Make sure head_exists is correct for is_legacy() check
+      if (hoid.is_head())
+       snapset.head_exists = true;
       continue;
     }
     if (hoid.snap < CEPH_MAXSNAP) {
@@ -4024,16 +4160,77 @@ void PG::_scan_snaps(ScrubMap &smap)
                            << "...repaired";
        }
        snap_mapper.add_oid(hoid, obj_snaps, &_t);
-       r = osd->store->apply_transaction(osr.get(), std::move(t));
-       if (r != 0) {
-         derr << __func__ << ": apply_transaction got " << cpp_strerror(r)
-              << dendl;
+
+       // wait for repair to apply to avoid confusing other bits of the system.
+       {
+         Cond my_cond;
+         Mutex my_lock("PG::_scan_snaps my_lock");
+         int r = 0;
+         bool done;
+         t.register_on_applied_sync(
+           new C_SafeCond(&my_lock, &my_cond, &done, &r));
+         r = osd->store->apply_transaction(osr.get(), std::move(t));
+         if (r != 0) {
+           derr << __func__ << ": apply_transaction got " << cpp_strerror(r)
+                << dendl;
+         } else {
+           my_lock.Lock();
+           while (!done)
+             my_cond.Wait(my_lock);
+           my_lock.Unlock();
+         }
        }
       }
     }
   }
 }
 
+void PG::_repair_oinfo_oid(ScrubMap &smap)
+{
+  for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
+       i != smap.objects.rend();
+       ++i) {
+    const hobject_t &hoid = i->first;
+    ScrubMap::object &o = i->second;
+
+    bufferlist bl;
+    if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
+      continue;
+    }
+    bl.push_back(o.attrs[OI_ATTR]);
+    object_info_t oi;
+    try {
+      oi.decode(bl);
+    } catch(...) {
+      continue;
+    }
+    if (oi.soid != hoid) {
+      ObjectStore::Transaction t;
+      OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+      osd->clog->error() << "osd." << osd->whoami
+                           << " found object info error on pg "
+                           << info.pgid
+                           << " oid " << hoid << " oid in object info: "
+                           << oi.soid
+                           << "...repaired";
+      // Fix object info
+      oi.soid = hoid;
+      bl.clear();
+      ::encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+
+      bufferptr bp(bl.c_str(), bl.length());
+      o.attrs[OI_ATTR] = bp;
+
+      t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
+      int r = osd->store->apply_transaction(osr.get(), std::move(t));
+      if (r != 0) {
+       derr << __func__ << ": apply_transaction got " << cpp_strerror(r)
+            << dendl;
+      }
+    }
+  }
+}
+
 /*
  * build a scrub map over a chunk without releasing the lock
  * only used by chunky scrub
@@ -4066,6 +4263,7 @@ int PG::build_scrub_map_chunk(
   get_pgbackend()->be_scan_list(map, ls, deep, seed, handle);
   _scan_rollback_obs(rollback_obs, handle);
   _scan_snaps(map);
+  _repair_oinfo_oid(map);
 
   dout(20) << __func__ << " done" << dendl;
   return 0;
@@ -4100,9 +4298,16 @@ void PG::repair_object(
   eversion_t v;
   bufferlist bv;
   bv.push_back(po.attrs[OI_ATTR]);
-  object_info_t oi(bv);
+  object_info_t oi;
+  try {
+    bufferlist::iterator bliter = bv.begin();
+    ::decode(oi, bliter);
+  } catch (...) {
+    dout(0) << __func__ << ": Need version of replica, bad object_info_t: " << soid << dendl;
+    assert(0);
+  }
   if (bad_peer != primary) {
-    peer_missing[bad_peer].add(soid, oi.version, eversion_t());
+    peer_missing[bad_peer].add(soid, oi.version, eversion_t(), false);
   } else {
     // We should only be scrubbing if the PG is clean.
     assert(waiting_for_unreadable_object.empty());
@@ -4405,7 +4610,7 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
          const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
          stringstream oss;
          oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
-         osd->clog->info(oss);
+         osd->clog->debug(oss);
        }
 
        scrubber.seed = -1;
@@ -4611,6 +4816,11 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
         scrubber.state = PG::Scrubber::INACTIVE;
         done = true;
 
+       if (!snap_trimq.empty()) {
+         dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
+         snap_trimmer_scrub_complete();
+       }
+
         break;
 
       default:
@@ -4635,11 +4845,6 @@ void PG::scrub_clear_state()
 
   requeue_ops(waiting_for_scrub);
 
-  if (scrubber.queue_snap_trim) {
-    dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
-    snap_trimmer_scrub_complete();
-  }
-
   scrubber.reset();
 
   // type-specific state clear
@@ -4844,7 +5049,7 @@ void PG::scrub_finish()
     if (total_errors)
       osd->clog->error(oss);
     else
-      osd->clog->info(oss);
+      osd->clog->debug(oss);
   }
 
   // finish up
@@ -4941,25 +5146,41 @@ void PG::share_pg_info()
 
 bool PG::append_log_entries_update_missing(
   const mempool::osd_pglog::list<pg_log_entry_t> &entries,
-  ObjectStore::Transaction &t)
+  ObjectStore::Transaction &t, boost::optional<eversion_t> trim_to,
+  boost::optional<eversion_t> roll_forward_to)
 {
   assert(!entries.empty());
   assert(entries.begin()->version > info.last_update);
 
   PGLogEntryHandler rollbacker{this, &t};
+  if (roll_forward_to) {
+    pg_log.roll_forward(&rollbacker);
+  }
   bool invalidate_stats =
     pg_log.append_new_log_entries(info.last_backfill,
                                  info.last_backfill_bitwise,
                                  entries,
                                  &rollbacker);
+
+  if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) {
+    pg_log.roll_forward(&rollbacker);
+  }
+  if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) {
+    pg_log.roll_forward_to(*roll_forward_to, &rollbacker);
+    last_rollback_info_trimmed_to_applied = *roll_forward_to;
+  }
+
   info.last_update = pg_log.get_head();
 
   if (pg_log.get_missing().num_missing() == 0) {
     // advance last_complete since nothing else is missing!
     info.last_complete = info.last_update;
   }
-
   info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats;
+
+  dout(20) << __func__ << "trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl;
+  if (trim_to)
+    pg_log.trim(*trim_to, info);
   dirty_info = true;
   write_if_dirty(t);
   return invalidate_stats;
@@ -4968,12 +5189,14 @@ bool PG::append_log_entries_update_missing(
 
 void PG::merge_new_log_entries(
   const mempool::osd_pglog::list<pg_log_entry_t> &entries,
-  ObjectStore::Transaction &t)
+  ObjectStore::Transaction &t,
+  boost::optional<eversion_t> trim_to,
+  boost::optional<eversion_t> roll_forward_to)
 {
   dout(10) << __func__ << " " << entries << dendl;
   assert(is_primary());
 
-  bool rebuild_missing = append_log_entries_update_missing(entries, t);
+  bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
   for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
        i != actingbackfill.end();
        ++i) {
@@ -4982,6 +5205,7 @@ void PG::merge_new_log_entries(
     assert(peer_missing.count(peer));
     assert(peer_info.count(peer));
     pg_missing_t& pmissing(peer_missing[peer]);
+    dout(20) << __func__ << " peer_missing for " << peer << " = " << pmissing << dendl;
     pg_info_t& pinfo(peer_info[peer]);
     bool invalidate_stats = PGLog::append_log_entries_update_missing(
       pinfo.last_backfill,
@@ -5212,6 +5436,7 @@ void PG::start_peering_interval(
   pg_shard_t old_acting_primary = get_primary();
   pg_shard_t old_up_primary = up_primary;
   bool was_old_primary = is_primary();
+  bool was_old_replica = is_replica();
 
   acting.swap(oldacting);
   up.swap(oldup);
@@ -5330,9 +5555,11 @@ void PG::start_peering_interval(
   actingbackfill.clear();
   scrub_queued = false;
 
-  // reset primary state?
+  // reset primary/replica state?
   if (was_old_primary || is_primary()) {
     osd->remove_want_pg_temp(info.pgid.pgid);
+  } else if (was_old_replica || is_replica()) {
+    osd->remove_want_pg_temp(info.pgid.pgid);
   }
   clear_primary_state();
 
@@ -5409,8 +5636,6 @@ void PG::on_new_interval()
     upacting_features &= osdmap->get_xinfo(*p).features;
   }
 
-  assert(osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE));
-
   _on_new_interval();
 }
 
@@ -5419,38 +5644,16 @@ void PG::proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &oinfo)
   assert(!is_primary());
 
   update_history(oinfo.history);
+  if (!info.stats.stats_invalid && info.stats.stats.sum.num_scrub_errors) {
+    info.stats.stats.sum.num_scrub_errors = 0;
+    info.stats.stats.sum.num_shallow_scrub_errors = 0;
+    info.stats.stats.sum.num_deep_scrub_errors = 0;
+    dirty_info = true;
+  }
 
-  if (last_complete_ondisk.epoch >= info.history.last_epoch_started) {
-    // DEBUG: verify that the snaps are empty in snap_mapper
-    if (cct->_conf->osd_debug_verify_snaps_on_info) {
-      interval_set<snapid_t> p;
-      p.union_of(oinfo.purged_snaps, info.purged_snaps);
-      p.subtract(info.purged_snaps);
-      if (!p.empty()) {
-       for (interval_set<snapid_t>::iterator i = p.begin();
-            i != p.end();
-            ++i) {
-         for (snapid_t snap = i.get_start();
-              snap != i.get_len() + i.get_start();
-              ++snap) {
-           vector<hobject_t> hoids;
-           int r = snap_mapper.get_next_objects_to_trim(snap, 1, &hoids);
-           if (r != 0 && r != -ENOENT) {
-             derr << __func__ << ": snap_mapper get_next_object_to_trim returned "
-                  << cpp_strerror(r) << dendl;
-             ceph_abort();
-           } else if (r != -ENOENT) {
-             assert(!hoids.empty());
-             derr << __func__ << ": snap_mapper get_next_object_to_trim returned "
-                  << cpp_strerror(r) << " for object "
-                  << hoids[0] << " on snap " << snap
-                  << " which should have been fully trimmed " << dendl;
-             ceph_abort();
-           }
-         }
-       }
-      }
-    }
+  if (!(info.purged_snaps == oinfo.purged_snaps)) {
+    dout(10) << __func__ << " updating purged_snaps to " << oinfo.purged_snaps
+            << dendl;
     info.purged_snaps = oinfo.purged_snaps;
     dirty_info = true;
     dirty_big_info = true;
@@ -5463,6 +5666,8 @@ ostream& operator<<(ostream& out, const PG& pg)
       << " " << pg.up;
   if (pg.acting != pg.up)
     out << "/" << pg.acting;
+  if (pg.is_ec_pg())
+    out << "p" << pg.get_primary();
   out << " r=" << pg.get_role();
   out << " lpr=" << pg.get_last_peering_reset();
 
@@ -5579,12 +5784,21 @@ bool PG::can_discard_replica_op(OpRequestRef& op)
   const T *m = static_cast<const T *>(op->get_req());
   assert(m->get_type() == MSGTYPE);
 
+  int from = m->get_source().num();
+
+  // if a repop is replied after a replica goes down in a new osdmap, and
+  // before the pg advances to this new osdmap, the repop replies before this
+  // repop can be discarded by that replica OSD, because the primary resets the
+  // connection to it when handling the new osdmap marking it down, and also
+  // resets the messenger sesssion when the replica reconnects. to avoid the
+  // out-of-order replies, the messages from that replica should be discarded.
+  if (osd->get_osdmap()->is_down(from))
+    return true;
   /* Mostly, this overlaps with the old_peering_msg
    * condition.  An important exception is pushes
    * sent by replicas not in the acting set, since
    * if such a replica goes down it does not cause
    * a new interval. */
-  int from = m->get_source().num();
   if (get_osdmap()->get_down_at(from) >= m->map_epoch)
     return true;
 
@@ -5646,6 +5860,11 @@ bool PG::can_discard_request(OpRequestRef& op)
     return can_discard_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
   case MSG_OSD_REPOPREPLY:
     return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
+  case MSG_OSD_PG_RECOVERY_DELETE:
+    return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
+
+  case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
 
   case MSG_OSD_EC_WRITE:
     return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
@@ -5810,6 +6029,10 @@ void PG::handle_create(RecoveryCtx *rctx)
   recovery_state.handle_event(evt, rctx);
   ActMap evt2;
   recovery_state.handle_event(evt2, rctx);
+
+  rctx->on_applied->add(make_lambda_context([this]() {
+    update_store_with_options();
+  }));
 }
 
 void PG::handle_query_state(Formatter *f)
@@ -5823,7 +6046,7 @@ void PG::update_store_with_options()
 {
   auto r = osd->store->set_collection_opts(coll, pool.info.opts);
   if(r < 0 && r != -EOPNOTSUPP) {
-    derr << __func__ << "set_collection_opts returns error:" << r << dendl;
+    derr << __func__ << " set_collection_opts returns error:" << r << dendl;
   }
 }
 
@@ -5833,10 +6056,8 @@ void PG::update_store_on_load()
     // legacy filestore didn't store collection bit width; fix.
     int bits = osd->store->collection_bits(coll);
     if (bits < 0) {
-      if (coll.is_meta())
-       bits = 0;
-      else
-       bits = info.pgid.get_split_bits(pool.info.get_pg_num());
+      assert(!coll.is_meta()); // otherwise OSD::load_pgs() did a bad thing
+      bits = info.pgid.get_split_bits(pool.info.get_pg_num());
       lderr(cct) << __func__ << " setting bit width to " << bits << dendl;
       ObjectStore::Transaction t;
       t.collection_set_bits(coll, bits);
@@ -6262,10 +6483,77 @@ PG::RecoveryState::Backfilling::Backfilling(my_context ctx)
   pg->queue_recovery();
   pg->state_clear(PG_STATE_BACKFILL_TOOFULL);
   pg->state_clear(PG_STATE_BACKFILL_WAIT);
-  pg->state_set(PG_STATE_BACKFILL);
+  pg->state_set(PG_STATE_BACKFILLING);
   pg->publish_stats_to_osd();
 }
 
+boost::statechart::result
+PG::RecoveryState::Backfilling::react(const DeferBackfill &c)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  ldout(pg->cct, 10) << "defer backfill, retry delay " << c.delay << dendl;
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+
+  pg->state_set(PG_STATE_BACKFILL_WAIT);
+  pg->state_clear(PG_STATE_BACKFILLING);
+
+  for (set<pg_shard_t>::iterator it = pg->backfill_targets.begin();
+       it != pg->backfill_targets.end();
+       ++it) {
+    assert(*it != pg->pg_whoami);
+    ConnectionRef con = pg->osd->get_con_osd_cluster(
+      it->osd, pg->get_osdmap()->get_epoch());
+    if (con) {
+      pg->osd->send_message_osd_cluster(
+        new MBackfillReserve(
+         MBackfillReserve::REJECT,
+         spg_t(pg->info.pgid.pgid, it->shard),
+         pg->get_osdmap()->get_epoch()),
+       con.get());
+    }
+  }
+
+
+  if (!pg->waiting_on_backfill.empty()) {
+    pg->waiting_on_backfill.clear();
+    pg->finish_recovery_op(hobject_t::get_max());
+  }
+
+  pg->schedule_backfill_retry(c.delay);
+  return transit<NotBackfilling>();
+}
+
+boost::statechart::result
+PG::RecoveryState::Backfilling::react(const UnfoundBackfill &c)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  ldout(pg->cct, 10) << "backfill has unfound, can't continue" << dendl;
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+
+  pg->state_set(PG_STATE_BACKFILL_UNFOUND);
+  pg->state_clear(PG_STATE_BACKFILLING);
+
+  for (set<pg_shard_t>::iterator it = pg->backfill_targets.begin();
+       it != pg->backfill_targets.end();
+       ++it) {
+    assert(*it != pg->pg_whoami);
+    ConnectionRef con = pg->osd->get_con_osd_cluster(
+      it->osd, pg->get_osdmap()->get_epoch());
+    if (con) {
+      pg->osd->send_message_osd_cluster(
+        new MBackfillReserve(
+         MBackfillReserve::REJECT,
+         spg_t(pg->info.pgid.pgid, it->shard),
+         pg->get_osdmap()->get_epoch()),
+       con.get());
+    }
+  }
+
+  pg->waiting_on_backfill.clear();
+
+  return transit<NotBackfilling>();
+}
+
 boost::statechart::result
 PG::RecoveryState::Backfilling::react(const RemoteReservationRejected &)
 {
@@ -6289,10 +6577,12 @@ PG::RecoveryState::Backfilling::react(const RemoteReservationRejected &)
     }
   }
 
-  pg->waiting_on_backfill.clear();
-  pg->finish_recovery_op(hobject_t::get_max());
+  if (!pg->waiting_on_backfill.empty()) {
+    pg->waiting_on_backfill.clear();
+    pg->finish_recovery_op(hobject_t::get_max());
+  }
 
-  pg->schedule_backfill_full_retry();
+  pg->schedule_backfill_retry(pg->cct->_conf->osd_recovery_retry_interval);
   return transit<NotBackfilling>();
 }
 
@@ -6302,7 +6592,8 @@ void PG::RecoveryState::Backfilling::exit()
   PG *pg = context< RecoveryMachine >().pg;
   pg->backfill_reserved = false;
   pg->backfill_reserving = false;
-  pg->state_clear(PG_STATE_BACKFILL);
+  pg->state_clear(PG_STATE_BACKFILLING);
+  pg->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY);
   utime_t dur = ceph_clock_now() - enter_time;
   pg->osd->recoverystate_perf->tinc(rs_backfilling_latency, dur);
 }
@@ -6385,7 +6676,7 @@ PG::RecoveryState::WaitRemoteBackfillReserved::react(const RemoteReservationReje
   pg->state_set(PG_STATE_BACKFILL_TOOFULL);
   pg->publish_stats_to_osd();
 
-  pg->schedule_backfill_full_retry();
+  pg->schedule_backfill_retry(pg->cct->_conf->osd_recovery_retry_interval);
 
   return transit<NotBackfilling>();
 }
@@ -6403,7 +6694,10 @@ PG::RecoveryState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_conte
     new QueuePeeringEvt<LocalBackfillReserved>(
       pg, pg->get_osdmap()->get_epoch(),
       LocalBackfillReserved()),
-    pg->get_backfill_priority());
+    pg->get_backfill_priority(),
+    new QueuePeeringEvt<DeferBackfill>(
+      pg, pg->get_osdmap()->get_epoch(),
+      DeferBackfill(0.0)));
   pg->publish_stats_to_osd();
 }
 
@@ -6441,6 +6735,7 @@ void PG::RecoveryState::NotBackfilling::exit()
 {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
   PG *pg = context< RecoveryMachine >().pg;
+  pg->state_clear(PG_STATE_BACKFILL_UNFOUND);
   utime_t dur = ceph_clock_now() - enter_time;
   pg->osd->recoverystate_perf->tinc(rs_notbackfilling_latency, dur);
 }
@@ -6459,6 +6754,7 @@ void PG::RecoveryState::NotRecovering::exit()
 {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
   PG *pg = context< RecoveryMachine >().pg;
+  pg->state_clear(PG_STATE_RECOVERY_UNFOUND);
   utime_t dur = ceph_clock_now() - enter_time;
   pg->osd->recoverystate_perf->tinc(rs_notrecovering_latency, dur);
 }
@@ -6471,6 +6767,15 @@ PG::RecoveryState::RepNotRecovering::RepNotRecovering(my_context ctx)
   context< RecoveryMachine >().log_enter(state_name);
 }
 
+boost::statechart::result
+PG::RecoveryState::RepNotRecovering::react(const RejectRemoteReservation &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->reject_reservation();
+  post_event(RemoteReservationRejected());
+  return discard_event();
+}
+
 void PG::RecoveryState::RepNotRecovering::exit()
 {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
@@ -6509,6 +6814,15 @@ PG::RecoveryState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &
   return transit<RepRecovering>();
 }
 
+boost::statechart::result
+PG::RecoveryState::RepWaitRecoveryReserved::react(
+  const RemoteReservationCanceled &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  return transit<RepNotRecovering>();
+}
+
 void PG::RecoveryState::RepWaitRecoveryReserved::exit()
 {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
@@ -6535,12 +6849,12 @@ PG::RecoveryState::RepNotRecovering::react(const RequestBackfillPrio &evt)
       (rand()%1000 < (pg->cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
     ldout(pg->cct, 10) << "backfill reservation rejected: failure injection"
                       << dendl;
-    post_event(RemoteReservationRejected());
+    post_event(RejectRemoteReservation());
   } else if (!pg->cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
       pg->osd->check_backfill_full(ss)) {
     ldout(pg->cct, 10) << "backfill reservation rejected: "
                       << ss.str() << dendl;
-    post_event(RemoteReservationRejected());
+    post_event(RejectRemoteReservation());
   } else {
     pg->osd->remote_reserver.request_reservation(
       pg->info.pgid,
@@ -6569,15 +6883,13 @@ PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &
       (rand()%1000 < (pg->cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
     ldout(pg->cct, 10) << "backfill reservation rejected after reservation: "
                       << "failure injection" << dendl;
-    pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
-    post_event(RemoteReservationRejected());
+    post_event(RejectRemoteReservation());
     return discard_event();
   } else if (!pg->cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
             pg->osd->check_backfill_full(ss)) {
     ldout(pg->cct, 10) << "backfill reservation rejected after reservation: "
                       << ss.str() << dendl;
-    pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
-    post_event(RemoteReservationRejected());
+    post_event(RejectRemoteReservation());
     return discard_event();
   } else {
     pg->osd->send_message_osd_cluster(
@@ -6592,10 +6904,30 @@ PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &
 }
 
 boost::statechart::result
-PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteReservationRejected &evt)
+PG::RecoveryState::RepWaitBackfillReserved::react(
+  const RejectRemoteReservation &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
   pg->reject_reservation();
+  post_event(RemoteReservationRejected());
+  return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::RepWaitBackfillReserved::react(
+  const RemoteReservationRejected &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  return transit<RepNotRecovering>();
+}
+
+boost::statechart::result
+PG::RecoveryState::RepWaitBackfillReserved::react(
+  const RemoteReservationCanceled &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
   return transit<RepNotRecovering>();
 }
 
@@ -6661,7 +6993,10 @@ PG::RecoveryState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_conte
     new QueuePeeringEvt<LocalRecoveryReserved>(
       pg, pg->get_osdmap()->get_epoch(),
       LocalRecoveryReserved()),
-    pg->get_recovery_priority());
+    pg->get_recovery_priority(),
+    new QueuePeeringEvt<DeferRecovery>(
+      pg, pg->get_osdmap()->get_epoch(),
+      DeferRecovery(0.0)));
   pg->publish_stats_to_osd();
 }
 
@@ -6670,7 +7005,7 @@ PG::RecoveryState::WaitLocalRecoveryReserved::react(const RecoveryTooFull &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
   pg->state_set(PG_STATE_RECOVERY_TOOFULL);
-  pg->schedule_recovery_full_retry();
+  pg->schedule_recovery_retry(pg->cct->_conf->osd_recovery_retry_interval);
   return transit<NotRecovering>();
 }
 
@@ -6732,14 +7067,15 @@ PG::RecoveryState::Recovering::Recovering(my_context ctx)
   pg->state_clear(PG_STATE_RECOVERY_WAIT);
   pg->state_clear(PG_STATE_RECOVERY_TOOFULL);
   pg->state_set(PG_STATE_RECOVERING);
+  assert(!pg->state_test(PG_STATE_ACTIVATING));
   pg->publish_stats_to_osd();
   pg->queue_recovery();
 }
 
-void PG::RecoveryState::Recovering::release_reservations()
+void PG::RecoveryState::Recovering::release_reservations(bool cancel)
 {
   PG *pg = context< RecoveryMachine >().pg;
-  assert(!pg->pg_log.get_missing().have_missing());
+  assert(cancel || !pg->pg_log.get_missing().have_missing());
 
   // release remote reservations
   for (set<pg_shard_t>::const_iterator i =
@@ -6766,7 +7102,9 @@ PG::RecoveryState::Recovering::react(const AllReplicasRecovered &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
   pg->state_clear(PG_STATE_RECOVERING);
+  pg->state_clear(PG_STATE_FORCED_RECOVERY);
   release_reservations();
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
   return transit<Recovered>();
 }
 
@@ -6775,8 +7113,37 @@ PG::RecoveryState::Recovering::react(const RequestBackfill &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
   pg->state_clear(PG_STATE_RECOVERING);
+  pg->state_clear(PG_STATE_FORCED_RECOVERY);
   release_reservations();
-  return transit<WaitRemoteBackfillReserved>();
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  // XXX: Is this needed?
+  pg->publish_stats_to_osd();
+  return transit<WaitLocalBackfillReserved>();
+}
+
+boost::statechart::result
+PG::RecoveryState::Recovering::react(const DeferRecovery &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  ldout(pg->cct, 10) << "defer recovery, retry delay " << evt.delay << dendl;
+  pg->state_clear(PG_STATE_RECOVERING);
+  pg->state_set(PG_STATE_RECOVERY_WAIT);
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  release_reservations(true);
+  pg->schedule_recovery_retry(evt.delay);
+  return transit<NotRecovering>();
+}
+
+boost::statechart::result
+PG::RecoveryState::Recovering::react(const UnfoundRecovery &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  ldout(pg->cct, 10) << "recovery has unfound, can't continue" << dendl;
+  pg->state_set(PG_STATE_RECOVERY_UNFOUND);
+  pg->state_clear(PG_STATE_RECOVERING);
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  release_reservations(true);
+  return transit<NotRecovering>();
 }
 
 void PG::RecoveryState::Recovering::exit()
@@ -6796,7 +7163,6 @@ PG::RecoveryState::Recovered::Recovered(my_context ctx)
   context< RecoveryMachine >().log_enter(state_name);
 
   PG *pg = context< RecoveryMachine >().pg;
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
 
   assert(!pg->needs_recovery());
 
@@ -6805,7 +7171,7 @@ PG::RecoveryState::Recovered::Recovered(my_context ctx)
   assert(!pg->actingbackfill.empty());
   if (pg->get_osdmap()->get_pg_size(pg->info.pgid.pgid) <=
       pg->actingbackfill.size()) {
-    pg->state_clear(PG_STATE_DEGRADED);
+    pg->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY);
     pg->publish_stats_to_osd();
   }
 
@@ -6842,11 +7208,14 @@ PG::RecoveryState::Clean::Clean(my_context ctx)
     ceph_abort();
   }
   pg->finish_recovery(*context< RecoveryMachine >().get_on_safe_context_list());
-  pg->mark_clean();
+
+  if (pg->is_active()) {
+    pg->mark_clean();
+  }
 
   pg->share_pg_info();
   pg->publish_stats_to_osd();
-
+  pg->requeue_ops(pg->waiting_for_clean_to_primary_repair);
 }
 
 void PG::RecoveryState::Clean::exit()
@@ -6946,16 +7315,11 @@ boost::statechart::result PG::RecoveryState::Active::react(const AdvMap& advmap)
       pg->get_osdmap()->get_pg_size(pg->info.pgid.pgid)) {
     if (pg->get_osdmap()->get_pg_size(pg->info.pgid.pgid) <= pg->actingset.size()) {
       pg->state_clear(PG_STATE_UNDERSIZED);
-      if (pg->needs_recovery()) {
-       pg->state_set(PG_STATE_DEGRADED);
-      } else {
-       pg->state_clear(PG_STATE_DEGRADED);
-      }
     } else {
       pg->state_set(PG_STATE_UNDERSIZED);
-      pg->state_set(PG_STATE_DEGRADED);
     }
-    need_publish = true; // degraded may have changed
+    // degraded changes will be detected by call from publish_stats_to_osd()
+    need_publish = true;
   }
 
   // if we haven't reported our PG stats in a long time, do so now.
@@ -6990,9 +7354,12 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
       pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) {
     if (pg->cct->_conf->osd_auto_mark_unfound_lost) {
       pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound
-                           << " objects unfound and apparently lost, would automatically marking lost but NOT IMPLEMENTED";
+                           << " objects unfound and apparently lost, would automatically "
+                           << "mark these objects lost but this feature is not yet implemented "
+                           << "(osd_auto_mark_unfound_lost)";
     } else
-      pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound << " objects unfound and apparently lost";
+      pg->osd->clog->error() << pg->info.pgid.pgid << " has "
+                             << unfound << " objects unfound and apparently lost";
   }
 
   if (pg->is_active()) {
@@ -7068,9 +7435,10 @@ boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt
     pg->peer_missing[logevt.from],
     logevt.from,
     context< RecoveryMachine >().get_recovery_ctx());
-  if (pg->is_peered() &&
-      got_missing)
-    pg->queue_recovery();
+  // If there are missing AND we are "fully" active then start recovery now
+  if (got_missing && pg->state_test(PG_STATE_ACTIVE)) {
+    post_event(DoRecovery());
+  }
   return discard_event();
 }
 
@@ -7161,6 +7529,13 @@ boost::statechart::result PG::RecoveryState::Active::react(const AllReplicasActi
   // waiters
   if (pg->flushes_in_progress == 0) {
     pg->requeue_ops(pg->waiting_for_peered);
+  } else if (!pg->waiting_for_peered.empty()) {
+    ldout(pg->cct, 10) << __func__ << " flushes in progress, moving "
+                      << pg->waiting_for_peered.size()
+                      << " items to waiting_for_flush"
+                      << dendl;
+    assert(pg->waiting_for_flush.empty());
+    pg->waiting_for_flush.swap(pg->waiting_for_peered);
   }
 
   pg->on_activate();
@@ -7859,18 +8234,23 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
        ++i) {
     if (*i == pg->get_primary()) continue;
     const pg_info_t& pi = pg->peer_info[*i];
+    // reset this so to make sure the pg_missing_t is initialized and
+    // has the correct semantics even if we don't need to get a
+    // missing set from a shard. This way later additions due to
+    // lost+unfound delete work properly.
+    pg->peer_missing[*i].may_include_deletes = !pg->perform_deletes_during_peering();
 
     if (pi.is_empty())
       continue;                                // no pg data, nothing divergent
 
     if (pi.last_update < pg->pg_log.get_tail()) {
       ldout(pg->cct, 10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl;
-      pg->peer_missing[*i];
+      pg->peer_missing[*i].clear();
       continue;
     }
     if (pi.last_backfill == hobject_t()) {
       ldout(pg->cct, 10) << " osd." << *i << " will fully backfill; can infer empty missing set" << dendl;
-      pg->peer_missing[*i];
+      pg->peer_missing[*i].clear();
       continue;
     }
 
@@ -7881,7 +8261,7 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
       // FIXME: we can do better here.  if last_update==last_complete we
       //        can infer the rest!
       ldout(pg->cct, 10) << " osd." << *i << " has no missing, identical log" << dendl;
-      pg->peer_missing[*i];
+      pg->peer_missing[*i].clear();
       continue;
     }