]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 7caed663360c0247d56aa7cc260136929b9b5de4..5332703ba33bc47a04836275d35bcab7daf5de52 100644 (file)
@@ -25,6 +25,7 @@
 #include "Session.h"
 #include "objclass/objclass.h"
 
+#include "common/ceph_crypto.h"
 #include "common/errno.h"
 #include "common/scrub_types.h"
 #include "common/perf_counters.h"
@@ -40,7 +41,6 @@
 #include "messages/MOSDPGUpdateLogMissingReply.h"
 #include "messages/MCommandReply.h"
 #include "messages/MOSDScrubReserve.h"
-#include "mds/inode_backtrace.h" // Ugh
 #include "common/EventTrace.h"
 
 #include "common/config.h"
@@ -63,6 +63,8 @@
 #define DOUT_PREFIX_ARGS this, osd->whoami, get_osdmap()
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
+using TOPNSPC::common::cmd_getval;
+
 template <typename T>
 static ostream& _prefix(std::ostream *_dout, T *pg) {
   return pg->gen_prefix(*_dout);
@@ -76,13 +78,7 @@ static ostream& _prefix(std::ostream *_dout, T *pg) {
 
 MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd);
 
-PGLSFilter::PGLSFilter() : cct(nullptr)
-{
-}
-
-PGLSFilter::~PGLSFilter()
-{
-}
+using namespace ceph::osd::scheduler;
 
 /**
  * The CopyCallback class defines an interface for completions to the
@@ -117,12 +113,11 @@ public:
   BlessedGenContext(PrimaryLogPG *pg, GenContext<T> *c, epoch_t e)
     : pg(pg), c(c), e(e) {}
   void finish(T t) override {
-    pg->lock();
+    std::scoped_lock locker{*pg};
     if (pg->pg_has_reset_since(e))
       c.reset();
     else
       c.release()->complete(t);
-    pg->unlock();
   }
   bool sync_finish(T t) {
     // we assume here all blessed/wrapped Contexts can complete synchronously.
@@ -172,12 +167,11 @@ public:
   BlessedContext(PrimaryLogPG *pg, Context *c, epoch_t e)
     : pg(pg), c(c), e(e) {}
   void finish(int r) override {
-    pg->lock();
+    std::scoped_lock locker{*pg};
     if (pg->pg_has_reset_since(e))
       c.reset();
     else
       c.release()->complete(r);
-    pg->unlock();
   }
   bool sync_finish(int r) {
     // we assume here all blessed/wrapped Contexts can complete synchronously.
@@ -224,9 +218,8 @@ class PrimaryLogPG::C_OSD_AppliedRecoveredObject : public Context {
     return true;
   }
   void finish(int r) override {
-    pg->lock();
+    std::scoped_lock locker{*pg};
     pg->_applied_recovered_object(obc);
-    pg->unlock();
   }
 };
 
@@ -254,9 +247,8 @@ class PrimaryLogPG::C_OSD_AppliedRecoveredObjectReplica : public Context {
     return true;
   }
   void finish(int r) override {
-    pg->lock();
+    std::scoped_lock locker{*pg};
     pg->_applied_recovered_object_replica();
-    pg->unlock();
   }
 };
 
@@ -292,6 +284,9 @@ public:
   PrimaryLogPG::CopyResults *results = nullptr;
   PrimaryLogPG::OpContext *ctx;
   OSDOp &osd_op;
+  uint32_t truncate_seq;
+  uint64_t truncate_size;
+  bool have_truncate = false;
 
   CopyFromCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
     : ctx(ctx), osd_op(osd_op) {
@@ -302,6 +297,13 @@ public:
     results = results_.get<1>();
     int r = results_.get<0>();
 
+    // Only use truncate_{seq,size} from the original object if the client
+    // did not sent us these parameters
+    if (!have_truncate) {
+      truncate_seq = results->truncate_seq;
+      truncate_size = results->truncate_size;
+    }
+
     // for finish_copyfrom
     ctx->user_at_version = results->user_version;
 
@@ -325,6 +327,11 @@ public:
   uint64_t get_data_size() {
     return results->object_size;
   }
+  void set_truncate(uint32_t seq, uint64_t size) {
+    truncate_seq = seq;
+    truncate_size = size;
+    have_truncate = true;
+  }
 };
 
 struct CopyFromFinisher : public PrimaryLogPG::OpFinisher {
@@ -372,10 +379,10 @@ void PrimaryLogPG::on_local_recover(
       derr << __func__ << " " << hoid << " had no clone_snaps" << dendl;
     }
   }
-  if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
-      pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
+  if (!is_delete && recovery_state.get_pg_log().get_missing().is_missing(recovery_info.soid) &&
+      recovery_state.get_pg_log().get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
     ceph_assert(is_primary());
-    const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+    const pg_log_entry_t *latest = recovery_state.get_pg_log().get_log().objects.find(recovery_info.soid)->second;
     if (latest->op == pg_log_entry_t::LOST_REVERT &&
        latest->reverting_to == recovery_info.version) {
       dout(10) << " got old revert version " << recovery_info.version
@@ -397,16 +404,11 @@ void PrimaryLogPG::on_local_recover(
   // keep track of active pushes for scrub
   ++active_pushes;
 
-  if (recovery_info.version > pg_log.get_can_rollback_to()) {
-    /* This can only happen during a repair, and even then, it would
-     * be one heck of a race.  If we are repairing the object, the
-     * write in question must be fully committed, so it's not valid
-     * to roll it back anyway (and we'll be rolled forward shortly
-     * anyway) */
-    PGLogEntryHandler h{this, t};
-    pg_log.roll_forward_to(recovery_info.version, &h);
-  }
-  recover_got(recovery_info.soid, recovery_info.version);
+  recovery_state.recover_got(
+    recovery_info.soid,
+    recovery_info.version,
+    is_delete,
+    *t);
 
   if (is_primary()) {
     if (!is_delete) {
@@ -423,9 +425,6 @@ void PrimaryLogPG::on_local_recover(
     t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
 
     publish_stats_to_osd();
-    ceph_assert(missing_loc.needs_recovery(hoid));
-    if (!is_delete)
-      missing_loc.add_location(hoid, pg_whoami);
     release_backoffs(hoid);
     if (!is_unreadable_object(hoid)) {
       auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
@@ -446,10 +445,6 @@ void PrimaryLogPG::on_local_recover(
       this,
       get_osdmap_epoch(),
       info.last_complete));
-
-  // update pg
-  dirty_info = true;
-  write_if_dirty(*t);
 }
 
 void PrimaryLogPG::on_global_recover(
@@ -457,8 +452,7 @@ void PrimaryLogPG::on_global_recover(
   const object_stat_sum_t &stat_diff,
   bool is_delete)
 {
-  info.stats.stats.sum.add(stat_diff);
-  missing_loc.recovered(soid);
+  recovery_state.object_recovered(soid, stat_diff);
   publish_stats_to_osd();
   dout(10) << "pushed " << soid << " to all replicas" << dendl;
   map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
@@ -493,65 +487,24 @@ void PrimaryLogPG::on_global_recover(
   finish_degraded_object(soid);
 }
 
-void PrimaryLogPG::on_peer_recover(
-  pg_shard_t peer,
-  const hobject_t &soid,
-  const ObjectRecoveryInfo &recovery_info)
-{
-  publish_stats_to_osd();
-  // done!
-  peer_missing[peer].got(soid, recovery_info.version);
-  missing_loc.add_location(soid, peer);
-}
-
-void PrimaryLogPG::begin_peer_recover(
-  pg_shard_t peer,
-  const hobject_t soid)
-{
-  peer_missing[peer].revise_have(soid, eversion_t());
-}
-
 void PrimaryLogPG::schedule_recovery_work(
   GenContext<ThreadPool::TPHandle&> *c)
 {
   osd->queue_recovery_context(this, c);
 }
 
-void PrimaryLogPG::send_message_osd_cluster(
-  int peer, Message *m, epoch_t from_epoch)
+void PrimaryLogPG::replica_clear_repop_obc(
+  const vector<pg_log_entry_t> &logv,
+  ObjectStore::Transaction &t)
 {
-  osd->send_message_osd_cluster(peer, m, from_epoch);
-}
-
-void PrimaryLogPG::send_message_osd_cluster(
-  Message *m, Connection *con)
-{
-  osd->send_message_osd_cluster(m, con);
-}
-
-void PrimaryLogPG::send_message_osd_cluster(
-  Message *m, const ConnectionRef& con)
-{
-  osd->send_message_osd_cluster(m, con);
-}
-
-void PrimaryLogPG::on_primary_error(
-  const hobject_t &oid,
-  eversion_t v)
-{
-  dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
-  primary_failed(oid);
-  primary_error(oid, v);
-  backfill_add_missing(oid, v);
-}
-
-void PrimaryLogPG::backfill_add_missing(
-  const hobject_t &oid,
-  eversion_t v)
-{
-  dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
-  backfills_in_flight.erase(oid);
-  missing_loc.add_missing(oid, v, eversion_t());
+  for (auto &&e: logv) {
+    /* Have to blast all clones, they share a snapset */
+    object_contexts.clear_range(
+      e.soid.get_object_boundary(), e.soid.get_head());
+    ceph_assert(
+      snapset_contexts.find(e.soid.get_head()) ==
+      snapset_contexts.end());
+  }
 }
 
 bool PrimaryLogPG::should_send_op(
@@ -559,21 +512,23 @@ bool PrimaryLogPG::should_send_op(
   const hobject_t &hoid) {
   if (peer == get_primary())
     return true;
-  ceph_assert(peer_info.count(peer));
+  ceph_assert(recovery_state.has_peer_info(peer));
   bool should_send =
       hoid.pool != (int64_t)info.pgid.pool() ||
       hoid <= last_backfill_started ||
-      hoid <= peer_info[peer].last_backfill;
+      hoid <= recovery_state.get_peer_info(peer).last_backfill;
   if (!should_send) {
-    ceph_assert(is_backfill_targets(peer));
+    ceph_assert(is_backfill_target(peer));
     dout(10) << __func__ << " issue_repop shipping empty opt to osd." << peer
              << ", object " << hoid
              << " beyond std::max(last_backfill_started "
              << ", peer_info[peer].last_backfill "
-             << peer_info[peer].last_backfill << ")" << dendl;
+             << recovery_state.get_peer_info(peer).last_backfill
+            << ")" << dendl;
     return should_send;
   }
-  if (async_recovery_targets.count(peer) && peer_missing[peer].is_missing(hoid)) {
+  if (is_async_recovery_target(peer) &&
+      recovery_state.get_peer_missing(peer).is_missing(hoid)) {
     should_send = false;
     dout(10) << __func__ << " issue_repop shipping empty opt to osd." << peer
              << ", object " << hoid
@@ -600,7 +555,7 @@ PerfCounters *PrimaryLogPG::get_logger()
 
 bool PrimaryLogPG::is_missing_object(const hobject_t& soid) const
 {
-  return pg_log.get_missing().get_items().count(soid);
+  return recovery_state.get_pg_log().get_missing().get_items().count(soid);
 }
 
 void PrimaryLogPG::maybe_kick_recovery(
@@ -608,25 +563,25 @@ void PrimaryLogPG::maybe_kick_recovery(
 {
   eversion_t v;
   bool work_started = false;
-  if (!missing_loc.needs_recovery(soid, &v))
+  if (!recovery_state.get_missing_loc().needs_recovery(soid, &v))
     return;
 
   map<hobject_t, ObjectContextRef>::const_iterator p = recovering.find(soid);
   if (p != recovering.end()) {
     dout(7) << "object " << soid << " v " << v << ", already recovering." << dendl;
-  } else if (missing_loc.is_unfound(soid)) {
+  } else if (recovery_state.get_missing_loc().is_unfound(soid)) {
     dout(7) << "object " << soid << " v " << v << ", is unfound." << dendl;
   } else {
     dout(7) << "object " << soid << " v " << v << ", recovering." << dendl;
     PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
     if (is_missing_object(soid)) {
-      recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
-    } else if (missing_loc.is_deleted(soid)) {
+      recover_missing(soid, v, CEPH_MSG_PRIO_HIGH, h);
+    } else if (recovery_state.get_missing_loc().is_deleted(soid)) {
       prep_object_replica_deletes(soid, v, h, &work_started);
     } else {
       prep_object_replica_pushes(soid, v, h, &work_started);
     }
-    pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
+    pgbackend->run_recovery_op(h, CEPH_MSG_PRIO_HIGH);
   }
 }
 
@@ -647,28 +602,28 @@ bool PrimaryLogPG::is_degraded_or_backfilling_object(const hobject_t& soid)
    */
   if (waiting_for_degraded_object.count(soid))
     return true;
-  if (pg_log.get_missing().get_items().count(soid))
+  if (recovery_state.get_pg_log().get_missing().get_items().count(soid))
     return true;
-  ceph_assert(!acting_recovery_backfill.empty());
-  for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-       i != acting_recovery_backfill.end();
+  ceph_assert(!get_acting_recovery_backfill().empty());
+  for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
+       i != get_acting_recovery_backfill().end();
        ++i) {
     if (*i == get_primary()) continue;
     pg_shard_t peer = *i;
-    auto peer_missing_entry = peer_missing.find(peer);
+    auto peer_missing_entry = recovery_state.get_peer_missing().find(peer);
     // If an object is missing on an async_recovery_target, return false.
     // This will not block the op and the object is async recovered later.
-    if (peer_missing_entry != peer_missing.end() &&
+    if (peer_missing_entry != recovery_state.get_peer_missing().end() &&
        peer_missing_entry->second.get_items().count(soid)) {
-      if (async_recovery_targets.count(peer))
+      if (is_async_recovery_target(peer))
        continue;
       else
        return true;
     }
     // Object is degraded if after last_backfill AND
     // we are backfilling it
-    if (is_backfill_targets(peer) &&
-       peer_info[peer].last_backfill <= soid &&
+    if (is_backfill_target(peer) &&
+        recovery_state.get_peer_info(peer).last_backfill <= soid &&
        last_backfill_started >= soid &&
        backfills_in_flight.count(soid))
       return true;
@@ -678,9 +633,9 @@ bool PrimaryLogPG::is_degraded_or_backfilling_object(const hobject_t& soid)
 
 bool PrimaryLogPG::is_degraded_on_async_recovery_target(const hobject_t& soid)
 {
-  for (auto &i: async_recovery_targets) {
-    auto peer_missing_entry = peer_missing.find(i);
-    if (peer_missing_entry != peer_missing.end() &&
+  for (auto &i: get_async_recovery_targets()) {
+    auto peer_missing_entry = recovery_state.get_peer_missing().find(i);
+    if (peer_missing_entry != recovery_state.get_peer_missing().end() &&
         peer_missing_entry->second.get_items().count(soid)) {
       dout(30) << __func__ << " " << soid << dendl;
       return true;
@@ -776,28 +731,28 @@ void PrimaryLogPG::maybe_force_recovery()
                  PG_STATE_BACKFILL_TOOFULL))
     return;
 
-  if (pg_log.get_log().approx_size() <
+  if (recovery_state.get_pg_log().get_log().approx_size() <
       cct->_conf->osd_max_pg_log_entries *
         cct->_conf->osd_force_recovery_pg_log_entries_factor)
     return;
 
   // find the oldest missing object
-  version_t min_version = pg_log.get_log().head.version;
+  version_t min_version = recovery_state.get_pg_log().get_log().head.version;
   hobject_t soid;
-  if (!pg_log.get_missing().get_rmissing().empty()) {
-    min_version = pg_log.get_missing().get_rmissing().begin()->first;
-    soid = pg_log.get_missing().get_rmissing().begin()->second;
+  if (!recovery_state.get_pg_log().get_missing().get_rmissing().empty()) {
+    min_version = recovery_state.get_pg_log().get_missing().get_rmissing().begin()->first;
+    soid = recovery_state.get_pg_log().get_missing().get_rmissing().begin()->second;
   }
-  ceph_assert(!acting_recovery_backfill.empty());
-  for (set<pg_shard_t>::iterator it = acting_recovery_backfill.begin();
-       it != acting_recovery_backfill.end();
+  ceph_assert(!get_acting_recovery_backfill().empty());
+  for (set<pg_shard_t>::iterator it = get_acting_recovery_backfill().begin();
+       it != get_acting_recovery_backfill().end();
        ++it) {
     if (*it == get_primary()) continue;
     pg_shard_t peer = *it;
-    auto it_missing = peer_missing.find(peer);
-    if (it_missing != peer_missing.end() &&
+    auto it_missing = recovery_state.get_peer_missing().find(peer);
+    if (it_missing != recovery_state.get_peer_missing().end() &&
        !it_missing->second.get_rmissing().empty()) {
-      const auto& min_obj = peer_missing[peer].get_rmissing().begin();
+      const auto& min_obj = recovery_state.get_peer_missing(peer).get_rmissing().begin();
       dout(20) << __func__ << " peer " << peer << " min_version " << min_obj->first
                << " oid " << min_obj->second << dendl;
       if (min_version > min_obj->first) {
@@ -812,135 +767,155 @@ void PrimaryLogPG::maybe_force_recovery()
     maybe_kick_recovery(soid);
 }
 
-class PGLSPlainFilter : public PGLSFilter {
-  string val;
-public:
-  int init(bufferlist::const_iterator &params) override
-  {
-    try {
-      decode(xattr, params);
-      decode(val, params);
-    } catch (buffer::error &e) {
-      return -EINVAL;
-    }
-
-    return 0;
+bool PrimaryLogPG::check_laggy(OpRequestRef& op)
+{
+  if (!HAVE_FEATURE(recovery_state.get_min_upacting_features(),
+                   SERVER_OCTOPUS)) {
+    dout(20) << __func__ << " not all upacting has SERVER_OCTOPUS" << dendl;
+    return true;
   }
-  ~PGLSPlainFilter() override {}
-  bool filter(const hobject_t &obj, bufferlist& xattr_data,
-                      bufferlist& outdata) override;
-};
+  if (state_test(PG_STATE_WAIT)) {
+    dout(10) << __func__ << " PG is WAIT state" << dendl;
+  } else if (!state_test(PG_STATE_LAGGY)) {
+    auto mnow = osd->get_mnow();
+    auto ru = recovery_state.get_readable_until();
+    if (mnow <= ru) {
+      // not laggy
+      return true;
+    }
+    dout(10) << __func__
+            << " mnow " << mnow
+            << " > readable_until " << ru << dendl;
 
-class PGLSParentFilter : public PGLSFilter {
-  inodeno_t parent_ino;
-public:
-  CephContext* cct;
-  explicit PGLSParentFilter(CephContext* cct) : cct(cct) {
-    xattr = "_parent";
-  }
-  int init(bufferlist::const_iterator &params) override
-  {
-    try {
-      decode(parent_ino, params);
-    } catch (buffer::error &e) {
-      return -EINVAL;
+    if (!is_primary()) {
+      osd->reply_op_error(op, -EAGAIN);
+      return false;
     }
-    generic_dout(0) << "parent_ino=" << parent_ino << dendl;
 
-    return 0;
+    // go to laggy state
+    state_set(PG_STATE_LAGGY);
+    publish_stats_to_osd();
   }
-  ~PGLSParentFilter() override {}
-  bool filter(const hobject_t &obj, bufferlist& xattr_data,
-                      bufferlist& outdata) override;
-};
+  dout(10) << __func__ << " not readable" << dendl;
+  waiting_for_readable.push_back(op);
+  op->mark_delayed("waiting for readable");
+  return false;
+}
 
-bool PGLSParentFilter::filter(const hobject_t &obj,
-                              bufferlist& xattr_data, bufferlist& outdata)
+bool PrimaryLogPG::check_laggy_requeue(OpRequestRef& op)
 {
-  auto iter = xattr_data.cbegin();
-  inode_backtrace_t bt;
-
-  generic_dout(0) << "PGLSParentFilter::filter" << dendl;
-
-  decode(bt, iter);
-
-  vector<inode_backpointer_t>::iterator vi;
-  for (vi = bt.ancestors.begin(); vi != bt.ancestors.end(); ++vi) {
-    generic_dout(0) << "vi->dirino=" << vi->dirino << " parent_ino=" << parent_ino << dendl;
-    if (vi->dirino == parent_ino) {
-      encode(*vi, outdata);
-      return true;
-    }
+  if (!HAVE_FEATURE(recovery_state.get_min_upacting_features(),
+                   SERVER_OCTOPUS)) {
+    return true;
   }
-
+  if (!state_test(PG_STATE_WAIT) && !state_test(PG_STATE_LAGGY)) {
+    return true; // not laggy
+  }
+  dout(10) << __func__ << " not readable" << dendl;
+  waiting_for_readable.push_front(op);
+  op->mark_delayed("waiting for readable");
   return false;
 }
 
-bool PGLSPlainFilter::filter(const hobject_t &obj,
-                             bufferlist& xattr_data, bufferlist& outdata)
+void PrimaryLogPG::recheck_readable()
 {
-  if (val.size() != xattr_data.length())
-    return false;
-
-  if (memcmp(val.c_str(), xattr_data.c_str(), val.size()))
-    return false;
-
-  return true;
+  if (!is_wait() && !is_laggy()) {
+    dout(20) << __func__ << " wasn't wait or laggy" << dendl;
+    return;
+  }
+  auto mnow = osd->get_mnow();
+  bool pub = false;
+  if (is_wait()) {
+    auto prior_readable_until_ub = recovery_state.get_prior_readable_until_ub();
+    if (mnow < prior_readable_until_ub) {
+      dout(10) << __func__ << " still wait (mnow " << mnow
+              << " < prior_readable_until_ub " << prior_readable_until_ub
+              << ")" << dendl;
+    } else {
+      dout(10) << __func__ << " no longer wait (mnow " << mnow
+              << " >= prior_readable_until_ub " << prior_readable_until_ub
+              << ")" << dendl;
+      state_clear(PG_STATE_WAIT);
+      recovery_state.clear_prior_readable_until_ub();
+      pub = true;
+    }
+  }
+  if (is_laggy()) {
+    auto ru = recovery_state.get_readable_until();
+    if (ru == ceph::signedspan::zero()) {
+      dout(10) << __func__ << " still laggy (mnow " << mnow
+              << ", readable_until zero)" << dendl;
+    } else if (mnow >= ru) {
+      dout(10) << __func__ << " still laggy (mnow " << mnow
+              << " >= readable_until " << ru << ")" << dendl;
+    } else {
+      dout(10) << __func__ << " no longer laggy (mnow " << mnow
+              << " < readable_until " << ru << ")" << dendl;
+      state_clear(PG_STATE_LAGGY);
+      pub = true;
+    }
+  }
+  if (pub) {
+    publish_stats_to_osd();
+  }
+  if (!is_laggy() && !is_wait()) {
+    requeue_ops(waiting_for_readable);
+  }
 }
 
-bool PrimaryLogPG::pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata)
+bool PrimaryLogPG::pgls_filter(const PGLSFilter& filter, const hobject_t& sobj)
 {
   bufferlist bl;
 
   // If filter has expressed an interest in an xattr, load it.
-  if (!filter->get_xattr().empty()) {
+  if (!filter.get_xattr().empty()) {
     int ret = pgbackend->objects_get_attr(
       sobj,
-      filter->get_xattr(),
+      filter.get_xattr(),
       &bl);
-    dout(0) << "getattr (sobj=" << sobj << ", attr=" << filter->get_xattr() << ") returned " << ret << dendl;
+    dout(0) << "getattr (sobj=" << sobj << ", attr=" << filter.get_xattr() << ") returned " << ret << dendl;
     if (ret < 0) {
-      if (ret != -ENODATA || filter->reject_empty_xattr()) {
+      if (ret != -ENODATA || filter.reject_empty_xattr()) {
         return false;
       }
     }
   }
 
-  return filter->filter(sobj, bl, outdata);
+  return filter.filter(sobj, bl);
 }
 
-int PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter, PGLSFilter **pfilter)
+std::pair<int, std::unique_ptr<const PGLSFilter>>
+PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter)
 {
   string type;
-  PGLSFilter *filter;
+  // storing non-const PGLSFilter for the sake of ::init()
+  std::unique_ptr<PGLSFilter> filter;
 
   try {
     decode(type, iter);
   }
   catch (buffer::error& e) {
-    return -EINVAL;
+    return { -EINVAL, nullptr };
   }
 
-  if (type.compare("parent") == 0) {
-    filter = new PGLSParentFilter(cct);
-  } else if (type.compare("plain") == 0) {
-    filter = new PGLSPlainFilter();
+  if (type.compare("plain") == 0) {
+    filter = std::make_unique<PGLSPlainFilter>();
   } else {
     std::size_t dot = type.find(".");
     if (dot == std::string::npos || dot == 0 || dot == type.size() - 1) {
-      return -EINVAL;
+      return { -EINVAL, nullptr };
     }
 
     const std::string class_name = type.substr(0, dot);
     const std::string filter_name = type.substr(dot + 1);
     ClassHandler::ClassData *cls = NULL;
-    int r = osd->class_handler->open_class(class_name, &cls);
+    int r = ClassHandler::get_instance().open_class(class_name, &cls);
     if (r != 0) {
       derr << "Error opening class '" << class_name << "': "
            << cpp_strerror(r) << dendl;
       if (r != -EPERM) // propogate permission error
         r = -EINVAL;
-      return r;
+      return { r, nullptr };
     } else {
       ceph_assert(cls);
     }
@@ -949,15 +924,15 @@ int PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter, PGLSFilter *
     if (class_filter == NULL) {
       derr << "Error finding filter '" << filter_name << "' in class "
            << class_name << dendl;
-      return -EINVAL;
+      return { -EINVAL, nullptr };
     }
-    filter = class_filter->fn();
+    filter.reset(class_filter->fn());
     if (!filter) {
       // Object classes are obliged to return us something, but let's
       // give an error rather than asserting out.
       derr << "Buggy class " << class_name << " failed to construct "
               "filter " << filter_name << dendl;
-      return -EINVAL;
+      return { -EINVAL, nullptr };
     }
   }
 
@@ -966,86 +941,45 @@ int PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter, PGLSFilter *
   if (r < 0) {
     derr << "Error initializing filter " << type << ": "
          << cpp_strerror(r) << dendl;
-    delete filter;
-    return -EINVAL;
+    return { -EINVAL, nullptr };
   } else {
     // Successfully constructed and initialized, return it.
-    *pfilter = filter;
-    return 0;
+    return std::make_pair(0, std::move(filter));
   }
 }
 
 
 // ==========================================================
 
-int PrimaryLogPG::do_command(
-  cmdmap_t cmdmap,
-  ostream& ss,
-  bufferlist& idata,
-  bufferlist& odata,
-  ConnectionRef con,
-  ceph_tid_t tid)
+void PrimaryLogPG::do_command(
+  const string_view& orig_prefix,
+  const cmdmap_t& cmdmap,
+  const bufferlist& idata,
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
-  string prefix;
   string format;
-
-  cmd_getval(cct, cmdmap, "format", format);
-  boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json"));
-
+  cmd_getval(cmdmap, "format", format);
+  std::unique_ptr<Formatter> f(Formatter::create(
+                                format, "json-pretty", "json-pretty"));
+  int ret = 0;
+  stringstream ss;   // stderr error message stream
+  bufferlist outbl;  // if empty at end, we'll dump formatter as output
+
+  // get final prefix:
+  // - ceph pg <pgid> foo -> prefix=pg, cmd=foo
+  // - ceph tell <pgid> foo -> prefix=foo
+  string prefix(orig_prefix);
   string command;
-  cmd_getval(cct, cmdmap, "cmd", command);
-  if (command == "query") {
+  cmd_getval(cmdmap, "cmd", command);
+  if (command.size()) {
+    prefix = command;
+  }
+
+  if (prefix == "query") {
     f->open_object_section("pg");
-    f->dump_string("state", pg_state_string(get_state()));
     f->dump_stream("snap_trimq") << snap_trimq;
     f->dump_unsigned("snap_trimq_len", snap_trimq.size());
-    f->dump_unsigned("epoch", get_osdmap_epoch());
-    f->open_array_section("up");
-    for (vector<int>::iterator p = up.begin(); p != up.end(); ++p)
-      f->dump_unsigned("osd", *p);
-    f->close_section();
-    f->open_array_section("acting");
-    for (vector<int>::iterator p = acting.begin(); p != acting.end(); ++p)
-      f->dump_unsigned("osd", *p);
-    f->close_section();
-    if (!backfill_targets.empty()) {
-      f->open_array_section("backfill_targets");
-      for (set<pg_shard_t>::iterator p = backfill_targets.begin();
-          p != backfill_targets.end();
-          ++p)
-        f->dump_stream("shard") << *p;
-      f->close_section();
-    }
-    if (!async_recovery_targets.empty()) {
-      f->open_array_section("async_recovery_targets");
-      for (set<pg_shard_t>::iterator p = async_recovery_targets.begin();
-          p != async_recovery_targets.end();
-          ++p)
-        f->dump_stream("shard") << *p;
-      f->close_section();
-    }
-    if (!acting_recovery_backfill.empty()) {
-      f->open_array_section("acting_recovery_backfill");
-      for (set<pg_shard_t>::iterator p = acting_recovery_backfill.begin();
-          p != acting_recovery_backfill.end();
-          ++p)
-        f->dump_stream("shard") << *p;
-      f->close_section();
-    }
-    f->open_object_section("info");
-    _update_calc_stats();
-    info.dump(f.get());
-    f->close_section();
-
-    f->open_array_section("peer_info");
-    for (map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
-        p != peer_info.end();
-        ++p) {
-      f->open_object_section("info");
-      f->dump_stream("peer") << p->first;
-      p->second.dump(f.get());
-      f->close_section();
-    }
+    recovery_state.dump_peering_state(f.get());
     f->close_section();
 
     f->open_array_section("recovery_state");
@@ -1058,53 +992,57 @@ int PrimaryLogPG::do_command(
     f->close_section();
 
     f->close_section();
-    f->flush(odata);
-    return 0;
   }
-  else if (command == "mark_unfound_lost") {
+
+  else if (prefix == "mark_unfound_lost") {
     string mulcmd;
-    cmd_getval(cct, cmdmap, "mulcmd", mulcmd);
+    cmd_getval(cmdmap, "mulcmd", mulcmd);
     int mode = -1;
     if (mulcmd == "revert") {
       if (pool.info.is_erasure()) {
        ss << "mode must be 'delete' for ec pool";
-       return -EINVAL;
+       ret = -EINVAL;
+       goto out;
       }
       mode = pg_log_entry_t::LOST_REVERT;
     } else if (mulcmd == "delete") {
       mode = pg_log_entry_t::LOST_DELETE;
     } else {
       ss << "mode must be 'revert' or 'delete'; mark not yet implemented";
-      return -EINVAL;
+      ret = -EINVAL;
+      goto out;
     }
     ceph_assert(mode == pg_log_entry_t::LOST_REVERT ||
-          mode == pg_log_entry_t::LOST_DELETE);
+               mode == pg_log_entry_t::LOST_DELETE);
 
     if (!is_primary()) {
       ss << "not primary";
-      return -EROFS;
+      ret = -EROFS;
+      goto out;
     }
 
-    uint64_t unfound = missing_loc.num_unfound();
+    uint64_t unfound = recovery_state.get_missing_loc().num_unfound();
     if (!unfound) {
       ss << "pg has no unfound objects";
-      return 0;  // make command idempotent
+      goto out;  // make command idempotent
     }
 
-    if (!all_unfound_are_queried_or_lost(get_osdmap())) {
+    if (!recovery_state.all_unfound_are_queried_or_lost(get_osdmap())) {
       ss << "pg has " << unfound
         << " unfound objects but we haven't probed all sources, not marking lost";
-      return -EINVAL;
+      ret = -EINVAL;
+      goto out;
     }
 
-    mark_all_unfound_lost(mode, con, tid);
-    return -EAGAIN;
+    mark_all_unfound_lost(mode, on_finish);
+    return;
   }
-  else if (command == "list_unfound") {
+
+  else if (prefix == "list_unfound") {
     hobject_t offset;
     string offset_json;
     bool show_offset = false;
-    if (cmd_getval(cct, cmdmap, "offset", offset_json)) {
+    if (cmd_getval(cmdmap, "offset", offset_json)) {
       json_spirit::Value v;
       try {
        if (!json_spirit::read(offset_json, v))
@@ -1112,7 +1050,8 @@ int PrimaryLogPG::do_command(
        offset.decode(v);
       } catch (std::runtime_error& e) {
        ss << "error parsing offset: " << e.what();
-       return -EINVAL;
+       ret = -EINVAL;
+       goto out;
       }
       show_offset = true;
     }
@@ -1122,7 +1061,8 @@ int PrimaryLogPG::do_command(
       offset.dump(f.get());
       f->close_section();
     }
-    auto &needs_recovery_map = missing_loc.get_needs_recovery();
+    auto &needs_recovery_map = recovery_state.get_missing_loc()
+      .get_needs_recovery();
     f->dump_int("num_missing", needs_recovery_map.size());
     f->dump_int("num_unfound", get_num_unfound());
     map<hobject_t, pg_missing_item>::const_iterator p =
@@ -1130,8 +1070,10 @@ int PrimaryLogPG::do_command(
     {
       f->open_array_section("objects");
       int32_t num = 0;
-      for (; p != needs_recovery_map.end() && num < cct->_conf->osd_command_max_records; ++p) {
-        if (missing_loc.is_unfound(p->first)) {
+      for (; p != needs_recovery_map.end() &&
+            num < cct->_conf->osd_command_max_records;
+          ++p) {
+        if (recovery_state.get_missing_loc().is_unfound(p->first)) {
          f->open_object_section("object");
          {
            f->open_object_section("oid");
@@ -1141,11 +1083,10 @@ int PrimaryLogPG::do_command(
           p->second.dump(f.get()); // have, need keys
          {
            f->open_array_section("locations");
-            for (set<pg_shard_t>::iterator r =
-                missing_loc.get_locations(p->first).begin();
-                r != missing_loc.get_locations(p->first).end();
-                ++r)
-              f->dump_stream("shard") << *r;
+            for (auto &&r : recovery_state.get_missing_loc().get_locations(
+                  p->first)) {
+              f->dump_stream("shard") << r;
+           }
            f->close_section();
          }
          f->close_section();
@@ -1156,21 +1097,68 @@ int PrimaryLogPG::do_command(
     }
     f->dump_bool("more", p != needs_recovery_map.end());
     f->close_section();
-    f->flush(odata);
-    return 0;
   }
 
-  ss << "unknown pg command " << prefix;
-  return -EINVAL;
+  else if (prefix == "scrub" ||
+          prefix == "deep_scrub") {
+    bool deep = (prefix == "deep_scrub");
+    int64_t time;
+    cmd_getval(cmdmap, "time", time, (int64_t)0);
+
+    if (is_primary()) {
+      const pg_pool_t *p = &pool.info;
+      double pool_scrub_max_interval = 0;
+      double scrub_max_interval;
+      if (deep) {
+        p->opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &pool_scrub_max_interval);
+        scrub_max_interval = pool_scrub_max_interval > 0 ?
+          pool_scrub_max_interval : g_conf()->osd_deep_scrub_interval;
+      } else {
+        p->opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &pool_scrub_max_interval);
+        scrub_max_interval = pool_scrub_max_interval > 0 ?
+          pool_scrub_max_interval : g_conf()->osd_scrub_max_interval;
+      }
+      // Instead of marking must_scrub force a schedule scrub
+      utime_t stamp = ceph_clock_now();
+      if (time == 0)
+        stamp -= scrub_max_interval;
+      else
+        stamp -=  (float)time;
+      stamp -= 100.0;  // push back last scrub more for good measure
+      if (deep) {
+        set_last_deep_scrub_stamp(stamp);
+      } else {
+        set_last_scrub_stamp(stamp);
+      }
+      f->open_object_section("result");
+      f->dump_bool("deep", deep);
+      f->dump_stream("stamp") << stamp;
+      f->close_section();
+    } else {
+      ss << "Not primary";
+      ret = -EPERM;
+    }
+    outbl.append(ss.str());
+  }
+
+  else {
+    ret = -ENOSYS;
+    ss << "prefix '" << prefix << "' not implemented";
+  }
+
+ out:
+  if (ret >= 0 && outbl.length() == 0) {
+    f->flush(outbl);
+  }
+  on_finish(ret, ss.str(), outbl);
 }
 
+
 // ==========================================================
 
 void PrimaryLogPG::do_pg_op(OpRequestRef op)
 {
-  // NOTE: this is non-const because we modify the OSDOp.outdata in
-  // place
-  MOSDOp *m = static_cast<MOSDOp *>(op->get_nonconst_req());
+  const MOSDOp *m = static_cast<const MOSDOp *>(op->get_req());
   ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
   dout(10) << "do_pg_op " << *m << dendl;
 
@@ -1178,14 +1166,13 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
 
   int result = 0;
   string cname, mname;
-  PGLSFilter *filter = NULL;
-  bufferlist filter_out;
 
   snapid_t snapid = m->get_snapid();
 
   vector<OSDOp> ops = m->ops;
 
   for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
+    std::unique_ptr<const PGLSFilter> filter;
     OSDOp& osd_op = *p;
     auto bp = p->indata.cbegin();
     switch (p->op.op) {
@@ -1199,11 +1186,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
        result = -EINVAL;
        break;
       }
-      if (filter) {
-       delete filter;
-       filter = NULL;
-      }
-      result = get_pgls_filter(bp, &filter);
+      std::tie(result, filter) = get_pgls_filter(bp);
       if (result < 0)
         break;
 
@@ -1267,12 +1250,12 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
        }
 
        map<hobject_t, pg_missing_item>::const_iterator missing_iter =
-         pg_log.get_missing().get_items().lower_bound(current);
+         recovery_state.get_pg_log().get_missing().get_items().lower_bound(current);
        vector<hobject_t>::iterator ls_iter = sentries.begin();
        hobject_t _max = hobject_t::get_max();
        while (1) {
          const hobject_t &mcand =
-           missing_iter == pg_log.get_missing().get_items().end() ?
+           missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() ?
            _max :
            missing_iter->first;
          const hobject_t &lcand =
@@ -1317,7 +1300,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          if (candidate.get_namespace() == cct->_conf->osd_hit_set_namespace)
            continue;
 
-         if (missing_loc.is_deleted(candidate))
+         if (recovery_state.get_missing_loc().is_deleted(candidate))
            continue;
 
          // skip wrong namespace
@@ -1325,7 +1308,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
                candidate.get_namespace() != m->get_hobj().nspace)
            continue;
 
-         if (filter && !pgls_filter(filter, candidate, filter_out))
+         if (filter && !pgls_filter(*filter, candidate))
            continue;
 
           dout(20) << "pgnls item 0x" << std::hex
@@ -1342,7 +1325,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
        }
 
        if (next.is_max() &&
-           missing_iter == pg_log.get_missing().get_items().end() &&
+           missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() &&
            ls_iter == sentries.end()) {
          result = 1;
 
@@ -1354,8 +1337,6 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
         }
         dout(10) << "pgnls handle=" << response.handle << dendl;
        encode(response, osd_op.outdata);
-       if (filter)
-         encode(filter_out, osd_op.outdata);
        dout(10) << " pgnls result=" << result << " outdata.length()="
                 << osd_op.outdata.length() << dendl;
       }
@@ -1371,11 +1352,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
        result = -EINVAL;
        break;
       }
-      if (filter) {
-       delete filter;
-       filter = NULL;
-      }
-      result = get_pgls_filter(bp, &filter);
+      std::tie(result, filter) = get_pgls_filter(bp);
       if (result < 0)
         break;
 
@@ -1423,15 +1400,15 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          break;
        }
 
-       ceph_assert(snapid == CEPH_NOSNAP || pg_log.get_missing().get_items().empty());
+       ceph_assert(snapid == CEPH_NOSNAP || recovery_state.get_pg_log().get_missing().get_items().empty());
 
        map<hobject_t, pg_missing_item>::const_iterator missing_iter =
-         pg_log.get_missing().get_items().lower_bound(current);
+         recovery_state.get_pg_log().get_missing().get_items().lower_bound(current);
        vector<hobject_t>::iterator ls_iter = sentries.begin();
        hobject_t _max = hobject_t::get_max();
        while (1) {
          const hobject_t &mcand =
-           missing_iter == pg_log.get_missing().get_items().end() ?
+           missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() ?
            _max :
            missing_iter->first;
          const hobject_t &lcand =
@@ -1472,24 +1449,22 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          if (candidate.get_namespace() != m->get_hobj().nspace)
            continue;
 
-         if (missing_loc.is_deleted(candidate))
+         if (recovery_state.get_missing_loc().is_deleted(candidate))
            continue;
 
-         if (filter && !pgls_filter(filter, candidate, filter_out))
+         if (filter && !pgls_filter(*filter, candidate))
            continue;
 
          response.entries.push_back(make_pair(candidate.oid,
                                               candidate.get_key()));
        }
        if (next.is_max() &&
-           missing_iter == pg_log.get_missing().get_items().end() &&
+           missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() &&
            ls_iter == sentries.end()) {
          result = 1;
        }
        response.handle = next;
        encode(response, osd_op.outdata);
-       if (filter)
-         encode(filter_out, osd_op.outdata);
        dout(10) << " pgls result=" << result << " outdata.length()="
                 << osd_op.outdata.length() << dendl;
       }
@@ -1542,7 +1517,6 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          }
          if (is_unreadable_object(oid)) {
            wait_for_unreadable_object(oid, op);
-            delete filter;
            return;
          }
          result = osd->store->read(ch, ghobject_t(oid), 0, 0, osd_op.outdata);
@@ -1571,10 +1545,9 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
   reply->set_result(result);
   reply->set_reply_versions(info.last_update, info.last_user_version);
   osd->send_message_osd_client(reply, m->get_connection());
-  delete filter;
 }
 
-int PrimaryLogPG::do_scrub_ls(MOSDOp *m, OSDOp *osd_op)
+int PrimaryLogPG::do_scrub_ls(const MOSDOp *m, OSDOp *osd_op)
 {
   if (m->get_pg() != info.pgid.pgid) {
     dout(10) << " scrubls pg=" << m->get_pg() << " != " << info.pgid << dendl;
@@ -1609,105 +1582,6 @@ int PrimaryLogPG::do_scrub_ls(MOSDOp *m, OSDOp *osd_op)
   return r;
 }
 
-void PrimaryLogPG::calc_trim_to()
-{
-  size_t target = cct->_conf->osd_min_pg_log_entries;
-  if (is_degraded() ||
-      state_test(PG_STATE_RECOVERING |
-                 PG_STATE_RECOVERY_WAIT |
-                 PG_STATE_BACKFILLING |
-                 PG_STATE_BACKFILL_WAIT |
-                 PG_STATE_BACKFILL_TOOFULL)) {
-    target = cct->_conf->osd_max_pg_log_entries;
-  }
-
-  eversion_t limit = std::min(
-    min_last_complete_ondisk,
-    pg_log.get_can_rollback_to());
-  if (limit != eversion_t() &&
-      limit != pg_trim_to &&
-      pg_log.get_log().approx_size() > target) {
-    size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target,
-                             cct->_conf->osd_pg_log_trim_max);
-    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
-        cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
-      return;
-    }
-    list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
-    eversion_t new_trim_to;
-    for (size_t i = 0; i < num_to_trim; ++i) {
-      new_trim_to = it->version;
-      ++it;
-      if (new_trim_to > limit) {
-        new_trim_to = limit;
-        dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
-        break;
-      }
-    }
-    dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
-    pg_trim_to = new_trim_to;
-    assert(pg_trim_to <= pg_log.get_head());
-    assert(pg_trim_to <= min_last_complete_ondisk);
-  }
-}
-
-void PrimaryLogPG::calc_trim_to_aggressive()
-{
-  size_t target = cct->_conf->osd_min_pg_log_entries;
-  if (is_degraded() ||
-      state_test(PG_STATE_RECOVERING |
-                PG_STATE_RECOVERY_WAIT |
-                PG_STATE_BACKFILLING |
-                PG_STATE_BACKFILL_WAIT |
-                PG_STATE_BACKFILL_TOOFULL)) {
-    target = cct->_conf->osd_max_pg_log_entries;
-  }
-  // limit pg log trimming up to the can_rollback_to value
-  eversion_t limit = std::min(
-    pg_log.get_head(),
-    pg_log.get_can_rollback_to());
-  dout(10) << __func__ << " limit = " << limit << dendl;
-
-  if (limit != eversion_t() &&
-      limit != pg_trim_to &&
-      pg_log.get_log().approx_size() > target) {
-    dout(10) << __func__ << " approx pg log length =  "
-             << pg_log.get_log().approx_size() << dendl;
-    uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target,
-                                              cct->_conf->osd_pg_log_trim_max);
-    dout(10) << __func__ << " num_to_trim =  " << num_to_trim << dendl;
-    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
-       cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
-      return;
-    }
-    auto it = pg_log.get_log().log.begin(); // oldest log entry
-    auto rit = pg_log.get_log().log.rbegin();
-    eversion_t by_n_to_keep; // start from tail
-    eversion_t by_n_to_trim = eversion_t::max(); // start from head
-    for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) {
-      i++;
-      if (i > target && by_n_to_keep == eversion_t()) {
-        by_n_to_keep = rit->version;
-      }
-      if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) {
-        by_n_to_trim = it->version;
-      }
-      if (by_n_to_keep != eversion_t() &&
-          by_n_to_trim != eversion_t::max()) {
-        break;
-      }
-    }
-
-    if (by_n_to_keep == eversion_t()) {
-      return;
-    }
-
-    pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit});
-    dout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl;
-    ceph_assert(pg_trim_to <= pg_log.get_head());
-  }
-}
-
 PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap,
                           const PGPool &_pool,
                           const map<string,string>& ec_profile, spg_t p) :
@@ -1716,12 +1590,11 @@ PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap,
     PGBackend::build_pg_backend(
       _pool.info, ec_profile, this, coll_t(p), ch, o->store, cct)),
   object_contexts(o->cct, o->cct->_conf->osd_pg_object_context_cache_count),
-  snapset_contexts_lock("PrimaryLogPG::snapset_contexts_lock"),
   new_backfill(false),
   temp_seq(0),
   snap_trimmer_machine(this)
 { 
-  missing_loc.set_backend_predicates(
+  recovery_state.set_backend_predicates(
     pgbackend->get_is_readable_predicate(),
     pgbackend->get_is_recoverable_predicate());
   snap_trimmer_machine.initiate();
@@ -1736,8 +1609,8 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
 
 void PrimaryLogPG::handle_backoff(OpRequestRef& op)
 {
-  const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
-  SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
+  auto m = op->get_req<MOSDBackoff>();
+  auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
   if (!session)
     return;  // drop it.
   hobject_t begin = info.pgid.pgid.get_hobj_start();
@@ -1788,7 +1661,7 @@ void PrimaryLogPG::do_request(
   const Message *m = op->get_req();
   int msg_type = m->get_type();
   if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
-    SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
+    auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
     if (!session)
       return;  // drop it.
 
@@ -1835,16 +1708,14 @@ void PrimaryLogPG::do_request(
     }
   }
 
-  if (flushes_in_progress > 0) {
-    dout(20) << flushes_in_progress
-            << " flushes_in_progress pending "
-            << "waiting for flush on " << op << dendl;
+  if (recovery_state.needs_flush()) {
+    dout(20) << "waiting for flush on " << op << dendl;
     waiting_for_flush.push_back(op);
     op->mark_delayed("waiting for flush");
     return;
   }
 
-  ceph_assert(is_peered() && flushes_in_progress == 0);
+  ceph_assert(is_peered() && !recovery_state.needs_flush());
   if (pgbackend->handle_message(op))
     return;
 
@@ -1888,8 +1759,7 @@ void PrimaryLogPG::do_request(
 
   case MSG_OSD_SCRUB_RESERVE:
     {
-      const MOSDScrubReserve *m =
-       static_cast<const MOSDScrubReserve*>(op->get_req());
+      auto m = op->get_req<MOSDScrubReserve>();
       switch (m->type) {
       case MOSDScrubReserve::REQUEST:
        handle_scrub_reserve_request(op);
@@ -1931,14 +1801,9 @@ void PrimaryLogPG::do_request(
 hobject_t PrimaryLogPG::earliest_backfill() const
 {
   hobject_t e = hobject_t::get_max();
-  for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
-       ++i) {
-    pg_shard_t bt = *i;
-    map<pg_shard_t, pg_info_t>::const_iterator iter = peer_info.find(bt);
-    ceph_assert(iter != peer_info.end());
-    if (iter->second.last_backfill < e)
-      e = iter->second.last_backfill;
+  for (const pg_shard_t& bt : get_backfill_targets()) {
+    const pg_info_t &pi = recovery_state.get_peer_info(bt);
+    e = std::min(pi.last_backfill, e);
   }
   return e;
 }
@@ -1961,8 +1826,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
 
   dout(20) << __func__ << ": op " << *m << dendl;
 
-  hobject_t head = m->get_hobj();
-  head.snap = CEPH_NOSNAP;
+  const hobject_t head = m->get_hobj().get_head();
 
   if (!info.pgid.pgid.contains(
        info.pgid.pgid.get_split_bits(pool.info.get_pg_num()), head)) {
@@ -1977,7 +1841,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
 
   bool can_backoff =
     m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
-  SessionRef session;
+  ceph::ref_t<Session> session;
   if (can_backoff) {
     session = static_cast<Session*>(m->get_connection()->get_priv().get());
     if (!session.get()) {
@@ -1997,8 +1861,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     return;
   }
 
-  if (op->rmw_flags == 0) {
-    int r = osd->osd->init_op_flags(op);
+  {
+    int r = op->maybe_init_op_info(*get_osdmap());
     if (r) {
       osd->reply_op_error(op, r);
       return;
@@ -2010,7 +1874,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       op->may_read() &&
       !(op->may_write() || op->may_cache())) {
     // balanced reads; any replica will do
-    if (!(is_primary() || is_replica())) {
+    if (!(is_primary() || is_nonprimary())) {
       osd->handle_misdirected_op(this, op);
       return;
     }
@@ -2022,6 +1886,10 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
   }
 
+  if (!check_laggy(op)) {
+    return;
+  }
+
   if (!op_has_sufficient_caps(op)) {
     osd->reply_op_error(op, -EPERM);
     return;
@@ -2143,7 +2011,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
     if (can_backoff &&
        (g_conf()->osd_backoff_on_degraded ||
-        (g_conf()->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
+        (g_conf()->osd_backoff_on_unfound &&
+         recovery_state.get_missing_loc().is_unfound(head)))) {
       add_backoff(session, head, head);
       maybe_kick_recovery(head);
     } else {
@@ -2170,6 +2039,9 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       op->mark_delayed("waiting for scrub");
       return;
     }
+    if (!check_laggy_requeue(op)) {
+      return;
+    }
 
     // blocked on snap?
     if (auto blocked_iter = objects_blocked_on_degraded_snap.find(head);
@@ -2200,17 +2072,19 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     eversion_t version;
     version_t user_version;
     int return_code = 0;
+    vector<pg_log_op_return_item_t> op_returns;
     bool got = check_in_progress_op(
-      m->get_reqid(), &version, &user_version, &return_code);
+      m->get_reqid(), &version, &user_version, &return_code, &op_returns);
     if (got) {
       dout(3) << __func__ << " dup " << m->get_reqid()
              << " version " << version << dendl;
       if (already_complete(version)) {
-       osd->reply_op_error(op, return_code, version, user_version);
+       osd->reply_op_error(op, return_code, version, user_version, op_returns);
       } else {
        dout(10) << " waiting for " << version << " to commit" << dendl;
         // always queue ondisk waiters, so that we can requeue if needed
-       waiting_for_ondisk[version].emplace_back(op, user_version, return_code);
+       waiting_for_ondisk[version].emplace_back(op, user_version, return_code,
+                                                op_returns);
        op->mark_delayed("waiting for ondisk");
       }
       return;
@@ -2222,12 +2096,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
   hobject_t missing_oid;
 
   // kludge around the fact that LIST_SNAPS sets CEPH_SNAPDIR for LIST_SNAPS
-  hobject_t _oid_head;
-  if (m->get_snapid() == CEPH_SNAPDIR) {
-    _oid_head = m->get_hobj().get_head();
-  }
   const hobject_t& oid =
-    m->get_snapid() == CEPH_SNAPDIR ? _oid_head : m->get_hobj();
+    m->get_snapid() == CEPH_SNAPDIR ? head : m->get_hobj();
 
   // make sure LIST_SNAPS is on CEPH_SNAPDIR and nothing else
   for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); ++p) {
@@ -2254,6 +2124,19 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     return;
   }
 
+  if (!is_primary()) {
+    if (!recovery_state.can_serve_replica_read(oid)) {
+      dout(20) << __func__ << ": oid " << oid
+              << " unstable write on replica, bouncing to primary."
+              << *m << dendl;
+      osd->reply_op_error(op, -EAGAIN);
+      return;
+    } else {
+      dout(20) << __func__ << ": serving replica read on oid" << oid
+              << dendl;
+    }
+  }
+
   int r = find_object_context(
     oid, &obc, can_create,
     m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
@@ -2343,7 +2226,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
     dout(20) << __func__ << ": find_object_context got error " << r << dendl;
     if (op->may_write() &&
-       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+       get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2395,13 +2278,14 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
 
   if (r) {
     dout(20) << __func__ << " returned an error: " << r << dendl;
-    close_op_ctx(ctx);
     if (op->may_write() &&
-       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
-      record_write_error(op, oid, nullptr, r);
+       get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
+      record_write_error(op, oid, nullptr, r,
+                        ctx->op->allows_returnvec() ? ctx : nullptr);
     } else {
       osd->reply_op_error(op, r);
     }
+    close_op_ctx(ctx);
     return;
   }
 
@@ -2455,8 +2339,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
   ObjectContextRef obc)
 {
   ceph_assert(obc);
-  if (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
-      CEPH_OSD_FLAG_IGNORE_REDIRECT) {
+  if (op->get_req<MOSDOp>()->get_flags() & CEPH_OSD_FLAG_IGNORE_REDIRECT) {
     dout(20) << __func__ << ": ignoring redirect due to flag" << dendl;
     return cache_result_t::NOOP;
   }
@@ -2468,14 +2351,25 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
     return cache_result_t::NOOP;
   }
 
-  vector<OSDOp> ops = static_cast<const MOSDOp*>(op->get_req())->ops;
+  vector<OSDOp> ops = op->get_req<MOSDOp>()->ops;
   for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
     OSDOp& osd_op = *p;
     ceph_osd_op& op = osd_op.op;
     if (op.op == CEPH_OSD_OP_SET_REDIRECT ||
        op.op == CEPH_OSD_OP_SET_CHUNK || 
-       op.op == CEPH_OSD_OP_TIER_PROMOTE ||
-       op.op == CEPH_OSD_OP_UNSET_MANIFEST) {
+       op.op == CEPH_OSD_OP_UNSET_MANIFEST ||
+       op.op == CEPH_OSD_OP_TIER_FLUSH) {
+      return cache_result_t::NOOP;
+    } else if (op.op == CEPH_OSD_OP_TIER_PROMOTE) {
+      bool is_dirty = false;
+      for (auto& p : obc->obs.oi.manifest.chunk_map) {
+       if (p.second.is_dirty()) {
+         is_dirty = true;
+       }
+      }
+      if (is_dirty) {
+       start_flush(OpRequestRef(), obc, true, NULL, std::nullopt);
+      }
       return cache_result_t::NOOP;
     }
   }
@@ -2520,10 +2414,13 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
        op->mark_delayed("waiting for scrub");
        return cache_result_t::BLOCKED_RECOVERY;
       }
+      if (!check_laggy_requeue(op)) {
+       return cache_result_t::BLOCKED_RECOVERY;
+      }
       
       for (auto& p : obc->obs.oi.manifest.chunk_map) {
        if (p.second.is_missing()) {
-         const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+         auto m = op->get_req<MOSDOp>();
          const object_locator_t oloc = m->get_object_locator();
          promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
          return cache_result_t::BLOCKED_PROMOTE;
@@ -2537,7 +2434,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
        }
       }
       if (all_dirty) {
-       start_flush(OpRequestRef(), obc, true, NULL, boost::none);
+       start_flush(OpRequestRef(), obc, true, NULL, std::nullopt);
       }
       return cache_result_t::NOOP;
     }
@@ -2563,10 +2460,9 @@ struct C_ManifestFlush : public Context {
   void finish(int r) override {
     if (r == -ECANCELED)
       return;
-    pg->lock();
+    std::scoped_lock locker{*pg};
     pg->handle_manifest_flush(oid, tid, r, offset, last_offset, lpr);
     pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start);
-    pg->unlock();
   }
 };
 
@@ -2599,7 +2495,7 @@ void PrimaryLogPG::handle_manifest_flush(hobject_t oid, ceph_tid_t tid, int r,
 }
 
 int PrimaryLogPG::start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
-                                      boost::optional<std::function<void()>> &&on_flush)
+                                      std::optional<std::function<void()>> &&on_flush)
 {
   auto p = obc->obs.oi.manifest.chunk_map.begin();
   FlushOpRef manifest_fop(std::make_shared<FlushOp>());
@@ -2663,44 +2559,47 @@ int PrimaryLogPG::do_manifest_flush(OpRequestRef op, ObjectContextRef obc, Flush
     unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
                     CEPH_OSD_FLAG_RWORDERED;
     tgt_length = chunk_data.length();
-    pg_pool_t::fingerprint_t fp_algo_t = pool.info.get_fingerprint_type();
-    if (iter->second.has_reference() &&
-       fp_algo_t != pg_pool_t::TYPE_FINGERPRINT_NONE) {
-      switch (fp_algo_t) {
+    if (pg_pool_t::fingerprint_t fp_algo = pool.info.get_fingerprint_type();
+       iter->second.has_reference() &&
+       fp_algo != pg_pool_t::TYPE_FINGERPRINT_NONE) {
+      object_t fp_oid = [fp_algo, &chunk_data]() -> string {
+        switch (fp_algo) {
        case pg_pool_t::TYPE_FINGERPRINT_SHA1:
-         {
-           sha1_digest_t sha1r = chunk_data.sha1();
-           object_t fp_oid = sha1r.to_str();
-           bufferlist in;
-           if (fp_oid != tgt_soid.oid) {
-             // decrement old chunk's reference count 
-             ObjectOperation dec_op;
-             cls_chunk_refcount_put_op put_call;
-             ::encode(put_call, in);                             
-             dec_op.call("refcount", "chunk_put", in);         
-             // we don't care dec_op's completion. scrub for dedup will fix this.
-             tid = osd->objecter->mutate(
-               tgt_soid.oid, oloc, dec_op, snapc,
-               ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
-               flags, NULL);
-             in.clear();
-           }
-           tgt_soid.oid = fp_oid;
-           iter->second.oid = tgt_soid;
-           // add data op
-           ceph_osd_op osd_op;
-           osd_op.extent.offset = 0;
-           osd_op.extent.length = chunk_data.length();
-           encode(osd_op, in);
-           encode(soid, in);
-           in.append(chunk_data);
-           obj_op.call("cas", "cas_write_or_get", in);
-           break;
-         }
+         return crypto::digest<crypto::SHA1>(chunk_data).to_str();
+       case pg_pool_t::TYPE_FINGERPRINT_SHA256:
+         return crypto::digest<crypto::SHA256>(chunk_data).to_str();
+       case pg_pool_t::TYPE_FINGERPRINT_SHA512:
+         return crypto::digest<crypto::SHA512>(chunk_data).to_str();
        default:
          assert(0 == "unrecognized fingerprint type");
-         break;
-      }
+         return {};
+       }
+      }();
+      bufferlist in;
+      if (fp_oid != tgt_soid.oid) {
+       // decrement old chunk's reference count 
+       ObjectOperation dec_op;
+       cls_chunk_refcount_put_op put_call;
+       put_call.source = soid;
+       ::encode(put_call, in);                             
+       dec_op.call("cas", "chunk_put", in);         
+       // we don't care dec_op's completion. scrub for dedup will fix this.
+       tid = osd->objecter->mutate(
+         tgt_soid.oid, oloc, dec_op, snapc,
+         ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
+         flags, NULL);
+       in.clear();
+      }
+      tgt_soid.oid = fp_oid;
+      iter->second.oid = tgt_soid;
+      // add data op
+      ceph_osd_op osd_op;
+      osd_op.extent.offset = 0;
+      osd_op.extent.length = chunk_data.length();
+      encode(osd_op, in);
+      encode(soid, in);
+      in.append(chunk_data);
+      obj_op.call("cas", "cas_write_or_get", in);
     } else {
       obj_op.add_data(CEPH_OSD_OP_WRITE, tgt_offset, tgt_length, chunk_data);
     }
@@ -2710,11 +2609,10 @@ int PrimaryLogPG::do_manifest_flush(OpRequestRef op, ObjectContextRef obc, Flush
     fin->last_offset = last_offset;
     manifest_fop->chunks++;
 
-    unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
     tid = osd->objecter->mutate(
       tgt_soid.oid, oloc, obj_op, snapc,
       ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
-      flags, new C_OnFinisher(fin, osd->objecter_finishers[n]));
+      flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())));
     fin->tid = tid;
     manifest_fop->io_tids[iter->first] = tid;
 
@@ -2752,15 +2650,20 @@ void PrimaryLogPG::finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r,
 }
 
 void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
-                                     MOSDOpReply *orig_reply, int r)
+                                     MOSDOpReply *orig_reply, int r,
+                                     OpContext *ctx_for_op_returns)
 {
   dout(20) << __func__ << " r=" << r << dendl;
   ceph_assert(op->may_write());
-  const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
+  const osd_reqid_t &reqid = op->get_req<MOSDOp>()->get_reqid();
   mempool::osd_pglog::list<pg_log_entry_t> entries;
   entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
                                   get_next_version(), eversion_t(), 0,
                                   reqid, utime_t(), r));
+  if (ctx_for_op_returns) {
+    entries.back().set_op_returns(*ctx_for_op_returns->ops);
+    dout(20) << __func__ << " op_returns=" << entries.back().op_returns << dendl;
+  }
 
   struct OnComplete {
     PrimaryLogPG *pg;
@@ -2777,13 +2680,8 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
       {}
     void operator()() {
       ldpp_dout(pg, 20) << "finished " << __func__ << " r=" << r << dendl;
-      const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
-      int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      auto m = op->get_req<MOSDOp>();
       MOSDOpReply *reply = orig_reply.detach();
-      if (reply == nullptr) {
-       reply = new MOSDOpReply(m, r, pg->get_osdmap_epoch(),
-                               flags, true);
-      }
       ldpp_dout(pg, 10) << " sending commit on " << *m << " " << reply << dendl;
       pg->osd->send_message_osd_client(reply, m->get_connection());
     }
@@ -2793,7 +2691,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
   submit_log_entries(
     entries,
     std::move(lock_manager),
-    boost::optional<std::function<void(void)> >(
+    std::optional<std::function<void(void)> >(
       OnComplete(this, op, orig_reply, r)),
     op,
     r);
@@ -2815,7 +2713,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
   if (op &&
       op->get_req() &&
       op->get_req()->get_type() == CEPH_MSG_OSD_OP &&
-      (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
+      (op->get_req<MOSDOp>()->get_flags() &
        CEPH_OSD_FLAG_IGNORE_CACHE)) {
     dout(20) << __func__ << ": ignoring cache due to flag" << dendl;
     return cache_result_t::NOOP;
@@ -2863,7 +2761,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
     missing_oid = obc->obs.oi.soid;
   }
 
-  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+  auto m = op->get_req<MOSDOp>();
   const object_locator_t oloc = m->get_object_locator();
 
   if (op->need_skip_handle_cache()) {
@@ -2926,11 +2824,6 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
     ceph_abort_msg("unreachable");
     return cache_result_t::NOOP;
 
-  case pg_pool_t::CACHEMODE_FORWARD:
-    // FIXME: this mode allows requests to be reordered.
-    do_cache_redirect(op);
-    return cache_result_t::HANDLED_REDIRECT;
-
   case pg_pool_t::CACHEMODE_READONLY:
     // TODO: clean this case up
     if (!obc.get() && r == -ENOENT) {
@@ -2945,23 +2838,8 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
     // crap, there was a failure of some kind
     return cache_result_t::NOOP;
 
-  case pg_pool_t::CACHEMODE_READFORWARD:
-    // Do writeback to the cache tier for writes
-    if (op->may_write() || write_ordered || must_promote) {
-      if (agent_state &&
-         agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
-       dout(20) << __func__ << " cache pool full, waiting" << dendl;
-       block_write_on_full_cache(missing_oid, op);
-       return cache_result_t::BLOCKED_FULL;
-      }
-      promote_object(obc, missing_oid, oloc, op, promote_obc);
-      return cache_result_t::BLOCKED_PROMOTE;
-    }
-
-    // If it is a read, we can read, we need to forward it
-    do_cache_redirect(op);
-    return cache_result_t::HANDLED_REDIRECT;
-
+  case pg_pool_t::CACHEMODE_FORWARD:
+    // this mode is deprecated; proxy instead
   case pg_pool_t::CACHEMODE_PROXY:
     if (!must_promote) {
       if (op->may_write() || op->may_cache() || write_ordered) {
@@ -2982,6 +2860,8 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
     promote_object(obc, missing_oid, oloc, op, promote_obc);
     return cache_result_t::BLOCKED_PROMOTE;
 
+  case pg_pool_t::CACHEMODE_READFORWARD:
+    // this mode is deprecated; proxy instead
   case pg_pool_t::CACHEMODE_READPROXY:
     // Do writeback to the cache tier for writes
     if (op->may_write() || write_ordered || must_promote) {
@@ -3065,7 +2945,7 @@ bool PrimaryLogPG::maybe_promote(ObjectContextRef obc,
 
 void PrimaryLogPG::do_cache_redirect(OpRequestRef op)
 {
-  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+  auto m = op->get_req<MOSDOp>();
   int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
   MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap_epoch(),
                                        flags, false);
@@ -3092,16 +2972,14 @@ struct C_ProxyRead : public Context {
   void finish(int r) override {
     if (prdop->canceled)
       return;
-    pg->lock();
+    std::scoped_lock locker{*pg};
     if (prdop->canceled) {
-      pg->unlock();
       return;
     }
     if (last_peering_reset == pg->get_last_peering_reset()) {
       pg->finish_proxy_read(oid, tid, r);
       pg->osd->logger->tinc(l_osd_tier_r_lat, ceph_clock_now() - start);
     }
-    pg->unlock();
   }
 };
 
@@ -3125,9 +3003,8 @@ struct C_ProxyChunkRead : public Context {
   void finish(int r) override {
     if (prdop->canceled)
       return;
-    pg->lock();
+    std::scoped_lock locker{*pg};
     if (prdop->canceled) {
-      pg->unlock();
       return;
     }
     if (last_peering_reset == pg->get_last_peering_reset()) {
@@ -3146,8 +3023,9 @@ struct C_ProxyChunkRead : public Context {
        } else {
          copy_offset = 0;
        }
-       prdop->ops[op_index].outdata.copy_in(copy_offset, obj_op->ops[0].outdata.length(),
-                                            obj_op->ops[0].outdata.c_str());
+       prdop->ops[op_index].outdata.begin(copy_offset).copy_in(
+          obj_op->ops[0].outdata.length(),
+          obj_op->ops[0].outdata.c_str());
       }        
       
       pg->finish_proxy_read(oid, tid, r);
@@ -3156,7 +3034,6 @@ struct C_ProxyChunkRead : public Context {
        delete obj_op;
       }
     }
-    pg->unlock();
   }
 };
 
@@ -3219,11 +3096,10 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
 
   C_ProxyRead *fin = new C_ProxyRead(this, soid, get_last_peering_reset(),
                                     prdop);
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
   ceph_tid_t tid = osd->objecter->read(
     soid.oid, oloc, obj_op,
     m->get_snapid(), NULL,
-    flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+    flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
     &prdop->user_version,
     &prdop->data_offset,
     m->get_features());
@@ -3280,7 +3156,7 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
 
   osd->logger->inc(l_osd_tier_proxy_read);
 
-  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+  auto m = op->get_req<MOSDOp>();
   OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
   ctx->reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0, false);
   ctx->user_at_version = prdop->user_version;
@@ -3363,15 +3239,13 @@ struct C_ProxyWrite_Commit : public Context {
   void finish(int r) override {
     if (pwop->canceled)
       return;
-    pg->lock();
+    std::scoped_lock locker{*pg};
     if (pwop->canceled) {
-      pg->unlock();
       return;
     }
     if (last_peering_reset == pg->get_last_peering_reset()) {
       pg->finish_proxy_write(oid, tid, r);
     }
-    pg->unlock();
   }
 };
 
@@ -3403,6 +3277,10 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, ObjectContextRef obc)
   if (!(op->may_write() || op->may_cache())) {
     flags |= CEPH_OSD_FLAG_RWORDERED;
   }
+  if (op->allows_returnvec()) {
+    flags |= CEPH_OSD_FLAG_RETURNVEC;
+  }
+
   dout(10) << __func__ << " Start proxy write for " << *m << dendl;
 
   ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
@@ -3414,11 +3292,10 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, ObjectContextRef obc)
 
   C_ProxyWrite_Commit *fin = new C_ProxyWrite_Commit(
       this, soid, get_last_peering_reset(), pwop);
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
   ceph_tid_t tid = osd->objecter->mutate(
     soid.oid, oloc, obj_op, snapc,
     ceph::real_clock::from_ceph_timespec(pwop->mtime),
-    flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+    flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
     &pwop->user_version, pwop->reqid);
   fin->tid = tid;
   pwop->objecter_tid = tid;
@@ -3490,29 +3367,34 @@ void PrimaryLogPG::do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing
 
 struct RefCountCallback : public Context {
 public:
-  PrimaryLogPG *pg;
   PrimaryLogPG::OpContext *ctx;
   OSDOp& osd_op;
-  epoch_t last_peering_reset;
+  bool requeue = false;
     
-  RefCountCallback(PrimaryLogPG *pg, PrimaryLogPG::OpContext *ctx,
-                  OSDOp &osd_op, epoch_t lpr) 
-    : pg(pg), ctx(ctx), osd_op(osd_op), last_peering_reset(lpr)
-  {}
+  RefCountCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+    : ctx(ctx), osd_op(osd_op) {}
   void finish(int r) override {
-    pg->lock();
-    if (last_peering_reset == pg->get_last_peering_reset()) {
-      if (r >= 0) {
-       osd_op.rval = 0;
-       pg->execute_ctx(ctx);
-      } else {
-       if (ctx->op) {
-         pg->osd->reply_op_error(ctx->op, r);
-       }
-       pg->close_op_ctx(ctx);
+    // NB: caller must already have pg->lock held
+    ctx->obc->stop_block();
+    ctx->pg->kick_object_context_blocked(ctx->obc);
+    if (r >= 0) {
+      osd_op.rval = 0;
+      ctx->pg->execute_ctx(ctx);
+    } else {
+       // on cancel simply toss op out,
+       // or requeue as requested
+      if (r != -ECANCELED) {
+        if (ctx->op)
+          ctx->pg->osd->reply_op_error(ctx->op, r);
+      } else if (requeue) {
+        if (ctx->op)
+          ctx->pg->requeue_op(ctx->op);
       }
+      ctx->pg->close_op_ctx(ctx);
     }
-    pg->unlock();
+  }
+  void set_requeue(bool rq) {
+    requeue = rq;
   }
 };
 
@@ -3527,8 +3409,45 @@ struct SetManifestFinisher : public PrimaryLogPG::OpFinisher {
   }
 };
 
+struct C_SetManifestRefCountDone : public Context {
+  RefCountCallback* cb;
+  hobject_t soid;
+  C_SetManifestRefCountDone(
+    RefCountCallback* cb, hobject_t soid) : cb(cb), soid(soid) {}
+  void finish(int r) override {
+    if (r == -ECANCELED)
+      return;
+    auto pg = cb->ctx->pg;
+    std::scoped_lock locker{*pg};
+    auto it = pg->manifest_ops.find(soid);
+    if (it == pg->manifest_ops.end()) {
+      // raced with cancel_manifest_ops
+      return;
+    }
+    pg->manifest_ops.erase(it);
+    cb->complete(r);
+  }
+};
+
+void PrimaryLogPG::cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids)
+{
+  dout(10) << __func__ << dendl;
+  auto p = manifest_ops.begin();
+  while (p != manifest_ops.end()) {
+    auto mop = p->second;
+    // cancel objecter op, if we can
+    if (mop->objecter_tid) {
+      tids->push_back(mop->objecter_tid);
+      mop->objecter_tid = 0;
+    }
+    mop->cb->set_requeue(requeue);
+    mop->cb->complete(-ECANCELED);
+    manifest_ops.erase(p++);
+  }
+}
+
 void PrimaryLogPG::refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
-                                     SnapContext snapc, bool get, Context *cb, uint64_t offset)
+                                     SnapContext snapc, bool get, RefCountCallback *cb, uint64_t offset)
 {
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
                    CEPH_OSD_FLAG_RWORDERED;                      
@@ -3549,18 +3468,21 @@ void PrimaryLogPG::refcount_manifest(ObjectContextRef obc, object_locator_t oloc
     obj_op.call("cas", "chunk_put", in);         
   }                                                     
   
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
-  Context *c;
+  Context *c = nullptr;
   if (cb) {
-    c = new C_OnFinisher(cb, osd->objecter_finishers[n]);
-  } else {
-    c = NULL;
+    C_SetManifestRefCountDone *fin =
+      new C_SetManifestRefCountDone(cb, obc->obs.oi.soid);
+    c = new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard()));
   }
 
-  osd->objecter->mutate(
+  auto tid = osd->objecter->mutate(
     soid.oid, oloc, obj_op, snapc,
     ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
     flags, c);
+  if (cb) {
+    manifest_ops[obc->obs.oi.soid] = std::make_shared<ManifestOp>(cb, tid);
+    obc->start_block();
+  }
 }  
 
 void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
@@ -3618,11 +3540,10 @@ void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc,
   fin->obc = obc;
   fin->req_total_len = req_total_len;
 
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
   ceph_tid_t tid = osd->objecter->read(
     soid.oid, oloc, obj_op,
     m->get_snapid(), NULL,
-    flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+    flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
     &prdop->user_version,
     &prdop->data_offset,
     m->get_features());
@@ -3721,19 +3642,17 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
 
   osd->logger->inc(l_osd_tier_proxy_write);
 
-  const MOSDOp *m = static_cast<const MOSDOp*>(pwop->op->get_req());
+  auto m = pwop->op->get_req<MOSDOp>();
   ceph_assert(m != NULL);
 
   if (!pwop->sent_reply) {
     // send commit.
-    MOSDOpReply *reply = pwop->ctx->reply;
-    if (reply)
-      pwop->ctx->reply = NULL;
-    else {
-      reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0, true);
-      reply->set_reply_versions(eversion_t(), pwop->user_version);
-    }
+    assert(pwop->ctx->reply == nullptr);
+    MOSDOpReply *reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0,
+                                        true /* we claim it below */);
+    reply->set_reply_versions(eversion_t(), pwop->user_version);
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+    reply->claim_op_out_data(pwop->ops);
     dout(10) << " sending commit on " << pwop << " " << reply << dendl;
     osd->send_message_osd_client(reply, m->get_connection());
     pwop->sent_reply = true;
@@ -3849,6 +3768,9 @@ void PrimaryLogPG::promote_object(ObjectContextRef obc,
     }
     return;
   }
+  if (op && !check_laggy_requeue(op)) {
+    return;
+  }
   if (!obc) { // we need to create an ObjectContext
     ceph_assert(missing_oid != hobject_t());
     obc = get_object_context(missing_oid, true);
@@ -3900,7 +3822,12 @@ void PrimaryLogPG::promote_object(ObjectContextRef obc,
 
   if (op)
     wait_for_blocked_object(obc->obs.oi.soid, op);
-  info.stats.stats.sum.num_promote++;
+
+  recovery_state.update_stats(
+    [](auto &history, auto &stats) {
+      stats.stats.sum.num_promote++;
+      return false;
+    });
 }
 
 void PrimaryLogPG::execute_ctx(OpContext *ctx)
@@ -3910,7 +3837,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
   ctx->reset_obs(ctx->obc);
   ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctx
   OpRequestRef op = ctx->op;
-  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+  auto m = op->get_req<MOSDOp>();
   ObjectContextRef obc = ctx->obc;
   const hobject_t& soid = obc->obs.oi.soid;
 
@@ -3993,24 +3920,33 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     return;
   }
 
-  bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
-  // prepare the reply
-  ctx->reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0,
-                              successful_write);
-
-  // Write operations aren't allowed to return a data payload because
-  // we can't do so reliably. If the client has to resend the request
-  // and it has already been applied, we will return 0 with no
-  // payload.  Non-deterministic behavior is no good.  However, it is
-  // possible to construct an operation that does a read, does a guard
-  // check (e.g., CMPXATTR), and then a write.  Then we either succeed
-  // with the write, or return a CMPXATTR and the read value.
-  if (successful_write) {
-    // write.  normalize the result code.
-    dout(20) << " zeroing write result code " << result << dendl;
-    result = 0;
+  bool ignore_out_data = false;
+  if (!ctx->op_t->empty() &&
+      op->may_write() &&
+      result >= 0) {
+    // successful update
+    if (ctx->op->allows_returnvec()) {
+      // enforce reasonable bound on the return buffer sizes
+      for (auto& i : *ctx->ops) {
+       if (i.outdata.length() > cct->_conf->osd_max_write_op_reply_len) {
+         dout(10) << __func__ << " op " << i << " outdata overflow" << dendl;
+         result = -EOVERFLOW;  // overall result is overflow
+         i.rval = -EOVERFLOW;
+         i.outdata.clear();
+       }
+      }
+    } else {
+      // legacy behavior -- zero result and return data etc.
+      ignore_out_data = true;
+      result = 0;
+    }
   }
-  ctx->reply->set_result(result);
+
+  // prepare the reply
+  ctx->reply = new MOSDOpReply(m, result, get_osdmap_epoch(), 0,
+                              ignore_out_data);
+  dout(20) << __func__ << " alloc reply " << ctx->reply
+          << " result " << result << dendl;
 
   // read or error?
   if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
@@ -4027,10 +3963,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
   ceph_assert(op->may_write() || op->may_cache());
 
   // trim log?
-  if (hard_limit_pglog())
-    calc_trim_to_aggressive();
-  else
-    calc_trim_to();
+  recovery_state.update_trim_to();
 
   // verify that we are doing this in order?
   if (cct->_conf->osd_debug_op_order && m->get_source().is_client() &&
@@ -4060,9 +3993,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     // save just what we need from ctx
     MOSDOpReply *reply = ctx->reply;
     ctx->reply = nullptr;
-    reply->claim_op_out_data(*ctx->ops);
     reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
-    close_op_ctx(ctx);
 
     if (result == -ENOENT) {
       reply->set_enoent_reply_versions(info.last_update,
@@ -4070,7 +4001,9 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     }
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
     // append to pg log for dup detection - don't save buffers for now
-    record_write_error(op, soid, reply, result);
+    record_write_error(op, soid, reply, result,
+                      ctx->op->allows_returnvec() ? ctx : nullptr);
+    close_op_ctx(ctx);
     return;
   }
 
@@ -4083,13 +4016,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
 
       if (m && !ctx->sent_reply) {
        MOSDOpReply *reply = ctx->reply;
-       if (reply)
-         ctx->reply = nullptr;
-       else {
-         reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0, true);
-         reply->set_reply_versions(ctx->at_version,
-                                   ctx->user_at_version);
-       }
+       ctx->reply = nullptr;
        reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
        dout(10) << " sending reply on " << *m << " " << reply << dendl;
        osd->send_message_osd_client(reply, m->get_connection());
@@ -4138,18 +4065,11 @@ void PrimaryLogPG::reply_ctx(OpContext *ctx, int r)
   close_op_ctx(ctx);
 }
 
-void PrimaryLogPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
-{
-  if (ctx->op)
-    osd->reply_op_error(ctx->op, r, v, uv);
-  close_op_ctx(ctx);
-}
-
 void PrimaryLogPG::log_op_stats(const OpRequest& op,
                                const uint64_t inb,
                                const uint64_t outb)
 {
-  const MOSDOp* const m = static_cast<const MOSDOp*>(op.get_req());
+  auto m = op.get_req<MOSDOp>();
   const utime_t now = ceph_clock_now();
 
   const utime_t latency = now - m->get_recv_stamp();
@@ -4211,7 +4131,7 @@ void PrimaryLogPG::do_scan(
   OpRequestRef op,
   ThreadPool::TPHandle &handle)
 {
-  const MOSDPGScan *m = static_cast<const MOSDPGScan*>(op->get_req());
+  auto m = op->get_req<MOSDPGScan>();
   ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
   dout(10) << "do_scan " << *m << dendl;
 
@@ -4228,7 +4148,7 @@ void PrimaryLogPG::do_scan(
            std::make_shared<PGPeeringEvent>(
              get_osdmap_epoch(),
              get_osdmap_epoch(),
-             BackfillTooFull())));
+             PeeringState::BackfillTooFull())));
        return;
       }
 
@@ -4256,7 +4176,7 @@ void PrimaryLogPG::do_scan(
       pg_shard_t from = m->from;
 
       // Check that from is in backfill_targets vector
-      ceph_assert(is_backfill_targets(from));
+      ceph_assert(is_backfill_target(from));
 
       BackfillInterval& bi = peer_backfill_info[from];
       bi.begin = m->begin;
@@ -4269,7 +4189,9 @@ void PrimaryLogPG::do_scan(
 
       if (waiting_on_backfill.erase(from)) {
        if (waiting_on_backfill.empty()) {
-         ceph_assert(peer_backfill_info.size() == backfill_targets.size());
+         ceph_assert(
+           peer_backfill_info.size() ==
+           get_backfill_targets().size());
          finish_recovery_op(hobject_t::get_max());
        }
       } else {
@@ -4284,7 +4206,7 @@ void PrimaryLogPG::do_scan(
 
 void PrimaryLogPG::do_backfill(OpRequestRef op)
 {
-  const MOSDPGBackfill *m = static_cast<const MOSDPGBackfill*>(op->get_req());
+  auto m = op->get_req<MOSDPGBackfill>();
   ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
   dout(10) << "do_backfill " << *m << dendl;
 
@@ -4315,24 +4237,13 @@ void PrimaryLogPG::do_backfill(OpRequestRef op)
     {
       ceph_assert(cct->_conf->osd_kill_backfill_at != 2);
 
-      info.set_last_backfill(m->last_backfill);
-      // During backfill submit_push_data() tracks num_bytes which is needed in case
-      // backfill stops and starts again.  We want to know how many bytes this
-      // pg is consuming on the disk in order to compute amount of new data
-      // reserved to hold backfill if it won't fit.
-      if (m->op == MOSDPGBackfill::OP_BACKFILL_PROGRESS) {
-        dout(25) << __func__ << " primary " << m->stats.stats.sum.num_bytes << " local " << info.stats.stats.sum.num_bytes << dendl;
-        int64_t bytes = info.stats.stats.sum.num_bytes;
-        info.stats = m->stats;
-        info.stats.stats.sum.num_bytes = bytes;
-      } else {
-        dout(20) << __func__ << " final " << m->stats.stats.sum.num_bytes << " replaces local " << info.stats.stats.sum.num_bytes << dendl;
-        info.stats = m->stats;
-      }
-
       ObjectStore::Transaction t;
-      dirty_info = true;
-      write_if_dirty(t);
+      recovery_state.update_backfill_progress(
+       m->last_backfill,
+       m->stats,
+       m->op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
+       t);
+
       int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
       ceph_assert(tr == 0);
     }
@@ -4398,7 +4309,8 @@ void PrimaryLogPG::do_backfill_remove(OpRequestRef op)
 }
 
 int PrimaryLogPG::trim_object(
-  bool first, const hobject_t &coid, PrimaryLogPG::OpContextUPtr *ctxp)
+  bool first, const hobject_t &coid, snapid_t snap_to_trim,
+  PrimaryLogPG::OpContextUPtr *ctxp)
 {
   *ctxp = NULL;
 
@@ -4442,11 +4354,14 @@ int PrimaryLogPG::trim_object(
   }
 
   set<snapid_t> new_snaps;
+  const OSDMapRef& osdmap = get_osdmap();
   for (set<snapid_t>::iterator i = old_snaps.begin();
        i != old_snaps.end();
        ++i) {
-    if (!pool.info.is_removed_snap(*i))
+    if (!osdmap->in_removed_snaps_queue(info.pgid.pgid.pool(), *i) &&
+       *i != snap_to_trim) {
       new_snaps.insert(*i);
+    }
   }
 
   vector<snapid_t>::iterator p = snapset.clones.end();
@@ -4629,8 +4544,15 @@ int PrimaryLogPG::trim_object(
     head_obc->obs.oi = object_info_t(head_oid);
     t->remove(head_oid);
   } else {
-    dout(10) << coid << " filtering snapset on " << head_oid << dendl;
-    snapset.filter(pool.info);
+    if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+      // filter SnapSet::snaps for the benefit of pre-octopus
+      // peers. This is perhaps overly conservative in that I'm not
+      // certain they need this, but let's be conservative here.
+      dout(10) << coid << " filtering snapset on " << head_oid << dendl;
+      snapset.filter(pool.info);
+    } else {
+      snapset.snaps.clear();
+    }
     dout(10) << coid << " writing updated snapset on " << head_oid
             << ", snapset is " << snapset << dendl;
     ctx->log.push_back(
@@ -4690,7 +4612,7 @@ void PrimaryLogPG::snap_trimmer_scrub_complete()
 
 void PrimaryLogPG::snap_trimmer(epoch_t queued)
 {
-  if (deleting || pg_has_reset_since(queued)) {
+  if (recovery_state.is_deleting() || pg_has_reset_since(queued)) {
     return;
   }
 
@@ -4881,7 +4803,6 @@ int PrimaryLogPG::do_tmapup_slow(OpContext *ctx, bufferlist::const_iterator& bp,
   newop.op.extent.length = obl.length();
   newop.indata = obl;
   do_osd_ops(ctx, nops);
-  osd_op.outdata.claim(newop.outdata);
   return 0;
 }
 
@@ -4961,7 +4882,7 @@ int PrimaryLogPG::do_tmapup(OpContext *ctx, bufferlist::const_iterator& bp, OSDO
       last_in_key = key;
 
       dout(10) << "tmapup op " << (int)op << " key " << key << dendl;
-         
+
       // skip existing intervening keys
       bool key_exists = false;
       while (have_next && !key_exists) {
@@ -5069,7 +4990,6 @@ int PrimaryLogPG::do_tmapup(OpContext *ctx, bufferlist::const_iterator& bp, OSDO
       newop.op.extent.length = obl.length();
       newop.indata = obl;
       do_osd_ops(ctx, nops);
-      osd_op.outdata.claim(newop.outdata);
     }
   }
   return result;
@@ -5094,14 +5014,14 @@ struct FillInVerifyExtent : public Context {
   ceph_le64 *r;
   int32_t *rval;
   bufferlist *outdatap;
-  boost::optional<uint32_t> maybe_crc;
+  std::optional<uint32_t> maybe_crc;
   uint64_t size;
   OSDService *osd;
   hobject_t soid;
-  __le32 flags;
+  uint32_t flags;
   FillInVerifyExtent(ceph_le64 *r, int32_t *rv, bufferlist *blp,
-                    boost::optional<uint32_t> mc, uint64_t size,
-                    OSDService *osd, hobject_t soid, __le32 flags) :
+                    std::optional<uint32_t> mc, uint64_t size,
+                    OSDService *osd, hobject_t soid, uint32_t flags) :
     r(r), rval(rv), outdatap(blp), maybe_crc(mc),
     size(size), osd(osd), soid(soid), flags(flags) {}
   void finish(int len) override {
@@ -5216,8 +5136,8 @@ struct C_ChecksumRead : public Context {
 
   C_ChecksumRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
                 Checksummer::CSumType csum_type, bufferlist &&init_value_bl,
-                boost::optional<uint32_t> maybe_crc, uint64_t size,
-                OSDService *osd, hobject_t soid, __le32 flags)
+                std::optional<uint32_t> maybe_crc, uint64_t size,
+                OSDService *osd, hobject_t soid, uint32_t flags)
     : primary_log_pg(primary_log_pg), osd_op(osd_op),
       csum_type(csum_type), init_value_bl(std::move(init_value_bl)),
       fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
@@ -5305,12 +5225,12 @@ int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
   bufferlist init_value_bl;
   init_value_bl.substr_of(bl_it->get_bl(), bl_it->get_off(),
                          csum_init_value_size);
-  bl_it->advance(csum_init_value_size);
+  *bl_it += csum_init_value_size;
 
   if (pool.info.is_erasure() && op.checksum.length > 0) {
     // If there is a data digest and it is possible we are reading
     // entire object, pass the digest.
-    boost::optional<uint32_t> maybe_crc;
+    std::optional<uint32_t> maybe_crc;
     if (oi.is_data_digest() && op.checksum.offset == 0 &&
         op.checksum.length >= oi.size) {
       maybe_crc = oi.data_digest;
@@ -5428,8 +5348,8 @@ struct C_ExtentCmpRead : public Context {
   Context *fill_extent_ctx;
 
   C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
-                 boost::optional<uint32_t> maybe_crc, uint64_t size,
-                 OSDService *osd, hobject_t soid, __le32 flags)
+                 std::optional<uint32_t> maybe_crc, uint64_t size,
+                 OSDService *osd, hobject_t soid, uint32_t flags)
     : primary_log_pg(primary_log_pg), osd_op(osd_op),
       fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
                                             &read_bl, maybe_crc, size,
@@ -5482,7 +5402,7 @@ int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
   } else if (pool.info.is_erasure()) {
     // If there is a data digest and it is possible we are reading
     // entire object, pass the digest.
-    boost::optional<uint32_t> maybe_crc;
+    std::optional<uint32_t> maybe_crc;
     if (oi.is_data_digest() && op.checksum.offset == 0 &&
         op.checksum.length >= oi.size) {
       maybe_crc = oi.data_digest;
@@ -5575,7 +5495,7 @@ int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
   } else if (pool.info.is_erasure()) {
     // The initialisation below is required to silence a false positive
     // -Wmaybe-uninitialized warning
-    boost::optional<uint32_t> maybe_crc = boost::make_optional(false, uint32_t());
+    std::optional<uint32_t> maybe_crc;
     // If there is a data digest and it is possible we are reading
     // entire object, pass the digest.  FillInVerifyExtent will
     // will check the oi.size again.
@@ -5679,72 +5599,20 @@ int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
       return r;
     }
 
-    map<uint64_t, uint64_t>::iterator miter;
     bufferlist data_bl;
-    uint64_t last = op.extent.offset;
-    for (miter = m.begin(); miter != m.end(); ++miter) {
-      // verify hole?
-      if (cct->_conf->osd_verify_sparse_read_holes &&
-          last < miter->first) {
-        bufferlist t;
-        uint64_t len = miter->first - last;
-        r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
-        if (r < 0) {
-          osd->clog->error() << coll << " " << soid
-                            << " sparse-read failed to read: "
-                            << r;
-        } else if (!t.is_zero()) {
-          osd->clog->error() << coll << " " << soid
-                            << " sparse-read found data in hole "
-                            << last << "~" << len;
-        }
-      }
-
-      bufferlist tmpbl;
-      r = pgbackend->objects_read_sync(soid, miter->first, miter->second,
-                                      op.flags, &tmpbl);
-      if (r == -EIO) {
-        r = rep_repair_primary_object(soid, ctx);
-      }
-      if (r < 0) {
-       return r;
-      }
-
-      // this is usually happen when we get extent that exceeds the actual file
-      // size
-      if (r < (int)miter->second)
-        miter->second = r;
-      total_read += r;
-      dout(10) << "sparse-read " << miter->first << "@" << miter->second
-              << dendl;
-      data_bl.claim_append(tmpbl);
-      last = miter->first + r;
-    }
-
-    // verify trailing hole?
-    if (cct->_conf->osd_verify_sparse_read_holes) {
-      uint64_t end = std::min<uint64_t>(op.extent.offset + op.extent.length,
-                                       oi.size);
-      if (last < end) {
-        bufferlist t;
-        uint64_t len = end - last;
-        r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
-        if (r < 0) {
-          osd->clog->error() << coll << " " << soid
-                            << " sparse-read failed to read: " << r;
-        } else if (!t.is_zero()) {
-          osd->clog->error() << coll << " " << soid
-                            << " sparse-read found data in hole "
-                            << last << "~" << len;
-        }
-      }
+    r = pgbackend->objects_readv_sync(soid, std::move(m), op.flags, &data_bl);
+    if (r == -EIO) {
+      r = rep_repair_primary_object(soid, ctx);
+    }
+    if (r < 0) {
+      return r;
     }
 
     // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
     // Maybe at first, there is no much whole objects. With continued use, more
     // and more whole object exist. So from this point, for spare-read add
     // checksum make sense.
-    if (total_read == oi.size && oi.is_data_digest()) {
+    if ((uint64_t)r == oi.size && oi.is_data_digest()) {
       uint32_t crc = data_bl.crc32c(-1);
       if (oi.data_digest != crc) {
         osd->clog->error() << info.pgid << std::hex
@@ -5763,7 +5631,7 @@ int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
     encode(m, osd_op.outdata); // re-encode since it might be modified
     ::encode_destructively(data_bl, osd_op.outdata);
 
-    dout(10) << " sparse_read got " << total_read << " bytes from object "
+    dout(10) << " sparse_read got " << r << " bytes from object "
             << soid << dendl;
   }
 
@@ -5799,7 +5667,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
     }
 
-    // TODO: check endianness (__le32 vs uint32_t, etc.)
+    // TODO: check endianness (ceph_le32 vs uint32_t, etc.)
     // The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
     // but the code in this function seems to treat them as native-endian.  What should the
     // tracepoints do?
@@ -5818,10 +5686,12 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     case CEPH_OSD_OP_CACHE_TRY_FLUSH:
     case CEPH_OSD_OP_UNDIRTY:
     case CEPH_OSD_OP_COPY_FROM:  // we handle user_version update explicitly
+    case CEPH_OSD_OP_COPY_FROM2:
     case CEPH_OSD_OP_CACHE_PIN:
     case CEPH_OSD_OP_CACHE_UNPIN:
     case CEPH_OSD_OP_SET_REDIRECT:
     case CEPH_OSD_OP_TIER_PROMOTE:
+    case CEPH_OSD_OP_TIER_FLUSH:
       break;
     default:
       if (op.op & CEPH_OSD_OP_MODE_WR)
@@ -5965,10 +5835,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        tracepoint(osd, do_osd_op_pre_call, soid.oid.name.c_str(), soid.snap.val, cname.c_str(), mname.c_str());
 
        ClassHandler::ClassData *cls;
-       result = osd->class_handler->open_class(cname, &cls);
+       result = ClassHandler::get_instance().open_class(cname, &cls);
        ceph_assert(result == 0);   // init_op_flags() already verified this works.
 
-       ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
+       ClassHandler::ClassMethod *method = cls->get_method(mname);
        if (!method) {
          dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl;
          result = -EOPNOTSUPP;
@@ -6036,6 +5906,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_UNDIRTY:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_undirty, soid.oid.name.c_str(), soid.snap.val);
        if (oi.is_dirty()) {
@@ -6043,15 +5914,15 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          ctx->modify = true;
          ctx->delta_stats.num_wr++;
        }
-       result = 0;
       }
       break;
 
     case CEPH_OSD_OP_CACHE_TRY_FLUSH:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_try_flush, soid.oid.name.c_str(), soid.snap.val);
-       if (ctx->lock_type != ObjectContext::RWState::RWNONE) {
+       if (ctx->lock_type != RWState::RWNONE) {
          dout(10) << "cache-try-flush without SKIPRWLOCKS flag set" << dendl;
          result = -EINVAL;
          break;
@@ -6070,7 +5941,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        }
        if (oi.is_dirty()) {
-         result = start_flush(ctx->op, ctx->obc, false, NULL, boost::none);
+         result = start_flush(ctx->op, ctx->obc, false, NULL, std::nullopt);
          if (result == -EINPROGRESS)
            result = -EAGAIN;
        } else {
@@ -6081,9 +5952,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_CACHE_FLUSH:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_cache_flush, soid.oid.name.c_str(), soid.snap.val);
-       if (ctx->lock_type == ObjectContext::RWState::RWNONE) {
+       if (ctx->lock_type == RWState::RWNONE) {
          dout(10) << "cache-flush with SKIPRWLOCKS flag set" << dendl;
          result = -EINVAL;
          break;
@@ -6103,7 +5975,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        }
        hobject_t missing;
        if (oi.is_dirty()) {
-         result = start_flush(ctx->op, ctx->obc, true, &missing, boost::none);
+         result = start_flush(ctx->op, ctx->obc, true, &missing, std::nullopt);
          if (result == -EINPROGRESS)
            result = -EAGAIN;
        } else {
@@ -6122,6 +5994,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_CACHE_EVICT:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_cache_evict, soid.oid.name.c_str(), soid.snap.val);
        if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
@@ -6418,7 +6291,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        notify_info_t n;
        n.timeout = timeout;
        n.notify_id = osd->get_next_id(get_osdmap_epoch());
-       n.cookie = op.watch.cookie;
+       n.cookie = op.notify.cookie;
         n.bl = bl;
        ctx->notifies.push_back(n);
 
@@ -6455,6 +6328,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_SETALLOCHINT:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_setallochint, soid.oid.name.c_str(), soid.snap.val, op.alloc_hint.expected_object_size, op.alloc_hint.expected_write_size);
        maybe_create_new_object(ctx);
@@ -6464,7 +6338,6 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
         t->set_alloc_hint(soid, op.alloc_hint.expected_object_size,
                           op.alloc_hint.expected_write_size,
                          op.alloc_hint.flags);
-        result = 0;
       }
       break;
 
@@ -6475,6 +6348,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_WRITE:
       ++ctx->num_write;
+      result = 0;
       { // write
         __u32 seq = oi.truncate_seq;
        tracepoint(osd, do_osd_op_pre_write, soid.oid.name.c_str(), soid.snap.val, oi.size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
@@ -6526,6 +6400,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
              trim.insert(op.extent.truncate_size,
                oi.size - op.extent.truncate_size);
              ctx->modified_ranges.union_of(trim);
+             ctx->clean_regions.mark_data_region_dirty(op.extent.truncate_size, oi.size - op.extent.truncate_size);
            }
            if (op.extent.truncate_size != oi.size) {
               truncate_update_size_and_usage(ctx->delta_stats,
@@ -6575,12 +6450,14 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
         }
        write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
                                    op.extent.offset, op.extent.length);
-
+       ctx->clean_regions.mark_data_region_dirty(op.extent.offset, op.extent.length);
+       dout(10) << "clean_regions modified" << ctx->clean_regions << dendl;
       }
       break;
       
     case CEPH_OSD_OP_WRITEFULL:
       ++ctx->num_write;
+      result = 0;
       { // write full object
        tracepoint(osd, do_osd_op_pre_writefull, soid.oid.name.c_str(), soid.snap.val, oi.size, 0, op.extent.length);
 
@@ -6611,7 +6488,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
         } else {
          obs.oi.clear_data_digest();
        }
-
+        ctx->clean_regions.mark_data_region_dirty(0,
+          std::max((uint64_t)op.extent.length, oi.size));
        write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
            0, op.extent.length, true);
       }
@@ -6649,6 +6527,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          interval_set<uint64_t> ch;
          ch.insert(op.extent.offset, op.extent.length);
          ctx->modified_ranges.union_of(ch);
+         ctx->clean_regions.mark_data_region_dirty(op.extent.offset, op.extent.length);
          ctx->delta_stats.num_wr++;
          oi.clear_data_digest();
        } else {
@@ -6658,11 +6537,11 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
     case CEPH_OSD_OP_CREATE:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_create, soid.oid.name.c_str(), soid.snap.val);
-        int flags = le32_to_cpu(op.flags);
        if (obs.exists && !oi.is_whiteout() &&
-           (flags & CEPH_OSD_OP_FLAG_EXCL)) {
+           (op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
           result = -EEXIST; /* this is an exclusive create */
        } else {
          if (osd_op.indata.length()) {
@@ -6677,10 +6556,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            }
            // category is no longer implemented.
          }
-          if (result >= 0) {
-           maybe_create_new_object(ctx);
-           t->nop(soid);
-          }
+         maybe_create_new_object(ctx);
+         t->nop(soid);
        }
       }
       break;
@@ -6696,6 +6573,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        // truncate
        if (!obs.exists || oi.is_whiteout()) {
@@ -6728,7 +6606,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          interval_set<uint64_t> trim;
          trim.insert(op.extent.offset, oi.size-op.extent.offset);
          ctx->modified_ranges.union_of(trim);
-       }
+         ctx->clean_regions.mark_data_region_dirty(op.extent.offset, oi.size - op.extent.offset);
+       } else if (oi.size < op.extent.offset) {
+          ctx->clean_regions.mark_data_region_dirty(oi.size, op.extent.offset - oi.size);
+        }
        if (op.extent.offset != oi.size) {
           truncate_update_size_and_usage(ctx->delta_stats,
                                          oi,
@@ -6743,6 +6624,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     
     case CEPH_OSD_OP_DELETE:
       ++ctx->num_write;
+      result = 0;
       tracepoint(osd, do_osd_op_pre_delete, soid.oid.name.c_str(), soid.snap.val);
       {
        if (oi.has_manifest()) {
@@ -6772,6 +6654,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_WATCH:
       ++ctx->num_write;
+      result = 0;
       {
        tracepoint(osd, do_osd_op_pre_watch, soid.oid.name.c_str(), soid.snap.val,
                   op.watch.cookie, op.watch.op);
@@ -6779,6 +6662,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENOENT;
          break;
        }
+       result = 0;
         uint64_t cookie = op.watch.cookie;
         entity_name_t entity = ctx->reqid.name;
        ObjectContextRef obc = ctx->obc;
@@ -6860,6 +6744,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
         break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        if (!obs.exists || oi.is_whiteout()) {
          result = -ENOENT;
@@ -6872,7 +6757,6 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          ctx->delta_stats.num_objects_pinned++;
          ctx->delta_stats.num_wr++;
        }
-       result = 0;
       }
       break;
 
@@ -6885,6 +6769,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
         break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        if (!obs.exists || oi.is_whiteout()) {
          result = -ENOENT;
@@ -6897,12 +6782,12 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          ctx->delta_stats.num_objects_pinned--;
          ctx->delta_stats.num_wr++;
        }
-       result = 0;
       }
       break;
 
     case CEPH_OSD_OP_SET_REDIRECT:
       ++ctx->num_write;
+      result = 0;
       {
        if (pool.info.is_tier()) {
          result = -EINVAL;
@@ -6912,7 +6797,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENOENT;
          break;
        }
-       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
          result = -EOPNOTSUPP;
          break;
        }
@@ -6951,8 +6836,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          // start
          ctx->op_finishers[ctx->current_osd_subop_num].reset(
            new SetManifestFinisher(osd_op));
-         RefCountCallback *fin = new RefCountCallback(
-           this, ctx, osd_op, get_last_peering_reset());
+         RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
          refcount_manifest(ctx->obc, target_oloc, target, SnapContext(),
                            true, fin, 0);
          result = -EINPROGRESS;
@@ -6970,11 +6854,15 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          oi.manifest.redirect_target = target;
          oi.manifest.type = object_manifest_t::TYPE_REDIRECT;
          t->truncate(soid, 0);
+          ctx->clean_regions.mark_data_region_dirty(0, oi.size);
          if (oi.is_omap() && pool.info.supports_omap()) {
            t->omap_clear(soid);
            obs.oi.clear_omap_digest();
            obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+            ctx->clean_regions.mark_omap_dirty();
          }
+          write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
+           0, oi.size, false);
          ctx->delta_stats.num_bytes -= oi.size;
          oi.size = 0;
          oi.new_object();
@@ -7006,6 +6894,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_SET_CHUNK:
       ++ctx->num_write;
+      result = 0;
       {
        if (pool.info.is_tier()) {
          result = -EINVAL;
@@ -7015,7 +6904,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENOENT;
          break;
        }
-       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
          result = -EOPNOTSUPP;
          break;
        }
@@ -7072,8 +6961,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          // start
          ctx->op_finishers[ctx->current_osd_subop_num].reset(
            new SetManifestFinisher(osd_op));
-         RefCountCallback *fin = new RefCountCallback(
-           this, ctx, osd_op, get_last_peering_reset());
+         RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
          refcount_manifest(ctx->obc, tgt_oloc, target, SnapContext(),
                            true, fin, src_offset);
          result = -EINPROGRESS;
@@ -7113,6 +7001,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_TIER_PROMOTE:
       ++ctx->num_write;
+      result = 0;
       {
        if (pool.info.is_tier()) {
          result = -EINVAL;
@@ -7122,7 +7011,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENOENT;
          break;
        }
-       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
          result = -EOPNOTSUPP;
          break;
        }
@@ -7169,8 +7058,50 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
       break;
 
+    case CEPH_OSD_OP_TIER_FLUSH:
+      ++ctx->num_write;
+      result = 0;
+      {
+       if (pool.info.is_tier()) {
+         result = -EINVAL;
+         break;
+       }
+       if (!obs.exists) {
+         result = -ENOENT;
+         break;
+       }
+       if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+         result = -EOPNOTSUPP;
+         break;
+       }
+       if (!obs.oi.has_manifest()) {
+         result = 0;
+         break;
+       }
+
+       hobject_t missing;
+       bool is_dirty = false;
+       for (auto& p : ctx->obc->obs.oi.manifest.chunk_map) {
+         if (p.second.is_dirty()) {
+           is_dirty = true;
+           break;
+         }
+       }
+
+       if (is_dirty) {
+         result = start_flush(ctx->op, ctx->obc, true, NULL, std::nullopt);
+         if (result == -EINPROGRESS)
+           result = -EAGAIN;
+       } else {
+         result = 0;
+       }
+      }
+
+      break;
+
     case CEPH_OSD_OP_UNSET_MANIFEST:
       ++ctx->num_write;
+      result = 0;
       {
        if (pool.info.is_tier()) {
          result = -EINVAL;
@@ -7184,7 +7115,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -EOPNOTSUPP;
          break;
        }
-       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
          result = -EOPNOTSUPP;
          break;
        }
@@ -7226,6 +7157,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       
     case CEPH_OSD_OP_SETXATTR:
       ++ctx->num_write;
+      result = 0;
       {
        if (cct->_conf->osd_max_attr_size > 0 &&
            op.xattr.value_len > cct->_conf->osd_max_attr_size) {
@@ -7254,6 +7186,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_RMXATTR:
       ++ctx->num_write;
+      result = 0;
       {
        string aname;
        bp.copy(op.xattr.name_len, aname);
@@ -7288,6 +7221,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_STARTSYNC:
+      result = 0;
       t->nop(soid);
       break;
 
@@ -7304,7 +7238,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        newop.op.op = CEPH_OSD_OP_SYNC_READ;
        newop.op.extent.offset = 0;
        newop.op.extent.length = 0;
-       do_osd_ops(ctx, nops);
+       result = do_osd_ops(ctx, nops);
        osd_op.outdata.claim(newop.outdata);
       }
       break;
@@ -7606,6 +7540,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        maybe_create_new_object(ctx);
        bufferlist to_set_bl;
@@ -7630,6 +7565,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          }
        }
        t->omap_setkeys(soid, to_set_bl);
+       ctx->clean_regions.mark_omap_dirty();
        ctx->delta_stats.num_wr++;
         ctx->delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
       }
@@ -7644,9 +7580,11 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        maybe_create_new_object(ctx);
        t->omap_setheader(soid, osd_op.indata);
+       ctx->clean_regions.mark_omap_dirty();
        ctx->delta_stats.num_wr++;
       }
       obs.oi.set_flag(object_info_t::FLAG_OMAP);
@@ -7660,6 +7598,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        if (!obs.exists || oi.is_whiteout()) {
          result = -ENOENT;
@@ -7667,6 +7606,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        }
        if (oi.is_omap()) {
          t->omap_clear(soid);
+         ctx->clean_regions.mark_omap_dirty();
          ctx->delta_stats.num_wr++;
          obs.oi.clear_omap_digest();
          obs.oi.clear_flag(object_info_t::FLAG_OMAP);
@@ -7681,6 +7621,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
       ++ctx->num_write;
+      result = 0;
       {
        if (!obs.exists || oi.is_whiteout()) {
          result = -ENOENT;
@@ -7698,6 +7639,34 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        }
        tracepoint(osd, do_osd_op_pre_omaprmkeys, soid.oid.name.c_str(), soid.snap.val);
        t->omap_rmkeys(soid, to_rm_bl);
+       ctx->clean_regions.mark_omap_dirty();
+       ctx->delta_stats.num_wr++;
+      }
+      obs.oi.clear_omap_digest();
+      break;
+
+    case CEPH_OSD_OP_OMAPRMKEYRANGE:
+      tracepoint(osd, do_osd_op_pre_omaprmkeyrange, soid.oid.name.c_str(), soid.snap.val);
+      if (!pool.info.supports_omap()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
+      ++ctx->num_write;
+      result = 0;
+      {
+       if (!obs.exists || oi.is_whiteout()) {
+         result = -ENOENT;
+         break;
+       }
+       std::string key_begin, key_end;
+       try {
+         decode(key_begin, bp);
+         decode(key_end, bp);
+       } catch (buffer::error& e) {
+         result = -EINVAL;
+         goto fail;
+       }
+       t->omap_rmkeyrange(soid, key_begin, key_end);
        ctx->delta_stats.num_wr++;
       }
       obs.oi.clear_omap_digest();
@@ -7715,15 +7684,35 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_COPY_FROM:
+    case CEPH_OSD_OP_COPY_FROM2:
       ++ctx->num_write;
+      result = 0;
       {
        object_t src_name;
        object_locator_t src_oloc;
+       uint32_t truncate_seq = 0;
+       uint64_t truncate_size = 0;
+       bool have_truncate = false;
        snapid_t src_snapid = (uint64_t)op.copy_from.snapid;
        version_t src_version = op.copy_from.src_version;
+
+       if ((op.op == CEPH_OSD_OP_COPY_FROM2) &&
+           (op.copy_from.flags & ~CEPH_OSD_COPY_FROM_FLAGS)) {
+         dout(20) << "invalid copy-from2 flags 0x"
+                 << std::hex << (int)op.copy_from.flags << std::dec << dendl;
+         result = -EINVAL;
+         break;
+       }
        try {
          decode(src_name, bp);
          decode(src_oloc, bp);
+         // check if client sent us truncate_seq and truncate_size
+         if ((op.op == CEPH_OSD_OP_COPY_FROM2) &&
+             (op.copy_from.flags & CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ)) {
+           decode(truncate_seq, bp);
+           decode(truncate_size, bp);
+           have_truncate = true;
+         }
        }
        catch (buffer::error& e) {
          result = -EINVAL;
@@ -7764,6 +7753,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            break;
          }
          CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
+         if (have_truncate)
+           cb->set_truncate(truncate_seq, truncate_size);
           ctx->op_finishers[ctx->current_osd_subop_num].reset(
             new CopyFromFinisher(cb));
          start_copy(cb, ctx->obc, src, src_oloc, src_version,
@@ -7905,8 +7896,10 @@ inline int PrimaryLogPG::_delete_oid(
     interval_set<uint64_t> ch;
     ch.insert(0, oi.size);
     ctx->modified_ranges.union_of(ch);
+    ctx->clean_regions.mark_data_region_dirty(0, oi.size);
   }
 
+  ctx->clean_regions.mark_omap_dirty();
   ctx->delta_stats.num_wr++;
   if (soid.is_snap()) {
     ceph_assert(ctx->obc->ssc->snapset.clone_overlap.count(soid.snap));
@@ -8082,6 +8075,8 @@ int PrimaryLogPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
       maybe_create_new_object(ctx, true);
       ctx->delta_stats.num_bytes -= obs.oi.size;
       ctx->delta_stats.num_bytes += rollback_to->obs.oi.size;
+      ctx->clean_regions.mark_data_region_dirty(0, std::max(obs.oi.size, rollback_to->obs.oi.size));
+      ctx->clean_regions.mark_omap_dirty();
       obs.oi.size = rollback_to->obs.oi.size;
       if (rollback_to->obs.oi.is_data_digest())
        obs.oi.set_data_digest(rollback_to->obs.oi.data_digest);
@@ -8274,7 +8269,11 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
   if (snapc.seq > ctx->new_snapset.seq) {
     // update snapset with latest snap context
     ctx->new_snapset.seq = snapc.seq;
-    ctx->new_snapset.snaps = snapc.snaps;
+    if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+      ctx->new_snapset.snaps = snapc.snaps;
+    } else {
+      ctx->new_snapset.snaps.clear();
+    }
   }
   dout(20) << "make_writeable " << soid
           << " done, snapset=" << ctx->new_snapset << dendl;
@@ -8303,7 +8302,7 @@ void PrimaryLogPG::write_update_size_and_usage(object_stat_sum_t& delta_stats, o
   if (oi.has_manifest() && oi.manifest.is_chunked()) {
     for (auto &p : oi.manifest.chunk_map) {
       if ((p.first <= offset && p.first + p.second.length > offset) ||
-         (p.first > offset && p.first <= offset + length)) {
+         (p.first > offset && p.first < offset + length)) {
        p.second.clear_flag(chunk_info_t::FLAG_MISSING);
        p.second.set_flag(chunk_info_t::FLAG_DIRTY);
       }
@@ -8415,7 +8414,7 @@ void PrimaryLogPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
        p != ctx->notify_acks.end();
        ++p) {
     if (p->watch_cookie)
-      dout(10) << "notify_ack " << make_pair(p->watch_cookie.get(), p->notify_id) << dendl;
+      dout(10) << "notify_ack " << make_pair(*(p->watch_cookie), p->notify_id) << dendl;
     else
       dout(10) << "notify_ack " << make_pair("NULL", p->notify_id) << dendl;
     for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
@@ -8424,7 +8423,7 @@ void PrimaryLogPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
         ++i) {
       if (i->first.second != entity) continue;
       if (p->watch_cookie &&
-         p->watch_cookie.get() != i->first.first) continue;
+         *(p->watch_cookie) != i->first.first) continue;
       dout(10) << "acking notify on watch " << i->first << dendl;
       i->second->notify_ack(p->notify_id, p->reply_bl);
     }
@@ -8470,7 +8469,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   int result = do_osd_ops(ctx, *ctx->ops);
   if (result < 0) {
     if (ctx->op->may_write() &&
-       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+       get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
       // need to save the error code in the pg log, to detect dup ops,
       // but do nothing else
       ctx->update_log_only = true;
@@ -8483,7 +8482,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
     if (ctx->pending_async_reads.empty())
       unstable_stats.add(ctx->delta_stats);
     if (ctx->op->may_write() &&
-       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+       get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
       ctx->update_log_only = true;
     }
     return result;
@@ -8492,9 +8491,8 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   // check for full
   if ((ctx->delta_stats.num_bytes > 0 ||
        ctx->delta_stats.num_objects > 0) &&  // FIXME: keys?
-      (pool.info.has_flag(pg_pool_t::FLAG_FULL) ||
-       get_osdmap()->test_flag(CEPH_OSDMAP_FULL))) {
-    const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
+      pool.info.has_flag(pg_pool_t::FLAG_FULL)) {
+    auto m = ctx->op->get_req<MOSDOp>();
     if (ctx->reqid.name.is_mds() ||   // FIXME: ignore MDS for now
        m->has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
       dout(20) << __func__ << " full, but proceeding due to FULL_FORCE or MDS"
@@ -8517,12 +8515,13 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
 
   finish_ctx(ctx,
             ctx->new_obs.exists ? pg_log_entry_t::MODIFY :
-            pg_log_entry_t::DELETE);
+            pg_log_entry_t::DELETE,
+            result);
 
   return result;
 }
 
-void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type)
+void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, int result)
 {
   const hobject_t& soid = ctx->obs->oi.soid;
   dout(20) << __func__ << " " << soid << " " << ctx
@@ -8580,10 +8579,22 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type)
   }
 
   // append to log
-  ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
-                                   ctx->obs->oi.version,
-                                   ctx->user_at_version, ctx->reqid,
-                                   ctx->mtime, 0));
+  ctx->log.push_back(
+    pg_log_entry_t(log_op_type, soid, ctx->at_version,
+                  ctx->obs->oi.version,
+                  ctx->user_at_version, ctx->reqid,
+                  ctx->mtime,
+                  (ctx->op && ctx->op->allows_returnvec()) ? result : 0));
+  if (ctx->op && ctx->op->allows_returnvec()) {
+    // also the per-op values
+    ctx->log.back().set_op_returns(*ctx->ops);
+    dout(20) << __func__ << " op_returns " << ctx->log.back().op_returns
+            << dendl;
+  }
+
+  ctx->log.back().clean_regions = ctx->clean_regions;
+  dout(20) << __func__ << " object " << soid <<  " marks clean_regions " << ctx->log.back().clean_regions << dendl;
+
   if (soid.snap < CEPH_NOSNAP) {
     switch (log_op_type) {
     case pg_log_entry_t::MODIFY:
@@ -8621,18 +8632,15 @@ void PrimaryLogPG::apply_stats(
   const hobject_t &soid,
   const object_stat_sum_t &delta_stats) {
 
-  info.stats.stats.add(delta_stats);
-  info.stats.stats.floor(0);
-
-  for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
+  recovery_state.apply_op_stats(soid, delta_stats);
+  for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+       i != get_backfill_targets().end();
        ++i) {
     pg_shard_t bt = *i;
-    pg_info_t& pinfo = peer_info[bt];
-    if (soid <= pinfo.last_backfill)
-      pinfo.stats.stats.add(delta_stats);
-    else if (soid <= last_backfill_started)
+    const pg_info_t& pinfo = recovery_state.get_peer_info(bt);
+    if (soid > pinfo.last_backfill && soid <= last_backfill_started) {
       pending_backfill_updates[soid].stats.add(delta_stats);
+    }
   }
 
   if (is_primary() && scrubber.active) {
@@ -8649,7 +8657,7 @@ void PrimaryLogPG::apply_stats(
 
 void PrimaryLogPG::complete_read_ctx(int result, OpContext *ctx)
 {
-  const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
+  auto m = ctx->op->get_req<MOSDOp>();
   ceph_assert(ctx->async_reads_complete());
 
   for (vector<OSDOp>::iterator p = ctx->ops->begin();
@@ -8660,7 +8668,6 @@ void PrimaryLogPG::complete_read_ctx(int result, OpContext *ctx)
     }
     ctx->bytes_read += p->outdata.length();
   }
-  ctx->reply->claim_op_out_data(*ctx->ops);
   ctx->reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
 
   MOSDOpReply *reply = ctx->reply;
@@ -8707,12 +8714,11 @@ struct C_Copyfrom : public Context {
   void finish(int r) override {
     if (r == -ECANCELED)
       return;
-    pg->lock();
+    std::scoped_lock l{*pg};
     if (last_peering_reset == pg->get_last_peering_reset()) {
       pg->process_copy_chunk(oid, tid, r);
       cop.reset();
     }
-    pg->unlock();
   }
 };
 
@@ -8753,12 +8759,11 @@ struct C_CopyChunk : public Context {
   void finish(int r) override {
     if (r == -ECANCELED)
       return;
-    pg->lock();
+    std::scoped_lock l{*pg};
     if (last_peering_reset == pg->get_last_peering_reset()) {
       pg->process_copy_chunk_manifest(oid, tid, r, offset);
       cop.reset();
     }
-    pg->unlock();
   }
 };
 
@@ -8903,7 +8908,7 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp,
   if (cursor.is_complete()) {
     // include reqids only in the final step.  this is a bit fragile
     // but it works...
-    pg_log.get_log().get_object_reqids(ctx->obc->obs.oi.soid, 10,
+    recovery_state.get_pg_log().get_log().get_object_reqids(ctx->obc->obs.oi.soid, 10,
                                        &reply_obj.reqids,
                                        &reply_obj.reqid_return_codes);
     dout(20) << " got reqids" << dendl;
@@ -8934,20 +8939,16 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp,
 void PrimaryLogPG::fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
                                           OSDOp& osd_op)
 {
-  // NOTE: we take non-const ref here for claim_op_out_data below; we must
-  // be careful not to modify anything else that will upset a racing
-  // operator<<
-  MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
+  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
   uint64_t features = m->get_features();
   object_copy_data_t reply_obj;
 
-  pg_log.get_log().get_object_reqids(oid, 10, &reply_obj.reqids,
+  recovery_state.get_pg_log().get_log().get_object_reqids(oid, 10, &reply_obj.reqids,
                                      &reply_obj.reqid_return_codes);
   dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl;
   encode(reply_obj, osd_op.outdata, features);
   osd_op.rval = -ENOENT;
   MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0, false);
-  reply->claim_op_out_data(m->ops);
   reply->set_result(-ENOENT);
   reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
   osd->send_message_osd_client(reply, m->get_connection());
@@ -9052,9 +9053,8 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
 
   C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid,
                                   get_last_peering_reset(), cop);
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
   gather.set_finisher(new C_OnFinisher(fin,
-                                      osd->objecter_finishers[n]));
+                                      osd->get_objecter_finisher(get_pg_shard())));
 
   ceph_tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
                                  cop->src.snap, NULL,
@@ -9141,14 +9141,14 @@ void PrimaryLogPG::_copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint
     C_CopyChunk *fin = new C_CopyChunk(this, obc->obs.oi.soid,
                                     get_last_peering_reset(), cop);
     fin->offset = obj_offset;
-    unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
-
-    ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, op,
-                                   sub_cop->src.snap, NULL,
-                                   flags,
-                                   new C_OnFinisher(fin, osd->objecter_finishers[n]),
-                                   // discover the object version if we don't know it yet
-                                   sub_cop->results.user_version ? NULL : &sub_cop->results.user_version);
+
+    ceph_tid_t tid = osd->objecter->read(
+      soid.oid, oloc, op,
+      sub_cop->src.snap, NULL,
+      flags,
+      new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
+      // discover the object version if we don't know it yet
+      sub_cop->results.user_version ? NULL : &sub_cop->results.user_version);
     fin->tid = tid;
     sub_cop->objecter_tid = tid;
     if (last_offset < iter->first) {
@@ -9193,7 +9193,8 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     // verify snap hasn't been deleted
     vector<snapid_t>::iterator p = cop->results.snaps.begin();
     while (p != cop->results.snaps.end()) {
-      if (pool.info.is_removed_snap(*p)) {
+      // make best effort to sanitize snaps/clones.
+      if (get_osdmap()->in_removed_snaps_queue(info.pgid.pgid.pool(), *p)) {
        dout(10) << __func__ << " clone snap " << *p << " has been deleted"
                 << dendl;
        for (vector<snapid_t>::iterator q = p + 1;
@@ -9426,6 +9427,7 @@ void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, in
                                  p.second->cursor.data_offset, sub_chunk.outdata.length());
       obs.oi.manifest.chunk_map[p.second->cursor.data_offset].clear_flag(chunk_info_t::FLAG_DIRTY); 
       obs.oi.manifest.chunk_map[p.second->cursor.data_offset].clear_flag(chunk_info_t::FLAG_MISSING); 
+      ctx->clean_regions.mark_data_region_dirty(p.second->cursor.data_offset, sub_chunk.outdata.length());
       sub_chunk.outdata.clear();
     }
     obs.oi.clear_data_digest();
@@ -9583,8 +9585,11 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
     obs.oi.clear_omap_digest();
   }
 
-  obs.oi.truncate_seq = cb->results->truncate_seq;
-  obs.oi.truncate_size = cb->results->truncate_size;
+  obs.oi.truncate_seq = cb->truncate_seq;
+  obs.oi.truncate_size = cb->truncate_size;
+
+  obs.oi.mtime = ceph::real_clock::to_timespec(cb->results->mtime);
+  ctx->mtime = utime_t();
 
   ctx->extra_reqids = cb->results->reqids;
   ctx->extra_reqid_return_codes = cb->results->reqid_return_codes;
@@ -9599,6 +9604,7 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
   if (cb->results->has_omap) {
     dout(10) << __func__ << " setting omap flag on " << obs.oi.soid << dendl;
     obs.oi.set_flag(object_info_t::FLAG_OMAP);
+    ctx->clean_regions.mark_omap_dirty();
   } else {
     dout(10) << __func__ << " clearing omap flag on " << obs.oi.soid << dendl;
     obs.oi.clear_flag(object_info_t::FLAG_OMAP);
@@ -9608,6 +9614,7 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
   if (obs.oi.size > 0)
     ch.insert(0, obs.oi.size);
   ctx->modified_ranges.union_of(ch);
+  ctx->clean_regions.mark_data_region_dirty(0, std::max(obs.oi.size, cb->get_data_size()));
 
   if (cb->get_data_size() != obs.oi.size) {
     ctx->delta_stats.num_bytes -= obs.oi.size;
@@ -9633,18 +9640,25 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
 
   if (r != -ENOENT && soid.is_snap()) {
     if (results->snaps.empty()) {
-      // we must have read "snap" content from the head object in
-      // the base pool.  use snap_seq to construct what snaps should
-      // be for this clone (what is was before we evicted the clean
-      // clone from this pool, and what it will be when we flush and
-      // the clone eventually happens in the base pool).
+      // we must have read "snap" content from the head object in the
+      // base pool.  use snap_seq to construct what snaps should be
+      // for this clone (what is was before we evicted the clean clone
+      // from this pool, and what it will be when we flush and the
+      // clone eventually happens in the base pool).  we want to use
+      // snaps in (results->snap_seq,soid.snap]
       SnapSet& snapset = obc->ssc->snapset;
-      vector<snapid_t>::iterator p = snapset.snaps.begin();
-      while (p != snapset.snaps.end() && *p > soid.snap)
-       ++p;
-      while (p != snapset.snaps.end() && *p > results->snap_seq) {
-       results->snaps.push_back(*p);
-       ++p;
+      for (auto p = snapset.clone_snaps.rbegin();
+          p != snapset.clone_snaps.rend();
+          ++p) {
+       for (auto snap : p->second) {
+         if (snap > soid.snap) {
+           continue;
+         }
+         if (snap <= results->snap_seq) {
+           break;
+         }
+         results->snaps.push_back(snap);
+       }
       }
     }
 
@@ -9681,7 +9695,11 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
 
     OpContextUPtr tctx = simple_opc_create(obc);
     tctx->at_version = get_next_version();
-    filter_snapc(tctx->new_snapset.snaps);
+    if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+      filter_snapc(tctx->new_snapset.snaps);
+    } else {
+      tctx->new_snapset.snaps.clear();
+    }
     vector<snapid_t> new_clones;
     map<snapid_t, vector<snapid_t>> new_clone_snaps;
     for (vector<snapid_t>::iterator i = tctx->new_snapset.clones.begin();
@@ -9773,16 +9791,22 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     }
     tctx->new_obs.oi.size = results->object_size;
     tctx->new_obs.oi.user_version = results->user_version;
+    tctx->new_obs.oi.mtime = ceph::real_clock::to_timespec(results->mtime);
+    tctx->mtime = utime_t();
     if (results->is_data_digest()) {
       tctx->new_obs.oi.set_data_digest(results->data_digest);
     } else {
       tctx->new_obs.oi.clear_data_digest();
     }
+    if (results->object_size)
+      tctx->clean_regions.mark_data_region_dirty(0, results->object_size);
     if (results->is_omap_digest()) {
       tctx->new_obs.oi.set_omap_digest(results->omap_digest);
     } else {
       tctx->new_obs.oi.clear_omap_digest();
     }
+    if (results->has_omap)
+        tctx->clean_regions.mark_omap_dirty();
     tctx->new_obs.oi.truncate_seq = results->truncate_seq;
     tctx->new_obs.oi.truncate_size = results->truncate_size;
 
@@ -9803,7 +9827,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     ceph_assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
     tctx->new_snapset.from_snap_set(
       results->snapset,
-      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
+      get_osdmap()->require_osd_release < ceph_release_t::luminous);
   }
   dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
 
@@ -9946,19 +9970,18 @@ struct C_Flush : public Context {
   void finish(int r) override {
     if (r == -ECANCELED)
       return;
-    pg->lock();
+    std::scoped_lock locker{*pg};
     if (last_peering_reset == pg->get_last_peering_reset()) {
       pg->finish_flush(oid, tid, r);
       pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start);
     }
-    pg->unlock();
   }
 };
 
 int PrimaryLogPG::start_flush(
   OpRequestRef op, ObjectContextRef obc,
   bool blocking, hobject_t *pmissing,
-  boost::optional<std::function<void()>> &&on_flush)
+  std::optional<std::function<void()>> &&on_flush)
 {
   const object_info_t& oi = obc->obs.oi;
   const hobject_t& soid = oi.soid;
@@ -9968,8 +9991,17 @@ int PrimaryLogPG::start_flush(
           << " " << (blocking ? "blocking" : "non-blocking/best-effort")
           << dendl;
 
-  // get a filtered snapset, need to remove removed snaps
-  SnapSet snapset = obc->ssc->snapset.get_filtered(pool.info);
+  bool preoctopus_compat =
+    get_osdmap()->require_osd_release < ceph_release_t::octopus;
+  SnapSet snapset;
+  if (preoctopus_compat) {
+    // for pre-octopus compatibility, filter SnapSet::snaps.  not
+    // certain we need this, but let's be conservative.
+    snapset = obc->ssc->snapset.get_filtered(pool.info);
+  } else {
+    // NOTE: change this to a const ref when we remove this compat code
+    snapset = obc->ssc->snapset;
+  }
 
   // verify there are no (older) check for dirty clones
   {
@@ -9981,7 +10013,7 @@ int PrimaryLogPG::start_flush(
       hobject_t next = soid;
       next.snap = *p;
       ceph_assert(next.snap < soid.snap);
-      if (pg_log.get_missing().is_missing(next)) {
+      if (recovery_state.get_pg_log().get_missing().is_missing(next)) {
        dout(10) << __func__ << " missing clone is " << next << dendl;
        if (pmissing)
          *pmissing = next;
@@ -10072,8 +10104,7 @@ int PrimaryLogPG::start_flush(
   SnapContext snapc, dsnapc;
   if (snapset.seq != 0) {
     if (soid.snap == CEPH_NOSNAP) {
-      snapc.seq = snapset.seq;
-      snapc.snaps = snapset.snaps;
+      snapc = snapset.get_ssc_as_of(snapset.seq);
     } else {
       snapid_t min_included_snap;
       auto p = snapset.clone_snaps.find(soid.snap);
@@ -10138,20 +10169,24 @@ int PrimaryLogPG::start_flush(
   }
   C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
 
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
   ceph_tid_t tid = osd->objecter->mutate(
     soid.oid, base_oloc, o, snapc,
     ceph::real_clock::from_ceph_timespec(oi.mtime),
     CEPH_OSD_FLAG_IGNORE_OVERLAY | CEPH_OSD_FLAG_ENFORCE_SNAPC,
     new C_OnFinisher(fin,
-                    osd->objecter_finishers[n]));
+                    osd->get_objecter_finisher(get_pg_shard())));
   /* we're under the pg lock and fin->finish() is grabbing that */
   fin->tid = tid;
   fop->objecter_tid = tid;
 
   flush_ops[soid] = fop;
-  info.stats.stats.sum.num_flush++;
-  info.stats.stats.sum.num_flush_kb += shift_round_up(oi.size, 10);
+
+  recovery_state.update_stats(
+    [&oi](auto &history, auto &stats) {
+      stats.stats.sum.num_flush++;
+      stats.stats.sum.num_flush_kb += shift_round_up(oi.size, 10);
+      return false;
+    });
   return -EINPROGRESS;
 }
 
@@ -10187,7 +10222,7 @@ void PrimaryLogPG::finish_flush(hobject_t oid, ceph_tid_t tid, int r)
     }
     if (fop->on_flush) {
       (*(fop->on_flush))();
-      fop->on_flush = boost::none;
+      fop->on_flush = std::nullopt;
     }
     flush_ops.erase(oid);
     return;
@@ -10224,7 +10259,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
     }
     if (fop->on_flush) {
       (*(fop->on_flush))();
-      fop->on_flush = boost::none;
+      fop->on_flush = std::nullopt;
     }
     flush_ops.erase(oid);
     if (fop->blocking)
@@ -10257,7 +10292,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
     osd->logger->inc(l_osd_tier_clean);
     if (fop->on_flush) {
       (*(fop->on_flush))();
-      fop->on_flush = boost::none;
+      fop->on_flush = std::nullopt;
     }
     flush_ops.erase(oid);
     return 0;
@@ -10270,7 +10305,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   // try to take the lock manually, since we don't
   // have a ctx yet.
   if (ctx->lock_manager.get_lock_type(
-       ObjectContext::RWState::RWWRITE,
+       RWState::RWWRITE,
        oid,
        obc,
        fop->op)) {
@@ -10281,7 +10316,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
     // fop->op is now waiting on the lock; get fop->dup_ops to wait too.
     for (auto op : fop->dup_ops) {
       bool locked = ctx->lock_manager.get_lock_type(
-       ObjectContext::RWState::RWWRITE,
+       RWState::RWWRITE,
        oid,
        obc,
        op);
@@ -10301,7 +10336,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
 
   if (fop->on_flush) {
     ctx->register_on_finish(*(fop->on_flush));
-    fop->on_flush = boost::none;
+    fop->on_flush = std::nullopt;
   }
 
   ctx->at_version = get_next_version();
@@ -10320,6 +10355,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
       t->omap_clear(oid);
       ctx->new_obs.oi.clear_omap_digest();
       ctx->new_obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+      ctx->clean_regions.mark_omap_dirty();
     }
     if (obc->obs.oi.size == chunks_size) { 
       t->truncate(oid, 0);
@@ -10329,6 +10365,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
       truncate_update_size_and_usage(ctx->delta_stats,
                                     ctx->new_obs.oi,
                                     0);
+      ctx->clean_regions.mark_data_region_dirty(0, ctx->new_obs.oi.size);
       ctx->new_obs.oi.new_object();
       for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
        p.second.clear_flag(chunk_info_t::FLAG_DIRTY);
@@ -10397,7 +10434,7 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
   }
   if (fop->on_flush) {
     (*(fop->on_flush))();
-    fop->on_flush = boost::none;
+    fop->on_flush = std::nullopt;
   }
   flush_ops.erase(fop->obc->obs.oi.soid);
 }
@@ -10442,8 +10479,7 @@ void PrimaryLogPG::repop_all_committed(RepGather *repop)
   repop->all_committed = true;
   if (!repop->rep_aborted) {
     if (repop->v != eversion_t()) {
-      last_update_ondisk = repop->v;
-      last_complete_ondisk = repop->pg_local_last_complete;
+      recovery_state.complete_write(repop->v, repop->pg_local_last_complete);
     }
     eval_repop(repop);
   }
@@ -10454,10 +10490,11 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
   dout(10) << "op_applied version " << applied_version << dendl;
   ceph_assert(applied_version != eversion_t());
   ceph_assert(applied_version <= info.last_update);
-  last_update_applied = applied_version;
+  recovery_state.local_write_applied(applied_version);
   if (is_primary()) {
     if (scrubber.active) {
-      if (last_update_applied >= scrubber.subset_last_update) {
+      if (recovery_state.get_last_update_applied() >=
+       scrubber.subset_last_update) {
        requeue_scrub(ops_blocked_by_scrub());
       }
     } else {
@@ -10468,14 +10505,8 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
 
 void PrimaryLogPG::eval_repop(RepGather *repop)
 {
-  const MOSDOp *m = NULL;
-  if (repop->op)
-    m = static_cast<const MOSDOp *>(repop->op->get_req());
-
-  if (m)
-    dout(10) << "eval_repop " << *repop << dendl;
-  else
-    dout(10) << "eval_repop " << *repop << " (no op)" << dendl;
+  dout(10) << "eval_repop " << *repop
+    << (repop->op && repop->op->get_req<MOSDOp>() ? "" : " (no op)") << dendl;
 
   // ondisk?
   if (repop->all_committed) {
@@ -10495,13 +10526,12 @@ void PrimaryLogPG::eval_repop(RepGather *repop)
           return_code = std::get<2>(i);
         }
         osd->reply_op_error(std::get<0>(i), return_code, repop->v,
-                            std::get<1>(i));
+                            std::get<1>(i), std::get<3>(i));
       }
       waiting_for_ondisk.erase(it);
     }
 
     publish_stats_to_osd();
-    calc_min_last_complete_ondisk();
 
     dout(10) << " removing " << *repop << dendl;
     ceph_assert(!repop_queue.empty());
@@ -10531,18 +10561,6 @@ void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
           << dendl;
 
   repop->v = ctx->at_version;
-  if (ctx->at_version > eversion_t()) {
-    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-        i != acting_recovery_backfill.end();
-        ++i) {
-      if (*i == get_primary()) continue;
-      pg_info_t &pinfo = peer_info[*i];
-      // keep peer_info up to date
-      if (pinfo.last_complete == pinfo.last_update)
-       pinfo.last_complete = ctx->at_version;
-      pinfo.last_update = ctx->at_version;
-    }
-  }
 
   ctx->op_t->add_obc(ctx->obc);
   if (ctx->clone_obc) {
@@ -10561,41 +10579,17 @@ void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
     projected_log.add(entry);
   }
 
-  bool requires_missing_loc = false;
-  for (set<pg_shard_t>::iterator i = async_recovery_targets.begin();
-       i != async_recovery_targets.end();
-       ++i) {
-    if (*i == get_primary() || !peer_missing[*i].is_missing(soid)) continue;
-    requires_missing_loc = true;
-    for (auto &&entry: ctx->log) {
-      peer_missing[*i].add_next_event(entry);
-    }
-  }
-
-  if (requires_missing_loc) {
-    for (auto &&entry: ctx->log) {
-      dout(30) << __func__ << " missing_loc before: "
-               << missing_loc.get_locations(entry.soid) << dendl;
-      missing_loc.add_missing(entry.soid, entry.version,
-                              eversion_t(), entry.is_delete());
-      // clear out missing_loc
-      missing_loc.clear_location(entry.soid);
-      for (auto &i: actingset) {
-        if (!peer_missing[i].is_missing(entry.soid))
-          missing_loc.add_location(entry.soid, i);
-      }
-      dout(30) << __func__ << " missing_loc after: "
-               << missing_loc.get_locations(entry.soid) << dendl;
-    }
-  }
-
+  recovery_state.pre_submit_op(
+    soid,
+    ctx->log,
+    ctx->at_version);
   pgbackend->submit_transaction(
     soid,
     ctx->delta_stats,
     ctx->at_version,
     std::move(ctx->op_t),
-    pg_trim_to,
-    min_last_complete_ondisk,
+    recovery_state.get_pg_trim_to(),
+    recovery_state.get_min_last_complete_ondisk(),
     ctx->log,
     ctx->updated_hset_history,
     on_all_commit,
@@ -10632,7 +10626,7 @@ boost::intrusive_ptr<PrimaryLogPG::RepGather> PrimaryLogPG::new_repop(
   int r,
   ObcLockManager &&manager,
   OpRequestRef &&op,
-  boost::optional<std::function<void(void)> > &&on_complete)
+  std::optional<std::function<void(void)> > &&on_complete)
 {
   RepGather *repop = new RepGather(
     std::move(manager),
@@ -10687,10 +10681,7 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
   dout(20) << __func__ << " " << repop << dendl;
   issue_repop(repop, ctx.get());
   eval_repop(repop);
-  if (hard_limit_pglog())
-    calc_trim_to_aggressive();
-  else
-    calc_trim_to();
+  recovery_state.update_trim_to();
   repop->put();
 }
 
@@ -10698,7 +10689,7 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
 void PrimaryLogPG::submit_log_entries(
   const mempool::osd_pglog::list<pg_log_entry_t> &entries,
   ObcLockManager &&manager,
-  boost::optional<std::function<void(void)> > &&_on_complete,
+  std::optional<std::function<void(void)> > &&_on_complete,
   OpRequestRef op,
   int r)
 {
@@ -10712,8 +10703,8 @@ void PrimaryLogPG::submit_log_entries(
   }
 
   boost::intrusive_ptr<RepGather> repop;
-  boost::optional<std::function<void(void)> > on_complete;
-  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
+  std::optional<std::function<void(void)> > on_complete;
+  if (get_osdmap()->require_osd_release >= ceph_release_t::jewel) {
     repop = new_repop(
       version,
       r,
@@ -10728,28 +10719,29 @@ void PrimaryLogPG::submit_log_entries(
     [this, entries, repop, on_complete]() {
       ObjectStore::Transaction t;
       eversion_t old_last_update = info.last_update;
-      merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
-
+      recovery_state.merge_new_log_entries(
+       entries, t, recovery_state.get_pg_trim_to(),
+       recovery_state.get_min_last_complete_ondisk());
 
       set<pg_shard_t> waiting_on;
-      for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
-          i != acting_recovery_backfill.end();
+      for (set<pg_shard_t>::const_iterator i = get_acting_recovery_backfill().begin();
+          i != get_acting_recovery_backfill().end();
           ++i) {
        pg_shard_t peer(*i);
        if (peer == pg_whoami) continue;
-       ceph_assert(peer_missing.count(peer));
-       ceph_assert(peer_info.count(peer));
-       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
+       ceph_assert(recovery_state.get_peer_missing().count(peer));
+       ceph_assert(recovery_state.has_peer_info(peer));
+       if (get_osdmap()->require_osd_release >= ceph_release_t::jewel) {
          ceph_assert(repop);
          MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
            entries,
            spg_t(info.pgid.pgid, i->shard),
            pg_whoami.shard,
            get_osdmap_epoch(),
-           last_peering_reset,
+           get_last_peering_reset(),
            repop->rep_tid,
-           pg_trim_to,
-           min_last_complete_ondisk);
+           recovery_state.get_pg_trim_to(),
+           recovery_state.get_min_last_complete_ondisk());
          osd->send_message_osd_cluster(
            peer.osd, m, get_osdmap_epoch());
          waiting_on.insert(peer);
@@ -10757,7 +10749,7 @@ void PrimaryLogPG::submit_log_entries(
          MOSDPGLog *m = new MOSDPGLog(
            peer.shard, pg_whoami.shard,
            info.last_update.epoch,
-           info, last_peering_reset);
+           info, get_last_peering_reset());
          m->log.log = entries;
          m->log.tail = old_last_update;
          m->log.head = info.last_update;
@@ -10782,7 +10774,7 @@ void PrimaryLogPG::submit_log_entries(
          epoch_t epoch)
          : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
        void finish(int) override {
-         pg->lock();
+         std::scoped_lock l{*pg};
          if (!pg->pg_has_reset_since(epoch)) {
            auto it = pg->log_entry_update_waiting_on.find(rep_tid);
            ceph_assert(it != pg->log_entry_update_waiting_on.end());
@@ -10794,7 +10786,6 @@ void PrimaryLogPG::submit_log_entries(
              pg->log_entry_update_waiting_on.erase(it);
            }
          }
-         pg->unlock();
        }
       };
       t.register_on_commit(
@@ -10804,10 +10795,7 @@ void PrimaryLogPG::submit_log_entries(
       op_applied(info.last_update);
     });
 
-  if (hard_limit_pglog())
-    calc_trim_to_aggressive();
-  else
-    calc_trim_to();
+  recovery_state.update_trim_to();
 }
 
 void PrimaryLogPG::cancel_log_updates()
@@ -10821,13 +10809,12 @@ void PrimaryLogPG::cancel_log_updates()
 
 void PrimaryLogPG::get_watchers(list<obj_watch_item_t> *ls)
 {
-  lock();
+  std::scoped_lock l{*this};
   pair<hobject_t, ObjectContextRef> i;
   while (object_contexts.get_next(i.first, &i)) {
     ObjectContextRef obc(i.second);
     get_obc_watchers(obc, *ls);
   }
-  unlock();
 }
 
 void PrimaryLogPG::get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers)
@@ -10883,10 +10870,10 @@ void PrimaryLogPG::check_blacklisted_obc_watchers(ObjectContextRef obc)
 void PrimaryLogPG::populate_obc_watchers(ObjectContextRef obc)
 {
   ceph_assert(is_active());
-  auto it_objects = pg_log.get_log().objects.find(obc->obs.oi.soid);
+  auto it_objects = recovery_state.get_pg_log().get_log().objects.find(obc->obs.oi.soid);
   ceph_assert((recovering.count(obc->obs.oi.soid) ||
          !is_missing_object(obc->obs.oi.soid)) ||
-        (it_objects != pg_log.get_log().objects.end() && // or this is a revert... see recover_primary()
+        (it_objects != recovery_state.get_pg_log().get_log().objects.end() && // or this is a revert... see recover_primary()
          it_objects->second->op ==
            pg_log_entry_t::LOST_REVERT &&
          it_objects->second->reverting_to ==
@@ -11007,11 +10994,11 @@ ObjectContextRef PrimaryLogPG::get_object_context(
   bool can_create,
   const map<string, bufferlist> *attrs)
 {
-  auto it_objects = pg_log.get_log().objects.find(soid);
+  auto it_objects = recovery_state.get_pg_log().get_log().objects.find(soid);
   ceph_assert(
-    attrs || !pg_log.get_missing().is_missing(soid) ||
+    attrs || !recovery_state.get_pg_log().get_missing().is_missing(soid) ||
     // or this is a revert... see recover_primary()
-    (it_objects != pg_log.get_log().objects.end() &&
+    (it_objects != recovery_state.get_pg_log().get_log().objects.end() &&
       it_objects->second->op ==
       pg_log_entry_t::LOST_REVERT));
   ObjectContextRef obc = object_contexts.lookup(soid);
@@ -11161,14 +11148,9 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
     return 0;
   }
 
-  hobject_t head = oid.get_head();
-
   // we want a snap
-  if (!map_snapid_to_clone && pool.info.is_removed_snap(oid.snap)) {
-    dout(10) << __func__ << " snap " << oid.snap << " is removed" << dendl;
-    return -ENOENT;
-  }
 
+  hobject_t head = oid.get_head();
   SnapSetContext *ssc = get_snapset_context(oid, can_create);
   if (!ssc || !(ssc->exists || can_create)) {
     dout(20) << __func__ << " " << oid << " no snapset" << dendl;
@@ -11209,7 +11191,7 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
               << " snapset " << ssc->snapset
               << " maps to " << oid << dendl;
 
-      if (pg_log.get_missing().is_missing(oid)) {
+      if (recovery_state.get_pg_log().get_missing().is_missing(oid)) {
        dout(10) << __func__ << " " << oid << " @" << oid.snap
                 << " snapset " << ssc->snapset
                 << " " << oid << " is missing" << dendl;
@@ -11273,7 +11255,7 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
   hobject_t soid(oid.oid, oid.get_key(), ssc->snapset.clones[k], oid.get_hash(),
                 info.pgid.pool(), oid.get_namespace());
 
-  if (pg_log.get_missing().is_missing(soid)) {
+  if (recovery_state.get_pg_log().get_missing().is_missing(soid)) {
     dout(20) << __func__ << " " << soid << " missing, try again later"
             << dendl;
     if (pmissing)
@@ -11287,14 +11269,19 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
     if (pmissing)
       *pmissing = soid;
     put_snapset_context(ssc);
-    if (is_degraded_or_backfilling_object(soid)) {
-      dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
-      return -EAGAIN;
-    } else if (is_degraded_on_async_recovery_target(soid)) {
-      dout(20) << __func__ << " clone is recovering " << soid << dendl;
-      return -EAGAIN;
+    if (is_primary()) {
+      if (is_degraded_or_backfilling_object(soid)) {
+       dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
+       return -EAGAIN;
+      } else if (is_degraded_on_async_recovery_target(soid)) {
+       dout(20) << __func__ << " clone is recovering " << soid << dendl;
+       return -EAGAIN;
+      } else {
+       dout(20) << __func__ << " missing clone " << soid << dendl;
+       return -ENOENT;
+      }
     } else {
-      dout(20) << __func__ << " missing clone " << soid << dendl;
+      dout(20) << __func__ << " replica missing clone" << soid << dendl;
       return -ENOENT;
     }
   }
@@ -11319,18 +11306,21 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
     ceph_assert(!cct->_conf->osd_debug_verify_snaps);
     return -ENOENT;
   }
-  first = p->second.back();
-  last = p->second.front();
-  if (first <= oid.snap) {
-    dout(20) << __func__ << " " << soid << " [" << first << "," << last
-            << "] contains " << oid.snap << " -- HIT " << obc->obs << dendl;
-    *pobc = obc;
-    return 0;
-  } else {
-    dout(20) << __func__ << " " << soid << " [" << first << "," << last
-            << "] does not contain " << oid.snap << " -- DNE" << dendl;
+  if (std::find(p->second.begin(), p->second.end(), oid.snap) ==
+      p->second.end()) {
+    dout(20) << __func__ << " " << soid << " clone_snaps " << p->second
+            << " does not contain " << oid.snap << " -- DNE" << dendl;
+    return -ENOENT;
+  }
+  if (get_osdmap()->in_removed_snaps_queue(info.pgid.pgid.pool(), oid.snap)) {
+    dout(20) << __func__ << " " << soid << " snap " << oid.snap
+            << " in removed_snaps_queue" << " -- DNE" << dendl;
     return -ENOENT;
   }
+  dout(20) << __func__ << " " << soid << " clone_snaps " << p->second
+          << " contains " << oid.snap << " -- HIT " << obc->obs << dendl;
+  *pobc = obc;
+  return 0;
 }
 
 void PrimaryLogPG::object_context_destructor_callback(ObjectContext *obc)
@@ -11481,27 +11471,27 @@ int PrimaryLogPG::recover_missing(
   int priority,
   PGBackend::RecoveryHandle *h)
 {
-  if (missing_loc.is_unfound(soid)) {
+  if (recovery_state.get_missing_loc().is_unfound(soid)) {
     dout(7) << __func__ << " " << soid
            << " v " << v 
            << " but it is unfound" << dendl;
     return PULL_NONE;
   }
 
-  if (missing_loc.is_deleted(soid)) {
+  if (recovery_state.get_missing_loc().is_deleted(soid)) {
     start_recovery_op(soid);
     ceph_assert(!recovering.count(soid));
     recovering.insert(make_pair(soid, ObjectContextRef()));
     epoch_t cur_epoch = get_osdmap_epoch();
-    remove_missing_object(soid, v, new FunctionContext(
+    remove_missing_object(soid, v, new LambdaContext(
      [=](int) {
-       lock();
+       std::scoped_lock locker{*this};
        if (!pg_has_reset_since(cur_epoch)) {
         bool object_missing = false;
-        for (const auto& shard : acting_recovery_backfill) {
+        for (const auto& shard : get_acting_recovery_backfill()) {
           if (shard == pg_whoami)
             continue;
-          if (peer_missing[shard].is_missing(soid)) {
+          if (recovery_state.get_peer_missing(shard).is_missing(soid)) {
             dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
             object_missing = true;
             break;
@@ -11519,7 +11509,6 @@ int PrimaryLogPG::recover_missing(
           pgbackend->run_recovery_op(recovery_handle, priority);
         }
        }
-       unlock();
      }));
     return PULL_YES;
   }
@@ -11530,13 +11519,13 @@ int PrimaryLogPG::recover_missing(
   if (soid.snap && soid.snap < CEPH_NOSNAP) {
     // do we have the head?
     hobject_t head = soid.get_head();
-    if (pg_log.get_missing().is_missing(head)) {
+    if (recovery_state.get_pg_log().get_missing().is_missing(head)) {
       if (recovering.count(head)) {
        dout(10) << " missing but already recovering head " << head << dendl;
        return PULL_NONE;
       } else {
        int r = recover_missing(
-         head, pg_log.get_missing().get_items().find(head)->second.need, priority,
+         head, recovery_state.get_pg_log().get_missing().get_items().find(head)->second.need, priority,
          h);
        if (r != PULL_NONE)
          return PULL_HEAD;
@@ -11577,18 +11566,18 @@ void PrimaryLogPG::remove_missing_object(const hobject_t &soid,
   recovery_info.version = v;
 
   epoch_t cur_epoch = get_osdmap_epoch();
-  t.register_on_complete(new FunctionContext(
+  t.register_on_complete(new LambdaContext(
      [=](int) {
-       lock();
+       std::unique_lock locker{*this};
        if (!pg_has_reset_since(cur_epoch)) {
         ObjectStore::Transaction t2;
         on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
         t2.register_on_complete(on_complete);
         int r = osd->store->queue_transaction(ch, std::move(t2), nullptr);
         ceph_assert(r == 0);
-        unlock();
+        locker.unlock();
        } else {
-        unlock();
+        locker.unlock();
         on_complete->complete(-EAGAIN);
        }
      }));
@@ -11619,32 +11608,13 @@ void PrimaryLogPG::finish_degraded_object(const hobject_t oid)
 void PrimaryLogPG::_committed_pushed_object(
   epoch_t epoch, eversion_t last_complete)
 {
-  lock();
+  std::scoped_lock locker{*this};
   if (!pg_has_reset_since(epoch)) {
-    dout(10) << __func__ << " last_complete " << last_complete << " now ondisk" << dendl;
-    last_complete_ondisk = last_complete;
-
-    if (last_complete_ondisk == info.last_update) {
-      if (!is_primary()) {
-        // Either we are a replica or backfill target.
-       // we are fully up to date.  tell the primary!
-       osd->send_message_osd_cluster(
-         get_primary().osd,
-         new MOSDPGTrim(
-           get_osdmap_epoch(),
-           spg_t(info.pgid.pgid, get_primary().shard),
-           last_complete_ondisk),
-         get_osdmap_epoch());
-      } else {
-       calc_min_last_complete_ondisk();
-      }
-    }
-
+    recovery_state.recovery_committed_to(last_complete);
   } else {
-    dout(10) << __func__ << " pg has changed, not touching last_complete_ondisk" << dendl;
+    dout(10) << __func__
+            << " pg has changed, not touching last_complete_ondisk" << dendl;
   }
-
-  unlock();
 }
 
 void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
@@ -11657,7 +11627,7 @@ void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
   --active_pushes;
 
   // requeue an active chunky scrub waiting on recovery ops
-  if (!deleting && active_pushes == 0
+  if (!recovery_state.is_deleting() && active_pushes == 0
       && scrubber.is_chunky_scrub_active()) {
     requeue_scrub(ops_blocked_by_scrub());
   }
@@ -11670,13 +11640,13 @@ void PrimaryLogPG::_applied_recovered_object_replica()
   --active_pushes;
 
   // requeue an active chunky scrub waiting on recovery ops
-  if (!deleting && active_pushes == 0 &&
+  if (!recovery_state.is_deleting() && active_pushes == 0 &&
       scrubber.active_rep_scrub && static_cast<const MOSDRepScrub*>(
        scrubber.active_rep_scrub->get_req())->chunky) {
     auto& op = scrubber.active_rep_scrub;
     osd->enqueue_back(
-      OpQueueItem(
-        unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
        op->get_req()->get_cost(),
        op->get_req()->get_priority(),
        op->get_req()->get_recv_stamp(),
@@ -11686,34 +11656,10 @@ void PrimaryLogPG::_applied_recovered_object_replica()
   }
 }
 
-void PrimaryLogPG::recover_got(hobject_t oid, eversion_t v)
-{
-  dout(10) << "got missing " << oid << " v " << v << dendl;
-  pg_log.recover_got(oid, v, info);
-  if (pg_log.get_log().log.empty()) {
-    dout(10) << "last_complete now " << info.last_complete
-             << " while log is empty" << dendl;
-  } else if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) {
-    dout(10) << "last_complete now " << info.last_complete
-            << " log.complete_to " << pg_log.get_log().complete_to->version
-            << dendl;
-  } else {
-    dout(10) << "last_complete now " << info.last_complete
-            << " log.complete_to at end" << dendl;
-    //below is not true in the repair case.
-    //assert(missing.num_missing() == 0);  // otherwise, complete_to was wrong.
-    ceph_assert(info.last_complete == info.last_update);
-  }
-}
-
-void PrimaryLogPG::primary_failed(const hobject_t &soid)
-{
-  list<pg_shard_t> fl = { pg_whoami };
-  failed_push(fl, soid);
-}
-
-void PrimaryLogPG::failed_push(const list<pg_shard_t> &from,
-  const hobject_t &soid, const eversion_t &need)
+void PrimaryLogPG::on_failed_pull(
+  const set<pg_shard_t> &from,
+  const hobject_t &soid,
+  const eversion_t &v)
 {
   dout(20) << __func__ << ": " << soid << dendl;
   ceph_assert(recovering.count(soid));
@@ -11725,40 +11671,44 @@ void PrimaryLogPG::failed_push(const list<pg_shard_t> &from,
   }
   recovering.erase(soid);
   for (auto&& i : from) {
-    missing_loc.remove_location(soid, i);
-    if (need != eversion_t()) {
-      dout(0) << __func__ << " adding " << soid << " to shard " << i
-              << "'s missing set too" << dendl;
-      auto pm = peer_missing.find(i);
-      if (pm != peer_missing.end())
-        pm->second.add(soid, need, eversion_t(), false);
+    if (i != pg_whoami) { // we'll get it below in primary_error
+      recovery_state.force_object_missing(i, soid, v);
     }
   }
+
   dout(0) << __func__ << " " << soid << " from shard " << from
-         << ", reps on " << missing_loc.get_locations(soid)
-         << " unfound? " << missing_loc.is_unfound(soid) << dendl;
+         << ", reps on " << recovery_state.get_missing_loc().get_locations(soid)
+         << " unfound? " << recovery_state.get_missing_loc().is_unfound(soid)
+         << dendl;
   finish_recovery_op(soid);  // close out this attempt,
+  finish_degraded_object(soid);
+
+  if (from.count(pg_whoami)) {
+    dout(0) << " primary missing oid " << soid << " version " << v << dendl;
+    primary_error(soid, v);
+    backfills_in_flight.erase(soid);
+  }
 }
 
 eversion_t PrimaryLogPG::pick_newest_available(const hobject_t& oid)
 {
   eversion_t v;
   pg_missing_item pmi;
-  bool is_missing = pg_log.get_missing().is_missing(oid, &pmi);
+  bool is_missing = recovery_state.get_pg_log().get_missing().is_missing(oid, &pmi);
   ceph_assert(is_missing);
   v = pmi.have;
   dout(10) << "pick_newest_available " << oid << " " << v << " on osd." << osd->whoami << " (local)" << dendl;
 
-  ceph_assert(!acting_recovery_backfill.empty());
-  for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-       i != acting_recovery_backfill.end();
+  ceph_assert(!get_acting_recovery_backfill().empty());
+  for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
+       i != get_acting_recovery_backfill().end();
        ++i) {
     if (*i == get_primary()) continue;
     pg_shard_t peer = *i;
-    if (!peer_missing[peer].is_missing(oid)) {
+    if (!recovery_state.get_peer_missing(peer).is_missing(oid)) {
       continue;
     }
-    eversion_t h = peer_missing[peer].get_items().at(oid).have;
+    eversion_t h = recovery_state.get_peer_missing(peer).get_items().at(oid).have;
     dout(10) << "pick_newest_available " << oid << " " << h << " on osd." << peer << dendl;
     if (h > v)
       v = h;
@@ -11774,22 +11724,24 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
     op->get_req());
   ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
   ObjectStore::Transaction t;
-  boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+  std::optional<eversion_t> op_trim_to, op_roll_forward_to;
   if (m->pg_trim_to != eversion_t())
     op_trim_to = m->pg_trim_to;
   if (m->pg_roll_forward_to != eversion_t())
     op_roll_forward_to = m->pg_roll_forward_to;
 
-  dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+  dout(20) << __func__
+          << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
 
-  append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+  recovery_state.append_log_entries_update_missing(
+    m->entries, t, op_trim_to, op_roll_forward_to);
   eversion_t new_lcod = info.last_complete;
 
-  Context *complete = new FunctionContext(
+  Context *complete = new LambdaContext(
     [=](int) {
       const MOSDPGUpdateLogMissing *msg = static_cast<const MOSDPGUpdateLogMissing*>(
        op->get_req());
-      lock();
+      std::scoped_lock locker{*this};
       if (!pg_has_reset_since(msg->get_epoch())) {
        update_last_complete_ondisk(new_lcod);
        MOSDPGUpdateLogMissingReply *reply =
@@ -11803,10 +11755,9 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
        reply->set_priority(CEPH_MSG_PRIO_HIGH);
        msg->get_connection()->send_message(reply);
       }
-      unlock();
     });
 
-  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+  if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
     t.register_on_commit(complete);
   } else {
     /* Hack to work around the fact that ReplicatedBackend sends
@@ -11865,31 +11816,30 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op)
  */
 void PrimaryLogPG::mark_all_unfound_lost(
   int what,
-  ConnectionRef con,
-  ceph_tid_t tid)
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
   dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
   list<hobject_t> oids;
 
   dout(30) << __func__ << ": log before:\n";
-  pg_log.get_log().print(*_dout);
+  recovery_state.get_pg_log().get_log().print(*_dout);
   *_dout << dendl;
 
   mempool::osd_pglog::list<pg_log_entry_t> log_entries;
 
   utime_t mtime = ceph_clock_now();
   map<hobject_t, pg_missing_item>::const_iterator m =
-    missing_loc.get_needs_recovery().begin();
+    recovery_state.get_missing_loc().get_needs_recovery().begin();
   map<hobject_t, pg_missing_item>::const_iterator mend =
-    missing_loc.get_needs_recovery().end();
+    recovery_state.get_missing_loc().get_needs_recovery().end();
 
   ObcLockManager manager;
   eversion_t v = get_next_version();
   v.epoch = get_osdmap_epoch();
-  uint64_t num_unfound = missing_loc.num_unfound();
+  uint64_t num_unfound = recovery_state.get_missing_loc().num_unfound();
   while (m != mend) {
     const hobject_t &oid(m->first);
-    if (!missing_loc.is_unfound(oid)) {
+    if (!recovery_state.get_missing_loc().is_unfound(oid)) {
       // We only care about unfound objects
       ++m;
       continue;
@@ -11925,7 +11875,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
       {
        pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
                         0, osd_reqid_t(), mtime, 0);
-       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
+       if (get_osdmap()->require_osd_release >= ceph_release_t::jewel) {
          if (pool.info.require_rollback()) {
            e.mod_desc.try_rmobject(v.version);
          } else {
@@ -11953,19 +11903,23 @@ void PrimaryLogPG::mark_all_unfound_lost(
     }
   }
 
-  info.stats.stats_invalid = true;
+  recovery_state.update_stats(
+    [](auto &history, auto &stats) {
+      stats.stats_invalid = true;
+      return false;
+    });
 
   submit_log_entries(
     log_entries,
     std::move(manager),
-    boost::optional<std::function<void(void)> >(
-      [this, oids, con, num_unfound, tid]() {
-       if (perform_deletes_during_peering()) {
+    std::optional<std::function<void(void)> >(
+      [this, oids, num_unfound, on_finish]() {
+       if (recovery_state.perform_deletes_during_peering()) {
          for (auto oid : oids) {
            // clear old locations - merge_new_log_entries will have
            // handled rebuilding missing_loc for each of these
            // objects if we have the RECOVERY_DELETES flag
-           missing_loc.recovered(oid);
+           recovery_state.object_recovered(oid, object_stat_sum_t());
          }
        }
 
@@ -11975,14 +11929,14 @@ void PrimaryLogPG::mark_all_unfound_lost(
              std::make_shared<PGPeeringEvent>(
              get_osdmap_epoch(),
              get_osdmap_epoch(),
-             DoRecovery())));
+             PeeringState::DoRecovery())));
        } else if (is_backfill_unfound()) {
          queue_peering_event(
            PGPeeringEventRef(
              std::make_shared<PGPeeringEvent>(
              get_osdmap_epoch(),
              get_osdmap_epoch(),
-             RequestBackfill())));
+             PeeringState::RequestBackfill())));
        } else {
          queue_recovery();
        }
@@ -11993,11 +11947,8 @@ void PrimaryLogPG::mark_all_unfound_lost(
        string rs = ss.str();
        dout(0) << "do_command r=" << 0 << " " << rs << dendl;
        osd->clog->info() << rs;
-       if (con) {
-         MCommandReply *reply = new MCommandReply(0, rs);
-         reply->set_tid(tid);
-         con->send_message(reply);
-       }
+       bufferlist empty;
+       on_finish(0, rs, empty);
       }),
     OpRequestRef());
 }
@@ -12065,11 +12016,7 @@ void PrimaryLogPG::apply_and_flush_repops(bool requeue)
 
 void PrimaryLogPG::on_flushed()
 {
-  ceph_assert(flushes_in_progress > 0);
-  flushes_in_progress--;
-  if (flushes_in_progress == 0) {
-    requeue_ops(waiting_for_flush);
-  }
+  requeue_ops(waiting_for_flush);
   if (!is_peered() || !is_primary()) {
     pair<hobject_t, ObjectContextRef> i;
     while (object_contexts.get_next(i.first, &i)) {
@@ -12079,20 +12026,13 @@ void PrimaryLogPG::on_flushed()
   }
 }
 
-void PrimaryLogPG::on_removal(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_removal(ObjectStore::Transaction &t)
 {
   dout(10) << __func__ << dendl;
 
-  // adjust info to backfill
-  info.set_last_backfill(hobject_t());
-  pg_log.reset_backfill();
-  dirty_info = true;
-
-  // clear log
-  PGLogEntryHandler rollbacker{this, t};
-  pg_log.roll_forward(&rollbacker);
-
   on_shutdown();
+
+  t.register_on_commit(new C_DeleteMore(this, get_osdmap_epoch()));
 }
 
 void PrimaryLogPG::clear_async_reads()
@@ -12116,9 +12056,6 @@ void PrimaryLogPG::on_shutdown()
 {
   dout(10) << __func__ << dendl;
 
-  // handles queue races
-  deleting = true;
-
   if (recovery_queued) {
     recovery_queued = false;
     osd->clear_queued_recovery(this);
@@ -12133,6 +12070,7 @@ void PrimaryLogPG::on_shutdown()
   cancel_copy_ops(false, &tids);
   cancel_flush_ops(false, &tids);
   cancel_proxy_ops(false, &tids);
+  cancel_manifest_ops(false, &tids);
   osd->objecter->op_cancel(tids, -ECANCELED);
 
   apply_and_flush_repops(false);
@@ -12160,8 +12098,22 @@ void PrimaryLogPG::on_shutdown()
   }
 }
 
-void PrimaryLogPG::on_activate()
+void PrimaryLogPG::on_activate_complete()
 {
+  check_local();
+  // waiters
+  if (!recovery_state.needs_flush()) {
+    requeue_ops(waiting_for_peered);
+  } else if (!waiting_for_peered.empty()) {
+    dout(10) << __func__ << " flushes in progress, moving "
+            << waiting_for_peered.size()
+            << " items to waiting_for_flush"
+            << dendl;
+    ceph_assert(waiting_for_flush.empty());
+    waiting_for_flush.swap(waiting_for_peered);
+  }
+
+
   // all clean?
   if (needs_recovery()) {
     dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
@@ -12170,7 +12122,7 @@ void PrimaryLogPG::on_activate()
        std::make_shared<PGPeeringEvent>(
          get_osdmap_epoch(),
          get_osdmap_epoch(),
-         DoRecovery())));
+         PeeringState::DoRecovery())));
   } else if (needs_backfill()) {
     dout(10) << "activate queueing backfill" << dendl;
     queue_peering_event(
@@ -12178,7 +12130,7 @@ void PrimaryLogPG::on_activate()
        std::make_shared<PGPeeringEvent>(
          get_osdmap_epoch(),
          get_osdmap_epoch(),
-         RequestBackfill())));
+         PeeringState::RequestBackfill())));
   } else {
     dout(10) << "activate all replicas clean, no recovery" << dendl;
     eio_errors_to_process = false;
@@ -12187,22 +12139,22 @@ void PrimaryLogPG::on_activate()
        std::make_shared<PGPeeringEvent>(
          get_osdmap_epoch(),
          get_osdmap_epoch(),
-         AllReplicasRecovered())));
+         PeeringState::AllReplicasRecovered())));
   }
 
   publish_stats_to_osd();
 
-  if (!backfill_targets.empty()) {
+  if (get_backfill_targets().size()) {
     last_backfill_started = earliest_backfill();
     new_backfill = true;
     ceph_assert(!last_backfill_started.is_max());
-    dout(5) << __func__ << ": bft=" << backfill_targets
+    dout(5) << __func__ << ": bft=" << get_backfill_targets()
           << " from " << last_backfill_started << dendl;
-    for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-        i != backfill_targets.end();
+    for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+        i != get_backfill_targets().end();
         ++i) {
       dout(5) << "target shard " << *i
-            << " from " << peer_info[*i].last_backfill
+            << " from " << recovery_state.get_peer_info(*i).last_backfill
             << dendl;
     }
   }
@@ -12211,17 +12163,7 @@ void PrimaryLogPG::on_activate()
   agent_setup();
 }
 
-void PrimaryLogPG::_on_new_interval()
-{
-  dout(20) << __func__ << " checking missing set deletes flag. missing = " << pg_log.get_missing() << dendl;
-  if (!pg_log.get_missing().may_include_deletes &&
-      get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
-    pg_log.rebuild_missing_set_with_deletes(osd->store, ch, info);
-  }
-  ceph_assert(pg_log.get_missing().may_include_deletes == get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES));
-}
-
-void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_change(ObjectStore::Transaction &t)
 {
   dout(10) << __func__ << dendl;
 
@@ -12240,6 +12182,7 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
   requeue_ops(waiting_for_peered);
   requeue_ops(waiting_for_flush);
   requeue_ops(waiting_for_active);
+  requeue_ops(waiting_for_readable);
 
   clear_scrub_reserved();
 
@@ -12247,6 +12190,7 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
   cancel_copy_ops(is_primary(), &tids);
   cancel_flush_ops(is_primary(), &tids);
   cancel_proxy_ops(is_primary(), &tids);
+  cancel_manifest_ops(is_primary(), &tids);
   osd->objecter->op_cancel(tids, -ECANCELED);
 
   // requeue object waiters
@@ -12312,8 +12256,8 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
   // registered watches.
   context_registry_on_change();
 
-  pgbackend->on_change_cleanup(t);
-  scrubber.cleanup_store(t);
+  pgbackend->on_change_cleanup(&t);
+  scrubber.cleanup_store(&t);
   pgbackend->on_change();
 
   // clear snap_trimmer state
@@ -12331,7 +12275,7 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
   ceph_assert(objects_blocked_on_degraded_snap.empty());
 }
 
-void PrimaryLogPG::on_role_change()
+void PrimaryLogPG::plpg_on_role_change()
 {
   dout(10) << __func__ << dendl;
   if (get_role() != 0 && hit_set) {
@@ -12340,7 +12284,7 @@ void PrimaryLogPG::on_role_change()
   }
 }
 
-void PrimaryLogPG::on_pool_change()
+void PrimaryLogPG::plpg_on_pool_change()
 {
   dout(10) << __func__ << dendl;
   // requeue cache full waiters just in case the cache_mode is
@@ -12362,7 +12306,6 @@ void PrimaryLogPG::on_pool_change()
 // clear state.  called on recovery completion AND cancellation.
 void PrimaryLogPG::_clear_recovery_state()
 {
-  missing_loc.clear();
 #ifdef DEBUG_RECOVERY_OIDS
   recovering_oids.clear();
 #endif
@@ -12412,40 +12355,13 @@ void PrimaryLogPG::cancel_pull(const hobject_t &soid)
     waiting_for_unreadable_object.erase(soid);
   }
   if (is_missing_object(soid))
-    pg_log.set_last_requested(0); // get recover_primary to start over
+    recovery_state.set_last_requested(0);
   finish_degraded_object(soid);
 }
 
 void PrimaryLogPG::check_recovery_sources(const OSDMapRef& osdmap)
 {
-  /*
-   * check that any peers we are planning to (or currently) pulling
-   * objects from are dealt with.
-   */
-  missing_loc.check_recovery_sources(osdmap);
   pgbackend->check_recovery_sources(osdmap);
-
-  for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
-       i != peer_log_requested.end();
-       ) {
-    if (!osdmap->is_up(i->osd)) {
-      dout(10) << "peer_log_requested removing " << *i << dendl;
-      peer_log_requested.erase(i++);
-    } else {
-      ++i;
-    }
-  }
-
-  for (set<pg_shard_t>::iterator i = peer_missing_requested.begin();
-       i != peer_missing_requested.end();
-       ) {
-    if (!osdmap->is_up(i->osd)) {
-      dout(10) << "peer_missing_requested removing " << *i << dendl;
-      peer_missing_requested.erase(i++);
-    } else {
-      ++i;
-    }
-  }
 }
 
 bool PrimaryLogPG::start_recovery_ops(
@@ -12459,7 +12375,7 @@ bool PrimaryLogPG::start_recovery_ops(
   bool recovery_started = false;
   ceph_assert(is_primary());
   ceph_assert(is_peered());
-  ceph_assert(!is_deleting());
+  ceph_assert(!recovery_state.is_deleting());
 
   ceph_assert(recovery_queued);
   recovery_queued = false;
@@ -12472,16 +12388,17 @@ bool PrimaryLogPG::start_recovery_ops(
     return have_unfound();
   }
 
-  const auto &missing = pg_log.get_missing();
+  const auto &missing = recovery_state.get_pg_log().get_missing();
 
   uint64_t num_unfound = get_num_unfound();
 
-  if (!missing.have_missing()) {
-    info.last_complete = info.last_update;
+  if (!recovery_state.have_missing()) {
+    recovery_state.local_recovery_complete();
   }
 
   if (!missing.have_missing() || // Primary does not have missing
-      all_missing_unfound()) { // or all of the missing objects are unfound.
+      // or all of the missing objects are unfound.
+      recovery_state.all_missing_unfound()) {
     // Recover the replicas.
     started = recover_replicas(max, handle, &recovery_started);
   }
@@ -12500,7 +12417,7 @@ bool PrimaryLogPG::start_recovery_ops(
   bool deferred_backfill = false;
   if (recovering.empty() &&
       state_test(PG_STATE_BACKFILLING) &&
-      !backfill_targets.empty() && started < max &&
+      !get_backfill_targets().empty() && started < max &&
       missing.num_missing() == 0 &&
       waiting_on_backfill.empty()) {
     if (get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL)) {
@@ -12510,7 +12427,7 @@ bool PrimaryLogPG::start_recovery_ops(
               !is_degraded())  {
       dout(10) << "deferring backfill due to NOREBALANCE" << dendl;
       deferred_backfill = true;
-    } else if (!backfill_reserved) {
+    } else if (!recovery_state.is_backfill_reserved()) {
       dout(10) << "deferring backfill due to !backfill_reserved" << dendl;
       if (!backfill_reserving) {
        dout(10) << "queueing RequestBackfill" << dendl;
@@ -12520,7 +12437,7 @@ bool PrimaryLogPG::start_recovery_ops(
            std::make_shared<PGPeeringEvent>(
              get_osdmap_epoch(),
              get_osdmap_epoch(),
-             RequestBackfill())));
+             PeeringState::RequestBackfill())));
       }
       deferred_backfill = true;
     } else {
@@ -12539,10 +12456,10 @@ bool PrimaryLogPG::start_recovery_ops(
   ceph_assert(recovery_ops_active == 0);
 
   dout(10) << __func__ << " needs_recovery: "
-          << missing_loc.get_needs_recovery()
+          << recovery_state.get_missing_loc().get_needs_recovery()
           << dendl;
   dout(10) << __func__ << " missing_loc: "
-          << missing_loc.get_missing_locs()
+          << recovery_state.get_missing_loc().get_missing_locs()
           << dendl;
   int unfound = get_num_unfound();
   if (unfound) {
@@ -12575,7 +12492,7 @@ bool PrimaryLogPG::start_recovery_ops(
           std::make_shared<PGPeeringEvent>(
             get_osdmap_epoch(),
             get_osdmap_epoch(),
-            RequestBackfill())));
+            PeeringState::RequestBackfill())));
     } else {
       dout(10) << "recovery done, no backfill" << dendl;
       eio_errors_to_process = false;
@@ -12585,7 +12502,7 @@ bool PrimaryLogPG::start_recovery_ops(
           std::make_shared<PGPeeringEvent>(
             get_osdmap_epoch(),
             get_osdmap_epoch(),
-            AllReplicasRecovered())));
+            PeeringState::AllReplicasRecovered())));
     }
   } else { // backfilling
     state_clear(PG_STATE_BACKFILLING);
@@ -12598,7 +12515,7 @@ bool PrimaryLogPG::start_recovery_ops(
         std::make_shared<PGPeeringEvent>(
           get_osdmap_epoch(),
           get_osdmap_epoch(),
-          Backfilled())));
+          PeeringState::Backfilled())));
   }
 
   return false;
@@ -12612,7 +12529,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 {
   ceph_assert(is_primary());
 
-  const auto &missing = pg_log.get_missing();
+  const auto &missing = recovery_state.get_pg_log().get_missing();
 
   dout(10) << __func__ << " recovering " << recovering.size()
            << " in pg,"
@@ -12627,14 +12544,14 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 
   PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
   map<version_t, hobject_t>::const_iterator p =
-    missing.get_rmissing().lower_bound(pg_log.get_log().last_requested);
+    missing.get_rmissing().lower_bound(recovery_state.get_pg_log().get_log().last_requested);
   while (p != missing.get_rmissing().end()) {
     handle.reset_tp_timeout();
     hobject_t soid;
     version_t v = p->first;
 
-    auto it_objects = pg_log.get_log().objects.find(p->second);
-    if (it_objects != pg_log.get_log().objects.end()) {
+    auto it_objects = recovery_state.get_pg_log().get_log().objects.find(p->second);
+    if (it_objects != recovery_state.get_pg_log().get_log().objects.end()) {
       latest = it_objects->second;
       ceph_assert(latest->is_update() || latest->is_delete());
       soid = latest->soid;
@@ -12687,8 +12604,11 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
              ceph_assert(!pool.info.require_rollback());
              t.setattr(coll, ghobject_t(soid), OI_ATTR, b2);
 
-             recover_got(soid, latest->version);
-             missing_loc.add_location(soid, pg_whoami);
+             recovery_state.recover_got(
+               soid,
+               latest->version,
+               false,
+               t);
 
              ++active_pushes;
 
@@ -12715,15 +12635,21 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
            eversion_t alternate_need = latest->reverting_to;
            dout(10) << " need to pull prior_version " << alternate_need << " for revert " << item << dendl;
 
-           for (map<pg_shard_t, pg_missing_t>::iterator p = peer_missing.begin();
-                p != peer_missing.end();
-                ++p)
+           set<pg_shard_t> good_peers;
+           for (auto p = recovery_state.get_peer_missing().begin();
+                p != recovery_state.get_peer_missing().end();
+                ++p) {
              if (p->second.is_missing(soid, need) &&
                  p->second.get_items().at(soid).have == alternate_need) {
-               missing_loc.add_location(soid, p->first);
+               good_peers.insert(p->first);
              }
+           }
+           recovery_state.set_revert_with_targets(
+             soid,
+             good_peers);
            dout(10) << " will pull " << alternate_need << " or " << need
-                    << " from one of " << missing_loc.get_locations(soid)
+                    << " from one of "
+                    << recovery_state.get_missing_loc().get_locations(soid)
                     << dendl;
          }
        }
@@ -12756,7 +12682,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
     
     // only advance last_requested if we haven't skipped anything
     if (!skipped)
-      pg_log.set_last_requested(v);
+      recovery_state.set_last_requested(v);
   }
  
   pgbackend->run_recovery_op(h, get_recovery_op_priority());
@@ -12766,28 +12692,16 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 bool PrimaryLogPG::primary_error(
   const hobject_t& soid, eversion_t v)
 {
-  pg_log.missing_add(soid, v, eversion_t());
-  pg_log.set_last_requested(0);
-  missing_loc.remove_location(soid, pg_whoami);
-  bool uhoh = true;
-  ceph_assert(!acting_recovery_backfill.empty());
-  for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-       i != acting_recovery_backfill.end();
-       ++i) {
-    if (*i == get_primary()) continue;
-    pg_shard_t peer = *i;
-    if (!peer_missing[peer].is_missing(soid, v)) {
-      missing_loc.add_location(soid, peer);
-      dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
-              << ", there should be a copy on shard " << peer << dendl;
-      uhoh = false;
-    }
-  }
+  recovery_state.force_object_missing(pg_whoami, soid, v);
+  bool uhoh = recovery_state.get_missing_loc().is_unfound(soid);
   if (uhoh)
-    osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
+    osd->clog->error() << info.pgid << " missing primary copy of "
+                      << soid << ", unfound";
   else
-    osd->clog->error() << info.pgid << " missing primary copy of " << soid
-                        << ", will try copies on " << missing_loc.get_locations(soid);
+    osd->clog->error() << info.pgid << " missing primary copy of "
+                      << soid
+                      << ", will try copies on "
+                      << recovery_state.get_missing_loc().get_locations(soid);
   return uhoh;
 }
 
@@ -12831,6 +12745,24 @@ int PrimaryLogPG::prep_object_replica_pushes(
   ceph_assert(is_primary());
   dout(10) << __func__ << ": on " << soid << dendl;
 
+  if (soid.snap && soid.snap < CEPH_NOSNAP) {
+    // do we have the head and/or snapdir?
+    hobject_t head = soid.get_head();
+    if (recovery_state.get_pg_log().get_missing().is_missing(head)) {
+      if (recovering.count(head)) {
+       dout(10) << " missing but already recovering head " << head << dendl;
+       return 0;
+      } else {
+       int r = recover_missing(
+           head, recovery_state.get_pg_log().get_missing().get_items().find(head)->second.need,
+           get_recovery_op_priority(), h);
+       if (r != PULL_NONE)
+         return 1;
+       return 0;
+      }
+    }
+  }
+
   // NOTE: we know we will get a valid oloc off of disk here.
   ObjectContextRef obc = get_object_context(soid, false);
   if (!obc) {
@@ -12852,11 +12784,6 @@ int PrimaryLogPG::prep_object_replica_pushes(
   ceph_assert(!recovering.count(soid));
   recovering.insert(make_pair(soid, obc));
 
-  /* We need this in case there is an in progress write on the object.  In fact,
-   * the only possible write is an update to the xattr due to a lost_revert --
-   * a client write would be blocked since the object is degraded.
-   * In almost all cases, therefore, this lock should be uncontended.
-   */
   int r = pgbackend->recover_object(
     soid,
     v,
@@ -12865,8 +12792,7 @@ int PrimaryLogPG::prep_object_replica_pushes(
     h);
   if (r < 0) {
     dout(0) << __func__ << " Error " << r << " on oid " << soid << dendl;
-    primary_failed(soid);
-    primary_error(soid, v);
+    on_failed_pull({ pg_whoami }, soid, v);
     return 0;
   }
   return 1;
@@ -12881,21 +12807,21 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
   PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
 
   // this is FAR from an optimal recovery order.  pretty lame, really.
-  ceph_assert(!acting_recovery_backfill.empty());
+  ceph_assert(!get_acting_recovery_backfill().empty());
   // choose replicas to recover, replica has the shortest missing list first
   // so we can bring it back to normal ASAP
   std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
     async_by_num_missing;
-  replicas_by_num_missing.reserve(acting_recovery_backfill.size() - 1);
-  for (auto &p: acting_recovery_backfill) {
+  replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1);
+  for (auto &p: get_acting_recovery_backfill()) {
     if (p == get_primary()) {
       continue;
     }
-    auto pm = peer_missing.find(p);
-    ceph_assert(pm != peer_missing.end());
+    auto pm = recovery_state.get_peer_missing().find(p);
+    ceph_assert(pm != recovery_state.get_peer_missing().end());
     auto nm = pm->second.num_missing();
     if (nm != 0) {
-      if (async_recovery_targets.count(p)) {
+      if (is_async_recovery_target(p)) {
         async_by_num_missing.push_back(make_pair(nm, p));
       } else {
         replicas_by_num_missing.push_back(make_pair(nm, p));
@@ -12916,10 +12842,8 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
   for (auto &replica: replicas_by_num_missing) {
     pg_shard_t &peer = replica.second;
     ceph_assert(peer != get_primary());
-    map<pg_shard_t, pg_missing_t>::const_iterator pm = peer_missing.find(peer);
-    ceph_assert(pm != peer_missing.end());
-    map<pg_shard_t, pg_info_t>::const_iterator pi = peer_info.find(peer);
-    ceph_assert(pi != peer_info.end());
+    auto pm = recovery_state.get_peer_missing().find(peer);
+    ceph_assert(pm != recovery_state.get_peer_missing().end());
     size_t m_sz = pm->second.num_missing();
 
     dout(10) << " peer osd." << peer << " missing " << m_sz << " objects." << dendl;
@@ -12933,14 +12857,16 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
       handle.reset_tp_timeout();
       const hobject_t soid(p->second);
 
-      if (missing_loc.is_unfound(soid)) {
+      if (recovery_state.get_missing_loc().is_unfound(soid)) {
        dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
        continue;
       }
 
-      if (soid > pi->second.last_backfill) {
+      const pg_info_t &pi = recovery_state.get_peer_info(peer);
+      if (soid > pi.last_backfill) {
        if (!recovering.count(soid)) {
-          derr << __func__ << ": object " << soid << " last_backfill " << pi->second.last_backfill << dendl;
+          derr << __func__ << ": object " << soid << " last_backfill "
+              << pi.last_backfill << dendl;
          derr << __func__ << ": object added to missing set for backfill, but "
               << "is not in recovering, error!" << dendl;
          ceph_abort();
@@ -12953,20 +12879,22 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
        continue;
       }
 
-      if (missing_loc.is_deleted(soid)) {
+      if (recovery_state.get_missing_loc().is_deleted(soid)) {
        dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
        map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
        started += prep_object_replica_deletes(soid, r->second.need, h, work_started);
        continue;
       }
 
-      if (soid.is_snap() && pg_log.get_missing().is_missing(soid.get_head())) {
+      if (soid.is_snap() &&
+         recovery_state.get_pg_log().get_missing().is_missing(
+           soid.get_head())) {
        dout(10) << __func__ << ": " << soid.get_head()
                 << " still missing on primary" << dendl;
        continue;
       }
 
-      if (pg_log.get_missing().is_missing(soid)) {
+      if (recovery_state.get_pg_log().get_missing().is_missing(soid)) {
        dout(10) << __func__ << ": " << soid << " still missing on primary" << dendl;
        continue;
       }
@@ -12984,15 +12912,10 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
 hobject_t PrimaryLogPG::earliest_peer_backfill() const
 {
   hobject_t e = hobject_t::get_max();
-  for (set<pg_shard_t>::const_iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
-       ++i) {
-    pg_shard_t peer = *i;
-    map<pg_shard_t, BackfillInterval>::const_iterator iter =
-      peer_backfill_info.find(peer);
+  for (const pg_shard_t& peer : get_backfill_targets()) {
+    const auto iter = peer_backfill_info.find(peer);
     ceph_assert(iter != peer_backfill_info.end());
-    if (iter->second.begin < e)
-      e = iter->second.begin;
+    e = std::min(e, iter->second.begin);
   }
   return e;
 }
@@ -13002,12 +12925,8 @@ bool PrimaryLogPG::all_peer_done() const
   // Primary hasn't got any more objects
   ceph_assert(backfill_info.empty());
 
-  for (set<pg_shard_t>::const_iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
-       ++i) {
-    pg_shard_t bt = *i;
-    map<pg_shard_t, BackfillInterval>::const_iterator piter =
-      peer_backfill_info.find(bt);
+  for (const pg_shard_t& bt : get_backfill_targets()) {
+    const auto piter = peer_backfill_info.find(bt);
     ceph_assert(piter != peer_backfill_info.end());
     const BackfillInterval& pbi = piter->second;
     // See if peer has more to process
@@ -13050,11 +12969,11 @@ uint64_t PrimaryLogPG::recover_backfill(
   ThreadPool::TPHandle &handle, bool *work_started)
 {
   dout(10) << __func__ << " (" << max << ")"
-           << " bft=" << backfill_targets
+           << " bft=" << get_backfill_targets()
           << " last_backfill_started " << last_backfill_started
           << (new_backfill ? " new_backfill":"")
           << dendl;
-  ceph_assert(!backfill_targets.empty());
+  ceph_assert(!get_backfill_targets().empty());
 
   // Initialize from prior backfill state
   if (new_backfill) {
@@ -13063,10 +12982,11 @@ uint64_t PrimaryLogPG::recover_backfill(
     new_backfill = false;
 
     // initialize BackfillIntervals
-    for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-        i != backfill_targets.end();
+    for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+        i != get_backfill_targets().end();
         ++i) {
-      peer_backfill_info[*i].reset(peer_info[*i].last_backfill);
+      peer_backfill_info[*i].reset(
+       recovery_state.get_peer_info(*i).last_backfill);
     }
     backfill_info.reset(last_backfill_started);
 
@@ -13074,11 +12994,11 @@ uint64_t PrimaryLogPG::recover_backfill(
     pending_backfill_updates.clear();
   }
 
-  for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
+  for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+       i != get_backfill_targets().end();
        ++i) {
     dout(10) << "peer osd." << *i
-          << " info " << peer_info[*i]
+          << " info " << recovery_state.get_peer_info(*i)
           << " interval " << peer_backfill_info[*i].begin
           << "-" << peer_backfill_info[*i].end
           << " " << peer_backfill_info[*i].objects.size() << " objects"
@@ -13093,11 +13013,13 @@ uint64_t PrimaryLogPG::recover_backfill(
   vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
   set<hobject_t> add_to_stat;
 
-  for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
+  for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+       i != get_backfill_targets().end();
        ++i) {
     peer_backfill_info[*i].trim_to(
-      std::max(peer_info[*i].last_backfill, last_backfill_started));
+      std::max(
+       recovery_state.get_peer_info(*i).last_backfill,
+       last_backfill_started));
   }
   backfill_info.trim_to(last_backfill_started);
 
@@ -13115,8 +13037,8 @@ uint64_t PrimaryLogPG::recover_backfill(
     dout(20) << "   my backfill interval " << backfill_info << dendl;
 
     bool sent_scan = false;
-    for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-        i != backfill_targets.end();
+    for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+        i != get_backfill_targets().end();
         ++i) {
       pg_shard_t bt = *i;
       BackfillInterval& pbi = peer_backfill_info[bt];
@@ -13127,7 +13049,7 @@ uint64_t PrimaryLogPG::recover_backfill(
        dout(10) << " scanning peer osd." << bt << " from " << pbi.end << dendl;
        epoch_t e = get_osdmap_epoch();
        MOSDPGScan *m = new MOSDPGScan(
-         MOSDPGScan::OP_SCAN_GET_DIGEST, pg_whoami, e, last_peering_reset,
+         MOSDPGScan::OP_SCAN_GET_DIGEST, pg_whoami, e, get_last_peering_reset(),
          spg_t(info.pgid.pgid, bt.shard),
          pbi.end, hobject_t());
        osd->send_message_osd_cluster(bt.osd, m, get_osdmap_epoch());
@@ -13156,8 +13078,8 @@ uint64_t PrimaryLogPG::recover_backfill(
     if (check < backfill_info.begin) {
 
       set<pg_shard_t> check_targets;
-      for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-          i != backfill_targets.end();
+      for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+          i != get_backfill_targets().end();
           ++i) {
         pg_shard_t bt = *i;
         BackfillInterval& pbi = peer_backfill_info[bt];
@@ -13189,8 +13111,8 @@ uint64_t PrimaryLogPG::recover_backfill(
       eversion_t& obj_v = backfill_info.objects.begin()->second;
 
       vector<pg_shard_t> need_ver_targs, missing_targs, keep_ver_targs, skip_targs;
-      for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-          i != backfill_targets.end();
+      for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+          i != get_backfill_targets().end();
           ++i) {
        pg_shard_t bt = *i;
        BackfillInterval& pbi = peer_backfill_info[bt];
@@ -13202,7 +13124,7 @@ uint64_t PrimaryLogPG::recover_backfill(
            keep_ver_targs.push_back(bt);
          }
         } else {
-         pg_info_t& pinfo = peer_info[bt];
+         const pg_info_t& pinfo = recovery_state.get_peer_info(bt);
 
           // Only include peers that we've caught up to their backfill line
          // otherwise, they only appear to be missing this object
@@ -13255,7 +13177,7 @@ uint64_t PrimaryLogPG::recover_backfill(
       }
       dout(20) << "need_ver_targs=" << need_ver_targs
               << " keep_ver_targs=" << keep_ver_targs << dendl;
-      dout(20) << "backfill_targets=" << backfill_targets
+      dout(20) << "backfill_targets=" << get_backfill_targets()
               << " missing_targs=" << missing_targs
               << " skip_targs=" << skip_targs << dendl;
 
@@ -13331,15 +13253,9 @@ uint64_t PrimaryLogPG::recover_backfill(
        pending_backfill_updates.erase(i++)) {
     dout(20) << " pending_backfill_update " << i->first << dendl;
     ceph_assert(i->first > new_last_backfill);
-    for (set<pg_shard_t>::iterator j = backfill_targets.begin();
-        j != backfill_targets.end();
-        ++j) {
-      pg_shard_t bt = *j;
-      pg_info_t& pinfo = peer_info[bt];
-      //Add stats to all peers that were missing object
-      if (i->first > pinfo.last_backfill)
-        pinfo.stats.add(i->second);
-    }
+    recovery_state.update_complete_backfill_object_stats(
+      i->first,
+      i->second);
     new_last_backfill = i->first;
   }
   dout(10) << "possible new_last_backfill at " << new_last_backfill << dendl;
@@ -13357,33 +13273,29 @@ uint64_t PrimaryLogPG::recover_backfill(
   // If new_last_backfill == MAX, then we will send OP_BACKFILL_FINISH to
   // all the backfill targets.  Otherwise, we will move last_backfill up on
   // those targets need it and send OP_BACKFILL_PROGRESS to them.
-  for (set<pg_shard_t>::iterator i = backfill_targets.begin();
-       i != backfill_targets.end();
+  for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+       i != get_backfill_targets().end();
        ++i) {
     pg_shard_t bt = *i;
-    pg_info_t& pinfo = peer_info[bt];
+    const pg_info_t& pinfo = recovery_state.get_peer_info(bt);
 
     if (new_last_backfill > pinfo.last_backfill) {
-      pinfo.set_last_backfill(new_last_backfill);
+      recovery_state.update_peer_last_backfill(bt, new_last_backfill);
       epoch_t e = get_osdmap_epoch();
       MOSDPGBackfill *m = NULL;
       if (pinfo.last_backfill.is_max()) {
         m = new MOSDPGBackfill(
          MOSDPGBackfill::OP_BACKFILL_FINISH,
          e,
-         last_peering_reset,
+         get_last_peering_reset(),
          spg_t(info.pgid.pgid, bt.shard));
         // Use default priority here, must match sub_op priority
-        /* pinfo.stats might be wrong if we did log-based recovery on the
-         * backfilled portion in addition to continuing backfill.
-         */
-        pinfo.stats = info.stats;
         start_recovery_op(hobject_t::get_max());
       } else {
         m = new MOSDPGBackfill(
          MOSDPGBackfill::OP_BACKFILL_PROGRESS,
          e,
-         last_peering_reset,
+         get_last_peering_reset(),
          spg_t(info.pgid.pgid, bt.shard));
         // Use default priority here, must match sub_op priority
       }
@@ -13411,18 +13323,13 @@ int PrimaryLogPG::prep_backfill_object_push(
   ceph_assert(!peers.empty());
 
   backfills_in_flight.insert(oid);
-  for (unsigned int i = 0 ; i < peers.size(); ++i) {
-    map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
-    ceph_assert(bpm != peer_missing.end());
-    bpm->second.add(oid, eversion_t(), eversion_t(), false);
-  }
+  recovery_state.prepare_backfill_for_missing(oid, v, peers);
 
   ceph_assert(!recovering.count(oid));
 
   start_recovery_op(oid);
   recovering.insert(make_pair(oid, obc));
 
-  // We need to take the read_lock here in order to flush in-progress writes
   int r = pgbackend->recover_object(
     oid,
     v,
@@ -13431,10 +13338,7 @@ int PrimaryLogPG::prep_backfill_object_push(
     h);
   if (r < 0) {
     dout(0) << __func__ << " Error " << r << " on oid " << oid << dendl;
-    primary_failed(oid);
-    primary_error(oid, v);
-    backfills_in_flight.erase(oid);
-    missing_loc.add_missing(oid, v, eversion_t());
+    on_failed_pull({ pg_whoami }, oid, v);
   }
   return r;
 }
@@ -13457,7 +13361,7 @@ void PrimaryLogPG::update_range(
     dout(10) << __func__<< ": bi is current " << dendl;
     ceph_assert(bi->version == projected_last_update);
   } else if (bi->version >= info.log_tail) {
-    if (pg_log.get_log().empty() && projected_log.empty()) {
+    if (recovery_state.get_pg_log().get_log().empty() && projected_log.empty()) {
       /* Because we don't move log_tail on split, the log might be
        * empty even if log_tail != last_update.  However, the only
        * way to get here with an empty log is if log_tail is actually
@@ -13493,7 +13397,7 @@ void PrimaryLogPG::update_range(
       }
     };
     dout(10) << "scanning pg log first" << dendl;
-    pg_log.get_log().scan_log_after(bi->version, func);
+    recovery_state.get_pg_log().get_log().scan_log_after(bi->version, func);
     dout(10) << "scanning projected log" << dendl;
     projected_log.scan_log_after(bi->version, func);
     bi->version = projected_last_update;
@@ -13559,15 +13463,17 @@ void PrimaryLogPG::check_local()
 {
   dout(10) << __func__ << dendl;
 
-  ceph_assert(info.last_update >= pg_log.get_tail());  // otherwise we need some help!
+  ceph_assert(
+    info.last_update >=
+    recovery_state.get_pg_log().get_tail());  // otherwise we need some help!
 
   if (!cct->_conf->osd_debug_verify_stray_on_activate)
     return;
 
   // just scan the log.
   set<hobject_t> did;
-  for (list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
-       p != pg_log.get_log().log.rend();
+  for (list<pg_log_entry_t>::const_reverse_iterator p = recovery_state.get_pg_log().get_log().log.rbegin();
+       p != recovery_state.get_pg_log().get_log().log.rend();
        ++p) {
     if (did.count(p->soid))
       continue;
@@ -13615,11 +13521,11 @@ hobject_t PrimaryLogPG::get_hit_set_archive_object(utime_t start,
   ostringstream ss;
   ss << "hit_set_" << info.pgid.pgid << "_archive_";
   if (using_gmt) {
-    start.gmtime(ss) << "_";
-    end.gmtime(ss);
+    start.gmtime(ss, true /* legacy pre-octopus form */) << "_";
+    end.gmtime(ss, true /* legacy pre-octopus form */);
   } else {
-    start.localtime(ss) << "_";
-    end.localtime(ss);
+    start.localtime(ss, true /* legacy pre-octopus form */) << "_";
+    end.localtime(ss, true /* legacy pre-octopus form */);
   }
   hobject_t hoid(sobject_t(ss.str(), CEPH_NOSNAP), "",
                 info.pgid.ps(), info.pgid.pool(),
@@ -13665,7 +13571,7 @@ void PrimaryLogPG::hit_set_setup()
 void PrimaryLogPG::hit_set_remove_all()
 {
   // If any archives are degraded we skip this
-  for (list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
+  for (auto p = info.hit_set.history.begin();
        p != info.hit_set.history.end();
        ++p) {
     hobject_t aoid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt);
@@ -13678,7 +13584,7 @@ void PrimaryLogPG::hit_set_remove_all()
   }
 
   if (!info.hit_set.history.empty()) {
-    list<pg_hit_set_info_t>::reverse_iterator p = info.hit_set.history.rbegin();
+    auto p = info.hit_set.history.rbegin();
     ceph_assert(p != info.hit_set.history.rend());
     hobject_t oid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt);
     ceph_assert(!is_degraded_or_backfilling_object(oid));
@@ -13694,7 +13600,7 @@ void PrimaryLogPG::hit_set_remove_all()
     simple_opc_submit(std::move(ctx));
   }
 
-  info.hit_set = pg_hit_set_history_t();
+  recovery_state.update_hset(pg_hit_set_history_t());
   if (agent_state) {
     agent_state->discard_hit_sets();
   }
@@ -13762,10 +13668,11 @@ bool PrimaryLogPG::hit_set_apply_log()
   }
 
   dout(20) << __func__ << " " << to << " .. " << info.last_update << dendl;
-  list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
-  while (p != pg_log.get_log().log.rend() && p->version > to)
+  list<pg_log_entry_t>::const_reverse_iterator p =
+    recovery_state.get_pg_log().get_log().log.rbegin();
+  while (p != recovery_state.get_pg_log().get_log().log.rend() && p->version > to)
     ++p;
-  while (p != pg_log.get_log().log.rend() && p->version > from) {
+  while (p != recovery_state.get_pg_log().get_log().log.rend() && p->version > from) {
     hit_set->insert(p->soid);
     ++p;
   }
@@ -13784,7 +13691,7 @@ void PrimaryLogPG::hit_set_persist()
 
   // If any archives are degraded we skip this persist request
   // account for the additional entry being added below
-  for (list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
+  for (auto p = info.hit_set.history.begin();
        p != info.hit_set.history.end();
        ++p) {
     hobject_t aoid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt);
@@ -13802,11 +13709,10 @@ void PrimaryLogPG::hit_set_persist()
   // look just at that.  This is necessary because our transactions
   // may include a modify of the new hit_set *and* a delete of the
   // old one, and this may span the backfill boundary.
-  for (set<pg_shard_t>::iterator p = backfill_targets.begin();
-       p != backfill_targets.end();
+  for (set<pg_shard_t>::const_iterator p = get_backfill_targets().begin();
+       p != get_backfill_targets().end();
        ++p) {
-    ceph_assert(peer_info.count(*p));
-    const pg_info_t& pi = peer_info[*p];
+    const pg_info_t& pi = recovery_state.get_peer_info(*p);
     if (pi.last_backfill == hobject_t() ||
        pi.last_backfill.get_hash() == info.pgid.ps()) {
       dout(10) << __func__ << " backfill target osd." << *p
@@ -13881,6 +13787,9 @@ void PrimaryLogPG::hit_set_persist()
   ctx->op_t->create(oid);
   if (bl.length()) {
     ctx->op_t->write(oid, 0, bl.length(), bl, 0);
+    write_update_size_and_usage(ctx->delta_stats, obc->obs.oi, ctx->modified_ranges,
+        0, bl.length());
+    ctx->clean_regions.mark_data_region_dirty(0, bl.length());
   }
   map <string, bufferlist> attrs;
   attrs[OI_ATTR].claim(boi);
@@ -13897,6 +13806,7 @@ void PrimaryLogPG::hit_set_persist()
       ctx->mtime,
       0)
     );
+  ctx->log.back().clean_regions = ctx->clean_regions;
 
   hit_set_trim(ctx, max);
 
@@ -13995,18 +13905,16 @@ void PrimaryLogPG::agent_clear()
 // Return false if no objects operated on since start of object hash space
 bool PrimaryLogPG::agent_work(int start_max, int agent_flush_quota)
 {
-  lock();
+  std::scoped_lock locker{*this};
   if (!agent_state) {
     dout(10) << __func__ << " no agent state, stopping" << dendl;
-    unlock();
     return true;
   }
 
-  ceph_assert(!deleting);
+  ceph_assert(!recovery_state.is_deleting());
 
   if (agent_state->is_idle()) {
     dout(10) << __func__ << " idle, stopping" << dendl;
-    unlock();
     return true;
   }
 
@@ -14152,11 +14060,9 @@ bool PrimaryLogPG::agent_work(int start_max, int agent_flush_quota)
   if (need_delay) {
     ceph_assert(agent_state->delaying == false);
     agent_delay();
-    unlock();
     return false;
   }
   agent_choose_mode();
-  unlock();
   return true;
 }
 
@@ -14168,7 +14074,7 @@ void PrimaryLogPG::agent_load_hit_sets()
 
   if (agent_state->hit_set_map.size() < info.hit_set.history.size()) {
     dout(10) << __func__ << dendl;
-    for (list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
+    for (auto p = info.hit_set.history.begin();
         p != info.hit_set.history.end(); ++p) {
       if (agent_state->hit_set_map.count(p->begin.sec()) == 0) {
        dout(10) << __func__ << " loading " << p->begin << "-"
@@ -14346,7 +14252,7 @@ bool PrimaryLogPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush)
 
   auto null_op_req = OpRequestRef();
   if (!ctx->lock_manager.get_lock_type(
-       ObjectContext::RWState::RWWRITE,
+       RWState::RWWRITE,
        obc->obs.oi.soid,
        obc,
        null_op_req)) {
@@ -14401,12 +14307,11 @@ void PrimaryLogPG::agent_delay()
 void PrimaryLogPG::agent_choose_mode_restart()
 {
   dout(20) << __func__ << dendl;
-  lock();
+  std::scoped_lock locker{*this};
   if (agent_state && agent_state->delaying) {
     agent_state->delaying = false;
     agent_choose_mode(true);
   }
-  unlock();
 }
 
 bool PrimaryLogPG::agent_choose_mode(bool restart, OpRequestRef op)
@@ -14568,18 +14473,22 @@ bool PrimaryLogPG::agent_choose_mode(bool restart, OpRequestRef op)
            << " -> "
            << TierAgentState::get_flush_mode_name(flush_mode)
            << dendl;
-    if (flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
-      osd->agent_inc_high_count();
-      info.stats.stats.sum.num_flush_mode_high = 1;
-    } else if (flush_mode == TierAgentState::FLUSH_MODE_LOW) {
-      info.stats.stats.sum.num_flush_mode_low = 1;
-    }
-    if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
-      osd->agent_dec_high_count();
-      info.stats.stats.sum.num_flush_mode_high = 0;
-    } else if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_LOW) {
-      info.stats.stats.sum.num_flush_mode_low = 0;
-    }
+    recovery_state.update_stats(
+      [=](auto &history, auto &stats) {
+       if (flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
+         osd->agent_inc_high_count();
+         stats.stats.sum.num_flush_mode_high = 1;
+       } else if (flush_mode == TierAgentState::FLUSH_MODE_LOW) {
+         stats.stats.sum.num_flush_mode_low = 1;
+       }
+       if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
+         osd->agent_dec_high_count();
+         stats.stats.sum.num_flush_mode_high = 0;
+       } else if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_LOW) {
+         stats.stats.sum.num_flush_mode_low = 0;
+       }
+       return false;
+      });
     agent_state->flush_mode = flush_mode;
   }
   if (evict_mode != agent_state->evict_mode) {
@@ -14594,21 +14503,26 @@ bool PrimaryLogPG::agent_choose_mode(bool restart, OpRequestRef op)
        requeue_op(op);
       requeue_ops(waiting_for_flush);
       requeue_ops(waiting_for_active);
+      requeue_ops(waiting_for_readable);
       requeue_ops(waiting_for_scrub);
       requeue_ops(waiting_for_cache_not_full);
       objects_blocked_on_cache_full.clear();
       requeued = true;
     }
-    if (evict_mode == TierAgentState::EVICT_MODE_SOME) {
-      info.stats.stats.sum.num_evict_mode_some = 1;
-    } else if (evict_mode == TierAgentState::EVICT_MODE_FULL) {
-      info.stats.stats.sum.num_evict_mode_full = 1;
-    }
-    if (agent_state->evict_mode == TierAgentState::EVICT_MODE_SOME) {
-      info.stats.stats.sum.num_evict_mode_some = 0;
-    } else if (agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
-      info.stats.stats.sum.num_evict_mode_full = 0;
-    }
+    recovery_state.update_stats(
+      [=](auto &history, auto &stats) {
+       if (evict_mode == TierAgentState::EVICT_MODE_SOME) {
+         stats.stats.sum.num_evict_mode_some = 1;
+       } else if (evict_mode == TierAgentState::EVICT_MODE_FULL) {
+         stats.stats.sum.num_evict_mode_full = 1;
+       }
+       if (agent_state->evict_mode == TierAgentState::EVICT_MODE_SOME) {
+         stats.stats.sum.num_evict_mode_some = 0;
+       } else if (agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
+         stats.stats.sum.num_evict_mode_full = 0;
+       }
+       return false;
+      });
     agent_state->evict_mode = evict_mode;
   }
   uint64_t old_effort = agent_state->evict_effort;
@@ -14688,28 +14602,6 @@ bool PrimaryLogPG::already_complete(eversion_t v)
   return true;
 }
 
-bool PrimaryLogPG::already_ack(eversion_t v)
-{
-  dout(20) << __func__ << ": " << v << dendl;
-  for (xlist<RepGather*>::iterator i = repop_queue.begin();
-       !i.end();
-       ++i) {
-    // skip copy from temp object ops
-    if ((*i)->v == eversion_t()) {
-      dout(20) << __func__ << ": " << **i
-              << " version is empty" << dendl;
-      continue;
-    }
-    if ((*i)->v > v) {
-      dout(20) << __func__ << ": " << **i
-              << " (*i)->v past v" << dendl;
-      break;
-    }
-  }
-  dout(20) << __func__ << ": returning true" << dendl;
-  return true;
-}
-
 
 // ==========================================================================================
 // SCRUB
@@ -14735,13 +14627,13 @@ bool PrimaryLogPG::_range_available_for_scrub(
   return true;
 }
 
-static bool doing_clones(const boost::optional<SnapSet> &snapset,
+static bool doing_clones(const std::optional<SnapSet> &snapset,
                         const vector<snapid_t>::reverse_iterator &curclone) {
-    return snapset && curclone != snapset.get().clones.rend();
+    return snapset && curclone != snapset->clones.rend();
 }
 
 void PrimaryLogPG::log_missing(unsigned missing,
-                       const boost::optional<hobject_t> &head,
+                       const std::optional<hobject_t> &head,
                        LogChannelRef clog,
                        const spg_t &pgid,
                        const char *func,
@@ -14750,21 +14642,21 @@ void PrimaryLogPG::log_missing(unsigned missing,
 {
   ceph_assert(head);
   if (allow_incomplete_clones) {
-    dout(20) << func << " " << mode << " " << pgid << " " << head.get()
-               << " skipped " << missing << " clone(s) in cache tier" << dendl;
+    dout(20) << func << " " << mode << " " << pgid << " " << *head
+            << " skipped " << missing << " clone(s) in cache tier" << dendl;
   } else {
-    clog->info() << mode << " " << pgid << " " << head.get()
-                      << " : " << missing << " missing clone(s)";
+    clog->info() << mode << " " << pgid << " " << *head
+                << " : " << missing << " missing clone(s)";
   }
 }
 
-unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
-  const boost::optional<SnapSet> &snapset,
+unsigned PrimaryLogPG::process_clones_to(const std::optional<hobject_t> &head,
+  const std::optional<SnapSet> &snapset,
   LogChannelRef clog,
   const spg_t &pgid,
   const char *mode,
   bool allow_incomplete_clones,
-  boost::optional<snapid_t> target,
+  std::optional<snapid_t> target,
   vector<snapid_t>::reverse_iterator *curclone,
   inconsistent_snapset_wrapper &e)
 {
@@ -14773,14 +14665,14 @@ unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
   unsigned missing = 0;
 
   // NOTE: clones are in descending order, thus **curclone > target test here
-  hobject_t next_clone(head.get());
+  hobject_t next_clone(*head);
   while(doing_clones(snapset, *curclone) && (!target || **curclone > *target)) {
     ++missing;
     // it is okay to be missing one or more clones in a cache tier.
     // skip higher-numbered clones in the list.
     if (!allow_incomplete_clones) {
       next_clone.snap = **curclone;
-      clog->error() << mode << " " << pgid << " " << head.get()
+      clog->error() << mode << " " << pgid << " " << *head
                         << " : expected clone " << next_clone << " " << missing
                          << " missing";
       ++scrubber.shallow_errors;
@@ -14820,19 +14712,19 @@ unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
 void PrimaryLogPG::scrub_snapshot_metadata(
   ScrubMap &scrubmap,
   const map<hobject_t,
-            pair<boost::optional<uint32_t>,
-                 boost::optional<uint32_t>>> &missing_digest)
+            pair<std::optional<uint32_t>,
+                 std::optional<uint32_t>>> &missing_digest)
 {
   dout(10) << __func__ << dendl;
 
   bool repair = state_test(PG_STATE_REPAIR);
   bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
   const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-  boost::optional<snapid_t> all_clones;   // Unspecified snapid_t or boost::none
+  std::optional<snapid_t> all_clones;   // Unspecified snapid_t or std::nullopt
 
   // traverse in reverse order.
-  boost::optional<hobject_t> head;
-  boost::optional<SnapSet> snapset; // If initialized so will head (above)
+  std::optional<hobject_t> head;
+  std::optional<SnapSet> snapset; // If initialized so will head (above)
   vector<snapid_t>::reverse_iterator curclone; // Defined only if snapset initialized
   unsigned missing = 0;
   inconsistent_snapset_wrapper soid_error, head_error;
@@ -14844,7 +14736,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     ceph_assert(!soid.is_snapdir());
     soid_error = inconsistent_snapset_wrapper{soid};
     object_stat_sum_t stat;
-    boost::optional<object_info_t> oi;
+    std::optional<object_info_t> oi;
 
     stat.num_objects++;
 
@@ -14858,7 +14750,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
 
     // basic checks.
     if (p->second.attrs.count(OI_ATTR) == 0) {
-      oi = boost::none;
+      oi = std::nullopt;
       osd->clog->error() << mode << " " << info.pgid << " " << soid
                        << " : no '" << OI_ATTR << "' attr";
       ++scrubber.shallow_errors;
@@ -14868,9 +14760,9 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       bv.push_back(p->second.attrs[OI_ATTR]);
       try {
        oi = object_info_t(); // Initialize optional<> before decode into it
-       oi.get().decode(bv);
+       oi->decode(bv);
       } catch (buffer::error& e) {
-       oi = boost::none;
+       oi = std::nullopt;
        osd->clog->error() << mode << " " << info.pgid << " " << soid
                << " : can't decode '" << OI_ATTR << "' attr " << e.what();
        ++scrubber.shallow_errors;
@@ -14891,7 +14783,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
        ++scrubber.shallow_errors;
       }
 
-      dout(20) << mode << "  " << soid << " " << oi.get() << dendl;
+      dout(20) << mode << "  " << soid << " " << *oi << dendl;
 
       // A clone num_bytes will be added later when we have snapset
       if (!soid.is_snap()) {
@@ -14914,12 +14806,12 @@ void PrimaryLogPG::scrub_snapshot_metadata(
 
     // Check for any problems while processing clones
     if (doing_clones(snapset, curclone)) {
-      boost::optional<snapid_t> target;
+      std::optional<snapid_t> target;
       // Expecting an object with snap for current head
       if (soid.has_snapset() || soid.get_head() != head->get_head()) {
 
        dout(10) << __func__ << " " << mode << " " << info.pgid << " new object "
-                << soid << " while processing " << head.get() << dendl;
+                << soid << " while processing " << *head << dendl;
 
         target = all_clones;
       } else {
@@ -14987,7 +14879,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
        osd->clog->error() << mode << " " << info.pgid << " " << soid
                          << " : no '" << SS_ATTR << "' attr";
         ++scrubber.shallow_errors;
-       snapset = boost::none;
+       snapset = std::nullopt;
        head_error.set_snapset_missing();
       } else {
        bufferlist bl;
@@ -14995,10 +14887,10 @@ void PrimaryLogPG::scrub_snapshot_metadata(
        auto blp = bl.cbegin();
         try {
          snapset = SnapSet(); // Initialize optional<> before decoding into it
-         decode(snapset.get(), blp);
+         decode(*snapset, blp);
           head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]);
         } catch (buffer::error& e) {
-         snapset = boost::none;
+         snapset = std::nullopt;
           osd->clog->error() << mode << " " << info.pgid << " " << soid
                << " : can't decode '" << SS_ATTR << "' attr " << e.what();
          ++scrubber.shallow_errors;
@@ -15011,7 +14903,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
        curclone = snapset->clones.rbegin();
 
        if (!snapset->clones.empty()) {
-         dout(20) << "  snapset " << snapset.get() << dendl;
+         dout(20) << "  snapset " << *snapset << dendl;
          if (snapset->seq == 0) {
            osd->clog->error() << mode << " " << info.pgid << " " << soid
                               << " : snaps.seq not set";
@@ -15089,7 +14981,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
 
   if (doing_clones(snapset, curclone)) {
     dout(10) << __func__ << " " << mode << " " << info.pgid
-            << " No more objects while processing " << head.get() << dendl;
+            << " No more objects while processing " << *head << dendl;
 
     missing += process_clones_to(head, snapset, osd->clog, info.pgid, mode,
                      pool.info.allow_incomplete_clones(), all_clones, &curclone,
@@ -15162,8 +15054,12 @@ void PrimaryLogPG::_scrub_finish()
   const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
 
   if (info.stats.stats_invalid) {
-    info.stats.stats = scrub_cstat;
-    info.stats.stats_invalid = false;
+    recovery_state.update_stats(
+      [=](auto &history, auto &stats) {
+       stats.stats = scrub_cstat;
+       stats.stats_invalid = false;
+       return false;
+      });
 
     if (agent_state)
       agent_choose_mode();
@@ -15213,15 +15109,19 @@ void PrimaryLogPG::_scrub_finish()
 
     if (repair) {
       ++scrubber.fixed;
-      info.stats.stats = scrub_cstat;
-      info.stats.dirty_stats_invalid = false;
-      info.stats.omap_stats_invalid = false;
-      info.stats.hitset_stats_invalid = false;
-      info.stats.hitset_bytes_stats_invalid = false;
-      info.stats.pin_stats_invalid = false;
-      info.stats.manifest_stats_invalid = false;
+      recovery_state.update_stats(
+       [this](auto &history, auto &stats) {
+         stats.stats = scrub_cstat;
+         stats.dirty_stats_invalid = false;
+         stats.omap_stats_invalid = false;
+         stats.hitset_stats_invalid = false;
+         stats.hitset_bytes_stats_invalid = false;
+         stats.pin_stats_invalid = false;
+         stats.manifest_stats_invalid = false;
+         return false;
+       });
       publish_stats_to_osd();
-      share_pg_info();
+      recovery_state.share_pg_info();
     }
   }
   // Clear object context cache to get repair information
@@ -15229,11 +15129,6 @@ void PrimaryLogPG::_scrub_finish()
     object_contexts.clear();
 }
 
-bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
-{
-    return osd->check_osdmap_full(missing_on);
-}
-
 int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpContext *ctx)
 {
   OpRequestRef op = ctx->op;
@@ -15242,18 +15137,17 @@ int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpContext *ct
   ceph_assert(is_primary());
 
   dout(10) << __func__ << " " << soid
-          << " peers osd.{" << acting_recovery_backfill << "}" << dendl;
+          << " peers osd.{" << get_acting_recovery_backfill() << "}" << dendl;
 
   if (!is_clean()) {
     block_for_clean(soid, op);
     return -EAGAIN;
   }
 
-  ceph_assert(!pg_log.get_missing().is_missing(soid));
+  ceph_assert(!recovery_state.get_pg_log().get_missing().is_missing(soid));
   auto& oi = ctx->new_obs.oi;
   eversion_t v = oi.version;
 
-  missing_loc.add_missing(soid, v, eversion_t());
   if (primary_error(soid, v)) {
     dout(0) << __func__ << " No other replicas available for " << soid << dendl;
     // XXX: If we knew that there is no down osd which could include this
@@ -15278,7 +15172,7 @@ int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpContext *ct
          std::make_shared<PGPeeringEvent>(
          get_osdmap_epoch(),
          get_osdmap_epoch(),
-         DoRecovery())));
+         PeeringState::DoRecovery())));
   } else {
     // A prior error must have already cleared clean state and queued recovery
     // or a map change has triggered re-peering.
@@ -15311,7 +15205,7 @@ void PrimaryLogPG::SnapTrimmer::log_exit(const char *state_name, utime_t enter_t
 /* NotTrimming */
 PrimaryLogPG::NotTrimming::NotTrimming(my_context ctx)
   : my_base(ctx), 
-    NamedState(context< SnapTrimmer >().pg, "NotTrimming")
+    NamedState(nullptr, "NotTrimming")
 {
   context< SnapTrimmer >().log_enter(state_name);
 }
@@ -15364,7 +15258,7 @@ boost::statechart::result PrimaryLogPG::WaitReservation::react(const SnapTrimRes
 /* AwaitAsyncWork */
 PrimaryLogPG::AwaitAsyncWork::AwaitAsyncWork(my_context ctx)
   : my_base(ctx),
-    NamedState(context< SnapTrimmer >().pg, "Trimming/AwaitAsyncWork")
+    NamedState(nullptr, "Trimming/AwaitAsyncWork")
 {
   auto *pg = context< SnapTrimmer >().pg;
   context< SnapTrimmer >().log_enter(state_name);
@@ -15405,22 +15299,31 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
     // Done!
     ldout(pg->cct, 10) << "got ENOENT" << dendl;
 
-    ldout(pg->cct, 10) << "adding snap " << snap_to_trim
-                      << " to purged_snaps"
-                      << dendl;
-    pg->info.purged_snaps.insert(snap_to_trim);
     pg->snap_trimq.erase(snap_to_trim);
-    ldout(pg->cct, 10) << "purged_snaps now "
-                      << pg->info.purged_snaps << ", snap_trimq now "
-                      << pg->snap_trimq << dendl;
 
-    ObjectStore::Transaction t;
-    pg->dirty_big_info = true;
-    pg->write_if_dirty(t);
-    int tr = pg->osd->store->queue_transaction(pg->ch, std::move(t), NULL);
-    ceph_assert(tr == 0);
+    if (pg->snap_trimq_repeat.count(snap_to_trim)) {
+      ldout(pg->cct, 10) << " removing from snap_trimq_repeat" << dendl;
+      pg->snap_trimq_repeat.erase(snap_to_trim);
+    } else {
+      ldout(pg->cct, 10) << "adding snap " << snap_to_trim
+                        << " to purged_snaps"
+                        << dendl;
+      ObjectStore::Transaction t;
+      pg->recovery_state.adjust_purged_snaps(
+       [snap_to_trim](auto &purged_snaps) {
+         purged_snaps.insert(snap_to_trim);
+       });
+      pg->write_if_dirty(t);
+
+      ldout(pg->cct, 10) << "purged_snaps now "
+                        << pg->info.purged_snaps << ", snap_trimq now "
+                        << pg->snap_trimq << dendl;
+
+      int tr = pg->osd->store->queue_transaction(pg->ch, std::move(t), NULL);
+      ceph_assert(tr == 0);
 
-    pg->share_pg_info();
+      pg->recovery_state.share_pg_info();
+    }
     post_event(KickTrim());
     return transit< NotTrimming >();
   }
@@ -15430,7 +15333,7 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
     // Get next
     ldout(pg->cct, 10) << "AwaitAsyncWork react trimming " << object << dendl;
     OpContextUPtr ctx;
-    int error = pg->trim_object(in_flight.empty(), object, &ctx);
+    int error = pg->trim_object(in_flight.empty(), object, snap_to_trim, &ctx);
     if (error) {
       if (error == -ENOLCK) {
        ldout(pg->cct, 10) << "could not get write lock on obj "