]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/pg.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / pg.cc
index c65ad15ae433f3ebb00ad16b78769f47ffb3f74f..0f01c160783d280851bd25ee6bb69be22486345e 100644 (file)
 
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
-#include "messages/MOSDPGInfo.h"
-#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGNotify.h"
-#include "messages/MOSDPGQuery.h"
 #include "messages/MOSDRepOp.h"
 #include "messages/MOSDRepOpReply.h"
 
@@ -27,6 +23,7 @@
 
 #include "os/Transaction.h"
 
+#include "crimson/common/exception.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/os/cyanstore/cyan_store.h"
 #include "crimson/osd/pg_meta.h"
 #include "crimson/osd/pg_backend.h"
 #include "crimson/osd/ops_executer.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/replicated_recovery_backend.h"
 
 namespace {
   seastar::logger& logger() {
@@ -97,12 +97,16 @@ PG::PG(
        coll_ref,
        shard_services,
        profile)),
+    recovery_backend(
+      std::make_unique<ReplicatedRecoveryBackend>(
+       *this, shard_services, coll_ref, backend.get())),
+    recovery_handler(
+      std::make_unique<PGRecovery>(this)),
     peering_state(
       shard_services.get_cct(),
       pg_shard,
       pgid,
       PGPool(
-       shard_services.get_cct(),
        osdmap,
        pgid.pool(),
        pool,
@@ -142,7 +146,7 @@ void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
   // handle the peering event in the background
   check_readable_timer.cancel();
   check_readable_timer.set_callback([last_peering_reset, this] {
-    shard_services.start_operation<LocalPeeringEvent>(
+    (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
       shard_services,
       pg_whoami,
@@ -230,7 +234,9 @@ void PG::on_activate_complete()
   wait_for_active_blocker.on_active();
 
   if (peering_state.needs_recovery()) {
-    shard_services.start_operation<LocalPeeringEvent>(
+    logger().info("{}: requesting recovery",
+                  __func__);
+    (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
       shard_services,
       pg_whoami,
@@ -239,7 +245,9 @@ void PG::on_activate_complete()
       get_osdmap_epoch(),
       PeeringState::DoRecovery{});
   } else if (peering_state.needs_backfill()) {
-    shard_services.start_operation<LocalPeeringEvent>(
+    logger().info("{}: requesting backfill",
+                  __func__);
+    (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
       shard_services,
       pg_whoami,
@@ -248,7 +256,9 @@ void PG::on_activate_complete()
       get_osdmap_epoch(),
       PeeringState::RequestBackfill{});
   } else {
-    shard_services.start_operation<LocalPeeringEvent>(
+    logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
+                  " for pg: {}", __func__, pgid);
+    (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
       shard_services,
       pg_whoami,
@@ -257,6 +267,7 @@ void PG::on_activate_complete()
       get_osdmap_epoch(),
       PeeringState::AllReplicasRecovered{});
   }
+  backend->on_activate_complete();
 }
 
 void PG::prepare_write(pg_info_t &info,
@@ -297,11 +308,29 @@ void PG::prepare_write(pg_info_t &info,
   }
 }
 
-ghobject_t PG::do_delete_work(ceph::os::Transaction &t,
-  ghobject_t _next)
+std::pair<ghobject_t, bool>
+PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
 {
   // TODO
   shard_services.dec_pg_num();
+  return {_next, false};
+}
+
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
+{
+  // TODO: should update the stats upon finishing the scrub
+  peering_state.update_stats([scrub_level, this](auto& history, auto& stats) {
+    const utime_t now = ceph_clock_now();
+    history.last_scrub = peering_state.get_info().last_update;
+    history.last_scrub_stamp = now;
+    history.last_clean_scrub_stamp = now;
+    if (scrub_level == scrub_level_t::deep) {
+      history.last_deep_scrub = history.last_scrub;
+      history.last_deep_scrub_stamp = now;
+    }
+    // yes, please publish the stats
+    return true;
+  });
 }
 
 void PG::log_state_enter(const char *state) {
@@ -334,7 +363,7 @@ void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
   // handle the peering event in the background
   renew_lease_timer.cancel();
   renew_lease_timer.set_callback([last_peering_reset, this] {
-    shard_services.start_operation<LocalPeeringEvent>(
+    (void) shard_services.start_operation<LocalPeeringEvent>(
       this,
       shard_services,
       pg_whoami,
@@ -364,8 +393,15 @@ void PG::init(
 
 seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
 {
-  return PGMeta{store, pgid}.load(
-  ).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
+  if (__builtin_expect(stopping, false)) {
+    return seastar::make_exception_future<>(
+       crimson::common::system_shutdown_exception());
+  }
+
+  return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
+    return pg_meta.load();
+  }).then([this, store](auto&& ret) {
+    auto [pg_info, past_intervals] = std::move(ret);
     return peering_state.init_from_disk_state(
        std::move(pg_info),
        std::move(past_intervals),
@@ -390,7 +426,7 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
     peering_state.set_role(rr);
 
     epoch_t epoch = get_osdmap_epoch();
-    shard_services.start_operation<LocalPeeringEvent>(
+    (void) shard_services.start_operation<LocalPeeringEvent>(
        this,
        shard_services,
        pg_whoami,
@@ -417,7 +453,7 @@ void PG::do_peering_event(
   PGPeeringEvent& evt, PeeringCtx &rctx)
 {
   if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
-    logger().debug("{} handling {}", __func__, evt.get_desc());
+    logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
     do_peering_event(evt.get_event(), rctx);
   } else {
     logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
@@ -461,6 +497,19 @@ void PG::print(ostream& out) const
   out << peering_state << " ";
 }
 
+void PG::dump_primary(Formatter* f)
+{
+  peering_state.dump_peering_state(f);
+
+  f->open_array_section("recovery_state");
+  PeeringState::QueryState q(f);
+  peering_state.handle_event(q, 0);
+  f->close_section();
+
+  // TODO: snap_trimq
+  // TODO: scrubber state
+  // TODO: agent state
+}
 
 std::ostream& operator<<(std::ostream& os, const PG& pg)
 {
@@ -489,36 +538,143 @@ blocking_future<> PG::WaitForActiveBlocker::wait()
   }
 }
 
-seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
+seastar::future<> PG::WaitForActiveBlocker::stop()
+{
+  p.set_exception(crimson::common::system_shutdown_exception());
+  return seastar::now();
+}
+
+seastar::future<> PG::submit_transaction(const OpInfo& op_info,
+                                        const std::vector<OSDOp>& ops,
+                                        ObjectContextRef&& obc,
                                         ceph::os::Transaction&& txn,
-                                        const MOSDOp& req)
+                                        const osd_op_params_t& osd_op_p)
 {
+  if (__builtin_expect(stopping, false)) {
+    return seastar::make_exception_future<>(
+       crimson::common::system_shutdown_exception());
+  }
+
   epoch_t map_epoch = get_osdmap_epoch();
-  eversion_t at_version{map_epoch, projected_last_update.version + 1};
+
+  if (__builtin_expect(osd_op_p.at_version.epoch != map_epoch, false)) {
+    throw crimson::common::actingset_changed(is_primary());
+  }
+
+  std::vector<pg_log_entry_t> log_entries;
+  log_entries.emplace_back(obc->obs.exists ?
+                     pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+                   obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
+                   osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
+                   osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(),
+                    op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+  // TODO: refactor the submit_transaction
+  if (op_info.allows_returnvec()) {
+    // also the per-op values are recorded in the pg log
+    log_entries.back().set_op_returns(ops);
+    logger().debug("{} op_returns: {}",
+                   __func__, log_entries.back().op_returns);
+  }
+  log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
+  peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
+  peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
+                                               txn, true, false);
+
   return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
                                std::move(obc),
                                std::move(txn),
-                               req,
+                               std::move(osd_op_p),
                                peering_state.get_last_peering_reset(),
                                map_epoch,
-                               at_version).then([this](auto acked) {
+                               std::move(log_entries)).then(
+    [this, last_complete=peering_state.get_info().last_complete,
+      at_version=osd_op_p.at_version](auto acked) {
     for (const auto& peer : acked) {
       peering_state.update_peer_last_complete_ondisk(
         peer.shard, peer.last_complete_ondisk);
     }
+    peering_state.complete_write(at_version, last_complete);
     return seastar::now();
   });
 }
 
+osd_op_params_t&& PG::fill_op_params_bump_pg_version(
+  osd_op_params_t&& osd_op_p,
+  Ref<MOSDOp> m,
+  const bool user_modify)
+{
+  osd_op_p.req = std::move(m);
+  osd_op_p.at_version = next_version();
+  osd_op_p.pg_trim_to = get_pg_trim_to();
+  osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
+  osd_op_p.last_complete = get_info().last_complete;
+  if (user_modify) {
+    osd_op_p.user_at_version = osd_op_p.at_version.version;
+  }
+  return std::move(osd_op_p);
+}
+
+seastar::future<Ref<MOSDOpReply>> PG::handle_failed_op(
+  const std::error_code& e,
+  ObjectContextRef obc,
+  const OpsExecuter& ox,
+  const MOSDOp& m) const
+{
+  // Oops, an operation had failed. do_osd_ops() altogether with
+  // OpsExecuter already dropped the ObjectStore::Transaction if
+  // there was any. However, this is not enough to completely
+  // rollback as we gave OpsExecuter the very single copy of `obc`
+  // we maintain and we did it for both reading and writing.
+  // Now all modifications must be reverted.
+  //
+  // Let's just reload from the store. Evicting from the shared
+  // LRU would be tricky as next MOSDOp (the one at `get_obc`
+  // phase) could actually already finished the lookup. Fortunately,
+  // this is supposed to live on cold  paths, so performance is not
+  // a concern -- simplicity wins.
+  //
+  // The conditional's purpose is to efficiently handle hot errors
+  // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
+  // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
+  // typically append them before any write. If OpsExecuter hasn't
+  // seen any modifying operation, `obc` is supposed to be kept
+  // unchanged.
+  assert(e.value() > 0);
+  const bool need_reload_obc = ox.has_seen_write();
+  logger().debug(
+    "{}: {} - object {} got error code {}, {}; need_reload_obc {}",
+    __func__,
+    m,
+    obc->obs.oi.soid,
+    e.value(),
+    e.message(),
+    need_reload_obc);
+  return (need_reload_obc ? reload_obc(*obc)
+                          : load_obc_ertr::now()
+  ).safe_then([&e, &m, obc = std::move(obc), this] {
+    auto reply = make_message<MOSDOpReply>(
+      &m, -e.value(), get_osdmap_epoch(), 0, false);
+    reply->set_enoent_reply_versions(
+      peering_state.get_info().last_update,
+      peering_state.get_info().last_user_version);
+    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+  }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
+}
+
 seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
   Ref<MOSDOp> m,
-  ObjectContextRef obc)
+  ObjectContextRef obc,
+  const OpInfo &op_info)
 {
+  if (__builtin_expect(stopping, false)) {
+    throw crimson::common::system_shutdown_exception();
+  }
+
   using osd_op_errorator = OpsExecuter::osd_op_errorator;
   const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
                                                    : m->get_hobj();
-  auto ox =
-    std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
+  auto ox = std::make_unique<OpsExecuter>(
+    obc, op_info, get_pool().info, get_backend(), *m);
   return crimson::do_for_each(
     m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
     logger().debug(
@@ -526,73 +682,86 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
       *m,
       obc->obs.oi.soid,
       ceph_osd_op_name(osd_op.op.op));
-    return ox->execute_osd_op(osd_op);
-  }).safe_then([this, obc, m, ox = ox.get()] {
+    return ox->execute_op(osd_op);
+  }).safe_then([this, obc, m, ox = ox.get(), &op_info] {
     logger().debug(
       "do_osd_ops: {} - object {} all operations successful",
       *m,
       obc->obs.oi.soid);
-    return std::move(*ox).submit_changes(
-      [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
-        // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
-        if (txn.empty()) {
-         logger().debug(
-           "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
-           *m,
-           obc->obs.oi.soid);
-          return osd_op_errorator::now();
-        } else {
-         logger().debug(
-           "do_osd_ops: {} - object {} submitting txn",
-           *m,
-           obc->obs.oi.soid);
-          return submit_transaction(std::move(obc), std::move(txn), *m);
-        }
+    return std::move(*ox).flush_changes(
+      [m] (auto&& obc) -> osd_op_errorator::future<> {
+       logger().debug(
+         "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
+         *m,
+         obc->obs.oi.soid);
+        return osd_op_errorator::now();
+      },
+      [this, m, &op_info] (auto&& txn,
+                          auto&& obc,
+                          auto&& osd_op_p,
+                           bool user_modify) -> osd_op_errorator::future<> {
+       logger().debug(
+         "do_osd_ops: {} - object {} submitting txn",
+         *m,
+         obc->obs.oi.soid);
+        auto filled_osd_op_p = fill_op_params_bump_pg_version(
+          std::move(osd_op_p),
+          std::move(m),
+          user_modify);
+       return submit_transaction(
+          op_info,
+          filled_osd_op_p.req->ops,
+          std::move(obc),
+          std::move(txn),
+          std::move(filled_osd_op_p));
       });
-  }).safe_then([m, obc, this, ox_deleter = std::move(ox)] {
-    auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
-                                           0, false);
+  }).safe_then([this,
+                m,
+                obc,
+                rvec = op_info.allows_returnvec()] {
+    // TODO: should stop at the first op which returns a negative retval,
+    //       cmpext uses it for returning the index of first unmatched byte
+    int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+    if (result > 0 && !rvec) {
+      result = 0;
+    }
+    auto reply = make_message<MOSDOpReply>(m.get(),
+                                           result,
+                                           get_osdmap_epoch(),
+                                           0,
+                                           false);
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
     logger().debug(
       "do_osd_ops: {} - object {} sending reply",
       *m,
       obc->obs.oi.soid);
     return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-  }, OpsExecuter::osd_op_errorator::all_same_way([=] (const std::error_code& e) {
-    assert(e.value() > 0);
-    logger().debug(
-      "do_osd_ops: {} - object {} got error code {}, {}",
-      *m,
-      obc->obs.oi.soid,
-      e.value(),
-      e.message());
-    auto reply = make_message<MOSDOpReply>(
-      m.get(), -e.value(), get_osdmap_epoch(), 0, false);
-    reply->set_enoent_reply_versions(peering_state.get_info().last_update,
-                                    peering_state.get_info().last_user_version);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-  })).handle_exception_type([=,&oid](const crimson::osd::error& e) {
+  }, osd_op_errorator::all_same_way([ox = ox.get(),
+                                     m,
+                                     obc,
+                                     this] (const std::error_code& e) {
+    return handle_failed_op(e, std::move(obc), *ox, *m);
+  })).handle_exception_type([ox_deleter = std::move(ox),
+                             m,
+                             obc,
+                             this] (const crimson::osd::error& e) {
     // we need this handler because throwing path which aren't errorated yet.
-    logger().debug(
-      "do_osd_ops: {} - object {} got unhandled exception {} ({})",
-      *m,
-      obc->obs.oi.soid,
-      e.code(),
-      e.what());
-    auto reply = make_message<MOSDOpReply>(
-      m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
-    reply->set_enoent_reply_versions(peering_state.get_info().last_update,
-                                    peering_state.get_info().last_user_version);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    logger().debug("encountered the legacy error handling path!");
+    return handle_failed_op(e.code(), std::move(obc), *ox_deleter, *m);
   });
 }
 
 seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
 {
-  auto ox = std::make_unique<OpsExecuter>(*this/* as const& */, m);
+  if (__builtin_expect(stopping, false)) {
+    throw crimson::common::system_shutdown_exception();
+  }
+
+  auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
+                                            std::as_const(*m));
   return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
     logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
-    return ox->execute_pg_op(osd_op);
+    return ox->execute_op(osd_op);
   }).then([m, this, ox = std::move(ox)] {
     auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
                                            CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
@@ -607,23 +776,24 @@ seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
   });
 }
 
-std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
-  const MOSDOp &m,
-  const OpInfo &op_info)
+hobject_t PG::get_oid(const MOSDOp &m)
+{
+  return (m.get_snapid() == CEPH_SNAPDIR ?
+          m.get_hobj().get_head() :
+          m.get_hobj());
+}
+
+RWState::State PG::get_lock_type(const OpInfo &op_info)
 {
-  auto oid = m.get_snapid() == CEPH_SNAPDIR ?
-    m.get_hobj().get_head() : m.get_hobj();
 
-  RWState::State lock_type = RWState::RWNONE;
   if (op_info.rwordered() && op_info.may_read()) {
-    lock_type = RWState::RWState::RWEXCL;
+    return RWState::RWEXCL;
   } else if (op_info.rwordered()) {
-    lock_type = RWState::RWState::RWWRITE;
+    return RWState::RWWRITE;
   } else {
     ceph_assert(op_info.may_read());
-    lock_type = RWState::RWState::RWREAD;
+    return RWState::RWREAD;
   }
-  return std::make_pair(oid, lock_type);
 }
 
 std::optional<hobject_t> PG::resolve_oid(
@@ -658,139 +828,170 @@ std::optional<hobject_t> PG::resolve_oid(
   }
 }
 
-PG::load_obc_ertr::future<
-  std::pair<crimson::osd::ObjectContextRef, bool>>
-PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
+template<RWState::State State>
+PG::load_obc_ertr::future<>
+PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
 {
-  ceph_assert(!oid.is_head());
-  using ObjectContextRef = crimson::osd::ObjectContextRef;
-  auto coid = resolve_oid(head->get_ro_ss(), oid);
-  if (!coid) {
-    return load_obc_ertr::make_ready_future<
-      std::pair<crimson::osd::ObjectContextRef, bool>>(
-       std::make_pair(ObjectContextRef(), true)
-      );
-  }
-  auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
-  if (existed) {
-    return load_obc_ertr::make_ready_future<
-      std::pair<crimson::osd::ObjectContextRef, bool>>(
-       std::make_pair(obc, true)
-      );
-  } else {
-    bool got = obc->maybe_get_excl();
-    ceph_assert(got);
-    return backend->load_metadata(*coid).safe_then(
-      [oid, obc=std::move(obc), head](auto &&md) mutable {
-       obc->set_clone_state(std::move(md->os), std::move(head));
-       return load_obc_ertr::make_ready_future<
-         std::pair<crimson::osd::ObjectContextRef, bool>>(
-           std::make_pair(obc, false)
-         );
+  assert(oid.is_head());
+  auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
+  return obc->with_lock<State>(
+    [oid=std::move(oid), existed=existed, obc=std::move(obc),
+     func=std::move(func), this] {
+    auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+    if (existed) {
+      logger().debug("with_head_obc: found {} in cache", oid);
+    } else {
+      logger().debug("with_head_obc: cache miss on {}", oid);
+      loaded = obc->with_promoted_lock<State>([this, obc] {
+        return load_head_obc(obc);
       });
-  }
+    }
+    return loaded.safe_then([func=std::move(func)](auto obc) {
+      return func(std::move(obc));
+    });
+  });
 }
 
-PG::load_obc_ertr::future<
-  std::pair<crimson::osd::ObjectContextRef, bool>>
-PG::get_or_load_head_obc(hobject_t oid)
+template<RWState::State State>
+PG::load_obc_ertr::future<>
+PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
 {
-  ceph_assert(oid.is_head());
-  auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
-  if (existed) {
+  assert(!oid.is_head());
+  return with_head_obc<RWState::RWREAD>(oid.get_head(),
+    [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> {
+    auto coid = resolve_oid(head->get_ro_ss(), oid);
+    if (!coid) {
+      // TODO: return crimson::ct_error::enoent::make();
+      logger().error("with_clone_obc: {} clone not found", coid);
+      return load_obc_ertr::make_ready_future<>();
+    }
+    auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
+    return clone->template with_lock<State>(
+      [coid=*coid, existed=existed,
+       head=std::move(head), clone=std::move(clone),
+       func=std::move(func), this]() -> load_obc_ertr::future<> {
+      auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(clone);
+      if (existed) {
+        logger().debug("with_clone_obc: found {} in cache", coid);
+      } else {
+        logger().debug("with_clone_obc: cache miss on {}", coid);
+        loaded = clone->template with_promoted_lock<State>(
+          [coid, clone, head, this] {
+          return backend->load_metadata(coid).safe_then(
+            [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
+            clone->set_clone_state(std::move(md->os), std::move(head));
+            return clone;
+          });
+        });
+      }
+      return loaded.safe_then([func=std::move(func)](auto clone) {
+        return func(std::move(clone));
+      });
+    });
+  });
+}
+
+// explicitly instantiate the used instantiations
+template PG::load_obc_ertr::future<>
+PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
+
+PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+PG::load_head_obc(ObjectContextRef obc)
+{
+  hobject_t oid = obc->get_oid();
+  return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md)
+    -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
+    const hobject_t& oid = md->os.oi.soid;
     logger().debug(
-      "{}: found {} in cache",
-      __func__,
-      oid);
+      "load_head_obc: loaded obs {} for {}", md->os.oi, oid);
+    if (!md->ss) {
+      logger().error(
+        "load_head_obc: oid {} missing snapset", oid);
+      return crimson::ct_error::object_corrupted::make();
+    }
+    obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
+    logger().debug(
+      "load_head_obc: returning obc {} for {}",
+      obc->obs.oi, obc->obs.oi.soid);
     return load_obc_ertr::make_ready_future<
-      std::pair<crimson::osd::ObjectContextRef, bool>>(
-       std::make_pair(std::move(obc), true)
-      );
-  } else {
+      crimson::osd::ObjectContextRef>(obc);
+  });
+}
+
+PG::load_obc_ertr::future<>
+PG::reload_obc(crimson::osd::ObjectContext& obc) const
+{
+  assert(obc.is_head());
+  return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md)
+    -> load_obc_ertr::future<> {
     logger().debug(
-      "{}: cache miss on {}",
+      "{}: reloaded obs {} for {}",
       __func__,
-      oid);
-    bool got = obc->maybe_get_excl();
-    ceph_assert(got);
-    return backend->load_metadata(oid).safe_then(
-      [oid, obc=std::move(obc)](auto md) ->
-        load_obc_ertr::future<
-          std::pair<crimson::osd::ObjectContextRef, bool>>
-      {
-       logger().debug(
-         "{}: loaded obs {} for {}",
-         __func__,
-         md->os.oi,
-         oid);
-       if (!md->ss) {
-         logger().error(
-           "{}: oid {} missing snapset",
-           __func__,
-           oid);
-         return crimson::ct_error::object_corrupted::make();
-       }
-       obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
-         logger().debug(
-           "{}: returning obc {} for {}",
-           __func__,
-           obc->obs.oi,
-           obc->obs.oi.soid);
-         return load_obc_ertr::make_ready_future<
-           std::pair<crimson::osd::ObjectContextRef, bool>>(
-             std::make_pair(obc, false)
-           );
-      });
-  }
+      md->os.oi,
+      obc.get_oid());
+    if (!md->ss) {
+      logger().error(
+        "{}: oid {} missing snapset",
+        __func__,
+        obc.get_oid());
+      return crimson::ct_error::object_corrupted::make();
+    }
+    obc.set_head_state(std::move(md->os), std::move(*(md->ss)));
+    return load_obc_ertr::now();
+  });
 }
 
-PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
-PG::get_locked_obc(
-  Operation *op, const hobject_t &oid, RWState::State type)
-{
-  return get_or_load_head_obc(oid.get_head()).safe_then(
-    [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
-      auto &[head_obc, head_existed] = p;
-      if (oid.is_head()) {
-       if (head_existed) {
-         return head_obc->get_lock_type(op, type).then([head_obc=head_obc] {
-           ceph_assert(head_obc->loaded);
-           return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
-         });
-       } else {
-         head_obc->degrade_excl_to(type);
-         return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
-       }
-      } else {
-       return head_obc->get_lock_type(op, RWState::RWREAD).then(
-         [this, head_obc=head_obc, oid] {
-           ceph_assert(head_obc->loaded);
-           return get_or_load_clone_obc(oid, head_obc);
-         }).safe_then([head_obc=head_obc, op, oid, type](auto p) {
-             auto &[obc, existed] = p;
-             if (existed) {
-               return load_obc_ertr::future<>(
-                 obc->get_lock_type(op, type)).safe_then([obc=obc] {
-                 ceph_assert(obc->loaded);
-                 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
-               });
-             } else {
-               obc->degrade_excl_to(type);
-               return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
-             }
-         }).safe_then([head_obc=head_obc](auto obc) {
-           head_obc->put_lock_type(RWState::RWREAD);
-           return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
-         });
-      }
-    });
+PG::load_obc_ertr::future<>
+PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
+                   Operation *op, PG::with_obc_func_t &&f)
+{
+  if (__builtin_expect(stopping, false)) {
+    throw crimson::common::system_shutdown_exception();
+  }
+  const hobject_t oid = get_oid(*m);
+  switch (get_lock_type(op_info)) {
+  case RWState::RWREAD:
+    if (oid.is_head()) {
+      return with_head_obc<RWState::RWREAD>(oid, std::move(f));
+    } else {
+      return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
+    }
+  case RWState::RWWRITE:
+    if (oid.is_head()) {
+      return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+    } else {
+      return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+    }
+  case RWState::RWEXCL:
+    if (oid.is_head()) {
+      return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+    } else {
+      return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+    }
+  default:
+    ceph_abort();
+  };
 }
 
 seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
 {
+  if (__builtin_expect(stopping, false)) {
+    return seastar::make_exception_future<>(
+       crimson::common::system_shutdown_exception());
+  }
+
+  if (can_discard_replica_op(*req)) {
+    return seastar::now();
+  }
+
   ceph::os::Transaction txn;
   auto encoded_txn = req->get_data().cbegin();
   decode(txn, encoded_txn);
+  auto p = req->logbl.cbegin();
+  std::vector<pg_log_entry_t> log_entries;
+  decode(log_entries, p);
+  peering_state.append_log(std::move(log_entries), req->pg_trim_to,
+      req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
   return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
     .then([req, lcod=peering_state.get_info().last_complete, this] {
       peering_state.update_last_complete_ondisk(lcod);
@@ -803,10 +1004,99 @@ seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
     });
 }
 
-void PG::handle_rep_op_reply(crimson::net::Connection* conn,
+void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
                             const MOSDRepOpReply& m)
 {
-  backend->got_rep_op_reply(m);
+  if (!can_discard_replica_op(m)) {
+    backend->got_rep_op_reply(m);
+  }
+}
+
+template <typename MsgType>
+bool PG::can_discard_replica_op(const MsgType& m) const
+{
+  // if a repop is replied after a replica goes down in a new osdmap, and
+  // before the pg advances to this new osdmap, the repop replies before this
+  // repop can be discarded by that replica OSD, because the primary resets the
+  // connection to it when handling the new osdmap marking it down, and also
+  // resets the messenger sesssion when the replica reconnects. to avoid the
+  // out-of-order replies, the messages from that replica should be discarded.
+  const auto osdmap = peering_state.get_osdmap();
+  const int from_osd = m.get_source().num();
+  if (osdmap->is_down(from_osd)) {
+    return true;
+  }
+  // Mostly, this overlaps with the old_peering_msg
+  // condition.  An important exception is pushes
+  // sent by replicas not in the acting set, since
+  // if such a replica goes down it does not cause
+  // a new interval.
+  if (osdmap->get_down_at(from_osd) >= m.map_epoch) {
+    return true;
+  }
+  // same pg?
+  //  if pg changes *at all*, we reset and repeer!
+  if (epoch_t lpr = peering_state.get_last_peering_reset();
+      lpr > m.map_epoch) {
+    logger().debug("{}: pg changed {} after {}, dropping",
+                   __func__, get_info().history, m.map_epoch);
+    return true;
+  }
+  return false;
+}
+
+seastar::future<> PG::stop()
+{
+  logger().info("PG {} {}", pgid, __func__);
+  stopping = true;
+  return osdmap_gate.stop().then([this] {
+    return wait_for_active_blocker.stop();
+  }).then([this] {
+    return recovery_handler->stop();
+  }).then([this] {
+    return recovery_backend->stop();
+  }).then([this] {
+    return backend->stop();
+  });
+}
+
+void PG::on_change(ceph::os::Transaction &t) {
+  recovery_backend->on_peering_interval_change(t);
+  backend->on_actingset_changed({ is_primary() });
+}
+
+bool PG::can_discard_op(const MOSDOp& m) const {
+  return __builtin_expect(m.get_map_epoch()
+      < peering_state.get_info().history.same_primary_since, false);
+}
+
+bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
+  /* The conditions below may clear (on_local_recover, before we queue
+   * the transaction) before we actually requeue the degraded waiters
+   * in on_global_recover after the transaction completes.
+   */
+  if (peering_state.get_pg_log().get_missing().get_items().count(soid))
+    return true;
+  ceph_assert(!get_acting_recovery_backfill().empty());
+  for (auto& peer : get_acting_recovery_backfill()) {
+    if (peer == get_primary()) continue;
+    auto peer_missing_entry = peering_state.get_peer_missing().find(peer);
+    // If an object is missing on an async_recovery_target, return false.
+    // This will not block the op and the object is async recovered later.
+    if (peer_missing_entry != peering_state.get_peer_missing().end() &&
+       peer_missing_entry->second.get_items().count(soid)) {
+       return true;
+    }
+    // Object is degraded if after last_backfill AND
+    // we are backfilling it
+    if (is_backfill_target(peer) &&
+        peering_state.get_peer_info(peer).last_backfill <= soid &&
+       recovery_handler->backfill_state->get_last_backfill_started() >= soid &&
+       recovery_backend->is_recovering(soid)) {
+      return true;
+    }
+  }
+  return false;
 }
 
 }