]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/pg.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / pg.cc
index 7fd940f1e76a5733b1fab1a48e4e2f8bd123a601..3d5bb20d408b33c0b3eb995e3d9a0ac8d8ffe5b8 100644 (file)
 #include <fmt/format.h>
 #include <fmt/ostream.h>
 
+#include "common/hobject_fmt.h"
+
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 #include "messages/MOSDRepOp.h"
 #include "messages/MOSDRepOpReply.h"
 
 #include "osd/OSDMap.h"
+#include "osd/osd_types_fmt.h"
 
 #include "os/Transaction.h"
 
 #include "crimson/osd/ops_executer.h"
 #include "crimson/osd/osd_operations/osdop_params.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+#include "crimson/osd/osd_operations/snaptrim_event.h"
 #include "crimson/osd/pg_recovery.h"
 #include "crimson/osd/replicated_recovery_backend.h"
+#include "crimson/osd/watch.h"
 
 using std::ostream;
 using std::set;
@@ -58,6 +64,17 @@ std::ostream& operator<<(std::ostream& out, const signedspan& d)
 }
 }
 
+template <typename T>
+struct fmt::formatter<std::optional<T>> : fmt::formatter<T> {
+  template <typename FormatContext>
+  auto format(const std::optional<T>& v, FormatContext& ctx) const {
+    if (v.has_value()) {
+      return fmt::formatter<T>::format(*v, ctx);
+    }
+    return fmt::format_to(ctx.out(), "<null>");
+  }
+};
+
 namespace crimson::osd {
 
 using crimson::common::local_conf;
@@ -91,9 +108,8 @@ PG::PG(
     pg_whoami{pg_shard},
     coll_ref{coll_ref},
     pgmeta_oid{pgid.make_pgmeta_oid()},
-    osdmap_gate("PG::osdmap_gate", std::nullopt),
+    osdmap_gate("PG::osdmap_gate"),
     shard_services{shard_services},
-    osdmap{osdmap},
     backend(
       PGBackend::create(
        pgid.pgid,
@@ -101,7 +117,8 @@ PG::PG(
        pool,
        coll_ref,
        shard_services,
-       profile)),
+       profile,
+       *this)),
     recovery_backend(
       std::make_unique<ReplicatedRecoveryBackend>(
        *this, shard_services, coll_ref, backend.get())),
@@ -119,6 +136,23 @@ PG::PG(
       osdmap,
       this,
       this),
+    obc_registry{
+      local_conf()},
+    obc_loader{
+      obc_registry,
+      *backend.get(),
+      *this},
+    osdriver(
+      &shard_services.get_store(),
+      coll_ref,
+      pgid.make_pgmeta_oid()),
+    snap_mapper(
+      this->shard_services.get_cct(),
+      &osdriver,
+      pgid.ps(),
+      pgid.get_split_bits(pool.get_pg_num()),
+      pgid.pool(),
+      pgid.shard),
     wait_for_active_blocker(this)
 {
   peering_state.set_backend_predicates(
@@ -129,21 +163,38 @@ PG::PG(
 
 PG::~PG() {}
 
+void PG::check_blocklisted_watchers()
+{
+  logger().debug("{}", __func__);
+  obc_registry.for_each([this](ObjectContextRef obc) {
+    assert(obc);
+    for (const auto& [key, watch] : obc->watchers) {
+      assert(watch->get_pg() == this);
+      const auto& ea = watch->get_peer_addr();
+      logger().debug("watch: Found {} cookie {}. Checking entity_add_t {}",
+                     watch->get_entity(), watch->get_cookie(), ea);
+      if (get_osdmap()->is_blocklisted(ea)) {
+        logger().info("watch: Found blocklisted watcher for {}", ea);
+        watch->do_watch_timeout();
+      }
+    }
+  });
+}
+
 bool PG::try_flush_or_schedule_async() {
-  logger().debug("PG::try_flush_or_schedule_async: do_transaction...");
-  (void)shard_services.get_store().do_transaction(
-    coll_ref,
-    ObjectStore::Transaction()).then(
-      [this, epoch=get_osdmap_epoch()]() {
-       return shard_services.start_operation<LocalPeeringEvent>(
-         this,
-         shard_services,
-         pg_whoami,
-         pgid,
-         epoch,
-         epoch,
-         PeeringState::IntervalFlush());
-      });
+  logger().debug("PG::try_flush_or_schedule_async: flush ...");
+  (void)shard_services.get_store().flush(
+    coll_ref
+  ).then(
+    [this, epoch=get_osdmap_epoch()]() {
+      return shard_services.start_operation<LocalPeeringEvent>(
+       this,
+       pg_whoami,
+       pgid,
+       epoch,
+       epoch,
+       PeeringState::IntervalFlush());
+    });
   return false;
 }
 
@@ -172,11 +223,16 @@ pg_stat_t PG::get_stats() const
 void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
 {
   // handle the peering event in the background
+  logger().debug(
+    "{}: PG::queue_check_readable lpr: {}, delay: {}",
+    *this, last_peering_reset, delay);
   check_readable_timer.cancel();
   check_readable_timer.set_callback([last_peering_reset, this] {
+    logger().debug(
+      "{}: PG::queue_check_readable callback lpr: {}",
+      *this, last_peering_reset);
     (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
-      shard_services,
       pg_whoami,
       pgid,
       last_peering_reset,
@@ -194,11 +250,16 @@ void PG::recheck_readable()
   if (peering_state.state_test(PG_STATE_WAIT)) {
     auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
     if (mnow < prior_readable_until_ub) {
-      logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
-                   __func__, mnow, prior_readable_until_ub);
+      logger().info(
+       "{}: {} will wait (mnow {} < prior_readable_until_ub {})",
+       *this, __func__, mnow, prior_readable_until_ub);
+      queue_check_readable(
+       peering_state.get_last_peering_reset(),
+       prior_readable_until_ub - mnow);
     } else {
-      logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
-                   __func__, mnow, prior_readable_until_ub);
+      logger().info(
+       "{}:{} no longer wait (mnow {} >= prior_readable_until_ub {})",
+       *this, __func__, mnow, prior_readable_until_ub);
       peering_state.state_clear(PG_STATE_WAIT);
       peering_state.clear_prior_readable_until_ub();
       changed = true;
@@ -207,14 +268,17 @@ void PG::recheck_readable()
   if (peering_state.state_test(PG_STATE_LAGGY)) {
     auto readable_until = peering_state.get_readable_until();
     if (readable_until == readable_until.zero()) {
-      logger().info("{} still laggy (mnow {}, readable_until zero)",
-                   __func__, mnow);
+      logger().info(
+       "{}:{} still laggy (mnow {}, readable_until zero)",
+       *this, __func__, mnow);
     } else if (mnow >= readable_until) {
-      logger().info("{} still laggy (mnow {} >= readable_until {})",
-                   __func__, mnow, readable_until);
+      logger().info(
+       "{}:{} still laggy (mnow {} >= readable_until {})",
+       *this, __func__, mnow, readable_until);
     } else {
-      logger().info("{} no longer laggy (mnow {} < readable_until {})",
-                   __func__, mnow, readable_until);
+      logger().info(
+       "{}:{} no longer laggy (mnow {} < readable_until {})",
+       *this, __func__, mnow, readable_until);
       peering_state.state_clear(PG_STATE_LAGGY);
       changed = true;
     }
@@ -230,12 +294,13 @@ void PG::recheck_readable()
 
 unsigned PG::get_target_pg_log_entries() const
 {
-  const unsigned num_pgs = shard_services.get_pg_num();
-  const unsigned target =
-    local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd");
+  const unsigned local_num_pgs = shard_services.get_num_local_pgs();
+  const unsigned local_target =
+    local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd") /
+    seastar::smp::count;
   const unsigned min_pg_log_entries =
     local_conf().get_val<uint64_t>("osd_min_pg_log_entries");
-  if (num_pgs > 0 && target > 0) {
+  if (local_num_pgs > 0 && local_target > 0) {
     // target an even spread of our budgeted log entries across all
     // PGs.  note that while we only get to control the entry count
     // for primary PGs, we'll normally be responsible for a mix of
@@ -243,7 +308,7 @@ unsigned PG::get_target_pg_log_entries() const
     // will work out.
     const unsigned max_pg_log_entries =
       local_conf().get_val<uint64_t>("osd_max_pg_log_entries");
-    return std::clamp(target / num_pgs,
+    return std::clamp(local_target / local_num_pgs,
                      min_pg_log_entries,
                      max_pg_log_entries);
   } else {
@@ -252,21 +317,33 @@ unsigned PG::get_target_pg_log_entries() const
   }
 }
 
-void PG::on_activate(interval_set<snapid_t>)
+void PG::on_removal(ceph::os::Transaction &t) {
+  t.register_on_commit(
+    new LambdaContext(
+      [this](int r) {
+      ceph_assert(r == 0);
+      (void)shard_services.start_operation<LocalPeeringEvent>(
+        this, pg_whoami, pgid, float(0.001), get_osdmap_epoch(),
+        get_osdmap_epoch(), PeeringState::DeleteSome());
+  }));
+}
+
+void PG::on_activate(interval_set<snapid_t> snaps)
 {
+  logger().debug("{}: {} snaps={}", *this, __func__, snaps);
+  snap_trimq = std::move(snaps);
   projected_last_update = peering_state.get_info().last_update;
 }
 
 void PG::on_activate_complete()
 {
-  wait_for_active_blocker.on_active();
+  wait_for_active_blocker.unblock();
 
   if (peering_state.needs_recovery()) {
     logger().info("{}: requesting recovery",
                   __func__);
     (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
-      shard_services,
       pg_whoami,
       pgid,
       float(0.001),
@@ -278,7 +355,6 @@ void PG::on_activate_complete()
                   __func__);
     (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
-      shard_services,
       pg_whoami,
       pgid,
       float(0.001),
@@ -290,7 +366,6 @@ void PG::on_activate_complete()
                   " for pg: {}", __func__, pgid);
     (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
-      shard_services,
       pg_whoami,
       pgid,
       float(0.001),
@@ -299,7 +374,6 @@ void PG::on_activate_complete()
       PeeringState::AllReplicasRecovered{});
   }
   publish_stats_to_osd();
-  backend->on_activate_complete();
 }
 
 void PG::prepare_write(pg_info_t &info,
@@ -331,7 +405,7 @@ void PG::prepare_write(pg_info_t &info,
   }
   pglog.write_log_and_missing(
     t, &km, coll_ref->get_cid(), pgmeta_oid,
-    peering_state.get_pool().info.require_rollback());
+    peering_state.get_pgpool().info.require_rollback());
   if (!km.empty()) {
     t.omap_setkeys(coll_ref->get_cid(), pgmeta_oid, km);
   }
@@ -343,9 +417,126 @@ void PG::prepare_write(pg_info_t &info,
 std::pair<ghobject_t, bool>
 PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
 {
-  // TODO
-  shard_services.dec_pg_num();
-  return {_next, false};
+  logger().info("removing pg {}", pgid);
+  auto fut = interruptor::make_interruptible(
+    shard_services.get_store().list_objects(
+      coll_ref,
+      _next,
+      ghobject_t::get_max(),
+      local_conf()->osd_target_transaction_size));
+
+  auto [objs_to_rm, next] = fut.get();
+  if (objs_to_rm.empty()) {
+    logger().info("all objs removed, removing coll for {}", pgid);
+    t.remove(coll_ref->get_cid(), pgmeta_oid);
+    t.remove_collection(coll_ref->get_cid());
+    (void) shard_services.get_store().do_transaction(
+      coll_ref, std::move(t)).then([this] {
+      return shard_services.remove_pg(pgid);
+    });
+    return {next, false};
+  } else {
+    for (auto &obj : objs_to_rm) {
+      if (obj == pgmeta_oid) {
+        continue;
+      }
+      logger().trace("pg {}, removing obj {}", pgid, obj);
+      t.remove(coll_ref->get_cid(), obj);
+    }
+    t.register_on_commit(
+      new LambdaContext([this](int r) {
+      ceph_assert(r == 0);
+      logger().trace("triggering more pg delete {}", pgid);
+      (void) shard_services.start_operation<LocalPeeringEvent>(
+        this,
+        pg_whoami,
+        pgid,
+        float(0.001),
+        get_osdmap_epoch(),
+        get_osdmap_epoch(),
+        PeeringState::DeleteSome{});
+    }));
+    return {next, true};
+  }
+}
+
+Context *PG::on_clean()
+{
+  // Not needed yet (will be needed for IO unblocking)
+  return nullptr;
+}
+
+void PG::on_active_actmap()
+{
+  logger().debug("{}: {} snap_trimq={}", *this, __func__, snap_trimq);
+  peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
+  // loops until snap_trimq is empty or SNAPTRIM_ERROR.
+  std::ignore = seastar::do_until(
+    [this] { return snap_trimq.empty()
+                    || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR);
+    },
+    [this] {
+      peering_state.state_set(PG_STATE_SNAPTRIM);
+      publish_stats_to_osd();
+      const auto to_trim = snap_trimq.range_start();
+      snap_trimq.erase(to_trim);
+      const auto needs_pause = !snap_trimq.empty();
+      return seastar::repeat([to_trim, needs_pause, this] {
+        logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
+                       *this, to_trim);
+        return shard_services.start_operation<SnapTrimEvent>(
+          this,
+          snap_mapper,
+          to_trim,
+          needs_pause
+        ).second.handle_error(
+          crimson::ct_error::enoent::handle([this] {
+            logger().error("{}: ENOENT saw, trimming stopped", *this);
+            peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
+            publish_stats_to_osd();
+            return seastar::make_ready_future<seastar::stop_iteration>(
+              seastar::stop_iteration::yes);
+          }), crimson::ct_error::eagain::handle([this] {
+            logger().info("{}: EAGAIN saw, trimming restarted", *this);
+            return seastar::make_ready_future<seastar::stop_iteration>(
+              seastar::stop_iteration::no);
+          })
+        );
+      }).then([this, trimmed=to_trim] {
+        logger().debug("{}: trimmed snap={}", *this, trimmed);
+      });
+    }).finally([this] {
+      logger().debug("{}: PG::on_active_actmap() finished trimming",
+                     *this);
+      peering_state.state_clear(PG_STATE_SNAPTRIM);
+      peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
+      publish_stats_to_osd();
+    });
+}
+
+void PG::on_active_advmap(const OSDMapRef &osdmap)
+{
+  const auto new_removed_snaps = osdmap->get_new_removed_snaps();
+  if (auto it = new_removed_snaps.find(get_pgid().pool());
+      it != new_removed_snaps.end()) {
+    bool bad = false;
+    for (auto j : it->second) {
+      if (snap_trimq.intersects(j.first, j.second)) {
+       decltype(snap_trimq) added, overlap;
+       added.insert(j.first, j.second);
+       overlap.intersection_of(snap_trimq, added);
+        logger().error("{}: {} removed_snaps already contains {}",
+                       *this, __func__, overlap);
+       bad = true;
+       snap_trimq.union_of(added);
+      } else {
+       snap_trimq.insert(j.first, j.second);
+      }
+    }
+    logger().info("{}: {} new removed snaps {}, snap_trimq now{}",
+                  *this, __func__, it->second, snap_trimq);
+    assert(!bad || local_conf().get_val<bool>("osd_debug_verify_cached_snaps"));
+  }
 }
 
 void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
@@ -380,7 +571,7 @@ void PG::log_state_exit(
     events);
 }
 
-ceph::signedspan PG::get_mnow()
+ceph::signedspan PG::get_mnow() const
 {
   return shard_services.get_mnow();
 }
@@ -397,7 +588,6 @@ void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
   renew_lease_timer.set_callback([last_peering_reset, this] {
     (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
-      shard_services,
       pg_whoami,
       pgid,
       last_peering_reset,
@@ -422,7 +612,7 @@ void PG::init(
     new_acting_primary, history, pi, t);
 }
 
-seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
+seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
 {
   if (__builtin_expect(stopping, false)) {
     return seastar::make_exception_future<>(
@@ -459,7 +649,6 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
     epoch_t epoch = get_osdmap_epoch();
     (void) shard_services.start_operation<LocalPeeringEvent>(
        this,
-       shard_services,
        pg_whoami,
        pgid,
        epoch,
@@ -470,49 +659,61 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
   });
 }
 
-void PG::do_peering_event(
+PG::interruptible_future<> PG::do_peering_event(
   PGPeeringEvent& evt, PeeringCtx &rctx)
 {
   if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
       peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
     logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
+    return interruptor::now();
   } else {
     logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
-    peering_state.handle_event(
-      evt.get_event(),
-      &rctx);
-    peering_state.write_if_dirty(rctx.transaction);
+    // all peering event handling needs to be run in a dedicated seastar::thread,
+    // so that event processing can involve I/O reqs freely, for example: PG::on_removal,
+    // PG::on_new_interval
+    return interruptor::async([this, &evt, &rctx] {
+      peering_state.handle_event(
+        evt.get_event(),
+        &rctx);
+      peering_state.write_if_dirty(rctx.transaction);
+    });
   }
 }
 
-void PG::handle_advance_map(
+seastar::future<> PG::handle_advance_map(
   cached_map_t next_map, PeeringCtx &rctx)
 {
-  vector<int> newup, newacting;
-  int up_primary, acting_primary;
-  next_map->pg_to_up_acting_osds(
-    pgid.pgid,
-    &newup, &up_primary,
-    &newacting, &acting_primary);
-  peering_state.advance_map(
-    next_map,
-    peering_state.get_osdmap(),
-    newup,
-    up_primary,
-    newacting,
-    acting_primary,
-    rctx);
-  osdmap_gate.got_map(next_map->get_epoch());
+  return seastar::async([this, next_map=std::move(next_map), &rctx] {
+    vector<int> newup, newacting;
+    int up_primary, acting_primary;
+    next_map->pg_to_up_acting_osds(
+      pgid.pgid,
+      &newup, &up_primary,
+      &newacting, &acting_primary);
+    peering_state.advance_map(
+      next_map,
+      peering_state.get_osdmap(),
+      newup,
+      up_primary,
+      newacting,
+      acting_primary,
+      rctx);
+    osdmap_gate.got_map(next_map->get_epoch());
+  });
 }
 
-void PG::handle_activate_map(PeeringCtx &rctx)
+seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
 {
-  peering_state.activate_map(rctx);
+  return seastar::async([this, &rctx] {
+    peering_state.activate_map(rctx);
+  });
 }
 
-void PG::handle_initialize(PeeringCtx &rctx)
+seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
 {
-  peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+  return seastar::async([this, &rctx] {
+    peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+  });
 }
 
 
@@ -542,40 +743,13 @@ std::ostream& operator<<(std::ostream& os, const PG& pg)
   return os;
 }
 
-void PG::WaitForActiveBlocker::dump_detail(Formatter *f) const
-{
-  f->dump_stream("pgid") << pg->pgid;
-}
-
-void PG::WaitForActiveBlocker::on_active()
-{
-  p.set_value();
-  p = {};
-}
-
-blocking_future<> PG::WaitForActiveBlocker::wait()
-{
-  if (pg->peering_state.is_active()) {
-    return make_blocking_future(seastar::now());
-  } else {
-    return make_blocking_future(p.get_shared_future());
-  }
-}
-
-seastar::future<> PG::WaitForActiveBlocker::stop()
-{
-  p.set_exception(crimson::common::system_shutdown_exception());
-  return seastar::now();
-}
-
 std::tuple<PG::interruptible_future<>,
            PG::interruptible_future<>>
 PG::submit_transaction(
-  const OpInfo& op_info,
-  const std::vector<OSDOp>& ops,
   ObjectContextRef&& obc,
   ceph::os::Transaction&& txn,
-  osd_op_params_t&& osd_op_p)
+  osd_op_params_t&& osd_op_p,
+  std::vector<pg_log_entry_t>&& log_entries)
 {
   if (__builtin_expect(stopping, false)) {
     return {seastar::make_exception_future<>(
@@ -584,26 +758,8 @@ PG::submit_transaction(
   }
 
   epoch_t map_epoch = get_osdmap_epoch();
+  ceph_assert(!has_reset_since(osd_op_p.at_version.epoch));
 
-  if (__builtin_expect(osd_op_p.at_version.epoch != map_epoch, false)) {
-    throw crimson::common::actingset_changed(is_primary());
-  }
-
-  std::vector<pg_log_entry_t> log_entries;
-  log_entries.emplace_back(obc->obs.exists ?
-                     pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
-                   obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
-                   osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
-                   osd_op_p.req_id, osd_op_p.mtime,
-                    op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
-  // TODO: refactor the submit_transaction
-  if (op_info.allows_returnvec()) {
-    // also the per-op values are recorded in the pg log
-    log_entries.back().set_op_returns(ops);
-    logger().debug("{} op_returns: {}",
-                   __func__, log_entries.back().op_returns);
-  }
-  log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
   peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
   peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
                                                txn, true, false);
@@ -628,19 +784,6 @@ PG::submit_transaction(
   }));
 }
 
-void PG::fill_op_params_bump_pg_version(
-  osd_op_params_t& osd_op_p,
-  const bool user_modify)
-{
-  osd_op_p.at_version = next_version();
-  osd_op_p.pg_trim_to = get_pg_trim_to();
-  osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
-  osd_op_p.last_complete = get_info().last_complete;
-  if (user_modify) {
-    osd_op_p.user_at_version = osd_op_p.at_version.version;
-  }
-}
-
 PG::interruptible_future<> PG::repair_object(
   const hobject_t& oid,
   eversion_t& v) 
@@ -661,13 +804,12 @@ PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
 PG::do_osd_ops_execute(
   seastar::lw_shared_ptr<OpsExecuter> ox,
   std::vector<OSDOp>& ops,
-  const OpInfo &op_info,
   SuccessFunc&& success_func,
   FailureFunc&& failure_func)
 {
   assert(ox);
   auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
-    return reload_obc(obc).handle_error_interruptible(
+    return obc_loader.reload_obc(obc).handle_error_interruptible(
       load_obc_ertr::assert_all{"can't live with object state messed up"});
   });
   auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
@@ -677,26 +819,56 @@ PG::do_osd_ops_execute(
       ox->get_target(),
       ceph_osd_op_name(osd_op.op.op));
     return ox->execute_op(osd_op);
-  }).safe_then_interruptible([this, ox, &op_info, &ops] {
+  }).safe_then_interruptible([this, ox, &ops] {
     logger().debug(
       "do_osd_ops_execute: object {} all operations successful",
       ox->get_target());
-    peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
+    // check for full
+    if ((ox->delta_stats.num_bytes > 0 ||
+      ox->delta_stats.num_objects > 0) &&
+      get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
+      const auto& m = ox->get_message();
+      if (m.get_reqid().name.is_mds() ||   // FIXME: ignore MDS for now
+        m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
+        logger().info(" full, but proceeding due to FULL_FORCE or MDS");
+      } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
+        // they tried, they failed.
+        logger().info(" full, replying to FULL_TRY op");
+        if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA))
+          return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+            seastar::now(),
+            OpsExecuter::osd_op_ierrorator::future<>(
+              crimson::ct_error::edquot::make()));
+        else
+          return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+            seastar::now(),
+            OpsExecuter::osd_op_ierrorator::future<>(
+              crimson::ct_error::enospc::make()));
+      } else {
+        // drop request
+        logger().info(" full, dropping request (bad client)");
+        return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+          seastar::now(),
+          OpsExecuter::osd_op_ierrorator::future<>(
+            crimson::ct_error::eagain::make()));
+      }
+    }
     return std::move(*ox).flush_changes_n_do_ops_effects(
-      [this, &op_info, &ops] (auto&& txn,
-                              auto&& obc,
-                              auto&& osd_op_p,
-                              bool user_modify) {
+      ops,
+      snap_mapper,
+      osdriver,
+      [this] (auto&& txn,
+              auto&& obc,
+              auto&& osd_op_p,
+              auto&& log_entries) {
        logger().debug(
          "do_osd_ops_execute: object {} submitting txn",
          obc->get_oid());
-        fill_op_params_bump_pg_version(osd_op_p, user_modify);
        return submit_transaction(
-          op_info,
-          ops,
           std::move(obc),
           std::move(txn),
-          std::move(osd_op_p));
+          std::move(osd_op_p),
+          std::move(log_entries));
     });
   }).safe_then_unpack_interruptible(
     [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
@@ -732,32 +904,102 @@ PG::do_osd_ops_execute(
     (const std::error_code& e) mutable {
     return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
         seastar::now(),
+        e.value() == ENOENT ? (*failure_func_ptr)(e) :
         rollbacker.rollback_obc_if_modified(e).then_interruptible(
           [e, failure_func_ptr] {
           return (*failure_func_ptr)(e);
         }));
   }));
 }
+seastar::future<> PG::submit_error_log(
+  Ref<MOSDOp> m,
+  const OpInfo &op_info,
+  ObjectContextRef obc,
+  const std::error_code e,
+  ceph_tid_t rep_tid,
+  eversion_t &version)
+{
+  const osd_reqid_t &reqid = m->get_reqid();
+  mempool::osd_pglog::list<pg_log_entry_t> log_entries;
+  log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
+                                       obc->obs.oi.soid,
+                                       next_version(),
+                                       eversion_t(), 0,
+                                       reqid, utime_t(),
+                                       -e.value()));
+  if (op_info.allows_returnvec()) {
+    log_entries.back().set_op_returns(m->ops);
+  }
+  ceph_assert(is_primary());
+  if (!log_entries.empty()) {
+    ceph_assert(log_entries.rbegin()->version >= projected_last_update);
+    version = projected_last_update = log_entries.rbegin()->version;
+  }
+  ceph::os::Transaction t;
+  peering_state.merge_new_log_entries(
+    log_entries, t, peering_state.get_pg_trim_to(),
+    peering_state.get_min_last_complete_ondisk());
+
+    set<pg_shard_t> waiting_on;
+    for (auto &i : get_acting_recovery_backfill()) {
+      pg_shard_t peer(i);
+      if (peer == pg_whoami) continue;
+      ceph_assert(peering_state.get_peer_missing().count(peer));
+      ceph_assert(peering_state.has_peer_info(peer));
+      auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+                   log_entries,
+                   spg_t(peering_state.get_info().pgid.pgid, i.shard),
+                   pg_whoami.shard,
+                   get_osdmap_epoch(),
+                   get_last_peering_reset(),
+                   rep_tid,
+                   peering_state.get_pg_trim_to(),
+                   peering_state.get_min_last_complete_ondisk());
+      send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
+      waiting_on.insert(peer);
+    }
+    waiting_on.insert(pg_whoami);
+    log_entry_update_waiting_on.insert(
+      std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
+    return shard_services.get_store().do_transaction(
+      get_collection_ref(), std::move(t))
+      .then([this] {
+        peering_state.update_trim_to();
+        return seastar::now();
+    });
+}
 
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
 PG::do_osd_ops(
   Ref<MOSDOp> m,
+  crimson::net::ConnectionRef conn,
   ObjectContextRef obc,
-  const OpInfo &op_info)
+  const OpInfo &op_info,
+  const SnapContext& snapc)
 {
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
   }
   return do_osd_ops_execute<MURef<MOSDOpReply>>(
     seastar::make_lw_shared<OpsExecuter>(
-      Ref<PG>{this}, std::move(obc), op_info, *m),
+      Ref<PG>{this}, obc, op_info, *m, conn, snapc),
     m->ops,
-    op_info,
-    [this, m, rvec = op_info.allows_returnvec()] {
+    [this, m, obc, may_write = op_info.may_write(),
+     may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
       // TODO: should stop at the first op which returns a negative retval,
       //       cmpext uses it for returning the index of first unmatched byte
       int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
-      if (result > 0 && !rvec) {
+      if (may_read && result >= 0) {
+        for (auto &osdop : m->ops) {
+          if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+            result = osdop.rval.code;
+            break;
+          }
+        }
+      } else if (result > 0 && may_write && !rvec) {
+        result = 0;
+      } else if (result < 0 && (m->ops.empty() ?
+        0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
         result = 0;
       }
       auto reply = crimson::make_message<MOSDOpReply>(m.get(),
@@ -770,17 +1012,80 @@ PG::do_osd_ops(
         "do_osd_ops: {} - object {} sending reply",
         *m,
         m->get_hobj());
+      if (obc->obs.exists) {
+        reply->set_reply_versions(peering_state.get_info().last_update,
+          obc->obs.oi.user_version);
+      } else {
+        reply->set_reply_versions(peering_state.get_info().last_update,
+          peering_state.get_info().last_user_version);
+      }
       return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
         std::move(reply));
     },
-    [m, this] (const std::error_code& e) {
-      auto reply = crimson::make_message<MOSDOpReply>(
-        m.get(), -e.value(), get_osdmap_epoch(), 0, false);
-      reply->set_enoent_reply_versions(
-        peering_state.get_info().last_update,
-        peering_state.get_info().last_user_version);
-      return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+    [m, &op_info, obc, this] (const std::error_code& e) {
+    return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
+      auto fut = seastar::now();
+      epoch_t epoch = get_osdmap_epoch();
+      ceph_tid_t rep_tid = shard_services.get_tid();
+      auto last_complete = peering_state.get_info().last_complete;
+      if (op_info.may_write()) {
+        fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
+      }
+      return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete,  this] {
+        auto log_reply = [m, e, this] {
+          auto reply = crimson::make_message<MOSDOpReply>(
+            m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+          if (m->ops.empty() ? 0 :
+            m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+            reply->set_result(0);
+          }
+          // For all ops except for CMPEXT, the correct error value is encoded
+          // in e.value(). For CMPEXT, osdop.rval has the actual error value.
+          if (e.value() == ct_error::cmp_fail_error_value) {
+            assert(!m->ops.empty());
+            for (auto &osdop : m->ops) {
+              if (osdop.rval < 0) {
+                reply->set_result(osdop.rval);
+                break;
+              }
+            }
+          }
+          reply->set_enoent_reply_versions(
+          peering_state.get_info().last_update,
+          peering_state.get_info().last_user_version);
+          reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+          return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+            std::move(reply));
+       };
+
+        if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+          auto it = log_entry_update_waiting_on.find(rep_tid);
+          ceph_assert(it != log_entry_update_waiting_on.end());
+          auto it2 = it->second.waiting_on.find(pg_whoami);
+          ceph_assert(it2 != it->second.waiting_on.end());
+          it->second.waiting_on.erase(it2);
+
+          if (it->second.waiting_on.empty()) {
+            log_entry_update_waiting_on.erase(it);
+            if (version != eversion_t()) {
+              peering_state.complete_write(version, last_complete);
+            }
+            return log_reply();
+          } else {
+            return it->second.all_committed.get_shared_future()
+              .then([this, &version, last_complete, log_reply = std::move(log_reply)] {
+              if (version != eversion_t()) {
+                peering_state.complete_write(version, last_complete);
+              }
+              return log_reply();
+            });
+          }
+        } else {
+          return log_reply();
+        }
+      });
     });
+  });
 }
 
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
@@ -788,17 +1093,28 @@ PG::do_osd_ops(
   ObjectContextRef obc,
   std::vector<OSDOp>& ops,
   const OpInfo &op_info,
-  const do_osd_ops_params_tmsg_params,
+  const do_osd_ops_params_t &&msg_params,
   do_osd_ops_success_func_t success_func,
   do_osd_ops_failure_func_t failure_func)
 {
-  return do_osd_ops_execute<void>(
-    seastar::make_lw_shared<OpsExecuter>(
-      Ref<PG>{this}, std::move(obc), op_info, msg_params),
-    ops,
-    std::as_const(op_info),
-    std::move(success_func),
-    std::move(failure_func));
+  // This overload is generally used for internal client requests,
+  // use an empty SnapContext.
+  return seastar::do_with(
+    std::move(msg_params),
+    [=, this, &ops, &op_info](auto &msg_params) {
+    return do_osd_ops_execute<void>(
+      seastar::make_lw_shared<OpsExecuter>(
+        Ref<PG>{this},
+        std::move(obc),
+        op_info,
+        msg_params,
+        msg_params.get_connection(),
+        SnapContext{}
+      ),
+      ops,
+      std::move(success_func),
+      std::move(failure_func));
+  });
 }
 
 PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
@@ -816,8 +1132,11 @@ PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
     auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
                                            CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
                                            false);
+    reply->claim_op_out_data(m->ops);
+    reply->set_reply_versions(peering_state.get_info().last_update,
+      peering_state.get_info().last_user_version);
     return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
-  }).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
+  }).handle_exception_type_interruptible([=, this](const crimson::osd::error& e) {
     auto reply = crimson::make_message<MOSDOpReply>(
       m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
     reply->set_enoent_reply_versions(peering_state.get_info().last_update,
@@ -844,190 +1163,22 @@ RWState::State PG::get_lock_type(const OpInfo &op_info)
   }
 }
 
-std::optional<hobject_t> PG::resolve_oid(
-  const SnapSet &ss,
-  const hobject_t &oid)
+void PG::check_blocklisted_obc_watchers(
+  ObjectContextRef &obc)
 {
-  if (oid.snap > ss.seq) {
-    return oid.get_head();
-  } else {
-    // which clone would it be?
-    auto clone = std::upper_bound(
-      begin(ss.clones), end(ss.clones),
-      oid.snap);
-    if (clone == end(ss.clones)) {
-      // Doesn't exist, > last clone, < ss.seq
-      return std::nullopt;
-    }
-    auto citer = ss.clone_snaps.find(*clone);
-    // TODO: how do we want to handle this kind of logic error?
-    ceph_assert(citer != ss.clone_snaps.end());
-
-    if (std::find(
-         citer->second.begin(),
-         citer->second.end(),
-         *clone) == citer->second.end()) {
-      return std::nullopt;
-    } else {
-      auto soid = oid;
-      soid.snap = *clone;
-      return std::optional<hobject_t>(soid);
+  if (obc->watchers.empty()) {
+    for (auto &[src, winfo] : obc->obs.oi.watchers) {
+      auto watch = crimson::osd::Watch::create(
+        obc, winfo, src.second, this);
+      watch->disconnect();
+      auto [it, emplaced] = obc->watchers.emplace(src, std::move(watch));
+      assert(emplaced);
+      logger().debug("added watch for obj {}, client {}",
+        obc->get_oid(), src.second);
     }
   }
 }
 
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func)
-{
-  logger().debug("{} {}", __func__, obc->get_oid());
-  assert(obc->is_head());
-  obc->append_to(obc_set_accessing);
-  return obc->with_lock<State, IOInterruptCondition>(
-    [existed=existed, obc=obc, func=std::move(func), this] {
-    auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
-    if (existed) {
-      logger().debug("with_head_obc: found {} in cache", obc->get_oid());
-    } else {
-      logger().debug("with_head_obc: cache miss on {}", obc->get_oid());
-      loaded = obc->with_promoted_lock<State, IOInterruptCondition>([this, obc] {
-        return load_head_obc(obc);
-      });
-    }
-    return loaded.safe_then_interruptible([func = std::move(func)](auto obc) {
-      return std::move(func)(std::move(obc));
-    });
-  }).finally([this, pgref=boost::intrusive_ptr<PG>{this}, obc=std::move(obc)] {
-    logger().debug("with_head_obc: released {}", obc->get_oid());
-    obc->remove_from(obc_set_accessing);
-  });
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
-{
-  auto [obc, existed] =
-    shard_services.obc_registry.get_cached_obc(std::move(oid));
-  return with_head_obc<State>(std::move(obc), existed, std::move(func));
-}
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func)
-{
-  constexpr bool existed = true;
-  return with_head_obc<State>(
-    std::move(obc), existed, std::move(func)
-  ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"});
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
-{
-  assert(!oid.is_head());
-  return with_head_obc<RWState::RWREAD>(oid.get_head(),
-    [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
-    auto coid = resolve_oid(head->get_ro_ss(), oid);
-    if (!coid) {
-      // TODO: return crimson::ct_error::enoent::make();
-      logger().error("with_clone_obc: {} clone not found", coid);
-      return load_obc_ertr::make_ready_future<>();
-    }
-    auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
-    return clone->template with_lock<State>(
-      [coid=*coid, existed=existed,
-       head=std::move(head), clone=std::move(clone),
-       func=std::move(func), this]() -> load_obc_iertr::future<> {
-      auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(clone);
-      if (existed) {
-        logger().debug("with_clone_obc: found {} in cache", coid);
-      } else {
-        logger().debug("with_clone_obc: cache miss on {}", coid);
-        loaded = clone->template with_promoted_lock<State>(
-          [coid, clone, head, this] {
-          return backend->load_metadata(coid).safe_then_interruptible(
-            [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
-            clone->set_clone_state(std::move(md->os), std::move(head));
-            return clone;
-          });
-        });
-      }
-      return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
-        return std::move(func)(std::move(clone));
-      });
-    });
-  });
-}
-
-// explicitly instantiate the used instantiations
-template PG::load_obc_iertr::future<>
-PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func)
-{
-  assert(clone);
-  assert(clone->get_head_obc());
-  assert(!clone->get_oid().is_head());
-  return with_existing_head_obc<RWState::RWREAD>(clone->get_head_obc(),
-    [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) {
-    assert(head == clone->get_head_obc());
-    return clone->template with_lock<State>(
-      [clone=std::move(clone), func=std::move(func)] {
-      return std::move(func)(std::move(clone));
-    });
-  });
-}
-
-PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
-PG::load_head_obc(ObjectContextRef obc)
-{
-  return backend->load_metadata(obc->get_oid()).safe_then_interruptible(
-    [obc=std::move(obc)](auto md)
-    -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
-    const hobject_t& oid = md->os.oi.soid;
-    logger().debug(
-      "load_head_obc: loaded obs {} for {}", md->os.oi, oid);
-    if (!md->ss) {
-      logger().error(
-        "load_head_obc: oid {} missing snapset", oid);
-      return crimson::ct_error::object_corrupted::make();
-    }
-    obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
-    logger().debug(
-      "load_head_obc: returning obc {} for {}",
-      obc->obs.oi, obc->obs.oi.soid);
-    return load_obc_ertr::make_ready_future<
-      crimson::osd::ObjectContextRef>(obc);
-  });
-}
-
-PG::load_obc_iertr::future<>
-PG::reload_obc(crimson::osd::ObjectContext& obc) const
-{
-  assert(obc.is_head());
-  return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
-    -> load_obc_ertr::future<> {
-    logger().debug(
-      "{}: reloaded obs {} for {}",
-      __func__,
-      md->os.oi,
-      obc.get_oid());
-    if (!md->ss) {
-      logger().error(
-        "{}: oid {} missing snapset",
-        __func__,
-        obc.get_oid());
-      return crimson::ct_error::object_corrupted::make();
-    }
-    obc.set_head_state(std::move(md->os), std::move(*(md->ss)));
-    return load_obc_ertr::now();
-  });
-}
-
 PG::load_obc_iertr::future<>
 PG::with_locked_obc(const hobject_t &hobj,
                     const OpInfo &op_info,
@@ -1037,50 +1188,22 @@ PG::with_locked_obc(const hobject_t &hobj,
     throw crimson::common::system_shutdown_exception();
   }
   const hobject_t oid = get_oid(hobj);
+  auto wrapper = [f=std::move(f), this](auto obc) {
+    check_blocklisted_obc_watchers(obc);
+    return f(obc);
+  };
   switch (get_lock_type(op_info)) {
   case RWState::RWREAD:
-    if (oid.is_head()) {
-      return with_head_obc<RWState::RWREAD>(oid, std::move(f));
-    } else {
-      return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
-    }
+      return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper));
   case RWState::RWWRITE:
-    if (oid.is_head()) {
-      return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
-    } else {
-      return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
-    }
+      return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper));
   case RWState::RWEXCL:
-    if (oid.is_head()) {
-      return with_head_obc<RWState::RWEXCL>(oid, std::move(f));
-    } else {
-      return with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
-    }
+      return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper));
   default:
     ceph_abort();
   };
 }
 
-template <RWState::State State>
-PG::interruptible_future<>
-PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f)
-{
-  // TODO: a question from rebase: do we really need such checks when
-  // the interruptible stuff is being used?
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-  if (obc->is_head()) {
-    return with_existing_head_obc<State>(obc, std::move(f));
-  } else {
-    return with_existing_clone_obc<State>(obc, std::move(f));
-  }
-}
-
-// explicitly instantiate the used instantiations
-template PG::interruptible_future<>
-PG::with_locked_obc<RWState::RWEXCL>(ObjectContextRef, with_obc_func_t&&);
-
 PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
 {
   if (__builtin_expect(stopping, false)) {
@@ -1088,6 +1211,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
        crimson::common::system_shutdown_exception());
   }
 
+  logger().debug("{}: {}", __func__, *req);
   if (can_discard_replica_op(*req)) {
     return seastar::now();
   }
@@ -1098,8 +1222,13 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
   auto p = req->logbl.cbegin();
   std::vector<pg_log_entry_t> log_entries;
   decode(log_entries, p);
-  peering_state.append_log(std::move(log_entries), req->pg_trim_to,
-      req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
+  log_operation(std::move(log_entries),
+                req->pg_trim_to,
+                req->version,
+                req->min_last_complete_ondisk,
+                !txn.empty(),
+                txn,
+                false);
   logger().debug("PG::handle_rep_op: do_transaction...");
   return interruptor::make_interruptible(shard_services.get_store().do_transaction(
        coll_ref, std::move(txn))).then_interruptible(
@@ -1114,14 +1243,142 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
     });
 }
 
-void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
-                            const MOSDRepOpReply& m)
+void PG::log_operation(
+  std::vector<pg_log_entry_t>&& logv,
+  const eversion_t &trim_to,
+  const eversion_t &roll_forward_to,
+  const eversion_t &min_last_complete_ondisk,
+  bool transaction_applied,
+  ObjectStore::Transaction &txn,
+  bool async) {
+  logger().debug("{}", __func__);
+  if (is_primary()) {
+    ceph_assert(trim_to <= peering_state.get_last_update_ondisk());
+  }
+  /* TODO: when we add snap mapper and projected log support,
+   * we'll likely want to update them here.
+   *
+   * See src/osd/PrimaryLogPG.h:log_operation for how classic
+   * handles these cases.
+   */
+#if 0
+  if (transaction_applied) {
+    //TODO:
+    //update_snap_map(logv, t);
+  }
+  auto last = logv.rbegin();
+  if (is_primary() && last != logv.rend()) {
+    projected_log.skip_can_rollback_to_to_head();
+    projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+  }
+#endif
+  if (!is_primary()) { // && !is_ec_pg()
+    replica_clear_repop_obc(logv);
+  }
+  peering_state.append_log(std::move(logv),
+                           trim_to,
+                           roll_forward_to,
+                           min_last_complete_ondisk,
+                           txn,
+                           !txn.empty(),
+                           false);
+}
+
+void PG::replica_clear_repop_obc(
+  const std::vector<pg_log_entry_t> &logv) {
+    logger().debug("{} clearing {} entries", __func__, logv.size());
+    for (auto &&e: logv) {
+      logger().debug(" {} get_object_boundary(from): {} "
+                     " head version(to): {}",
+                     e.soid,
+                     e.soid.get_object_boundary(),
+                     e.soid.get_head());
+    /* Have to blast all clones, they share a snapset */
+    obc_registry.clear_range(
+      e.soid.get_object_boundary(), e.soid.get_head());
+  }
+}
+
+void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
 {
   if (!can_discard_replica_op(m)) {
     backend->got_rep_op_reply(m);
   }
 }
 
+PG::interruptible_future<> PG::do_update_log_missing(
+  Ref<MOSDPGUpdateLogMissing> m,
+  crimson::net::ConnectionRef conn)
+{
+  if (__builtin_expect(stopping, false)) {
+    return seastar::make_exception_future<>(
+      crimson::common::system_shutdown_exception());
+  }
+
+  ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+  ObjectStore::Transaction t;
+  std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+  if (m->pg_trim_to != eversion_t())
+    op_trim_to = m->pg_trim_to;
+  if (m->pg_roll_forward_to != eversion_t())
+    op_roll_forward_to = m->pg_roll_forward_to;
+  logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+    op_trim_to, op_roll_forward_to);
+
+  peering_state.append_log_entries_update_missing(
+    m->entries, t, op_trim_to, op_roll_forward_to);
+
+  return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+    coll_ref, std::move(t))).then_interruptible(
+    [m, conn, lcod=peering_state.get_info().last_complete, this] {
+    if (!peering_state.pg_has_reset_since(m->get_epoch())) {
+      peering_state.update_last_complete_ondisk(lcod);
+      auto reply =
+        crimson::make_message<MOSDPGUpdateLogMissingReply>(
+          spg_t(peering_state.get_info().pgid.pgid, get_primary().shard),
+          pg_whoami.shard,
+          m->get_epoch(),
+          m->min_epoch,
+          m->get_tid(),
+          lcod);
+      reply->set_priority(CEPH_MSG_PRIO_HIGH);
+      return conn->send(std::move(reply));
+    }
+    return seastar::now();
+  });
+}
+
+
+PG::interruptible_future<> PG::do_update_log_missing_reply(
+  Ref<MOSDPGUpdateLogMissingReply> m)
+{
+  logger().debug("{}: got reply from {}", __func__, m->get_from());
+
+  auto it = log_entry_update_waiting_on.find(m->get_tid());
+  if (it != log_entry_update_waiting_on.end()) {
+    if (it->second.waiting_on.count(m->get_from())) {
+      it->second.waiting_on.erase(m->get_from());
+      if (m->last_complete_ondisk != eversion_t()) {
+        peering_state.update_peer_last_complete_ondisk(
+         m->get_from(), m->last_complete_ondisk);
+      }
+    } else {
+      logger().error("{} : {} got reply {} from shard we are not waiting for ",
+        __func__, peering_state.get_info().pgid, *m, m->get_from());
+    }
+
+    if (it->second.waiting_on.empty()) {
+      it->second.all_committed.set_value();
+      it->second.all_committed = {};
+      log_entry_update_waiting_on.erase(it);
+    }
+  } else {
+    logger().error("{} : {} got reply {} on unknown tid {}",
+      __func__, peering_state.get_info().pgid, *m, m->get_tid());
+  }
+  return seastar::now();
+}
+
 bool PG::old_peering_msg(
   const epoch_t reply_epoch,
   const epoch_t query_epoch) const
@@ -1181,15 +1438,53 @@ seastar::future<> PG::stop()
 }
 
 void PG::on_change(ceph::os::Transaction &t) {
-  logger().debug("{}, {}", __func__, *this);
-  for (auto& obc : obc_set_accessing) {
-    obc.interrupt(::crimson::common::actingset_changed(is_primary()));
-  }
+  logger().debug("{} {}:", *this, __func__);
+  context_registry_on_change();
+  obc_loader.notify_on_change(is_primary());
   recovery_backend->on_peering_interval_change(t);
-  backend->on_actingset_changed({ is_primary() });
+  backend->on_actingset_changed(is_primary());
+  wait_for_active_blocker.unblock();
+  if (is_primary()) {
+    logger().debug("{} {}: requeueing", *this, __func__);
+    client_request_orderer.requeue(shard_services, this);
+  } else {
+    logger().debug("{} {}: dropping requests", *this, __func__);
+    client_request_orderer.clear_and_cancel();
+  }
+}
+
+void PG::context_registry_on_change() {
+    obc_registry.for_each([](ObjectContextRef obc) {
+      assert(obc);
+      for (auto j = obc->watchers.begin();
+           j != obc->watchers.end();
+           j = obc->watchers.erase(j)) {
+        j->second->discard_state();
+      }
+  });
 }
 
 bool PG::can_discard_op(const MOSDOp& m) const {
+  if (m.get_map_epoch() <
+      peering_state.get_info().history.same_primary_since) {
+    logger().debug("{} changed after {} dropping {} ",
+                   __func__ , m.get_map_epoch(), m);
+    return true;
+  }
+
+  if ((m.get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
+                        CEPH_OSD_FLAG_LOCALIZE_READS))
+    && !is_primary()
+    && (m.get_map_epoch() <
+        peering_state.get_info().history.same_interval_since))
+    {
+      // Note: the Objecter will resend on interval change without the primary
+      // changing if it actually sent to a replica.  If the primary hasn't
+      // changed since the send epoch, we got it, and we're primary, it won't
+      // have resent even if the interval did change as it sent it to the primary
+      // (us).
+      return true;
+    }
   return __builtin_expect(m.get_map_epoch()
       < peering_state.get_info().history.same_primary_since, false);
 }
@@ -1223,7 +1518,7 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
   return false;
 }
 
-PG::interruptible_future<std::tuple<bool, int>>
+PG::interruptible_future<std::optional<PG::complete_op_t>>
 PG::already_complete(const osd_reqid_t& reqid)
 {
   eversion_t version;
@@ -1233,11 +1528,15 @@ PG::already_complete(const osd_reqid_t& reqid)
 
   if (peering_state.get_pg_log().get_log().get_request(
        reqid, &version, &user_version, &ret, &op_returns)) {
-    return backend->request_committed(reqid, version).then([ret] {
-      return seastar::make_ready_future<std::tuple<bool, int>>(true, ret);
+    complete_op_t dupinfo{
+      user_version,
+      version,
+      ret};
+    return backend->request_committed(reqid, version).then([dupinfo] {
+      return seastar::make_ready_future<std::optional<complete_op_t>>(dupinfo);
     });
   } else {
-    return seastar::make_ready_future<std::tuple<bool, int>>(false, 0);
+    return seastar::make_ready_future<std::optional<complete_op_t>>(std::nullopt);
   }
 }