]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/pg.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / pg.cc
index 0f01c160783d280851bd25ee6bb69be22486345e..7fd940f1e76a5733b1fab1a48e4e2f8bd123a601 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 
 #include "pg.h"
 
 #include "crimson/osd/pg_recovery.h"
 #include "crimson/osd/replicated_recovery_backend.h"
 
+using std::ostream;
+using std::set;
+using std::string;
+using std::vector;
+
 namespace {
   seastar::logger& logger() {
     return crimson::get_logger(ceph_subsys_osd);
@@ -125,6 +130,7 @@ PG::PG(
 PG::~PG() {}
 
 bool PG::try_flush_or_schedule_async() {
+  logger().debug("PG::try_flush_or_schedule_async: do_transaction...");
   (void)shard_services.get_store().do_transaction(
     coll_ref,
     ObjectStore::Transaction()).then(
@@ -141,6 +147,28 @@ bool PG::try_flush_or_schedule_async() {
   return false;
 }
 
+void PG::publish_stats_to_osd()
+{
+  if (!is_primary())
+    return;
+  if (auto new_pg_stats = peering_state.prepare_stats_for_publish(
+        pg_stats,
+        object_stat_collection_t());
+      new_pg_stats.has_value()) {
+    pg_stats = std::move(new_pg_stats);
+  }
+}
+
+void PG::clear_publish_stats()
+{
+  pg_stats.reset();
+}
+
+pg_stat_t PG::get_stats() const
+{
+  return pg_stats.value_or(pg_stat_t{});
+}
+
 void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
 {
   // handle the peering event in the background
@@ -241,6 +269,7 @@ void PG::on_activate_complete()
       shard_services,
       pg_whoami,
       pgid,
+      float(0.001),
       get_osdmap_epoch(),
       get_osdmap_epoch(),
       PeeringState::DoRecovery{});
@@ -252,6 +281,7 @@ void PG::on_activate_complete()
       shard_services,
       pg_whoami,
       pgid,
+      float(0.001),
       get_osdmap_epoch(),
       get_osdmap_epoch(),
       PeeringState::RequestBackfill{});
@@ -263,10 +293,12 @@ void PG::on_activate_complete()
       shard_services,
       pg_whoami,
       pgid,
+      float(0.001),
       get_osdmap_epoch(),
       get_osdmap_epoch(),
       PeeringState::AllReplicasRecovered{});
   }
+  publish_stats_to_osd();
   backend->on_activate_complete();
 }
 
@@ -383,12 +415,11 @@ void PG::init(
   const vector<int>& newacting, int new_acting_primary,
   const pg_history_t& history,
   const PastIntervals& pi,
-  bool backfill,
   ObjectStore::Transaction &t)
 {
   peering_state.init(
     role, newup, new_up_primary, newacting,
-    new_acting_primary, history, pi, backfill, t);
+    new_acting_primary, history, pi, t);
 }
 
 seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
@@ -398,7 +429,7 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
        crimson::common::system_shutdown_exception());
   }
 
-  return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
+  return seastar::do_with(PGMeta(*store, pgid), [] (auto& pg_meta) {
     return pg_meta.load();
   }).then([this, store](auto&& ret) {
     auto [pg_info, past_intervals] = std::move(ret);
@@ -439,24 +470,18 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
   });
 }
 
-void PG::do_peering_event(
-  const boost::statechart::event_base &evt,
-  PeeringCtx &rctx)
-{
-  peering_state.handle_event(
-    evt,
-    &rctx);
-  peering_state.write_if_dirty(rctx.transaction);
-}
-
 void PG::do_peering_event(
   PGPeeringEvent& evt, PeeringCtx &rctx)
 {
-  if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
-    logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
-    do_peering_event(evt.get_event(), rctx);
-  } else {
+  if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
+      peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
     logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
+  } else {
+    logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
+    peering_state.handle_event(
+      evt.get_event(),
+      &rctx);
+    peering_state.write_if_dirty(rctx.transaction);
   }
 }
 
@@ -487,8 +512,7 @@ void PG::handle_activate_map(PeeringCtx &rctx)
 
 void PG::handle_initialize(PeeringCtx &rctx)
 {
-  PeeringState::Initialize evt;
-  peering_state.handle_event(evt, &rctx);
+  peering_state.handle_event(PeeringState::Initialize{}, &rctx);
 }
 
 
@@ -544,15 +568,19 @@ seastar::future<> PG::WaitForActiveBlocker::stop()
   return seastar::now();
 }
 
-seastar::future<> PG::submit_transaction(const OpInfo& op_info,
-                                        const std::vector<OSDOp>& ops,
-                                        ObjectContextRef&& obc,
-                                        ceph::os::Transaction&& txn,
-                                        const osd_op_params_t& osd_op_p)
+std::tuple<PG::interruptible_future<>,
+           PG::interruptible_future<>>
+PG::submit_transaction(
+  const OpInfo& op_info,
+  const std::vector<OSDOp>& ops,
+  ObjectContextRef&& obc,
+  ceph::os::Transaction&& txn,
+  osd_op_params_t&& osd_op_p)
 {
   if (__builtin_expect(stopping, false)) {
-    return seastar::make_exception_future<>(
-       crimson::common::system_shutdown_exception());
+    return {seastar::make_exception_future<>(
+              crimson::common::system_shutdown_exception()),
+            seastar::now()};
   }
 
   epoch_t map_epoch = get_osdmap_epoch();
@@ -566,7 +594,7 @@ seastar::future<> PG::submit_transaction(const OpInfo& op_info,
                      pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
                    obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
                    osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
-                   osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(),
+                   osd_op_p.req_id, osd_op_p.mtime,
                     op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
   // TODO: refactor the submit_transaction
   if (op_info.allows_returnvec()) {
@@ -580,13 +608,15 @@ seastar::future<> PG::submit_transaction(const OpInfo& op_info,
   peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
                                                txn, true, false);
 
-  return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
-                               std::move(obc),
-                               std::move(txn),
-                               std::move(osd_op_p),
-                               peering_state.get_last_peering_reset(),
-                               map_epoch,
-                               std::move(log_entries)).then(
+  auto [submitted, all_completed] = backend->mutate_object(
+      peering_state.get_acting_recovery_backfill(),
+      std::move(obc),
+      std::move(txn),
+      std::move(osd_op_p),
+      peering_state.get_last_peering_reset(),
+      map_epoch,
+      std::move(log_entries));
+  return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
     [this, last_complete=peering_state.get_info().last_complete,
       at_version=osd_op_p.at_version](auto acked) {
     for (const auto& peer : acked) {
@@ -595,15 +625,13 @@ seastar::future<> PG::submit_transaction(const OpInfo& op_info,
     }
     peering_state.complete_write(at_version, last_complete);
     return seastar::now();
-  });
+  }));
 }
 
-osd_op_params_t&& PG::fill_op_params_bump_pg_version(
-  osd_op_params_t&& osd_op_p,
-  Ref<MOSDOp> m,
+void PG::fill_op_params_bump_pg_version(
+  osd_op_params_t& osd_op_p,
   const bool user_modify)
 {
-  osd_op_p.req = std::move(m);
   osd_op_p.at_version = next_version();
   osd_op_p.pg_trim_to = get_pg_trim_to();
   osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
@@ -611,147 +639,169 @@ osd_op_params_t&& PG::fill_op_params_bump_pg_version(
   if (user_modify) {
     osd_op_p.user_at_version = osd_op_p.at_version.version;
   }
-  return std::move(osd_op_p);
 }
 
-seastar::future<Ref<MOSDOpReply>> PG::handle_failed_op(
-  const std::error_code& e,
-  ObjectContextRef obc,
-  const OpsExecuter& ox,
-  const MOSDOp& m) const
+PG::interruptible_future<> PG::repair_object(
+  const hobject_t& oid,
+  eversion_t& v) 
 {
-  // Oops, an operation had failed. do_osd_ops() altogether with
-  // OpsExecuter already dropped the ObjectStore::Transaction if
-  // there was any. However, this is not enough to completely
-  // rollback as we gave OpsExecuter the very single copy of `obc`
-  // we maintain and we did it for both reading and writing.
-  // Now all modifications must be reverted.
-  //
-  // Let's just reload from the store. Evicting from the shared
-  // LRU would be tricky as next MOSDOp (the one at `get_obc`
-  // phase) could actually already finished the lookup. Fortunately,
-  // this is supposed to live on cold  paths, so performance is not
-  // a concern -- simplicity wins.
-  //
-  // The conditional's purpose is to efficiently handle hot errors
-  // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
-  // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
-  // typically append them before any write. If OpsExecuter hasn't
-  // seen any modifying operation, `obc` is supposed to be kept
-  // unchanged.
-  assert(e.value() > 0);
-  const bool need_reload_obc = ox.has_seen_write();
-  logger().debug(
-    "{}: {} - object {} got error code {}, {}; need_reload_obc {}",
-    __func__,
-    m,
-    obc->obs.oi.soid,
-    e.value(),
-    e.message(),
-    need_reload_obc);
-  return (need_reload_obc ? reload_obc(*obc)
-                          : load_obc_ertr::now()
-  ).safe_then([&e, &m, obc = std::move(obc), this] {
-    auto reply = make_message<MOSDOpReply>(
-      &m, -e.value(), get_osdmap_epoch(), 0, false);
-    reply->set_enoent_reply_versions(
-      peering_state.get_info().last_update,
-      peering_state.get_info().last_user_version);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-  }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
+  // see also PrimaryLogPG::rep_repair_primary_object()
+  assert(is_primary());
+  logger().debug("{}: {} peers osd.{}", __func__, oid, get_acting_recovery_backfill());
+  // Add object to PG's missing set if it isn't there already
+  assert(!get_local_missing().is_missing(oid));
+  peering_state.force_object_missing(pg_whoami, oid, v);
+  auto [op, fut] = get_shard_services().start_operation<UrgentRecovery>(
+    oid, v, this, get_shard_services(), get_osdmap_epoch());
+  return std::move(fut);
 }
 
-seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
-  Ref<MOSDOp> m,
-  ObjectContextRef obc,
-  const OpInfo &op_info)
+template <class Ret, class SuccessFunc, class FailureFunc>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
+PG::do_osd_ops_execute(
+  seastar::lw_shared_ptr<OpsExecuter> ox,
+  std::vector<OSDOp>& ops,
+  const OpInfo &op_info,
+  SuccessFunc&& success_func,
+  FailureFunc&& failure_func)
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-
-  using osd_op_errorator = OpsExecuter::osd_op_errorator;
-  const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
-                                                   : m->get_hobj();
-  auto ox = std::make_unique<OpsExecuter>(
-    obc, op_info, get_pool().info, get_backend(), *m);
-  return crimson::do_for_each(
-    m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
+  assert(ox);
+  auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
+    return reload_obc(obc).handle_error_interruptible(
+      load_obc_ertr::assert_all{"can't live with object state messed up"});
+  });
+  auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
+  return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
     logger().debug(
-      "do_osd_ops: {} - object {} - handling op {}",
-      *m,
-      obc->obs.oi.soid,
+      "do_osd_ops_execute: object {} - handling op {}",
+      ox->get_target(),
       ceph_osd_op_name(osd_op.op.op));
     return ox->execute_op(osd_op);
-  }).safe_then([this, obc, m, ox = ox.get(), &op_info] {
+  }).safe_then_interruptible([this, ox, &op_info, &ops] {
     logger().debug(
-      "do_osd_ops: {} - object {} all operations successful",
-      *m,
-      obc->obs.oi.soid);
-    return std::move(*ox).flush_changes(
-      [m] (auto&& obc) -> osd_op_errorator::future<> {
-       logger().debug(
-         "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
-         *m,
-         obc->obs.oi.soid);
-        return osd_op_errorator::now();
-      },
-      [this, m, &op_info] (auto&& txn,
-                          auto&& obc,
-                          auto&& osd_op_p,
-                           bool user_modify) -> osd_op_errorator::future<> {
+      "do_osd_ops_execute: object {} all operations successful",
+      ox->get_target());
+    peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
+    return std::move(*ox).flush_changes_n_do_ops_effects(
+      [this, &op_info, &ops] (auto&& txn,
+                              auto&& obc,
+                              auto&& osd_op_p,
+                              bool user_modify) {
        logger().debug(
-         "do_osd_ops: {} - object {} submitting txn",
-         *m,
-         obc->obs.oi.soid);
-        auto filled_osd_op_p = fill_op_params_bump_pg_version(
-          std::move(osd_op_p),
-          std::move(m),
-          user_modify);
+         "do_osd_ops_execute: object {} submitting txn",
+         obc->get_oid());
+        fill_op_params_bump_pg_version(osd_op_p, user_modify);
        return submit_transaction(
           op_info,
-          filled_osd_op_p.req->ops,
+          ops,
           std::move(obc),
           std::move(txn),
-          std::move(filled_osd_op_p));
-      });
-  }).safe_then([this,
-                m,
-                obc,
-                rvec = op_info.allows_returnvec()] {
-    // TODO: should stop at the first op which returns a negative retval,
-    //       cmpext uses it for returning the index of first unmatched byte
-    int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
-    if (result > 0 && !rvec) {
-      result = 0;
-    }
-    auto reply = make_message<MOSDOpReply>(m.get(),
-                                           result,
-                                           get_osdmap_epoch(),
-                                           0,
-                                           false);
-    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-    logger().debug(
-      "do_osd_ops: {} - object {} sending reply",
-      *m,
-      obc->obs.oi.soid);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-  }, osd_op_errorator::all_same_way([ox = ox.get(),
-                                     m,
-                                     obc,
-                                     this] (const std::error_code& e) {
-    return handle_failed_op(e, std::move(obc), *ox, *m);
-  })).handle_exception_type([ox_deleter = std::move(ox),
-                             m,
-                             obc,
-                             this] (const crimson::osd::error& e) {
-    // we need this handler because throwing path which aren't errorated yet.
-    logger().debug("encountered the legacy error handling path!");
-    return handle_failed_op(e.code(), std::move(obc), *ox_deleter, *m);
-  });
+          std::move(osd_op_p));
+    });
+  }).safe_then_unpack_interruptible(
+    [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
+    (auto submitted_fut, auto all_completed_fut) mutable {
+    return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+        std::move(submitted_fut),
+        all_completed_fut.safe_then_interruptible_tuple(
+          std::move(success_func),
+          crimson::ct_error::object_corrupted::handle(
+            [rollbacker, this] (const std::error_code& e) mutable {
+            // this is a path for EIO. it's special because we want to fix the obejct
+            // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
+            // restart the execution.
+            return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+              [obc=rollbacker.get_obc(), this] {
+              return repair_object(obc->obs.oi.soid,
+                                   obc->obs.oi.version).then_interruptible([] {
+                return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
+              });
+            });
+          }), OpsExecuter::osd_op_errorator::all_same_way(
+            [rollbacker, failure_func_ptr]
+            (const std::error_code& e) mutable {
+            return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+              [e, failure_func_ptr] {
+              return (*failure_func_ptr)(e);
+            });
+          })
+        )
+      );
+  }, OpsExecuter::osd_op_errorator::all_same_way(
+    [rollbacker, failure_func_ptr]
+    (const std::error_code& e) mutable {
+    return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+        seastar::now(),
+        rollbacker.rollback_obc_if_modified(e).then_interruptible(
+          [e, failure_func_ptr] {
+          return (*failure_func_ptr)(e);
+        }));
+  }));
 }
 
-seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
+PG::do_osd_ops(
+  Ref<MOSDOp> m,
+  ObjectContextRef obc,
+  const OpInfo &op_info)
+{
+  if (__builtin_expect(stopping, false)) {
+    throw crimson::common::system_shutdown_exception();
+  }
+  return do_osd_ops_execute<MURef<MOSDOpReply>>(
+    seastar::make_lw_shared<OpsExecuter>(
+      Ref<PG>{this}, std::move(obc), op_info, *m),
+    m->ops,
+    op_info,
+    [this, m, rvec = op_info.allows_returnvec()] {
+      // TODO: should stop at the first op which returns a negative retval,
+      //       cmpext uses it for returning the index of first unmatched byte
+      int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+      if (result > 0 && !rvec) {
+        result = 0;
+      }
+      auto reply = crimson::make_message<MOSDOpReply>(m.get(),
+                                             result,
+                                             get_osdmap_epoch(),
+                                             0,
+                                             false);
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      logger().debug(
+        "do_osd_ops: {} - object {} sending reply",
+        *m,
+        m->get_hobj());
+      return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+        std::move(reply));
+    },
+    [m, this] (const std::error_code& e) {
+      auto reply = crimson::make_message<MOSDOpReply>(
+        m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+      reply->set_enoent_reply_versions(
+        peering_state.get_info().last_update,
+        peering_state.get_info().last_user_version);
+      return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+    });
+}
+
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
+PG::do_osd_ops(
+  ObjectContextRef obc,
+  std::vector<OSDOp>& ops,
+  const OpInfo &op_info,
+  const do_osd_ops_params_t& msg_params,
+  do_osd_ops_success_func_t success_func,
+  do_osd_ops_failure_func_t failure_func)
+{
+  return do_osd_ops_execute<void>(
+    seastar::make_lw_shared<OpsExecuter>(
+      Ref<PG>{this}, std::move(obc), op_info, msg_params),
+    ops,
+    std::as_const(op_info),
+    std::move(success_func),
+    std::move(failure_func));
+}
+
+PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
 {
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
@@ -759,28 +809,26 @@ seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
 
   auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
                                             std::as_const(*m));
-  return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
+  return interruptor::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
     logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
     return ox->execute_op(osd_op);
-  }).then([m, this, ox = std::move(ox)] {
-    auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+  }).then_interruptible([m, this, ox = std::move(ox)] {
+    auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
                                            CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
                                            false);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-  }).handle_exception_type([=](const crimson::osd::error& e) {
-    auto reply = make_message<MOSDOpReply>(
+    return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+  }).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
+    auto reply = crimson::make_message<MOSDOpReply>(
       m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
     reply->set_enoent_reply_versions(peering_state.get_info().last_update,
                                     peering_state.get_info().last_user_version);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
   });
 }
 
-hobject_t PG::get_oid(const MOSDOp &m)
+hobject_t PG::get_oid(const hobject_t& hobj)
 {
-  return (m.get_snapid() == CEPH_SNAPDIR ?
-          m.get_hobj().get_head() :
-          m.get_hobj());
+  return hobj.snap == CEPH_SNAPDIR ? hobj.get_head() : hobj;
 }
 
 RWState::State PG::get_lock_type(const OpInfo &op_info)
@@ -829,36 +877,58 @@ std::optional<hobject_t> PG::resolve_oid(
 }
 
 template<RWState::State State>
-PG::load_obc_ertr::future<>
-PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
+PG::load_obc_iertr::future<>
+PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func)
 {
-  assert(oid.is_head());
-  auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
-  return obc->with_lock<State>(
-    [oid=std::move(oid), existed=existed, obc=std::move(obc),
-     func=std::move(func), this] {
-    auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+  logger().debug("{} {}", __func__, obc->get_oid());
+  assert(obc->is_head());
+  obc->append_to(obc_set_accessing);
+  return obc->with_lock<State, IOInterruptCondition>(
+    [existed=existed, obc=obc, func=std::move(func), this] {
+    auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
     if (existed) {
-      logger().debug("with_head_obc: found {} in cache", oid);
+      logger().debug("with_head_obc: found {} in cache", obc->get_oid());
     } else {
-      logger().debug("with_head_obc: cache miss on {}", oid);
-      loaded = obc->with_promoted_lock<State>([this, obc] {
+      logger().debug("with_head_obc: cache miss on {}", obc->get_oid());
+      loaded = obc->with_promoted_lock<State, IOInterruptCondition>([this, obc] {
         return load_head_obc(obc);
       });
     }
-    return loaded.safe_then([func=std::move(func)](auto obc) {
-      return func(std::move(obc));
+    return loaded.safe_then_interruptible([func = std::move(func)](auto obc) {
+      return std::move(func)(std::move(obc));
     });
+  }).finally([this, pgref=boost::intrusive_ptr<PG>{this}, obc=std::move(obc)] {
+    logger().debug("with_head_obc: released {}", obc->get_oid());
+    obc->remove_from(obc_set_accessing);
   });
 }
 
 template<RWState::State State>
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
+PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
+{
+  auto [obc, existed] =
+    shard_services.obc_registry.get_cached_obc(std::move(oid));
+  return with_head_obc<State>(std::move(obc), existed, std::move(func));
+}
+
+template<RWState::State State>
+PG::interruptible_future<>
+PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func)
+{
+  constexpr bool existed = true;
+  return with_head_obc<State>(
+    std::move(obc), existed, std::move(func)
+  ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"});
+}
+
+template<RWState::State State>
+PG::load_obc_iertr::future<>
 PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
 {
   assert(!oid.is_head());
   return with_head_obc<RWState::RWREAD>(oid.get_head(),
-    [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> {
+    [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
     auto coid = resolve_oid(head->get_ro_ss(), oid);
     if (!coid) {
       // TODO: return crimson::ct_error::enoent::make();
@@ -869,37 +939,54 @@ PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
     return clone->template with_lock<State>(
       [coid=*coid, existed=existed,
        head=std::move(head), clone=std::move(clone),
-       func=std::move(func), this]() -> load_obc_ertr::future<> {
-      auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(clone);
+       func=std::move(func), this]() -> load_obc_iertr::future<> {
+      auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(clone);
       if (existed) {
         logger().debug("with_clone_obc: found {} in cache", coid);
       } else {
         logger().debug("with_clone_obc: cache miss on {}", coid);
         loaded = clone->template with_promoted_lock<State>(
           [coid, clone, head, this] {
-          return backend->load_metadata(coid).safe_then(
+          return backend->load_metadata(coid).safe_then_interruptible(
             [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
             clone->set_clone_state(std::move(md->os), std::move(head));
             return clone;
           });
         });
       }
-      return loaded.safe_then([func=std::move(func)](auto clone) {
-        return func(std::move(clone));
+      return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
+        return std::move(func)(std::move(clone));
       });
     });
   });
 }
 
 // explicitly instantiate the used instantiations
-template PG::load_obc_ertr::future<>
+template PG::load_obc_iertr::future<>
 PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
 
-PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+template<RWState::State State>
+PG::interruptible_future<>
+PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func)
+{
+  assert(clone);
+  assert(clone->get_head_obc());
+  assert(!clone->get_oid().is_head());
+  return with_existing_head_obc<RWState::RWREAD>(clone->get_head_obc(),
+    [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) {
+    assert(head == clone->get_head_obc());
+    return clone->template with_lock<State>(
+      [clone=std::move(clone), func=std::move(func)] {
+      return std::move(func)(std::move(clone));
+    });
+  });
+}
+
+PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
 PG::load_head_obc(ObjectContextRef obc)
 {
-  hobject_t oid = obc->get_oid();
-  return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md)
+  return backend->load_metadata(obc->get_oid()).safe_then_interruptible(
+    [obc=std::move(obc)](auto md)
     -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
     const hobject_t& oid = md->os.oi.soid;
     logger().debug(
@@ -918,11 +1005,11 @@ PG::load_head_obc(ObjectContextRef obc)
   });
 }
 
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
 PG::reload_obc(crimson::osd::ObjectContext& obc) const
 {
   assert(obc.is_head());
-  return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md)
+  return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
     -> load_obc_ertr::future<> {
     logger().debug(
       "{}: reloaded obs {} for {}",
@@ -941,14 +1028,15 @@ PG::reload_obc(crimson::osd::ObjectContext& obc) const
   });
 }
 
-PG::load_obc_ertr::future<>
-PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
-                   Operation *op, PG::with_obc_func_t &&f)
+PG::load_obc_iertr::future<>
+PG::with_locked_obc(const hobject_t &hobj,
+                    const OpInfo &op_info,
+                    with_obc_func_t &&f)
 {
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
   }
-  const hobject_t oid = get_oid(*m);
+  const hobject_t oid = get_oid(hobj);
   switch (get_lock_type(op_info)) {
   case RWState::RWREAD:
     if (oid.is_head()) {
@@ -964,16 +1052,36 @@ PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
     }
   case RWState::RWEXCL:
     if (oid.is_head()) {
-      return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+      return with_head_obc<RWState::RWEXCL>(oid, std::move(f));
     } else {
-      return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+      return with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
     }
   default:
     ceph_abort();
   };
 }
 
-seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+template <RWState::State State>
+PG::interruptible_future<>
+PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f)
+{
+  // TODO: a question from rebase: do we really need such checks when
+  // the interruptible stuff is being used?
+  if (__builtin_expect(stopping, false)) {
+    throw crimson::common::system_shutdown_exception();
+  }
+  if (obc->is_head()) {
+    return with_existing_head_obc<State>(obc, std::move(f));
+  } else {
+    return with_existing_clone_obc<State>(obc, std::move(f));
+  }
+}
+
+// explicitly instantiate the used instantiations
+template PG::interruptible_future<>
+PG::with_locked_obc<RWState::RWEXCL>(ObjectContextRef, with_obc_func_t&&);
+
+PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
 {
   if (__builtin_expect(stopping, false)) {
     return seastar::make_exception_future<>(
@@ -992,15 +1100,17 @@ seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
   decode(log_entries, p);
   peering_state.append_log(std::move(log_entries), req->pg_trim_to,
       req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
-  return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
-    .then([req, lcod=peering_state.get_info().last_complete, this] {
+  logger().debug("PG::handle_rep_op: do_transaction...");
+  return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+       coll_ref, std::move(txn))).then_interruptible(
+      [req, lcod=peering_state.get_info().last_complete, this] {
       peering_state.update_last_complete_ondisk(lcod);
       const auto map_epoch = get_osdmap_epoch();
-      auto reply = make_message<MOSDRepOpReply>(
+      auto reply = crimson::make_message<MOSDRepOpReply>(
         req.get(), pg_whoami, 0,
        map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
       reply->set_last_complete_ondisk(lcod);
-      return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
+      return shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch);
     });
 }
 
@@ -1012,8 +1122,20 @@ void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
   }
 }
 
-template <typename MsgType>
-bool PG::can_discard_replica_op(const MsgType& m) const
+bool PG::old_peering_msg(
+  const epoch_t reply_epoch,
+  const epoch_t query_epoch) const
+{
+  if (const epoch_t lpr = peering_state.get_last_peering_reset();
+      lpr > reply_epoch || lpr > query_epoch) {
+    logger().debug("{}: pg changed {} lpr {}, reply_epoch {}, query_epoch {}",
+                   __func__, get_info().history, lpr, reply_epoch, query_epoch);
+    return true;
+  }
+  return false;
+}
+
+bool PG::can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const
 {
   // if a repop is replied after a replica goes down in a new osdmap, and
   // before the pg advances to this new osdmap, the repop replies before this
@@ -1031,24 +1153,22 @@ bool PG::can_discard_replica_op(const MsgType& m) const
   // sent by replicas not in the acting set, since
   // if such a replica goes down it does not cause
   // a new interval.
-  if (osdmap->get_down_at(from_osd) >= m.map_epoch) {
+  if (osdmap->get_down_at(from_osd) >= m_map_epoch) {
     return true;
   }
   // same pg?
   //  if pg changes *at all*, we reset and repeer!
-  if (epoch_t lpr = peering_state.get_last_peering_reset();
-      lpr > m.map_epoch) {
-    logger().debug("{}: pg changed {} after {}, dropping",
-                   __func__, get_info().history, m.map_epoch);
-    return true;
-  }
-  return false;
+  return old_peering_msg(m_map_epoch, m_map_epoch);
 }
 
 seastar::future<> PG::stop()
 {
   logger().info("PG {} {}", pgid, __func__);
   stopping = true;
+  cancel_local_background_io_reservation();
+  cancel_remote_recovery_reservation();
+  check_readable_timer.cancel();
+  renew_lease_timer.cancel();
   return osdmap_gate.stop().then([this] {
     return wait_for_active_blocker.stop();
   }).then([this] {
@@ -1061,6 +1181,10 @@ seastar::future<> PG::stop()
 }
 
 void PG::on_change(ceph::os::Transaction &t) {
+  logger().debug("{}, {}", __func__, *this);
+  for (auto& obc : obc_set_accessing) {
+    obc.interrupt(::crimson::common::actingset_changed(is_primary()));
+  }
   recovery_backend->on_peering_interval_change(t);
   backend->on_actingset_changed({ is_primary() });
 }
@@ -1099,4 +1223,22 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
   return false;
 }
 
+PG::interruptible_future<std::tuple<bool, int>>
+PG::already_complete(const osd_reqid_t& reqid)
+{
+  eversion_t version;
+  version_t user_version;
+  int ret;
+  std::vector<pg_log_op_return_item_t> op_returns;
+
+  if (peering_state.get_pg_log().get_log().get_request(
+       reqid, &version, &user_version, &ret, &op_returns)) {
+    return backend->request_committed(reqid, version).then([ret] {
+      return seastar::make_ready_future<std::tuple<bool, int>>(true, ret);
+    });
+  } else {
+    return seastar::make_ready_future<std::tuple<bool, int>>(false, 0);
+  }
+}
+
 }